feat: 添加用户消息串行队列功能,防止同账户并发请求触发限流

- 新增 userMessageQueueService.js 实现基于 Redis 的队列锁机制
- 在 claudeRelayService、claudeConsoleRelayService、bedrockRelayService、ccrRelayService 中集成队列锁
- 添加 Redis 原子性 Lua 脚本:acquireUserMessageLock、releaseUserMessageLock、refreshUserMessageLock
- 支持锁续租机制,防止长时间请求锁过期
- 添加可配置参数:USER_MESSAGE_QUEUE_ENABLED、USER_MESSAGE_QUEUE_DELAY_MS、USER_MESSAGE_QUEUE_TIMEOUT_MS
- 添加 Web 管理界面配置入口
- 添加 logger.performance 方法用于结构化性能日志
- 添加完整单元测试 (tests/userMessageQueue.test.js)
This commit is contained in:
QTom
2025-12-09 17:04:01 +08:00
committed by QTom
parent 95870883a1
commit f5d1c25295
14 changed files with 2048 additions and 18 deletions

View File

@@ -15,6 +15,7 @@ const ClaudeCodeValidator = require('../validators/clients/claudeCodeValidator')
const { formatDateWithTimezone } = require('../utils/dateHelper')
const requestIdentityService = require('./requestIdentityService')
const { createClaudeTestPayload } = require('../utils/testPayloadHelper')
const userMessageQueueService = require('./userMessageQueueService')
class ClaudeRelayService {
constructor() {
@@ -148,6 +149,10 @@ class ClaudeRelayService {
options = {}
) {
let upstreamRequest = null
let queueLockAcquired = false
let queueRequestId = null
let queueLockRenewalStopper = null
let selectedAccountId = null
try {
// 调试日志查看API Key数据
@@ -192,11 +197,74 @@ class ClaudeRelayService {
}
const { accountId } = accountSelection
const { accountType } = accountSelection
selectedAccountId = accountId
logger.info(
`📤 Processing API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${accountId} (${accountType})${sessionHash ? `, session: ${sessionHash}` : ''}`
)
// 📬 用户消息队列处理:如果是用户消息请求,需要获取队列锁
if (userMessageQueueService.isUserMessageRequest(requestBody)) {
// 校验 accountId 非空,避免空值污染队列锁键
if (!accountId || accountId === '') {
logger.error('❌ accountId missing for queue lock in relayRequest')
throw new Error('accountId missing for queue lock')
}
const queueResult = await userMessageQueueService.acquireQueueLock(accountId)
if (!queueResult.acquired && !queueResult.skipped) {
// 区分 Redis 后端错误和队列超时
const isBackendError = queueResult.error === 'queue_backend_error'
const errorCode = isBackendError ? 'QUEUE_BACKEND_ERROR' : 'QUEUE_TIMEOUT'
const errorType = isBackendError ? 'queue_backend_error' : 'queue_timeout'
const errorMessage = isBackendError
? 'Queue service temporarily unavailable, please retry later'
: 'User message queue wait timeout, please retry later'
const statusCode = isBackendError ? 500 : 503
// 结构化性能日志,用于后续统计
logger.performance('user_message_queue_error', {
errorType,
errorCode,
accountId,
statusCode,
apiKeyName: apiKeyData.name,
backendError: isBackendError ? queueResult.errorMessage : undefined
})
logger.warn(
`📬 User message queue ${errorType} for account ${accountId}, key: ${apiKeyData.name}`,
isBackendError ? { backendError: queueResult.errorMessage } : {}
)
return {
statusCode,
headers: {
'Content-Type': 'application/json',
'x-user-message-queue-error': errorType
},
body: JSON.stringify({
type: 'error',
error: {
type: errorType,
code: errorCode,
message: errorMessage
}
}),
accountId
}
}
if (queueResult.acquired && !queueResult.skipped) {
queueLockAcquired = true
queueRequestId = queueResult.requestId
queueLockRenewalStopper = await userMessageQueueService.startLockRenewal(
accountId,
queueRequestId
)
logger.debug(
`📬 User message queue lock acquired for account ${accountId}, requestId: ${queueRequestId}`
)
}
}
// 获取账户信息
let account = await claudeAccountService.getAccount(accountId)
@@ -539,6 +607,21 @@ class ClaudeRelayService {
error.message
)
throw error
} finally {
// 📬 释放用户消息队列锁
if (queueLockAcquired && queueRequestId && selectedAccountId) {
try {
if (queueLockRenewalStopper) {
queueLockRenewalStopper()
}
await userMessageQueueService.releaseQueueLock(selectedAccountId, queueRequestId)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock for account ${selectedAccountId}:`,
releaseError.message
)
}
}
}
}
@@ -1057,8 +1140,6 @@ class ClaudeRelayService {
timeout: config.requestTimeout || 600000
}
console.log(options.path)
const req = https.request(options, (res) => {
let responseData = Buffer.alloc(0)
@@ -1112,7 +1193,6 @@ class ClaudeRelayService {
}
req.on('error', async (error) => {
console.error(': ❌ ', error)
logger.error(`❌ Claude API request error (Account: ${accountId}):`, error.message, {
code: error.code,
errno: error.errno,
@@ -1163,6 +1243,11 @@ class ClaudeRelayService {
streamTransformer = null,
options = {}
) {
let queueLockAcquired = false
let queueRequestId = null
let queueLockRenewalStopper = null
let selectedAccountId = null
try {
// 调试日志查看API Key数据流式请求
logger.info('🔍 [Stream] API Key data received:', {
@@ -1206,6 +1291,74 @@ class ClaudeRelayService {
}
const { accountId } = accountSelection
const { accountType } = accountSelection
selectedAccountId = accountId
// 📬 用户消息队列处理:如果是用户消息请求,需要获取队列锁
if (userMessageQueueService.isUserMessageRequest(requestBody)) {
// 校验 accountId 非空,避免空值污染队列锁键
if (!accountId || accountId === '') {
logger.error('❌ accountId missing for queue lock in relayStreamRequestWithUsageCapture')
throw new Error('accountId missing for queue lock')
}
const queueResult = await userMessageQueueService.acquireQueueLock(accountId)
if (!queueResult.acquired && !queueResult.skipped) {
// 区分 Redis 后端错误和队列超时
const isBackendError = queueResult.error === 'queue_backend_error'
const errorCode = isBackendError ? 'QUEUE_BACKEND_ERROR' : 'QUEUE_TIMEOUT'
const errorType = isBackendError ? 'queue_backend_error' : 'queue_timeout'
const errorMessage = isBackendError
? 'Queue service temporarily unavailable, please retry later'
: 'User message queue wait timeout, please retry later'
const statusCode = isBackendError ? 500 : 503
// 结构化性能日志,用于后续统计
logger.performance('user_message_queue_error', {
errorType,
errorCode,
accountId,
statusCode,
stream: true,
apiKeyName: apiKeyData.name,
backendError: isBackendError ? queueResult.errorMessage : undefined
})
logger.warn(
`📬 User message queue ${errorType} for account ${accountId} (stream), key: ${apiKeyData.name}`,
isBackendError ? { backendError: queueResult.errorMessage } : {}
)
if (!responseStream.headersSent) {
responseStream.writeHead(statusCode, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'x-user-message-queue-error': errorType
})
}
const errorEvent = `event: error\ndata: ${JSON.stringify({
type: 'error',
error: {
type: errorType,
code: errorCode,
message: errorMessage
}
})}\n\n`
responseStream.write(errorEvent)
responseStream.write('data: [DONE]\n\n')
responseStream.end()
return
}
if (queueResult.acquired && !queueResult.skipped) {
queueLockAcquired = true
queueRequestId = queueResult.requestId
queueLockRenewalStopper = await userMessageQueueService.startLockRenewal(
accountId,
queueRequestId
)
logger.debug(
`📬 User message queue lock acquired for account ${accountId} (stream), requestId: ${queueRequestId}`
)
}
}
logger.info(
`📡 Processing streaming API request with usage capture for key: ${apiKeyData.name || apiKeyData.id}, account: ${accountId} (${accountType})${sessionHash ? `, session: ${sessionHash}` : ''}`
@@ -1277,6 +1430,21 @@ class ClaudeRelayService {
} catch (error) {
logger.error(`❌ Claude stream relay with usage capture failed:`, error)
throw error
} finally {
// 📬 释放用户消息队列锁
if (queueLockAcquired && queueRequestId && selectedAccountId) {
try {
if (queueLockRenewalStopper) {
queueLockRenewalStopper()
}
await userMessageQueueService.releaseQueueLock(selectedAccountId, queueRequestId)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock for stream account ${selectedAccountId}:`,
releaseError.message
)
}
}
}
}
@@ -1478,7 +1646,6 @@ class ClaudeRelayService {
})
res.on('end', () => {
console.error(': ❌ ', errorData)
logger.error(
`❌ Claude API error response (Account: ${account?.name || accountId}):`,
errorData
@@ -1950,7 +2117,7 @@ class ClaudeRelayService {
responseStream.on('close', () => {
logger.debug('🔌 Client disconnected, cleaning up stream')
if (!req.destroyed) {
req.destroy()
req.destroy(new Error('Client disconnected'))
}
})