From 1458d609cacac0c8b508e8f4c8c6667d6b8b17df Mon Sep 17 00:00:00 2001 From: sususu98 Date: Tue, 21 Oct 2025 13:43:57 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=B8=BA=20Claude=20Console=20?= =?UTF-8?q?=E8=B4=A6=E6=88=B7=E6=B7=BB=E5=8A=A0=E5=B9=B6=E5=8F=91=E6=8E=A7?= =?UTF-8?q?=E5=88=B6=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 实现了完整的 Claude Console 账户并发任务数控制功能,防止单账户过载,提升服务稳定性。 **核心功能** - 🔒 **原子性并发控制**: 基于 Redis Sorted Set 实现的抢占式并发槽位管理,防止竞态条件 - 🔄 **自动租约刷新**: 流式请求每 5 分钟自动刷新租约,防止长连接租约过期 - 🚨 **智能降级处理**: 并发满额时自动清理粘性会话并重试其他账户(最多 1 次) - 🎯 **专用错误码**: 引入 `CONSOLE_ACCOUNT_CONCURRENCY_FULL` 错误码,区分并发限制和其他错误 - 📊 **批量性能优化**: 调度器使用 Promise.all 并行查询账户并发数,减少 Redis 往返 **后端实现** 1. **Redis 并发控制方法** (src/models/redis.js) - `incrConsoleAccountConcurrency()`: 增加并发计数(带租约) - `decrConsoleAccountConcurrency()`: 释放并发槽位 - `refreshConsoleAccountConcurrencyLease()`: 刷新租约(流式请求) - `getConsoleAccountConcurrency()`: 查询当前并发数 2. **账户服务增强** (src/services/claudeConsoleAccountService.js) - 添加 `maxConcurrentTasks` 字段(默认 0 表示无限制) - 获取账户时自动查询实时并发数 (`activeTaskCount`) - 支持更新并发限制配置 3. **转发服务并发保护** (src/services/claudeConsoleRelayService.js) - 请求前原子性抢占槽位,超限则立即回滚并抛出专用错误 - 流式请求启动定时器每 5 分钟刷新租约 - `finally` 块确保槽位释放(即使发生异常) - 为每个请求分配唯一 `requestId` 用于并发追踪 4. **统一调度器优化** (src/services/unifiedClaudeScheduler.js) - 获取可用账户时批量查询并发数(Promise.all 并行) - 预检查并发限制,避免选择已满的账户 - 检查分组成员时也验证并发状态 - 所有账户并发满额时抛出专用错误码 5. **API 路由降级处理** (src/routes/api.js) - 捕获 `CONSOLE_ACCOUNT_CONCURRENCY_FULL` 错误 - 自动清理粘性会话映射并重试(最多 1 次) - 重试失败返回 503 错误和友好提示 - count_tokens 端点也支持并发满额重试 6. **管理端点验证** (src/routes/admin.js) - 创建/更新账户时验证 `maxConcurrentTasks` 为非负整数 - 支持前端传入并发限制配置 **前端实现** 1. **表单字段** (web/admin-spa/src/components/accounts/AccountForm.vue) - 添加"最大并发任务数"输入框(创建和编辑模式) - 支持占位符提示"0 表示不限制" - 表单数据自动映射到后端 API 2. **实时监控** (web/admin-spa/src/views/AccountsView.vue) - 账户列表显示并发状态进度条和百分比 - 颜色编码:绿色(<80%)、黄色(80%-100%)、红色(100%) - 显示"X / Y"格式的并发数(如"2 / 5") - 未配置限制时显示"并发无限制"徽章 --- src/models/redis.js | 32 ++ src/routes/admin.js | 30 +- src/routes/api.js | 292 +++++++++++++----- src/services/claudeConsoleAccountService.js | 29 +- src/services/claudeConsoleRelayService.js | 127 +++++++- src/services/unifiedClaudeScheduler.js | 140 ++++++++- .../src/components/accounts/AccountForm.vue | 57 ++++ web/admin-spa/src/views/AccountsView.vue | 118 +++++-- 8 files changed, 706 insertions(+), 119 deletions(-) diff --git a/src/models/redis.js b/src/models/redis.js index ea917c28..4f222f64 100644 --- a/src/models/redis.js +++ b/src/models/redis.js @@ -1775,6 +1775,38 @@ class RedisClient { } } + // 🏢 Claude Console 账户并发控制(复用现有并发机制) + // 增加 Console 账户并发计数 + async incrConsoleAccountConcurrency(accountId, requestId, leaseSeconds = null) { + if (!requestId) { + throw new Error('Request ID is required for console account concurrency tracking') + } + // 使用特殊的 key 前缀区分 Console 账户并发 + const compositeKey = `console_account:${accountId}` + return await this.incrConcurrency(compositeKey, requestId, leaseSeconds) + } + + // 刷新 Console 账户并发租约 + async refreshConsoleAccountConcurrencyLease(accountId, requestId, leaseSeconds = null) { + if (!requestId) { + return 0 + } + const compositeKey = `console_account:${accountId}` + return await this.refreshConcurrencyLease(compositeKey, requestId, leaseSeconds) + } + + // 减少 Console 账户并发计数 + async decrConsoleAccountConcurrency(accountId, requestId) { + const compositeKey = `console_account:${accountId}` + return await this.decrConcurrency(compositeKey, requestId) + } + + // 获取 Console 账户当前并发数 + async getConsoleAccountConcurrency(accountId) { + const compositeKey = `console_account:${accountId}` + return await this.getConcurrency(compositeKey) + } + // 🔧 Basic Redis operations wrapper methods for convenience async get(key) { const client = this.getClientSafe() diff --git a/src/routes/admin.js b/src/routes/admin.js index ae5eecff..ae8395de 100644 --- a/src/routes/admin.js +++ b/src/routes/admin.js @@ -2757,7 +2757,8 @@ router.post('/claude-console-accounts', authenticateAdmin, async (req, res) => { accountType, groupId, dailyQuota, - quotaResetTime + quotaResetTime, + maxConcurrentTasks } = req.body if (!name || !apiUrl || !apiKey) { @@ -2769,6 +2770,14 @@ router.post('/claude-console-accounts', authenticateAdmin, async (req, res) => { return res.status(400).json({ error: 'Priority must be between 1 and 100' }) } + // 验证maxConcurrentTasks的有效性(非负整数) + if (maxConcurrentTasks !== undefined && maxConcurrentTasks !== null) { + const concurrent = Number(maxConcurrentTasks) + if (!Number.isInteger(concurrent) || concurrent < 0) { + return res.status(400).json({ error: 'maxConcurrentTasks must be a non-negative integer' }) + } + } + // 验证accountType的有效性 if (accountType && !['shared', 'dedicated', 'group'].includes(accountType)) { return res @@ -2794,7 +2803,11 @@ router.post('/claude-console-accounts', authenticateAdmin, async (req, res) => { proxy, accountType: accountType || 'shared', dailyQuota: dailyQuota || 0, - quotaResetTime: quotaResetTime || '00:00' + quotaResetTime: quotaResetTime || '00:00', + maxConcurrentTasks: + maxConcurrentTasks !== undefined && maxConcurrentTasks !== null + ? Number(maxConcurrentTasks) + : 0 }) // 如果是分组类型,将账户添加到分组(CCR 归属 Claude 平台分组) @@ -2830,6 +2843,19 @@ router.put('/claude-console-accounts/:accountId', authenticateAdmin, async (req, return res.status(400).json({ error: 'Priority must be between 1 and 100' }) } + // 验证maxConcurrentTasks的有效性(非负整数) + if ( + mappedUpdates.maxConcurrentTasks !== undefined && + mappedUpdates.maxConcurrentTasks !== null + ) { + const concurrent = Number(mappedUpdates.maxConcurrentTasks) + if (!Number.isInteger(concurrent) || concurrent < 0) { + return res.status(400).json({ error: 'maxConcurrentTasks must be a non-negative integer' }) + } + // 转换为数字类型 + mappedUpdates.maxConcurrentTasks = concurrent + } + // 验证accountType的有效性 if ( mappedUpdates.accountType && diff --git a/src/routes/api.js b/src/routes/api.js index bf795216..898293bd 100644 --- a/src/routes/api.js +++ b/src/routes/api.js @@ -56,6 +56,11 @@ async function handleMessagesRequest(req, res) { }) } + // 🔄 并发满额重试标志:最多重试一次(使用req对象存储状态) + if (req._concurrencyRetryAttempted === undefined) { + req._concurrencyRetryAttempted = false + } + // 严格的输入验证 if (!req.body || typeof req.body !== 'object') { return res.status(400).json({ @@ -676,9 +681,75 @@ async function handleMessagesRequest(req, res) { logger.api(`✅ Request completed in ${duration}ms for key: ${req.apiKey.name}`) return undefined } catch (error) { - logger.error('❌ Claude relay error:', error.message, { - code: error.code, - stack: error.stack + let handledError = error + + // 🔄 并发满额降级处理:捕获CONSOLE_ACCOUNT_CONCURRENCY_FULL错误 + if ( + handledError.code === 'CONSOLE_ACCOUNT_CONCURRENCY_FULL' && + !req._concurrencyRetryAttempted + ) { + req._concurrencyRetryAttempted = true + logger.warn( + `⚠️ Console account ${handledError.accountId} concurrency full, attempting fallback to other accounts...` + ) + + // 只有在响应头未发送时才能重试 + if (!res.headersSent) { + try { + // 清理粘性会话映射(如果存在) + const sessionHash = sessionHelper.generateSessionHash(req.body) + await unifiedClaudeScheduler.clearSessionMapping(sessionHash) + + logger.info('🔄 Session mapping cleared, retrying handleMessagesRequest...') + + // 递归重试整个请求处理(会选择新账户) + return await handleMessagesRequest(req, res) + } catch (retryError) { + // 重试失败 + if (retryError.code === 'CONSOLE_ACCOUNT_CONCURRENCY_FULL') { + logger.error('❌ All Console accounts reached concurrency limit after retry') + return res.status(503).json({ + error: 'service_unavailable', + message: + 'All available Claude Console accounts have reached their concurrency limit. Please try again later.' + }) + } + // 其他错误继续向下处理 + handledError = retryError + } + } else { + // 响应头已发送,无法重试 + logger.error('❌ Cannot retry concurrency full error - response headers already sent') + if (!res.destroyed && !res.finished) { + res.end() + } + return undefined + } + } + + // 🚫 第二次并发满额错误:已经重试过,直接返回503 + if ( + handledError.code === 'CONSOLE_ACCOUNT_CONCURRENCY_FULL' && + req._concurrencyRetryAttempted + ) { + logger.error('❌ All Console accounts reached concurrency limit (retry already attempted)') + if (!res.headersSent) { + return res.status(503).json({ + error: 'service_unavailable', + message: + 'All available Claude Console accounts have reached their concurrency limit. Please try again later.' + }) + } else { + if (!res.destroyed && !res.finished) { + res.end() + } + return undefined + } + } + + logger.error('❌ Claude relay error:', handledError.message, { + code: handledError.code, + stack: handledError.stack }) // 确保在任何情况下都能返回有效的JSON响应 @@ -687,23 +758,29 @@ async function handleMessagesRequest(req, res) { let statusCode = 500 let errorType = 'Relay service error' - if (error.message.includes('Connection reset') || error.message.includes('socket hang up')) { + if ( + handledError.message.includes('Connection reset') || + handledError.message.includes('socket hang up') + ) { statusCode = 502 errorType = 'Upstream connection error' - } else if (error.message.includes('Connection refused')) { + } else if (handledError.message.includes('Connection refused')) { statusCode = 502 errorType = 'Upstream service unavailable' - } else if (error.message.includes('timeout')) { + } else if (handledError.message.includes('timeout')) { statusCode = 504 errorType = 'Upstream timeout' - } else if (error.message.includes('resolve') || error.message.includes('ENOTFOUND')) { + } else if ( + handledError.message.includes('resolve') || + handledError.message.includes('ENOTFOUND') + ) { statusCode = 502 errorType = 'Upstream hostname resolution failed' } return res.status(statusCode).json({ error: errorType, - message: error.message || 'An unexpected error occurred', + message: handledError.message || 'An unexpected error occurred', timestamp: new Date().toISOString() }) } else { @@ -860,84 +937,85 @@ router.get('/v1/organizations/:org_id/usage', authenticateApiKey, async (req, re // 🔢 Token计数端点 - count_tokens beta API router.post('/v1/messages/count_tokens', authenticateApiKey, async (req, res) => { - try { - // 检查权限 - if ( - req.apiKey.permissions && - req.apiKey.permissions !== 'all' && - req.apiKey.permissions !== 'claude' - ) { - return res.status(403).json({ - error: { - type: 'permission_error', - message: 'This API key does not have permission to access Claude' - } - }) - } + // 检查权限 + if ( + req.apiKey.permissions && + req.apiKey.permissions !== 'all' && + req.apiKey.permissions !== 'claude' + ) { + return res.status(403).json({ + error: { + type: 'permission_error', + message: 'This API key does not have permission to access Claude' + } + }) + } - logger.info(`🔢 Processing token count request for key: ${req.apiKey.name}`) + logger.info(`🔢 Processing token count request for key: ${req.apiKey.name}`) - // 生成会话哈希用于sticky会话 - const sessionHash = sessionHelper.generateSessionHash(req.body) + const sessionHash = sessionHelper.generateSessionHash(req.body) + const requestedModel = req.body.model + const maxAttempts = 2 + let attempt = 0 - // 选择可用的Claude账户 - const requestedModel = req.body.model + const processRequest = async () => { const { accountId, accountType } = await unifiedClaudeScheduler.selectAccountForApiKey( req.apiKey, sessionHash, requestedModel ) - let response - if (accountType === 'claude-official') { - // 使用官方Claude账号转发count_tokens请求 - response = await claudeRelayService.relayRequest( - req.body, - req.apiKey, - req, - res, - req.headers, - { - skipUsageRecord: true, // 跳过usage记录,这只是计数请求 - customPath: '/v1/messages/count_tokens' // 指定count_tokens路径 - } - ) - } else if (accountType === 'claude-console') { - // 使用Console Claude账号转发count_tokens请求 - response = await claudeConsoleRelayService.relayRequest( - req.body, - req.apiKey, - req, - res, - req.headers, - accountId, - { - skipUsageRecord: true, // 跳过usage记录,这只是计数请求 - customPath: '/v1/messages/count_tokens' // 指定count_tokens路径 - } - ) - } else if (accountType === 'ccr') { - // CCR不支持count_tokens - return res.status(501).json({ - error: { - type: 'not_supported', - message: 'Token counting is not supported for CCR accounts' - } - }) - } else { - // Bedrock不支持count_tokens - return res.status(501).json({ - error: { - type: 'not_supported', - message: 'Token counting is not supported for Bedrock accounts' + if (accountType === 'ccr') { + throw Object.assign(new Error('Token counting is not supported for CCR accounts'), { + httpStatus: 501, + errorPayload: { + error: { + type: 'not_supported', + message: 'Token counting is not supported for CCR accounts' + } } }) } - // 直接返回响应,不记录token使用量 + if (accountType === 'bedrock') { + throw Object.assign(new Error('Token counting is not supported for Bedrock accounts'), { + httpStatus: 501, + errorPayload: { + error: { + type: 'not_supported', + message: 'Token counting is not supported for Bedrock accounts' + } + } + }) + } + + const relayOptions = { + skipUsageRecord: true, + customPath: '/v1/messages/count_tokens' + } + + const response = + accountType === 'claude-official' + ? await claudeRelayService.relayRequest( + req.body, + req.apiKey, + req, + res, + req.headers, + relayOptions + ) + : await claudeConsoleRelayService.relayRequest( + req.body, + req.apiKey, + req, + res, + req.headers, + accountId, + relayOptions + ) + res.status(response.statusCode) - // 设置响应头 const skipHeaders = ['content-encoding', 'transfer-encoding', 'content-length'] Object.keys(response.headers).forEach((key) => { if (!skipHeaders.includes(key.toLowerCase())) { @@ -945,10 +1023,8 @@ router.post('/v1/messages/count_tokens', authenticateApiKey, async (req, res) => } }) - // 尝试解析并返回JSON响应 try { const jsonData = JSON.parse(response.body) - // 对于非 2xx 响应,清理供应商特定信息 if (response.statusCode < 200 || response.statusCode >= 300) { const sanitizedData = sanitizeUpstreamError(jsonData) res.json(sanitizedData) @@ -960,14 +1036,70 @@ router.post('/v1/messages/count_tokens', authenticateApiKey, async (req, res) => } logger.info(`✅ Token count request completed for key: ${req.apiKey.name}`) - } catch (error) { - logger.error('❌ Token count error:', error) - res.status(500).json({ - error: { - type: 'server_error', - message: 'Failed to count tokens' + } + + while (attempt < maxAttempts) { + try { + await processRequest() + return + } catch (error) { + if (error.code === 'CONSOLE_ACCOUNT_CONCURRENCY_FULL') { + logger.warn( + `⚠️ Console account concurrency full during count_tokens (attempt ${attempt + 1}/${maxAttempts})` + ) + if (attempt < maxAttempts - 1) { + try { + await unifiedClaudeScheduler.clearSessionMapping(sessionHash) + } catch (clearError) { + logger.error('❌ Failed to clear session mapping for count_tokens retry:', clearError) + if (!res.headersSent) { + return res.status(500).json({ + error: { + type: 'server_error', + message: 'Failed to count tokens' + } + }) + } + if (!res.destroyed && !res.finished) { + res.end() + } + return + } + attempt += 1 + continue + } + if (!res.headersSent) { + return res.status(503).json({ + error: 'service_unavailable', + message: + 'All available Claude Console accounts have reached their concurrency limit. Please try again later.' + }) + } + if (!res.destroyed && !res.finished) { + res.end() + } + return } - }) + + if (error.httpStatus) { + return res.status(error.httpStatus).json(error.errorPayload) + } + + logger.error('❌ Token count error:', error) + if (!res.headersSent) { + return res.status(500).json({ + error: { + type: 'server_error', + message: 'Failed to count tokens' + } + }) + } + + if (!res.destroyed && !res.finished) { + res.end() + } + return + } } }) diff --git a/src/services/claudeConsoleAccountService.js b/src/services/claudeConsoleAccountService.js index 85144554..9121ee55 100644 --- a/src/services/claudeConsoleAccountService.js +++ b/src/services/claudeConsoleAccountService.js @@ -66,7 +66,8 @@ class ClaudeConsoleAccountService { accountType = 'shared', // 'dedicated' or 'shared' schedulable = true, // 是否可被调度 dailyQuota = 0, // 每日额度限制(美元),0表示不限制 - quotaResetTime = '00:00' // 额度重置时间(HH:mm格式) + quotaResetTime = '00:00', // 额度重置时间(HH:mm格式) + maxConcurrentTasks = 0 // 最大并发任务数,0表示无限制 } = options // 验证必填字段 @@ -113,7 +114,8 @@ class ClaudeConsoleAccountService { // 使用与统计一致的时区日期,避免边界问题 lastResetDate: redis.getDateStringInTimezone(), // 最后重置日期(按配置时区) quotaResetTime, // 额度重置时间 - quotaStoppedAt: '' // 因额度停用的时间 + quotaStoppedAt: '', // 因额度停用的时间 + maxConcurrentTasks: maxConcurrentTasks.toString() // 最大并发任务数,0表示无限制 } const client = redis.getClientSafe() @@ -149,7 +151,9 @@ class ClaudeConsoleAccountService { dailyUsage: 0, lastResetDate: accountData.lastResetDate, quotaResetTime, - quotaStoppedAt: null + quotaStoppedAt: null, + maxConcurrentTasks, // 新增:返回并发限制配置 + activeTaskCount: 0 // 新增:新建账户当前并发数为0 } } @@ -172,6 +176,9 @@ class ClaudeConsoleAccountService { // 获取限流状态信息 const rateLimitInfo = this._getRateLimitInfo(accountData) + // 获取实时并发计数 + const activeTaskCount = await redis.getConsoleAccountConcurrency(accountData.id) + accounts.push({ id: accountData.id, platform: accountData.platform, @@ -202,7 +209,11 @@ class ClaudeConsoleAccountService { dailyUsage: parseFloat(accountData.dailyUsage || '0'), lastResetDate: accountData.lastResetDate || '', quotaResetTime: accountData.quotaResetTime || '00:00', - quotaStoppedAt: accountData.quotaStoppedAt || null + quotaStoppedAt: accountData.quotaStoppedAt || null, + + // 并发控制相关 + maxConcurrentTasks: parseInt(accountData.maxConcurrentTasks) || 0, + activeTaskCount }) } } @@ -253,6 +264,11 @@ class ClaudeConsoleAccountService { accountData.proxy = JSON.parse(accountData.proxy) } + // 解析并发控制字段 + accountData.maxConcurrentTasks = parseInt(accountData.maxConcurrentTasks) || 0 + // 获取实时并发计数 + accountData.activeTaskCount = await redis.getConsoleAccountConcurrency(accountId) + logger.debug( `[DEBUG] Final account data - name: ${accountData.name}, hasApiUrl: ${!!accountData.apiUrl}, hasApiKey: ${!!accountData.apiKey}, supportedModels: ${JSON.stringify(accountData.supportedModels)}` ) @@ -347,6 +363,11 @@ class ClaudeConsoleAccountService { updatedData.quotaStoppedAt = updates.quotaStoppedAt } + // 并发控制相关字段 + if (updates.maxConcurrentTasks !== undefined) { + updatedData.maxConcurrentTasks = updates.maxConcurrentTasks.toString() + } + // ✅ 直接保存 subscriptionExpiresAt(如果提供) // Claude Console 没有 token 刷新逻辑,不会覆盖此字段 if (updates.subscriptionExpiresAt !== undefined) { diff --git a/src/services/claudeConsoleRelayService.js b/src/services/claudeConsoleRelayService.js index 0a63aae3..46c72996 100644 --- a/src/services/claudeConsoleRelayService.js +++ b/src/services/claudeConsoleRelayService.js @@ -1,5 +1,7 @@ const axios = require('axios') +const { v4: uuidv4 } = require('uuid') const claudeConsoleAccountService = require('./claudeConsoleAccountService') +const redis = require('../models/redis') const logger = require('../utils/logger') const config = require('../../config/config') const { @@ -25,6 +27,8 @@ class ClaudeConsoleRelayService { ) { let abortController = null let account = null + const requestId = uuidv4() // 用于并发追踪 + let concurrencyAcquired = false try { // 获取账户信息 @@ -34,8 +38,37 @@ class ClaudeConsoleRelayService { } logger.info( - `📤 Processing Claude Console API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${account.name} (${accountId})` + `📤 Processing Claude Console API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${account.name} (${accountId}), request: ${requestId}` ) + + // 🔒 并发控制:原子性抢占槽位 + if (account.maxConcurrentTasks > 0) { + // 先抢占,再检查 - 避免竞态条件 + const newConcurrency = Number( + await redis.incrConsoleAccountConcurrency(accountId, requestId, 600) + ) + concurrencyAcquired = true + + // 检查是否超过限制 + if (newConcurrency > account.maxConcurrentTasks) { + // 超限,立即回滚 + await redis.decrConsoleAccountConcurrency(accountId, requestId) + concurrencyAcquired = false + + logger.warn( + `⚠️ Console account ${account.name} (${accountId}) concurrency limit exceeded: ${newConcurrency}/${account.maxConcurrentTasks} (request: ${requestId}, rolled back)` + ) + + const error = new Error('Console account concurrency limit reached') + error.code = 'CONSOLE_ACCOUNT_CONCURRENCY_FULL' + error.accountId = accountId + throw error + } + + logger.debug( + `🔓 Acquired concurrency slot for account ${account.name} (${accountId}), current: ${newConcurrency}/${account.maxConcurrentTasks}, request: ${requestId}` + ) + } logger.debug(`🌐 Account API URL: ${account.apiUrl}`) logger.debug(`🔍 Account supportedModels: ${JSON.stringify(account.supportedModels)}`) logger.debug(`🔑 Account has apiKey: ${!!account.apiKey}`) @@ -297,6 +330,21 @@ class ClaudeConsoleRelayService { // 不再因为模型不支持而block账号 throw error + } finally { + // 🔓 并发控制:释放并发槽位 + if (concurrencyAcquired) { + try { + await redis.decrConsoleAccountConcurrency(accountId, requestId) + logger.debug( + `🔓 Released concurrency slot for account ${account?.name || accountId}, request: ${requestId}` + ) + } catch (releaseError) { + logger.error( + `❌ Failed to release concurrency slot for account ${accountId}, request: ${requestId}:`, + releaseError.message + ) + } + } } } @@ -312,6 +360,10 @@ class ClaudeConsoleRelayService { options = {} ) { let account = null + const requestId = uuidv4() // 用于并发追踪 + let concurrencyAcquired = false + let leaseRefreshInterval = null // 租约刷新定时器 + try { // 获取账户信息 account = await claudeConsoleAccountService.getAccount(accountId) @@ -320,8 +372,56 @@ class ClaudeConsoleRelayService { } logger.info( - `📡 Processing streaming Claude Console API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${account.name} (${accountId})` + `📡 Processing streaming Claude Console API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${account.name} (${accountId}), request: ${requestId}` ) + + // 🔒 并发控制:原子性抢占槽位 + if (account.maxConcurrentTasks > 0) { + // 先抢占,再检查 - 避免竞态条件 + const newConcurrency = Number( + await redis.incrConsoleAccountConcurrency(accountId, requestId, 600) + ) + concurrencyAcquired = true + + // 检查是否超过限制 + if (newConcurrency > account.maxConcurrentTasks) { + // 超限,立即回滚 + await redis.decrConsoleAccountConcurrency(accountId, requestId) + concurrencyAcquired = false + + logger.warn( + `⚠️ Console account ${account.name} (${accountId}) concurrency limit exceeded: ${newConcurrency}/${account.maxConcurrentTasks} (stream request: ${requestId}, rolled back)` + ) + + const error = new Error('Console account concurrency limit reached') + error.code = 'CONSOLE_ACCOUNT_CONCURRENCY_FULL' + error.accountId = accountId + throw error + } + + logger.debug( + `🔓 Acquired concurrency slot for stream account ${account.name} (${accountId}), current: ${newConcurrency}/${account.maxConcurrentTasks}, request: ${requestId}` + ) + + // 🔄 启动租约刷新定时器(每5分钟刷新一次,防止长连接租约过期) + leaseRefreshInterval = setInterval( + async () => { + try { + await redis.refreshConsoleAccountConcurrencyLease(accountId, requestId, 600) + logger.debug( + `🔄 Refreshed concurrency lease for stream account ${account.name} (${accountId}), request: ${requestId}` + ) + } catch (refreshError) { + logger.error( + `❌ Failed to refresh concurrency lease for account ${accountId}, request: ${requestId}:`, + refreshError.message + ) + } + }, + 5 * 60 * 1000 + ) // 5分钟刷新一次 + } + logger.debug(`🌐 Account API URL: ${account.apiUrl}`) // 处理模型映射 @@ -373,6 +473,29 @@ class ClaudeConsoleRelayService { error ) throw error + } finally { + // 🛑 清理租约刷新定时器 + if (leaseRefreshInterval) { + clearInterval(leaseRefreshInterval) + logger.debug( + `🛑 Cleared lease refresh interval for stream account ${account?.name || accountId}, request: ${requestId}` + ) + } + + // 🔓 并发控制:释放并发槽位 + if (concurrencyAcquired) { + try { + await redis.decrConsoleAccountConcurrency(accountId, requestId) + logger.debug( + `🔓 Released concurrency slot for stream account ${account?.name || accountId}, request: ${requestId}` + ) + } catch (releaseError) { + logger.error( + `❌ Failed to release concurrency slot for stream account ${accountId}, request: ${requestId}:`, + releaseError.message + ) + } + } } } diff --git a/src/services/unifiedClaudeScheduler.js b/src/services/unifiedClaudeScheduler.js index 626a6f7c..e68d607e 100644 --- a/src/services/unifiedClaudeScheduler.js +++ b/src/services/unifiedClaudeScheduler.js @@ -526,6 +526,13 @@ class UnifiedClaudeScheduler { const consoleAccounts = await claudeConsoleAccountService.getAllAccounts() logger.info(`📋 Found ${consoleAccounts.length} total Claude Console accounts`) + // 🔢 统计Console账户并发排除情况 + let consoleAccountsEligibleCount = 0 // 符合基本条件的账户数 + let consoleAccountsExcludedByConcurrency = 0 // 因并发满额被排除的账户数 + + // 🚀 收集需要并发检查的账户ID列表(批量查询优化) + const accountsNeedingConcurrencyCheck = [] + for (const account of consoleAccounts) { // 主动检查封禁状态并尝试恢复(在过滤之前执行,确保可以恢复被封禁的账户) const wasBlocked = await claudeConsoleAccountService.isAccountBlocked(account.id) @@ -585,17 +592,25 @@ class UnifiedClaudeScheduler { currentAccount.id ) + // 🔢 记录符合基本条件的账户(通过了前面所有检查,但可能因并发被排除) if (!isRateLimited && !isQuotaExceeded) { - availableAccounts.push({ - ...currentAccount, - accountId: currentAccount.id, - accountType: 'claude-console', - priority: parseInt(currentAccount.priority) || 50, - lastUsedAt: currentAccount.lastUsedAt || '0' - }) - logger.info( - `✅ Added Claude Console account to available pool: ${currentAccount.name} (priority: ${currentAccount.priority})` - ) + consoleAccountsEligibleCount++ + // 🚀 将符合条件且需要并发检查的账户加入批量查询列表 + if (currentAccount.maxConcurrentTasks > 0) { + accountsNeedingConcurrencyCheck.push(currentAccount) + } else { + // 未配置并发限制的账户直接加入可用池 + availableAccounts.push({ + ...currentAccount, + accountId: currentAccount.id, + accountType: 'claude-console', + priority: parseInt(currentAccount.priority) || 50, + lastUsedAt: currentAccount.lastUsedAt || '0' + }) + logger.info( + `✅ Added Claude Console account to available pool: ${currentAccount.name} (priority: ${currentAccount.priority}, no concurrency limit)` + ) + } } else { if (isRateLimited) { logger.warn(`⚠️ Claude Console account ${currentAccount.name} is rate limited`) @@ -611,6 +626,46 @@ class UnifiedClaudeScheduler { } } + // 🚀 批量查询所有账户的并发数(Promise.all 并行执行) + if (accountsNeedingConcurrencyCheck.length > 0) { + logger.debug( + `🚀 Batch checking concurrency for ${accountsNeedingConcurrencyCheck.length} accounts` + ) + + const concurrencyCheckPromises = accountsNeedingConcurrencyCheck.map((account) => + redis.getConsoleAccountConcurrency(account.id).then((currentConcurrency) => ({ + account, + currentConcurrency + })) + ) + + const concurrencyResults = await Promise.all(concurrencyCheckPromises) + + // 处理批量查询结果 + for (const { account, currentConcurrency } of concurrencyResults) { + const isConcurrencyFull = currentConcurrency >= account.maxConcurrentTasks + + if (!isConcurrencyFull) { + availableAccounts.push({ + ...account, + accountId: account.id, + accountType: 'claude-console', + priority: parseInt(account.priority) || 50, + lastUsedAt: account.lastUsedAt || '0' + }) + logger.info( + `✅ Added Claude Console account to available pool: ${account.name} (priority: ${account.priority}, concurrency: ${currentConcurrency}/${account.maxConcurrentTasks})` + ) + } else { + // 🔢 因并发满额被排除,计数器加1 + consoleAccountsExcludedByConcurrency++ + logger.warn( + `⚠️ Claude Console account ${account.name} reached concurrency limit: ${currentConcurrency}/${account.maxConcurrentTasks}` + ) + } + } + } + // 获取Bedrock账户(共享池) const bedrockAccountsResult = await bedrockAccountService.getAllAccounts() if (bedrockAccountsResult.success) { @@ -710,6 +765,26 @@ class UnifiedClaudeScheduler { logger.info( `📊 Total available accounts: ${availableAccounts.length} (Claude: ${availableAccounts.filter((a) => a.accountType === 'claude-official').length}, Console: ${availableAccounts.filter((a) => a.accountType === 'claude-console').length}, Bedrock: ${availableAccounts.filter((a) => a.accountType === 'bedrock').length}, CCR: ${availableAccounts.filter((a) => a.accountType === 'ccr').length})` ) + + // 🚨 最终检查:只有在没有任何可用账户时,才根据Console并发排除情况抛出专用错误码 + if (availableAccounts.length === 0) { + // 如果所有Console账户都因并发满额被排除,抛出专用错误码(503) + if ( + consoleAccountsEligibleCount > 0 && + consoleAccountsExcludedByConcurrency === consoleAccountsEligibleCount + ) { + logger.error( + `❌ All ${consoleAccountsEligibleCount} eligible Console accounts are at concurrency limit (no other account types available)` + ) + const error = new Error( + 'All available Claude Console accounts have reached their concurrency limit' + ) + error.code = 'CONSOLE_ACCOUNT_CONCURRENCY_FULL' + throw error + } + // 否则走通用的"无可用账户"错误处理(由上层 selectAccountForApiKey 捕获) + } + return availableAccounts } @@ -838,6 +913,18 @@ class UnifiedClaudeScheduler { if (await claudeConsoleAccountService.isAccountOverloaded(accountId)) { return false } + + // 检查并发限制(预检查,真正的原子抢占在 relayService 中进行) + if (account.maxConcurrentTasks > 0) { + const currentConcurrency = await redis.getConsoleAccountConcurrency(accountId) + if (currentConcurrency >= account.maxConcurrentTasks) { + logger.info( + `🚫 Claude Console account ${accountId} reached concurrency limit: ${currentConcurrency}/${account.maxConcurrentTasks} (pre-check)` + ) + return false + } + } + return true } else if (accountType === 'bedrock') { const accountResult = await bedrockAccountService.getAccount(accountId) @@ -946,6 +1033,28 @@ class UnifiedClaudeScheduler { await client.del(`${this.SESSION_MAPPING_PREFIX}${sessionHash}`) } + /** + * 🧹 公共方法:清理粘性会话映射(用于并发满额时的降级处理) + * @param {string} sessionHash - 会话哈希值 + */ + async clearSessionMapping(sessionHash) { + // 防御空会话哈希 + if (!sessionHash || typeof sessionHash !== 'string') { + logger.debug('⚠️ Skipping session mapping clear - invalid sessionHash') + return + } + + try { + await this._deleteSessionMapping(sessionHash) + logger.info( + `🧹 Cleared sticky session mapping for session: ${sessionHash.substring(0, 8)}...` + ) + } catch (error) { + logger.error(`❌ Failed to clear session mapping for ${sessionHash}:`, error) + throw error + } + } + // 🔁 续期统一调度会话映射TTL(针对 unified_claude_session_mapping:* 键),遵循会话配置 async _extendSessionMappingTTL(sessionHash) { try { @@ -1262,6 +1371,17 @@ class UnifiedClaudeScheduler { } } + // 🔒 检查 Claude Console 账户的并发限制 + if (accountType === 'claude-console' && account.maxConcurrentTasks > 0) { + const currentConcurrency = await redis.getConsoleAccountConcurrency(account.id) + if (currentConcurrency >= account.maxConcurrentTasks) { + logger.info( + `🚫 Skipping group member ${account.name} (${account.id}) due to concurrency limit: ${currentConcurrency}/${account.maxConcurrentTasks}` + ) + continue + } + } + availableAccounts.push({ ...account, accountId: account.id, diff --git a/web/admin-spa/src/components/accounts/AccountForm.vue b/web/admin-spa/src/components/accounts/AccountForm.vue index 2469d644..4b5e46b7 100644 --- a/web/admin-spa/src/components/accounts/AccountForm.vue +++ b/web/admin-spa/src/components/accounts/AccountForm.vue @@ -1142,6 +1142,23 @@ + +
+ + +

