From 0d64d4065429b359c7278668431836bf43f9967a Mon Sep 17 00:00:00 2001 From: IanShaw027 <131567472+IanShaw027@users.noreply.github.com> Date: Fri, 5 Dec 2025 01:36:59 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E4=B8=8A=E6=B8=B8?= =?UTF-8?q?=E4=B8=8D=E7=A8=B3=E5=AE=9A=E9=94=99=E8=AF=AF=E6=A3=80=E6=B5=8B?= =?UTF-8?q?=E4=B8=8E=E8=B4=A6=E6=88=B7=E4=B8=B4=E6=97=B6=E4=B8=8D=E5=8F=AF?= =?UTF-8?q?=E7=94=A8=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## 背景 当上游 API(如 Anthropic、AWS Bedrock 等)出现临时故障时,服务会持续向故障 账户发送请求,导致用户体验下降。需要自动检测上游不稳定状态并临时排除故障账户。 ## 改动内容 ### 新增 unstableUpstreamHelper.js - 检测多种上游不稳定错误模式 - 支持环境变量扩展检测规则 ### 修改 unifiedClaudeScheduler.js - 新增 markAccountTemporarilyUnavailable() 方法:标记账户临时不可用 - 新增 isAccountTemporarilyUnavailable() 方法:检查账户是否临时不可用 - 专属账户检查:claude-official、claude-console、bedrock 临时不可用时自动回退到池 - 池账户选择:跳过临时不可用的账户 ### 修改 claudeRelayService.js - _handleServerError() 方法增加临时不可用标记逻辑 - 5xx 错误时自动标记账户临时不可用(5分钟 TTL) ## 检测的状态码 | 分类 | 状态码 | 说明 | |------|--------|------| | 服务器错误 | 500-599 | 内部错误、服务不可用等 | | 超时类 | 408 | 请求超时 | | 连接类 | 499 | 客户端关闭请求 (Nginx) | | 网关类 | 502, 503, 504 | 网关错误、服务不可用、网关超时 | | CDN类 | 522 | Cloudflare 连接超时 | | 语义类 | error.type = "server_error" | API 级别服务器错误 | ## 环境变量配置 - UNSTABLE_ERROR_TYPES: 额外的错误类型(逗号分隔) - UNSTABLE_ERROR_KEYWORDS: 错误消息关键词(逗号分隔) ## Redis 键 - temp_unavailable:{accountType}:{accountId} - TTL 300秒 --- src/services/claudeRelayService.js | 20 ++- src/services/unifiedClaudeScheduler.js | 168 ++++++++++++++++++++----- src/utils/unstableUpstreamHelper.js | 77 ++++++++++++ 3 files changed, 233 insertions(+), 32 deletions(-) create mode 100644 src/utils/unstableUpstreamHelper.js diff --git a/src/services/claudeRelayService.js b/src/services/claudeRelayService.js index 9feeae0d..166d575b 100644 --- a/src/services/claudeRelayService.js +++ b/src/services/claudeRelayService.js @@ -1948,7 +1948,13 @@ class ClaudeRelayService { } // 🛠️ 统一的错误处理方法 - async _handleServerError(accountId, statusCode, _sessionHash = null, context = '') { + async _handleServerError( + accountId, + statusCode, + sessionHash = null, + context = '', + accountType = 'claude-official' + ) { try { await claudeAccountService.recordServerError(accountId, statusCode) const errorCount = await claudeAccountService.getServerErrorCount(accountId) @@ -1962,6 +1968,18 @@ class ClaudeRelayService { `⏱️ ${prefix}${isTimeout ? 'Timeout' : 'Server'} error for account ${accountId}, error count: ${errorCount}/${threshold}` ) + // 标记账户为临时不可用(5分钟) + try { + await unifiedClaudeScheduler.markAccountTemporarilyUnavailable( + accountId, + accountType, + sessionHash, + 300 + ) + } catch (markError) { + logger.error(`❌ Failed to mark account temporarily unavailable: ${accountId}`, markError) + } + if (errorCount > threshold) { const errorTypeLabel = isTimeout ? 'timeout' : '5xx' // ⚠️ 只记录5xx/504告警,不再自动停止调度,避免上游抖动导致误停 diff --git a/src/services/unifiedClaudeScheduler.js b/src/services/unifiedClaudeScheduler.js index e68d607e..73def6a8 100644 --- a/src/services/unifiedClaudeScheduler.js +++ b/src/services/unifiedClaudeScheduler.js @@ -177,30 +177,41 @@ class UnifiedClaudeScheduler { // 普通专属账户 const boundAccount = await redis.getClaudeAccount(apiKeyData.claudeAccountId) if (boundAccount && boundAccount.isActive === 'true' && boundAccount.status !== 'error') { - const isRateLimited = await claudeAccountService.isAccountRateLimited(boundAccount.id) - if (isRateLimited) { - const rateInfo = await claudeAccountService.getAccountRateLimitInfo(boundAccount.id) - const error = new Error('Dedicated Claude account is rate limited') - error.code = 'CLAUDE_DEDICATED_RATE_LIMITED' - error.accountId = boundAccount.id - error.rateLimitEndAt = rateInfo?.rateLimitEndAt || boundAccount.rateLimitEndAt || null - throw error - } - - if (!this._isSchedulable(boundAccount.schedulable)) { + // 检查是否临时不可用 + const isTempUnavailable = await this.isAccountTemporarilyUnavailable( + boundAccount.id, + 'claude-official' + ) + if (isTempUnavailable) { logger.warn( - `⚠️ Bound Claude OAuth account ${apiKeyData.claudeAccountId} is not schedulable (schedulable: ${boundAccount?.schedulable}), falling back to pool` + `⏱️ Bound Claude OAuth account ${boundAccount.id} is temporarily unavailable, falling back to pool` ) } else { - if (isOpusRequest) { - await claudeAccountService.clearExpiredOpusRateLimit(boundAccount.id) + const isRateLimited = await claudeAccountService.isAccountRateLimited(boundAccount.id) + if (isRateLimited) { + const rateInfo = await claudeAccountService.getAccountRateLimitInfo(boundAccount.id) + const error = new Error('Dedicated Claude account is rate limited') + error.code = 'CLAUDE_DEDICATED_RATE_LIMITED' + error.accountId = boundAccount.id + error.rateLimitEndAt = rateInfo?.rateLimitEndAt || boundAccount.rateLimitEndAt || null + throw error } - logger.info( - `🎯 Using bound dedicated Claude OAuth account: ${boundAccount.name} (${apiKeyData.claudeAccountId}) for API key ${apiKeyData.name}` - ) - return { - accountId: apiKeyData.claudeAccountId, - accountType: 'claude-official' + + if (!this._isSchedulable(boundAccount.schedulable)) { + logger.warn( + `⚠️ Bound Claude OAuth account ${apiKeyData.claudeAccountId} is not schedulable (schedulable: ${boundAccount?.schedulable}), falling back to pool` + ) + } else { + if (isOpusRequest) { + await claudeAccountService.clearExpiredOpusRateLimit(boundAccount.id) + } + logger.info( + `🎯 Using bound dedicated Claude OAuth account: ${boundAccount.name} (${apiKeyData.claudeAccountId}) for API key ${apiKeyData.name}` + ) + return { + accountId: apiKeyData.claudeAccountId, + accountType: 'claude-official' + } } } } else { @@ -221,12 +232,23 @@ class UnifiedClaudeScheduler { boundConsoleAccount.status === 'active' && this._isSchedulable(boundConsoleAccount.schedulable) ) { - logger.info( - `🎯 Using bound dedicated Claude Console account: ${boundConsoleAccount.name} (${apiKeyData.claudeConsoleAccountId}) for API key ${apiKeyData.name}` + // 检查是否临时不可用 + const isTempUnavailable = await this.isAccountTemporarilyUnavailable( + boundConsoleAccount.id, + 'claude-console' ) - return { - accountId: apiKeyData.claudeConsoleAccountId, - accountType: 'claude-console' + if (isTempUnavailable) { + logger.warn( + `⏱️ Bound Claude Console account ${boundConsoleAccount.id} is temporarily unavailable, falling back to pool` + ) + } else { + logger.info( + `🎯 Using bound dedicated Claude Console account: ${boundConsoleAccount.name} (${apiKeyData.claudeConsoleAccountId}) for API key ${apiKeyData.name}` + ) + return { + accountId: apiKeyData.claudeConsoleAccountId, + accountType: 'claude-console' + } } } else { logger.warn( @@ -245,12 +267,23 @@ class UnifiedClaudeScheduler { boundBedrockAccountResult.data.isActive === true && this._isSchedulable(boundBedrockAccountResult.data.schedulable) ) { - logger.info( - `🎯 Using bound dedicated Bedrock account: ${boundBedrockAccountResult.data.name} (${apiKeyData.bedrockAccountId}) for API key ${apiKeyData.name}` + // 检查是否临时不可用 + const isTempUnavailable = await this.isAccountTemporarilyUnavailable( + apiKeyData.bedrockAccountId, + 'bedrock' ) - return { - accountId: apiKeyData.bedrockAccountId, - accountType: 'bedrock' + if (isTempUnavailable) { + logger.warn( + `⏱️ Bound Bedrock account ${apiKeyData.bedrockAccountId} is temporarily unavailable, falling back to pool` + ) + } else { + logger.info( + `🎯 Using bound dedicated Bedrock account: ${boundBedrockAccountResult.data.name} (${apiKeyData.bedrockAccountId}) for API key ${apiKeyData.name}` + ) + return { + accountId: apiKeyData.bedrockAccountId, + accountType: 'bedrock' + } } } else { logger.warn( @@ -496,6 +529,18 @@ class UnifiedClaudeScheduler { continue } + // 检查是否临时不可用 + const isTempUnavailable = await this.isAccountTemporarilyUnavailable( + account.id, + 'claude-official' + ) + if (isTempUnavailable) { + logger.debug( + `⏭️ Skipping Claude Official account ${account.name} - temporarily unavailable` + ) + continue + } + // 检查是否被限流 const isRateLimited = await claudeAccountService.isAccountRateLimited(account.id) if (isRateLimited) { @@ -584,6 +629,18 @@ class UnifiedClaudeScheduler { // 继续处理该账号 } + // 检查是否临时不可用 + const isTempUnavailable = await this.isAccountTemporarilyUnavailable( + currentAccount.id, + 'claude-console' + ) + if (isTempUnavailable) { + logger.debug( + `⏭️ Skipping Claude Console account ${currentAccount.name} - temporarily unavailable` + ) + continue + } + // 检查是否被限流 const isRateLimited = await claudeConsoleAccountService.isAccountRateLimited( currentAccount.id @@ -682,7 +739,15 @@ class UnifiedClaudeScheduler { account.accountType === 'shared' && this._isSchedulable(account.schedulable) ) { - // 检查是否可调度 + // 检查是否临时不可用 + const isTempUnavailable = await this.isAccountTemporarilyUnavailable( + account.id, + 'bedrock' + ) + if (isTempUnavailable) { + logger.debug(`⏭️ Skipping Bedrock account ${account.name} - temporarily unavailable`) + continue + } availableAccounts.push({ ...account, @@ -731,6 +796,13 @@ class UnifiedClaudeScheduler { continue } + // 检查是否临时不可用 + const isTempUnavailable = await this.isAccountTemporarilyUnavailable(account.id, 'ccr') + if (isTempUnavailable) { + logger.debug(`⏭️ Skipping CCR account ${account.name} - temporarily unavailable`) + continue + } + // 检查是否被限流 const isRateLimited = await ccrAccountService.isAccountRateLimited(account.id) const isQuotaExceeded = await ccrAccountService.isAccountQuotaExceeded(account.id) @@ -1099,6 +1171,40 @@ class UnifiedClaudeScheduler { } } + // ⏱️ 标记账户为临时不可用状态(用于5xx等临时故障,默认5分钟后自动恢复) + async markAccountTemporarilyUnavailable( + accountId, + accountType, + sessionHash = null, + ttlSeconds = 300 + ) { + try { + const client = redis.getClientSafe() + const key = `temp_unavailable:${accountType}:${accountId}` + await client.setex(key, ttlSeconds, '1') + if (sessionHash) await this._deleteSessionMapping(sessionHash) + logger.warn( + `⏱️ Account ${accountId} (${accountType}) marked temporarily unavailable for ${ttlSeconds}s` + ) + return { success: true } + } catch (error) { + logger.error(`❌ Failed to mark account temporarily unavailable: ${accountId}`, error) + return { success: false } + } + } + + // 🔍 检查账户是否临时不可用 + async isAccountTemporarilyUnavailable(accountId, accountType) { + try { + const client = redis.getClientSafe() + const key = `temp_unavailable:${accountType}:${accountId}` + return (await client.exists(key)) === 1 + } catch (error) { + logger.error(`❌ Failed to check temp unavailable status: ${accountId}`, error) + return false + } + } + // 🚫 标记账户为限流状态 async markAccountRateLimited( accountId, diff --git a/src/utils/unstableUpstreamHelper.js b/src/utils/unstableUpstreamHelper.js new file mode 100644 index 00000000..c233fc3c --- /dev/null +++ b/src/utils/unstableUpstreamHelper.js @@ -0,0 +1,77 @@ +const logger = require('./logger') + +function parseList(envValue) { + if (!envValue) return [] + return envValue + .split(',') + .map((s) => s.trim().toLowerCase()) + .filter(Boolean) +} + +const unstableTypes = new Set(parseList(process.env.UNSTABLE_ERROR_TYPES)) +const unstableKeywords = parseList(process.env.UNSTABLE_ERROR_KEYWORDS) +const unstableStatusCodes = new Set([408, 499, 502, 503, 504, 522]) + +function normalizeErrorPayload(payload) { + if (!payload) return {} + + if (typeof payload === 'string') { + try { + return normalizeErrorPayload(JSON.parse(payload)) + } catch (e) { + return { message: payload } + } + } + + if (payload.error && typeof payload.error === 'object') { + return { + type: payload.error.type || payload.error.error || payload.error.code, + code: payload.error.code || payload.error.error || payload.error.type, + message: payload.error.message || payload.error.msg || payload.message || payload.error.error + } + } + + return { + type: payload.type || payload.code, + code: payload.code || payload.type, + message: payload.message || '' + } +} + +function isUnstableUpstreamError(statusCode, payload) { + const normalizedStatus = Number(statusCode) + if (Number.isFinite(normalizedStatus) && normalizedStatus >= 500) { + return true + } + if (Number.isFinite(normalizedStatus) && unstableStatusCodes.has(normalizedStatus)) { + return true + } + + const { type, code, message } = normalizeErrorPayload(payload) + const lowerType = (type || '').toString().toLowerCase() + const lowerCode = (code || '').toString().toLowerCase() + const lowerMessage = (message || '').toString().toLowerCase() + + if (lowerType === 'server_error' || lowerCode === 'server_error') { + return true + } + if (unstableTypes.has(lowerType) || unstableTypes.has(lowerCode)) { + return true + } + if (unstableKeywords.length > 0) { + return unstableKeywords.some((kw) => lowerMessage.includes(kw)) + } + + return false +} + +function logUnstable(accountLabel, statusCode) { + logger.warn( + `Detected unstable upstream error (${statusCode}) for account ${accountLabel}, marking temporarily unavailable` + ) +} + +module.exports = { + isUnstableUpstreamError, + logUnstable +}