Compare commits

...

3 Commits

Author SHA1 Message Date
github-actions[bot]
59ce0f091c chore: sync VERSION file with release v1.1.239 [skip ci] 2025-12-24 11:56:05 +00:00
shaw
67c20fa30e feat: 为 claude-official 账户添加 403 错误重试机制
针对 OAuth 和 Setup Token 类型的 Claude 账户,遇到 403 错误时:
- 休息 2 秒后进行重试
- 最多重试 2 次(总共最多 3 次请求)
- 重试后仍是 403 才标记账户为 blocked

同时支持流式和非流式请求,并修复了流式请求中的竞态条件问题。
2025-12-24 19:54:25 +08:00
shaw
671451253f fix: 修复并发清理任务 WRONGTYPE 错误
问题:
- 并发清理定时任务在遇到非 zset 类型的遗留键时报 WRONGTYPE 错误
- 错误键如 concurrency:wait:*, concurrency:user:*, concurrency:account:* 等

修复:
- app.js: 使用原子 Lua 脚本先检查键类型再执行清理,消除竞态条件
- redis.js: 为 6 个并发管理函数添加类型检查
  - getAllConcurrencyStatus(): 跳过 queue 键 + 类型检查
  - getConcurrencyStatus(): 类型检查,非 zset 返回 invalidType
  - forceClearConcurrency(): 类型检查,任意类型都删除
  - forceClearAllConcurrency(): 跳过 queue 键 + 类型检查
  - cleanupExpiredConcurrency(): 跳过 queue 键 + 类型检查

- 遗留键会被自动识别并删除,同时记录日志
2025-12-24 17:51:19 +08:00
4 changed files with 236 additions and 32 deletions

View File

@@ -1 +1 @@
1.1.238 1.1.239

View File