+ 限制该账户的并发请求数量,0 表示不限制 +

+
+
+ +
+ + +

+ 限制该账户的并发请求数量,0 表示不限制 +

+
+
+ + +
+ + +

+ 限制该账户的并发请求数量,0 表示不限制 +

+
@@ -3542,6 +3593,8 @@ const form = ref({ dailyQuota: props.account?.dailyQuota || 0, dailyUsage: props.account?.dailyUsage || 0, quotaResetTime: props.account?.quotaResetTime || '00:00', + // 并发控制字段 + maxConcurrentTasks: props.account?.maxConcurrentTasks || 0, // Bedrock 特定字段 accessKeyId: props.account?.accessKeyId || '', secretAccessKey: props.account?.secretAccessKey || '', @@ -4436,6 +4489,8 @@ const createAccount = async () => { // 额度管理字段 data.dailyQuota = form.value.dailyQuota || 0 data.quotaResetTime = form.value.quotaResetTime || '00:00' + // 并发控制字段 + data.maxConcurrentTasks = form.value.maxConcurrentTasks || 0 } else if (form.value.platform === 'openai-responses') { // OpenAI-Responses 账户特定数据 data.baseApi = form.value.baseApi @@ -4738,6 +4793,8 @@ const updateAccount = async () => { // 额度管理字段 data.dailyQuota = form.value.dailyQuota || 0 data.quotaResetTime = form.value.quotaResetTime || '00:00' + // 并发控制字段 + data.maxConcurrentTasks = form.value.maxConcurrentTasks || 0 } // OpenAI-Responses 特定更新 diff --git a/web/admin-spa/src/views/AccountsView.vue b/web/admin-spa/src/views/AccountsView.vue index 808bf670..9a007542 100644 --- a/web/admin-spa/src/views/AccountsView.vue +++ b/web/admin-spa/src/views/AccountsView.vue @@ -955,43 +955,90 @@
暂无统计
- -
-
+ +
+
+ + +
+ +
- 额度进度 - - {{ getQuotaUsagePercent(account).toFixed(1) }}% + 并发状态 + + {{ getConsoleConcurrencyPercent(account).toFixed(0) }}%
-
+
- ${{ formatCost(account.usage?.daily?.cost || 0) }} / ${{ - Number(account.dailyQuota).toFixed(2) - }} + {{ Number(account.activeTaskCount || 0) }} / + {{ Number(account.maxConcurrentTasks || 0) }}
-
- 剩余 ${{ formatRemainingQuota(account) }} - 重置 {{ account.quotaResetTime || '00:00' }} +
+ 并发无限制
-
- -
@@ -3622,6 +3669,35 @@ const getQuotaBarClass = (percent) => { return 'bg-green-500' } +// 并发使用百分比(Claude Console) +const getConsoleConcurrencyPercent = (account) => { + const max = Number(account?.maxConcurrentTasks || 0) + if (!max || max <= 0) return 0 + const active = Number(account?.activeTaskCount || 0) + return Math.min(100, (active / max) * 100) +} + +// 并发进度条颜色(Claude Console) +const getConcurrencyBarClass = (percent) => { + if (percent >= 100) return 'bg-red-500' + if (percent >= 80) return 'bg-yellow-500' + return 'bg-green-500' +} + +// 并发标签颜色(Claude Console) +const getConcurrencyLabelClass = (account) => { + const max = Number(account?.maxConcurrentTasks || 0) + if (!max || max <= 0) return 'text-gray-500 dark:text-gray-400' + const active = Number(account?.activeTaskCount || 0) + if (active >= max) { + return 'text-red-600 dark:text-red-400' + } + if (active >= max * 0.8) { + return 'text-yellow-600 dark:text-yellow-400' + } + return 'text-gray-700 dark:text-gray-200' +} + // 剩余额度(Claude Console) const formatRemainingQuota = (account) => { const used = Number(account?.usage?.daily?.cost || 0)