feat: 为 Claude Console 账户添加并发控制机制

实现了完整的 Claude Console 账户并发任务数控制功能,防止单账户过载,提升服务稳定性。

  **核心功能**

  - 🔒 **原子性并发控制**: 基于 Redis Sorted Set 实现的抢占式并发槽位管理,防止竞态条件
  - 🔄 **自动租约刷新**: 流式请求每 5 分钟自动刷新租约,防止长连接租约过期
  - 🚨 **智能降级处理**: 并发满额时自动清理粘性会话并重试其他账户(最多 1 次)
  - 🎯 **专用错误码**: 引入 `CONSOLE_ACCOUNT_CONCURRENCY_FULL` 错误码,区分并发限制和其他错误
  - 📊 **批量性能优化**: 调度器使用 Promise.all 并行查询账户并发数,减少 Redis 往返

  **后端实现**

  1. **Redis 并发控制方法** (src/models/redis.js)
     - `incrConsoleAccountConcurrency()`: 增加并发计数(带租约)
     - `decrConsoleAccountConcurrency()`: 释放并发槽位
     - `refreshConsoleAccountConcurrencyLease()`: 刷新租约(流式请求)
     - `getConsoleAccountConcurrency()`: 查询当前并发数

  2. **账户服务增强** (src/services/claudeConsoleAccountService.js)
     - 添加 `maxConcurrentTasks` 字段(默认 0 表示无限制)
     - 获取账户时自动查询实时并发数 (`activeTaskCount`)
     - 支持更新并发限制配置

  3. **转发服务并发保护** (src/services/claudeConsoleRelayService.js)
     - 请求前原子性抢占槽位,超限则立即回滚并抛出专用错误
     - 流式请求启动定时器每 5 分钟刷新租约
     - `finally` 块确保槽位释放(即使发生异常)
     - 为每个请求分配唯一 `requestId` 用于并发追踪

  4. **统一调度器优化** (src/services/unifiedClaudeScheduler.js)
     - 获取可用账户时批量查询并发数(Promise.all 并行)
     - 预检查并发限制,避免选择已满的账户
     - 检查分组成员时也验证并发状态
     - 所有账户并发满额时抛出专用错误码

  5. **API 路由降级处理** (src/routes/api.js)
     - 捕获 `CONSOLE_ACCOUNT_CONCURRENCY_FULL` 错误
     - 自动清理粘性会话映射并重试(最多 1 次)
     - 重试失败返回 503 错误和友好提示
     - count_tokens 端点也支持并发满额重试

  6. **管理端点验证** (src/routes/admin.js)
     - 创建/更新账户时验证 `maxConcurrentTasks` 为非负整数
     - 支持前端传入并发限制配置

  **前端实现**

  1. **表单字段** (web/admin-spa/src/components/accounts/AccountForm.vue)
     - 添加"最大并发任务数"输入框(创建和编辑模式)
     - 支持占位符提示"0 表示不限制"
     - 表单数据自动映射到后端 API

  2. **实时监控** (web/admin-spa/src/views/AccountsView.vue)
     - 账户列表显示并发状态进度条和百分比
     - 颜色编码:绿色(<80%)、黄色(80%-100%)、红色(100%)
     - 显示"X / Y"格式的并发数(如"2 / 5")
     - 未配置限制时显示"并发无限制"徽章
This commit is contained in:
sususu98
2025-10-21 13:43:57 +08:00
parent b61a3103e9
commit 1458d609ca
8 changed files with 706 additions and 119 deletions

View File