@@ -581,10 +581,11 @@ class Application {
const now = Date.now() const now = Date.now()
let totalCleaned = 0 let totalCleaned = 0
let legacyCleaned = 0
// 使用 Lua 脚本批量清理所有过期项 // 使用 Lua 脚本批量清理所有过期项
for (const key of keys) { for (const key of keys) {
// 跳过非 Sorted Set 类型的键(这些键有各自的清理逻辑) // 跳过已知非 Sorted Set 类型的键(这些键有各自的清理逻辑)
// - concurrency:queue:stats:* 是 Hash 类型 // - concurrency:queue:stats:* 是 Hash 类型
// - concurrency:queue:wait_times:* 是 List 类型 // - concurrency:queue:wait_times:* 是 List 类型
// - concurrency:queue:* (不含stats/wait_times) 是 String 类型 // - concurrency:queue:* (不含stats/wait_times) 是 String 类型
@@ -599,11 +600,21 @@ class Application {
} }
try { try {
const cleaned = await redis.client.eval( // 使用原子 Lua 脚本:先检查类型,再执行清理
// 返回值0 = 正常清理无删除1 = 清理后删除空键,-1 = 遗留键已删除
const result = await redis.client.eval(
` `
local key = KEYS[1] local key = KEYS[1]
local now = tonumber(ARGV[1]) local now = tonumber(ARGV[1])
-- 先检查键类型,只对 Sorted Set 执行清理
local keyType = redis.call('TYPE', key)
if keyType.ok ~= 'zset' then
-- 非 ZSET 类型的遗留键,直接删除
redis.call('DEL', key)
return -1
end
-- 清理过期项 -- 清理过期项
redis.call('ZREMRANGEBYSCORE', key, '-inf', now) redis.call('ZREMRANGEBYSCORE', key, '-inf', now)
@@ -622,8 +633,10 @@ class Application {
key, key,
now now
) )
if (cleaned === 1) { if (result === 1) {
totalCleaned++ totalCleaned++
} else if (result === -1) {
legacyCleaned++
} }
} catch (error) { } catch (error) {
logger.error(`❌ Failed to clean concurrency key ${key}:`, error) logger.error(`❌ Failed to clean concurrency key ${key}:`, error)
@@ -633,6 +646,9 @@ class Application {
if (totalCleaned > 0) { if (totalCleaned > 0) {
logger.info(`🔢 Concurrency cleanup: cleaned ${totalCleaned} expired keys`) logger.info(`🔢 Concurrency cleanup: cleaned ${totalCleaned} expired keys`)
} }
if (legacyCleaned > 0) {
logger.warn(`🧹 Concurrency cleanup: removed ${legacyCleaned} legacy keys (wrong type)`)
}
} catch (error) { } catch (error) {
logger.error('❌ Concurrency cleanup task failed:', error) logger.error('❌ Concurrency cleanup task failed:', error)
} }

View File

@@ -2140,6 +2140,27 @@ class RedisClient {
const results = [] const results = []
for (const key of keys) { for (const key of keys) {
// 跳过已知非 Sorted Set 类型的键
// - concurrency:queue:stats:* 是 Hash 类型
// - concurrency:queue:wait_times:* 是 List 类型
// - concurrency:queue:* (不含stats/wait_times) 是 String 类型
if (
key.startsWith('concurrency:queue:stats:') ||
key.startsWith('concurrency:queue:wait_times:') ||
(key.startsWith('concurrency:queue:') &&
!key.includes(':stats:') &&
!key.includes(':wait_times:'))
) {
continue
}
// 检查键类型,只处理 Sorted Set
const keyType = await client.type(key)
if (keyType !== 'zset') {
logger.debug(`🔢 getAllConcurrencyStatus skipped non-zset key: ${key} (type: ${keyType})`)
continue
}
// 提取 apiKeyId去掉 concurrency: 前缀) // 提取 apiKeyId去掉 concurrency: 前缀)
const apiKeyId = key.replace('concurrency:', '') const apiKeyId = key.replace('concurrency:', '')
@@ -2202,6 +2223,23 @@ class RedisClient {
} }
} }
// 检查键类型,只处理 Sorted Set
const keyType = await client.type(key)
if (keyType !== 'zset') {
logger.warn(
`⚠️ getConcurrencyStatus: key ${key} has unexpected type: ${keyType}, expected zset`
)
return {
apiKeyId,
key,
activeCount: 0,
expiredCount: 0,
activeRequests: [],
exists: true,
invalidType: keyType
}
}
// 获取所有成员和分数 // 获取所有成员和分数
const allMembers = await client.zrange(key, 0, -1, 'WITHSCORES') const allMembers = await client.zrange(key, 0, -1, 'WITHSCORES')
@@ -2251,20 +2289,36 @@ class RedisClient {
const client = this.getClientSafe() const client = this.getClientSafe()
const key = `concurrency:${apiKeyId}` const key = `concurrency:${apiKeyId}`
// 获取清理前的状态 // 检查键类型
const beforeCount = await client.zcard(key) const keyType = await client.type(key)
// 删除整个 key let beforeCount = 0
let isLegacy = false
if (keyType === 'zset') {
// 正常的 zset 键,获取条目数
beforeCount = await client.zcard(key)
} else if (keyType !== 'none') {
// 非 zset 且非空的遗留键
isLegacy = true
logger.warn(
`⚠️ forceClearConcurrency: key ${key} has unexpected type: ${keyType}, will be deleted`
)
}
// 删除键(无论什么类型)
await client.del(key) await client.del(key)
logger.warn( logger.warn(
`🧹 Force cleared concurrency for key ${apiKeyId}, removed ${beforeCount} entries` `🧹 Force cleared concurrency for key ${apiKeyId}, removed ${beforeCount} entries${isLegacy ? ' (legacy key)' : ''}`
) )
return { return {
apiKeyId, apiKeyId,
key, key,
clearedCount: beforeCount, clearedCount: beforeCount,
type: keyType,
legacy: isLegacy,
success: true success: true
} }
} catch (error) { } catch (error) {
@@ -2283,25 +2337,47 @@ class RedisClient {
const keys = await client.keys('concurrency:*') const keys = await client.keys('concurrency:*')
let totalCleared = 0 let totalCleared = 0
let legacyCleared = 0
const clearedKeys = [] const clearedKeys = []
for (const key of keys) { for (const key of keys) {
const count = await client.zcard(key) // 跳过 queue 相关的键(它们有各自的清理逻辑)
await client.del(key) if (key.startsWith('concurrency:queue:')) {
totalCleared += count continue
clearedKeys.push({ }
key,
clearedCount: count // 检查键类型
}) const keyType = await client.type(key)
if (keyType === 'zset') {
const count = await client.zcard(key)
await client.del(key)
totalCleared += count
clearedKeys.push({
key,
clearedCount: count,
type: 'zset'
})
} else {
// 非 zset 类型的遗留键,直接删除
await client.del(key)
legacyCleared++
clearedKeys.push({
key,
clearedCount: 0,
type: keyType,
legacy: true
})
}
} }
logger.warn( logger.warn(
`🧹 Force cleared all concurrency: ${keys.length} keys, ${totalCleared} total entries` `🧹 Force cleared all concurrency: ${clearedKeys.length} keys, ${totalCleared} entries, ${legacyCleared} legacy keys`
) )
return { return {
keysCleared: keys.length, keysCleared: clearedKeys.length,
totalEntriesCleared: totalCleared, totalEntriesCleared: totalCleared,
legacyKeysCleared: legacyCleared,
clearedKeys, clearedKeys,
success: true success: true
} }
@@ -2329,9 +2405,30 @@ class RedisClient {
} }
let totalCleaned = 0 let totalCleaned = 0
let legacyCleaned = 0
const cleanedKeys = [] const cleanedKeys = []
for (const key of keys) { for (const key of keys) {
// 跳过 queue 相关的键(它们有各自的清理逻辑)
if (key.startsWith('concurrency:queue:')) {
continue
}
// 检查键类型
const keyType = await client.type(key)
if (keyType !== 'zset') {
// 非 zset 类型的遗留键,直接删除
await client.del(key)
legacyCleaned++
cleanedKeys.push({
key,
cleanedCount: 0,
type: keyType,
legacy: true
})
continue
}
// 只清理过期的条目 // 只清理过期的条目
const cleaned = await client.zremrangebyscore(key, '-inf', now) const cleaned = await client.zremrangebyscore(key, '-inf', now)
if (cleaned > 0) { if (cleaned > 0) {
@@ -2350,13 +2447,14 @@ class RedisClient {
} }
logger.info( logger.info(
`🧹 Cleaned up expired concurrency: ${totalCleaned} entries from ${cleanedKeys.length} keys` `🧹 Cleaned up expired concurrency: ${totalCleaned} entries from ${cleanedKeys.length} keys, ${legacyCleaned} legacy keys removed`
) )
return { return {
keysProcessed: keys.length, keysProcessed: keys.length,
keysCleaned: cleanedKeys.length, keysCleaned: cleanedKeys.length,
totalEntriesCleaned: totalCleaned, totalEntriesCleaned: totalCleaned,
legacyKeysRemoved: legacyCleaned,
cleanedKeys, cleanedKeys,
success: true success: true
} }

View File

@@ -333,17 +333,46 @@ class ClaudeRelayService {
} }
// 发送请求到Claude API传入回调以获取请求对象 // 发送请求到Claude API传入回调以获取请求对象
const response = await this._makeClaudeRequest( // 🔄 403 重试机制:仅对 claude-official 类型账户OAuth 或 Setup Token
processedBody, const maxRetries = this._shouldRetryOn403(accountType) ? 2 : 0
accessToken, let retryCount = 0
proxyAgent, let response
clientHeaders, let shouldRetry = false
accountId,
(req) => { do {
upstreamRequest = req response = await this._makeClaudeRequest(
}, processedBody,
options accessToken,
) proxyAgent,
clientHeaders,
accountId,
(req) => {
upstreamRequest = req
},
options
)
// 检查是否需要重试 403
shouldRetry = response.statusCode === 403 && retryCount < maxRetries
if (shouldRetry) {
retryCount++
logger.warn(
`🔄 403 error for account ${accountId}, retry ${retryCount}/${maxRetries} after 2s`
)
await this._sleep(2000)
}
} while (shouldRetry)
// 如果进行了重试,记录最终结果
if (retryCount > 0) {
if (response.statusCode === 403) {
logger.error(`🚫 403 error persists for account ${accountId} after ${retryCount} retries`)
} else {
logger.info(
`✅ 403 retry successful for account ${accountId} on attempt ${retryCount}, got status ${response.statusCode}`
)
}
}
// 📬 请求已发送成功,立即释放队列锁(无需等待响应处理完成) // 📬 请求已发送成功,立即释放队列锁(无需等待响应处理完成)
// 因为 Claude API 限流基于请求发送时刻计算RPM不是请求完成时刻 // 因为 Claude API 限流基于请求发送时刻计算RPM不是请求完成时刻
@@ -408,9 +437,10 @@ class ClaudeRelayService {
} }
} }
// 检查是否为403状态码禁止访问 // 检查是否为403状态码禁止访问
// 注意如果进行了重试retryCount > 0这里的 403 是重试后最终的结果
else if (response.statusCode === 403) { else if (response.statusCode === 403) {
logger.error( logger.error(
`🚫 Forbidden error (403) detected for account ${accountId}, marking as blocked` `🚫 Forbidden error (403) detected for account ${accountId}${retryCount > 0 ? ` after ${retryCount} retries` : ''}, marking as blocked`
) )
await unifiedClaudeScheduler.markAccountBlocked(accountId, accountType, sessionHash) await unifiedClaudeScheduler.markAccountBlocked(accountId, accountType, sessionHash)
} }
@@ -1517,8 +1547,10 @@ class ClaudeRelayService {
streamTransformer = null, streamTransformer = null,
requestOptions = {}, requestOptions = {},
isDedicatedOfficialAccount = false, isDedicatedOfficialAccount = false,
onResponseStart = null // 📬 新增:收到响应头时的回调,用于提前释放队列锁 onResponseStart = null, // 📬 新增:收到响应头时的回调,用于提前释放队列锁
retryCount = 0 // 🔄 403 重试计数器
) { ) {
const maxRetries = 2 // 最大重试次数
// 获取账户信息用于统一 User-Agent // 获取账户信息用于统一 User-Agent
const account = await claudeAccountService.getAccount(accountId) const account = await claudeAccountService.getAccount(accountId)
@@ -1631,6 +1663,51 @@ class ClaudeRelayService {
} }
} }
// 🔄 403 重试机制(必须在设置 res.on('data')/res.on('end') 之前处理)
// 否则重试时旧响应的 on('end') 会与新请求产生竞态条件
if (res.statusCode === 403) {
const canRetry =
this._shouldRetryOn403(accountType) &&
retryCount < maxRetries &&
!responseStream.headersSent
if (canRetry) {
logger.warn(
`🔄 [Stream] 403 error for account ${accountId}, retry ${retryCount + 1}/${maxRetries} after 2s`
)
// 消费当前响应并销毁请求
res.resume()
req.destroy()
// 等待 2 秒后递归重试
await this._sleep(2000)
try {
// 递归调用自身进行重试
const retryResult = await this._makeClaudeStreamRequestWithUsageCapture(
body,
accessToken,
proxyAgent,
clientHeaders,
responseStream,
usageCallback,
accountId,
accountType,
sessionHash,
streamTransformer,
requestOptions,
isDedicatedOfficialAccount,
onResponseStart,
retryCount + 1
)
resolve(retryResult)
} catch (retryError) {
reject(retryError)
}
return // 重要:提前返回,不设置后续的错误处理器
}
}
// 将错误处理逻辑封装在一个异步函数中 // 将错误处理逻辑封装在一个异步函数中
const handleErrorResponse = async () => { const handleErrorResponse = async () => {
if (res.statusCode === 401) { if (res.statusCode === 401) {
@@ -1654,8 +1731,10 @@ class ClaudeRelayService {
) )
} }
} else if (res.statusCode === 403) { } else if (res.statusCode === 403) {
// 403 处理:走到这里说明重试已用尽或不适用重试,直接标记 blocked
// 注意:重试逻辑已在 handleErrorResponse 外部提前处理
logger.error( logger.error(
`🚫 [Stream] Forbidden error (403) detected for account ${accountId}, marking as blocked` `🚫 [Stream] Forbidden error (403) detected for account ${accountId}${retryCount > 0 ? ` after ${retryCount} retries` : ''}, marking as blocked`
) )
await unifiedClaudeScheduler.markAccountBlocked(accountId, accountType, sessionHash) await unifiedClaudeScheduler.markAccountBlocked(accountId, accountType, sessionHash)
} else if (res.statusCode === 529) { } else if (res.statusCode === 529) {
@@ -2693,6 +2772,17 @@ class ClaudeRelayService {
} }
} }
} }
// 🔄 判断账户是否应该在 403 错误时进行重试
// 仅 claude-official 类型账户OAuth 或 Setup Token 授权)需要重试
_shouldRetryOn403(accountType) {
return accountType === 'claude-official'
}
// ⏱️ 等待指定毫秒数
_sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms))
}
} }
module.exports = new ClaudeRelayService() module.exports = new ClaudeRelayService()