Merge pull request #758 from IanShaw027/upstream-pr-temp-unavailable [skip ci]

feat: 添加上游不稳定错误检测与账户临时不可用机制
This commit is contained in:
Wesley Liddick
2025-12-05 21:44:39 -05:00
committed by GitHub
3 changed files with 239 additions and 32 deletions

View File

@@ -1948,7 +1948,13 @@ class ClaudeRelayService {
}
// 🛠️ 统一的错误处理方法
async _handleServerError(accountId, statusCode, _sessionHash = null, context = '') {
async _handleServerError(
accountId,
statusCode,
sessionHash = null,
context = '',
accountType = 'claude-official'
) {
try {
await claudeAccountService.recordServerError(accountId, statusCode)
const errorCount = await claudeAccountService.getServerErrorCount(accountId)
@@ -1962,6 +1968,18 @@ class ClaudeRelayService {
`⏱️ ${prefix}${isTimeout ? 'Timeout' : 'Server'} error for account ${accountId}, error count: ${errorCount}/${threshold}`
)
// 标记账户为临时不可用5分钟
try {
await unifiedClaudeScheduler.markAccountTemporarilyUnavailable(
accountId,
accountType,
sessionHash,
300
)
} catch (markError) {
logger.error(`❌ Failed to mark account temporarily unavailable: ${accountId}`, markError)
}
if (errorCount > threshold) {
const errorTypeLabel = isTimeout ? 'timeout' : '5xx'
// ⚠️ 只记录5xx/504告警不再自动停止调度避免上游抖动导致误停

View File

@@ -177,6 +177,16 @@ class UnifiedClaudeScheduler {
// 普通专属账户
const boundAccount = await redis.getClaudeAccount(apiKeyData.claudeAccountId)
if (boundAccount && boundAccount.isActive === 'true' && boundAccount.status !== 'error') {
// 检查是否临时不可用
const isTempUnavailable = await this.isAccountTemporarilyUnavailable(
boundAccount.id,
'claude-official'
)
if (isTempUnavailable) {
logger.warn(
`⏱️ Bound Claude OAuth account ${boundAccount.id} is temporarily unavailable, falling back to pool`
)
} else {
const isRateLimited = await claudeAccountService.isAccountRateLimited(boundAccount.id)
if (isRateLimited) {
const rateInfo = await claudeAccountService.getAccountRateLimitInfo(boundAccount.id)
@@ -203,6 +213,7 @@ class UnifiedClaudeScheduler {
accountType: 'claude-official'
}
}
}
} else {
logger.warn(
`⚠️ Bound Claude OAuth account ${apiKeyData.claudeAccountId} is not available (isActive: ${boundAccount?.isActive}, status: ${boundAccount?.status}), falling back to pool`
@@ -221,6 +232,16 @@ class UnifiedClaudeScheduler {
boundConsoleAccount.status === 'active' &&
this._isSchedulable(boundConsoleAccount.schedulable)
) {
// 检查是否临时不可用
const isTempUnavailable = await this.isAccountTemporarilyUnavailable(
boundConsoleAccount.id,
'claude-console'
)
if (isTempUnavailable) {
logger.warn(
`⏱️ Bound Claude Console account ${boundConsoleAccount.id} is temporarily unavailable, falling back to pool`
)
} else {
logger.info(
`🎯 Using bound dedicated Claude Console account: ${boundConsoleAccount.name} (${apiKeyData.claudeConsoleAccountId}) for API key ${apiKeyData.name}`
)
@@ -228,6 +249,7 @@ class UnifiedClaudeScheduler {
accountId: apiKeyData.claudeConsoleAccountId,
accountType: 'claude-console'
}
}
} else {
logger.warn(
`⚠️ Bound Claude Console account ${apiKeyData.claudeConsoleAccountId} is not available (isActive: ${boundConsoleAccount?.isActive}, status: ${boundConsoleAccount?.status}, schedulable: ${boundConsoleAccount?.schedulable}), falling back to pool`
@@ -245,6 +267,16 @@ class UnifiedClaudeScheduler {
boundBedrockAccountResult.data.isActive === true &&
this._isSchedulable(boundBedrockAccountResult.data.schedulable)
) {
// 检查是否临时不可用
const isTempUnavailable = await this.isAccountTemporarilyUnavailable(
apiKeyData.bedrockAccountId,
'bedrock'
)
if (isTempUnavailable) {
logger.warn(
`⏱️ Bound Bedrock account ${apiKeyData.bedrockAccountId} is temporarily unavailable, falling back to pool`
)
} else {
logger.info(
`🎯 Using bound dedicated Bedrock account: ${boundBedrockAccountResult.data.name} (${apiKeyData.bedrockAccountId}) for API key ${apiKeyData.name}`
)
@@ -252,6 +284,7 @@ class UnifiedClaudeScheduler {
accountId: apiKeyData.bedrockAccountId,
accountType: 'bedrock'
}
}
} else {
logger.warn(
`⚠️ Bound Bedrock account ${apiKeyData.bedrockAccountId} is not available (isActive: ${boundBedrockAccountResult?.data?.isActive}, schedulable: ${boundBedrockAccountResult?.data?.schedulable}), falling back to pool`
@@ -496,6 +529,18 @@ class UnifiedClaudeScheduler {
continue
}
// 检查是否临时不可用
const isTempUnavailable = await this.isAccountTemporarilyUnavailable(
account.id,
'claude-official'
)
if (isTempUnavailable) {
logger.debug(
`⏭️ Skipping Claude Official account ${account.name} - temporarily unavailable`
)
continue
}
// 检查是否被限流
const isRateLimited = await claudeAccountService.isAccountRateLimited(account.id)
if (isRateLimited) {
@@ -584,6 +629,18 @@ class UnifiedClaudeScheduler {
// 继续处理该账号
}
// 检查是否临时不可用
const isTempUnavailable = await this.isAccountTemporarilyUnavailable(
currentAccount.id,
'claude-console'
)
if (isTempUnavailable) {
logger.debug(
`⏭️ Skipping Claude Console account ${currentAccount.name} - temporarily unavailable`
)
continue
}
// 检查是否被限流
const isRateLimited = await claudeConsoleAccountService.isAccountRateLimited(
currentAccount.id
@@ -682,7 +739,15 @@ class UnifiedClaudeScheduler {
account.accountType === 'shared' &&
this._isSchedulable(account.schedulable)
) {
// 检查是否可调度
// 检查是否临时不可用
const isTempUnavailable = await this.isAccountTemporarilyUnavailable(
account.id,
'bedrock'
)
if (isTempUnavailable) {
logger.debug(`⏭️ Skipping Bedrock account ${account.name} - temporarily unavailable`)
continue
}
availableAccounts.push({
...account,
@@ -731,6 +796,13 @@ class UnifiedClaudeScheduler {
continue
}
// 检查是否临时不可用
const isTempUnavailable = await this.isAccountTemporarilyUnavailable(account.id, 'ccr')
if (isTempUnavailable) {
logger.debug(`⏭️ Skipping CCR account ${account.name} - temporarily unavailable`)
continue
}
// 检查是否被限流
const isRateLimited = await ccrAccountService.isAccountRateLimited(account.id)
const isQuotaExceeded = await ccrAccountService.isAccountQuotaExceeded(account.id)
@@ -1099,6 +1171,42 @@ class UnifiedClaudeScheduler {
}
}
// ⏱️ 标记账户为临时不可用状态用于5xx等临时故障默认5分钟后自动恢复
async markAccountTemporarilyUnavailable(
accountId,
accountType,
sessionHash = null,
ttlSeconds = 300
) {
try {
const client = redis.getClientSafe()
const key = `temp_unavailable:${accountType}:${accountId}`
await client.setex(key, ttlSeconds, '1')
if (sessionHash) {
await this._deleteSessionMapping(sessionHash)
}
logger.warn(
`⏱️ Account ${accountId} (${accountType}) marked temporarily unavailable for ${ttlSeconds}s`
)
return { success: true }
} catch (error) {
logger.error(`❌ Failed to mark account temporarily unavailable: ${accountId}`, error)
return { success: false }
}
}
// 🔍 检查账户是否临时不可用
async isAccountTemporarilyUnavailable(accountId, accountType) {
try {
const client = redis.getClientSafe()
const key = `temp_unavailable:${accountType}:${accountId}`
return (await client.exists(key)) === 1
} catch (error) {
logger.error(`❌ Failed to check temp unavailable status: ${accountId}`, error)
return false
}
}
// 🚫 标记账户为限流状态
async markAccountRateLimited(
accountId,

View File

@@ -0,0 +1,81 @@
const logger = require('./logger')
function parseList(envValue) {
if (!envValue) {
return []
}
return envValue
.split(',')
.map((s) => s.trim().toLowerCase())
.filter(Boolean)
}
const unstableTypes = new Set(parseList(process.env.UNSTABLE_ERROR_TYPES))
const unstableKeywords = parseList(process.env.UNSTABLE_ERROR_KEYWORDS)
const unstableStatusCodes = new Set([408, 499, 502, 503, 504, 522])
function normalizeErrorPayload(payload) {
if (!payload) {
return {}
}
if (typeof payload === 'string') {
try {
return normalizeErrorPayload(JSON.parse(payload))
} catch (e) {
return { message: payload }
}
}
if (payload.error && typeof payload.error === 'object') {
return {
type: payload.error.type || payload.error.error || payload.error.code,
code: payload.error.code || payload.error.error || payload.error.type,
message: payload.error.message || payload.error.msg || payload.message || payload.error.error
}
}
return {
type: payload.type || payload.code,
code: payload.code || payload.type,
message: payload.message || ''
}
}
function isUnstableUpstreamError(statusCode, payload) {
const normalizedStatus = Number(statusCode)
if (Number.isFinite(normalizedStatus) && normalizedStatus >= 500) {
return true
}
if (Number.isFinite(normalizedStatus) && unstableStatusCodes.has(normalizedStatus)) {
return true
}
const { type, code, message } = normalizeErrorPayload(payload)
const lowerType = (type || '').toString().toLowerCase()
const lowerCode = (code || '').toString().toLowerCase()
const lowerMessage = (message || '').toString().toLowerCase()
if (lowerType === 'server_error' || lowerCode === 'server_error') {
return true
}
if (unstableTypes.has(lowerType) || unstableTypes.has(lowerCode)) {
return true
}
if (unstableKeywords.length > 0) {
return unstableKeywords.some((kw) => lowerMessage.includes(kw))
}
return false
}
function logUnstable(accountLabel, statusCode) {
logger.warn(
`Detected unstable upstream error (${statusCode}) for account ${accountLabel}, marking temporarily unavailable`
)
}
module.exports = {
isUnstableUpstreamError,
logUnstable
}