@@ -66,7 +66,8 @@ class ClaudeConsoleAccountService {
accountType = 'shared', // 'dedicated' or 'shared'
schedulable = true, // 是否可被调度
dailyQuota = 0, // 每日额度限制美元0表示不限制
quotaResetTime = '00:00' // 额度重置时间HH:mm格式
quotaResetTime = '00:00', // 额度重置时间HH:mm格式
maxConcurrentTasks = 0 // 最大并发任务数0表示无限制
} = options
// 验证必填字段
@@ -113,7 +114,8 @@ class ClaudeConsoleAccountService {
// 使用与统计一致的时区日期,避免边界问题
lastResetDate: redis.getDateStringInTimezone(), // 最后重置日期(按配置时区)
quotaResetTime, // 额度重置时间
quotaStoppedAt: '' // 因额度停用的时间
quotaStoppedAt: '', // 因额度停用的时间
maxConcurrentTasks: maxConcurrentTasks.toString() // 最大并发任务数0表示无限制
}
const client = redis.getClientSafe()
@@ -149,7 +151,9 @@ class ClaudeConsoleAccountService {
dailyUsage: 0,
lastResetDate: accountData.lastResetDate,
quotaResetTime,
quotaStoppedAt: null
quotaStoppedAt: null,
maxConcurrentTasks, // 新增:返回并发限制配置
activeTaskCount: 0 // 新增新建账户当前并发数为0
}
}
@@ -172,6 +176,9 @@ class ClaudeConsoleAccountService {
// 获取限流状态信息
const rateLimitInfo = this._getRateLimitInfo(accountData)
// 获取实时并发计数
const activeTaskCount = await redis.getConsoleAccountConcurrency(accountData.id)
accounts.push({
id: accountData.id,
platform: accountData.platform,
@@ -202,7 +209,11 @@ class ClaudeConsoleAccountService {
dailyUsage: parseFloat(accountData.dailyUsage || '0'),
lastResetDate: accountData.lastResetDate || '',
quotaResetTime: accountData.quotaResetTime || '00:00',
quotaStoppedAt: accountData.quotaStoppedAt || null
quotaStoppedAt: accountData.quotaStoppedAt || null,
// 并发控制相关
maxConcurrentTasks: parseInt(accountData.maxConcurrentTasks) || 0,
activeTaskCount
})
}
}
@@ -253,6 +264,11 @@ class ClaudeConsoleAccountService {
accountData.proxy = JSON.parse(accountData.proxy)
}
// 解析并发控制字段
accountData.maxConcurrentTasks = parseInt(accountData.maxConcurrentTasks) || 0
// 获取实时并发计数
accountData.activeTaskCount = await redis.getConsoleAccountConcurrency(accountId)
logger.debug(
`[DEBUG] Final account data - name: ${accountData.name}, hasApiUrl: ${!!accountData.apiUrl}, hasApiKey: ${!!accountData.apiKey}, supportedModels: ${JSON.stringify(accountData.supportedModels)}`
)
@@ -347,6 +363,11 @@ class ClaudeConsoleAccountService {
updatedData.quotaStoppedAt = updates.quotaStoppedAt
}
// 并发控制相关字段
if (updates.maxConcurrentTasks !== undefined) {
updatedData.maxConcurrentTasks = updates.maxConcurrentTasks.toString()
}
// ✅ 直接保存 subscriptionExpiresAt如果提供
// Claude Console 没有 token 刷新逻辑,不会覆盖此字段
if (updates.subscriptionExpiresAt !== undefined) {

View File

@@ -1,5 +1,7 @@
const axios = require('axios')
const { v4: uuidv4 } = require('uuid')
const claudeConsoleAccountService = require('./claudeConsoleAccountService')
const redis = require('../models/redis')
const logger = require('../utils/logger')
const config = require('../../config/config')
const {
@@ -25,6 +27,8 @@ class ClaudeConsoleRelayService {
) {
let abortController = null
let account = null
const requestId = uuidv4() // 用于并发追踪
let concurrencyAcquired = false
try {
// 获取账户信息
@@ -34,8 +38,37 @@ class ClaudeConsoleRelayService {
}
logger.info(
`📤 Processing Claude Console API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${account.name} (${accountId})`
`📤 Processing Claude Console API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${account.name} (${accountId}), request: ${requestId}`
)
// 🔒 并发控制:原子性抢占槽位
if (account.maxConcurrentTasks > 0) {
// 先抢占,再检查 - 避免竞态条件
const newConcurrency = Number(
await redis.incrConsoleAccountConcurrency(accountId, requestId, 600)
)
concurrencyAcquired = true
// 检查是否超过限制
if (newConcurrency > account.maxConcurrentTasks) {
// 超限,立即回滚
await redis.decrConsoleAccountConcurrency(accountId, requestId)
concurrencyAcquired = false
logger.warn(
`⚠️ Console account ${account.name} (${accountId}) concurrency limit exceeded: ${newConcurrency}/${account.maxConcurrentTasks} (request: ${requestId}, rolled back)`
)
const error = new Error('Console account concurrency limit reached')
error.code = 'CONSOLE_ACCOUNT_CONCURRENCY_FULL'
error.accountId = accountId
throw error
}
logger.debug(
`🔓 Acquired concurrency slot for account ${account.name} (${accountId}), current: ${newConcurrency}/${account.maxConcurrentTasks}, request: ${requestId}`
)
}
logger.debug(`🌐 Account API URL: ${account.apiUrl}`)
logger.debug(`🔍 Account supportedModels: ${JSON.stringify(account.supportedModels)}`)
logger.debug(`🔑 Account has apiKey: ${!!account.apiKey}`)
@@ -297,6 +330,21 @@ class ClaudeConsoleRelayService {
// 不再因为模型不支持而block账号
throw error
} finally {
// 🔓 并发控制:释放并发槽位
if (concurrencyAcquired) {
try {
await redis.decrConsoleAccountConcurrency(accountId, requestId)
logger.debug(
`🔓 Released concurrency slot for account ${account?.name || accountId}, request: ${requestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release concurrency slot for account ${accountId}, request: ${requestId}:`,
releaseError.message
)
}
}
}
}
@@ -312,6 +360,10 @@ class ClaudeConsoleRelayService {
options = {}
) {
let account = null
const requestId = uuidv4() // 用于并发追踪
let concurrencyAcquired = false
let leaseRefreshInterval = null // 租约刷新定时器
try {
// 获取账户信息
account = await claudeConsoleAccountService.getAccount(accountId)
@@ -320,8 +372,56 @@ class ClaudeConsoleRelayService {
}
logger.info(
`📡 Processing streaming Claude Console API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${account.name} (${accountId})`
`📡 Processing streaming Claude Console API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${account.name} (${accountId}), request: ${requestId}`
)
// 🔒 并发控制:原子性抢占槽位
if (account.maxConcurrentTasks > 0) {
// 先抢占,再检查 - 避免竞态条件
const newConcurrency = Number(
await redis.incrConsoleAccountConcurrency(accountId, requestId, 600)
)
concurrencyAcquired = true
// 检查是否超过限制
if (newConcurrency > account.maxConcurrentTasks) {
// 超限,立即回滚
await redis.decrConsoleAccountConcurrency(accountId, requestId)
concurrencyAcquired = false
logger.warn(
`⚠️ Console account ${account.name} (${accountId}) concurrency limit exceeded: ${newConcurrency}/${account.maxConcurrentTasks} (stream request: ${requestId}, rolled back)`
)
const error = new Error('Console account concurrency limit reached')
error.code = 'CONSOLE_ACCOUNT_CONCURRENCY_FULL'
error.accountId = accountId
throw error
}
logger.debug(
`🔓 Acquired concurrency slot for stream account ${account.name} (${accountId}), current: ${newConcurrency}/${account.maxConcurrentTasks}, request: ${requestId}`
)
// 🔄 启动租约刷新定时器每5分钟刷新一次防止长连接租约过期
leaseRefreshInterval = setInterval(
async () => {
try {
await redis.refreshConsoleAccountConcurrencyLease(accountId, requestId, 600)
logger.debug(
`🔄 Refreshed concurrency lease for stream account ${account.name} (${accountId}), request: ${requestId}`
)
} catch (refreshError) {
logger.error(
`❌ Failed to refresh concurrency lease for account ${accountId}, request: ${requestId}:`,
refreshError.message
)
}
},
5 * 60 * 1000
) // 5分钟刷新一次
}
logger.debug(`🌐 Account API URL: ${account.apiUrl}`)
// 处理模型映射
@@ -373,6 +473,29 @@ class ClaudeConsoleRelayService {
error
)
throw error
} finally {
// 🛑 清理租约刷新定时器
if (leaseRefreshInterval) {
clearInterval(leaseRefreshInterval)
logger.debug(
`🛑 Cleared lease refresh interval for stream account ${account?.name || accountId}, request: ${requestId}`
)
}
// 🔓 并发控制:释放并发槽位
if (concurrencyAcquired) {
try {
await redis.decrConsoleAccountConcurrency(accountId, requestId)
logger.debug(
`🔓 Released concurrency slot for stream account ${account?.name || accountId}, request: ${requestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release concurrency slot for stream account ${accountId}, request: ${requestId}:`,
releaseError.message
)
}
}
}
}

View File

@@ -526,6 +526,13 @@ class UnifiedClaudeScheduler {
const consoleAccounts = await claudeConsoleAccountService.getAllAccounts()
logger.info(`📋 Found ${consoleAccounts.length} total Claude Console accounts`)
// 🔢 统计Console账户并发排除情况
let consoleAccountsEligibleCount = 0 // 符合基本条件的账户数
let consoleAccountsExcludedByConcurrency = 0 // 因并发满额被排除的账户数
// 🚀 收集需要并发检查的账户ID列表批量查询优化
const accountsNeedingConcurrencyCheck = []
for (const account of consoleAccounts) {
// 主动检查封禁状态并尝试恢复(在过滤之前执行,确保可以恢复被封禁的账户)
const wasBlocked = await claudeConsoleAccountService.isAccountBlocked(account.id)
@@ -585,17 +592,25 @@ class UnifiedClaudeScheduler {
currentAccount.id
)
// 🔢 记录符合基本条件的账户(通过了前面所有检查,但可能因并发被排除)
if (!isRateLimited && !isQuotaExceeded) {
availableAccounts.push({
...currentAccount,
accountId: currentAccount.id,
accountType: 'claude-console',
priority: parseInt(currentAccount.priority) || 50,
lastUsedAt: currentAccount.lastUsedAt || '0'
})
logger.info(
`✅ Added Claude Console account to available pool: ${currentAccount.name} (priority: ${currentAccount.priority})`
)
consoleAccountsEligibleCount++
// 🚀 将符合条件且需要并发检查的账户加入批量查询列表
if (currentAccount.maxConcurrentTasks > 0) {
accountsNeedingConcurrencyCheck.push(currentAccount)
} else {
// 未配置并发限制的账户直接加入可用池
availableAccounts.push({
...currentAccount,
accountId: currentAccount.id,
accountType: 'claude-console',
priority: parseInt(currentAccount.priority) || 50,
lastUsedAt: currentAccount.lastUsedAt || '0'
})
logger.info(
`✅ Added Claude Console account to available pool: ${currentAccount.name} (priority: ${currentAccount.priority}, no concurrency limit)`
)
}
} else {
if (isRateLimited) {
logger.warn(`⚠️ Claude Console account ${currentAccount.name} is rate limited`)
@@ -611,6 +626,46 @@ class UnifiedClaudeScheduler {
}
}
// 🚀 批量查询所有账户的并发数Promise.all 并行执行)
if (accountsNeedingConcurrencyCheck.length > 0) {
logger.debug(
`🚀 Batch checking concurrency for ${accountsNeedingConcurrencyCheck.length} accounts`
)
const concurrencyCheckPromises = accountsNeedingConcurrencyCheck.map((account) =>
redis.getConsoleAccountConcurrency(account.id).then((currentConcurrency) => ({
account,
currentConcurrency
}))
)
const concurrencyResults = await Promise.all(concurrencyCheckPromises)
// 处理批量查询结果
for (const { account, currentConcurrency } of concurrencyResults) {
const isConcurrencyFull = currentConcurrency >= account.maxConcurrentTasks
if (!isConcurrencyFull) {
availableAccounts.push({
...account,
accountId: account.id,
accountType: 'claude-console',
priority: parseInt(account.priority) || 50,
lastUsedAt: account.lastUsedAt || '0'
})
logger.info(
`✅ Added Claude Console account to available pool: ${account.name} (priority: ${account.priority}, concurrency: ${currentConcurrency}/${account.maxConcurrentTasks})`
)
} else {
// 🔢 因并发满额被排除计数器加1
consoleAccountsExcludedByConcurrency++
logger.warn(
`⚠️ Claude Console account ${account.name} reached concurrency limit: ${currentConcurrency}/${account.maxConcurrentTasks}`
)
}
}
}
// 获取Bedrock账户共享池
const bedrockAccountsResult = await bedrockAccountService.getAllAccounts()
if (bedrockAccountsResult.success) {
@@ -710,6 +765,26 @@ class UnifiedClaudeScheduler {
logger.info(
`📊 Total available accounts: ${availableAccounts.length} (Claude: ${availableAccounts.filter((a) => a.accountType === 'claude-official').length}, Console: ${availableAccounts.filter((a) => a.accountType === 'claude-console').length}, Bedrock: ${availableAccounts.filter((a) => a.accountType === 'bedrock').length}, CCR: ${availableAccounts.filter((a) => a.accountType === 'ccr').length})`
)
// 🚨 最终检查只有在没有任何可用账户时才根据Console并发排除情况抛出专用错误码
if (availableAccounts.length === 0) {
// 如果所有Console账户都因并发满额被排除抛出专用错误码503
if (
consoleAccountsEligibleCount > 0 &&
consoleAccountsExcludedByConcurrency === consoleAccountsEligibleCount
) {
logger.error(
`❌ All ${consoleAccountsEligibleCount} eligible Console accounts are at concurrency limit (no other account types available)`
)
const error = new Error(
'All available Claude Console accounts have reached their concurrency limit'
)
error.code = 'CONSOLE_ACCOUNT_CONCURRENCY_FULL'
throw error
}
// 否则走通用的"无可用账户"错误处理(由上层 selectAccountForApiKey 捕获)
}
return availableAccounts
}
@@ -838,6 +913,18 @@ class UnifiedClaudeScheduler {
if (await claudeConsoleAccountService.isAccountOverloaded(accountId)) {
return false
}
// 检查并发限制(预检查,真正的原子抢占在 relayService 中进行)
if (account.maxConcurrentTasks > 0) {
const currentConcurrency = await redis.getConsoleAccountConcurrency(accountId)
if (currentConcurrency >= account.maxConcurrentTasks) {
logger.info(
`🚫 Claude Console account ${accountId} reached concurrency limit: ${currentConcurrency}/${account.maxConcurrentTasks} (pre-check)`
)
return false
}
}
return true
} else if (accountType === 'bedrock') {
const accountResult = await bedrockAccountService.getAccount(accountId)
@@ -946,6 +1033,28 @@ class UnifiedClaudeScheduler {
await client.del(`${this.SESSION_MAPPING_PREFIX}${sessionHash}`)
}
/**
* 🧹 公共方法:清理粘性会话映射(用于并发满额时的降级处理)
* @param {string} sessionHash - 会话哈希值
*/
async clearSessionMapping(sessionHash) {
// 防御空会话哈希
if (!sessionHash || typeof sessionHash !== 'string') {
logger.debug('⚠️ Skipping session mapping clear - invalid sessionHash')
return
}
try {
await this._deleteSessionMapping(sessionHash)
logger.info(
`🧹 Cleared sticky session mapping for session: ${sessionHash.substring(0, 8)}...`
)
} catch (error) {
logger.error(`❌ Failed to clear session mapping for ${sessionHash}:`, error)
throw error
}
}
// 🔁 续期统一调度会话映射TTL针对 unified_claude_session_mapping:* 键),遵循会话配置
async _extendSessionMappingTTL(sessionHash) {
try {
@@ -1262,6 +1371,17 @@ class UnifiedClaudeScheduler {
}
}
// 🔒 检查 Claude Console 账户的并发限制
if (accountType === 'claude-console' && account.maxConcurrentTasks > 0) {
const currentConcurrency = await redis.getConsoleAccountConcurrency(account.id)
if (currentConcurrency >= account.maxConcurrentTasks) {
logger.info(
`🚫 Skipping group member ${account.name} (${account.id}) due to concurrency limit: ${currentConcurrency}/${account.maxConcurrentTasks}`
)
continue
}
}
availableAccounts.push({
...account,
accountId: account.id,