From 3b9c96dff8ca2c25907740e051f604d284225917 Mon Sep 17 00:00:00 2001 From: QTom Date: Wed, 10 Dec 2025 01:26:00 +0800 Subject: [PATCH] =?UTF-8?q?feat(queue):=20=E4=BC=98=E5=8C=96=E7=94=A8?= =?UTF-8?q?=E6=88=B7=E6=B6=88=E6=81=AF=E9=98=9F=E5=88=97=E9=94=81=E9=87=8A?= =?UTF-8?q?=E6=94=BE=E6=97=B6=E6=9C=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 将队列锁释放时机从"请求完成后"提前到"请求发送后",因为 Claude API 限流(RPM)基于请求发送时刻计算,无需等待响应完成。 主要变更: - 移除锁续租机制(startLockRenewal、refreshUserMessageLock) - 所有 relay 服务在请求发送成功后立即释放锁 - 流式请求通过 onResponseStart 回调在收到响应头时释放 - 调整默认配置:timeoutMs 60s→5s,lockTtlMs 120s→5s - 新增 USER_MESSAGE_QUEUE_LOCK_TTL_MS 环境变量支持 --- CLAUDE.md | 7 +- config/config.example.js | 7 +- src/app.js | 3 +- src/models/redis.js | 32 ------ src/services/bedrockRelayService.js | 62 ++++++++--- src/services/ccrRelayService.js | 85 +++++++++++---- src/services/claudeConsoleRelayService.js | 83 +++++++++++---- src/services/claudeRelayConfigService.js | 9 +- src/services/claudeRelayService.js | 75 ++++++++++---- src/services/userMessageQueueService.js | 120 ++-------------------- tests/userMessageQueue.test.js | 82 --------------- 11 files changed, 251 insertions(+), 314 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index c918feef..f1f47ec1 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -186,9 +186,10 @@ npm run service:stop # 停止服务 - `CLAUDE_OVERLOAD_HANDLING_MINUTES`: Claude 529错误处理持续时间(分钟,0表示禁用) - `STICKY_SESSION_TTL_HOURS`: 粘性会话TTL(小时,默认1) - `STICKY_SESSION_RENEWAL_THRESHOLD_MINUTES`: 粘性会话续期阈值(分钟,默认0) -- `USER_MESSAGE_QUEUE_ENABLED`: 启用用户消息串行队列(默认true) +- `USER_MESSAGE_QUEUE_ENABLED`: 启用用户消息串行队列(默认false) - `USER_MESSAGE_QUEUE_DELAY_MS`: 用户消息请求间隔(毫秒,默认200) -- `USER_MESSAGE_QUEUE_TIMEOUT_MS`: 队列等待超时(毫秒,默认30000) +- `USER_MESSAGE_QUEUE_TIMEOUT_MS`: 队列等待超时(毫秒,默认5000,锁持有时间短无需长等待) +- `USER_MESSAGE_QUEUE_LOCK_TTL_MS`: 锁TTL(毫秒,默认5000,请求发送后立即释放无需长TTL) - `METRICS_WINDOW`: 实时指标统计窗口(分钟,1-60,默认5) - `MAX_API_KEYS_PER_USER`: 每用户最大API Key数量(默认1) - `ALLOW_USER_DELETE_API_KEYS`: 允许用户删除自己的API Keys(默认false) @@ -341,7 +342,7 @@ npm run setup # 自动生成密钥并创建管理员账户 11. **速率限制未清理**: rateLimitCleanupService每5分钟自动清理过期限流状态 12. **成本统计不准确**: 运行 `npm run init:costs` 初始化成本数据,检查pricingService是否正确加载模型价格 13. **缓存命中率低**: 查看缓存监控统计,调整LRU缓存大小配置 -14. **用户消息队列超时**: 检查 `USER_MESSAGE_QUEUE_TIMEOUT_MS` 配置是否合理,查看日志中的 `queue_timeout` 错误,可通过 Web 界面或 `USER_MESSAGE_QUEUE_ENABLED=false` 禁用此功能 +14. **用户消息队列超时**: 优化后锁持有时间已从分钟级降到毫秒级(请求发送后立即释放),默认 `USER_MESSAGE_QUEUE_TIMEOUT_MS=5000` 已足够。如仍有超时,检查网络延迟或禁用此功能(`USER_MESSAGE_QUEUE_ENABLED=false`) ### 调试工具 diff --git a/config/config.example.js b/config/config.example.js index 090937ed..9cf26002 100644 --- a/config/config.example.js +++ b/config/config.example.js @@ -206,11 +206,12 @@ const config = { }, // 📬 用户消息队列配置 + // 优化说明:锁在请求发送成功后立即释放(而非请求完成后),因为 Claude API 限流基于请求发送时刻计算 userMessageQueue: { enabled: process.env.USER_MESSAGE_QUEUE_ENABLED === 'true', // 默认关闭 - delayMs: parseInt(process.env.USER_MESSAGE_QUEUE_DELAY_MS) || 100, // 请求间隔(毫秒) - timeoutMs: parseInt(process.env.USER_MESSAGE_QUEUE_TIMEOUT_MS) || 60000, // 队列等待超时(毫秒) - lockTtlMs: 120000 // 锁租约TTL(毫秒),会在请求期间自动续租以防死锁 + delayMs: parseInt(process.env.USER_MESSAGE_QUEUE_DELAY_MS) || 200, // 请求间隔(毫秒) + timeoutMs: parseInt(process.env.USER_MESSAGE_QUEUE_TIMEOUT_MS) || 5000, // 队列等待超时(毫秒),锁持有时间短,无需长等待 + lockTtlMs: parseInt(process.env.USER_MESSAGE_QUEUE_LOCK_TTL_MS) || 5000 // 锁TTL(毫秒),5秒足以覆盖请求发送 } } diff --git a/src/app.js b/src/app.js index 2a85850e..e0a675f5 100644 --- a/src/app.js +++ b/src/app.js @@ -669,10 +669,9 @@ class Application { logger.error('❌ Error stopping rate limit cleanup service:', error) } - // 停止用户消息队列清理服务和续租定时器 + // 停止用户消息队列清理服务 try { const userMessageQueueService = require('./services/userMessageQueueService') - userMessageQueueService.stopAllRenewalTimers() userMessageQueueService.stopCleanupTask() logger.info('📬 User message queue service stopped') } catch (error) { diff --git a/src/models/redis.js b/src/models/redis.js index a36a27aa..e34054f3 100644 --- a/src/models/redis.js +++ b/src/models/redis.js @@ -2626,38 +2626,6 @@ redisClient.acquireUserMessageLock = async function (accountId, requestId, lockT } } -/** - * 续租用户消息队列锁(仅锁持有者可续租) - * @param {string} accountId - 账户ID - * @param {string} requestId - 请求ID - * @param {number} lockTtlMs - 锁 TTL(毫秒) - * @returns {Promise} 是否续租成功(只有锁持有者才能续租) - */ -redisClient.refreshUserMessageLock = async function (accountId, requestId, lockTtlMs) { - const lockKey = `user_msg_queue_lock:${accountId}` - - const script = ` - local lockKey = KEYS[1] - local requestId = ARGV[1] - local lockTtl = tonumber(ARGV[2]) - - local currentLock = redis.call('GET', lockKey) - if currentLock == requestId then - redis.call('PEXPIRE', lockKey, lockTtl) - return 1 - end - return 0 - ` - - try { - const result = await this.client.eval(script, 1, lockKey, requestId, lockTtlMs) - return result === 1 - } catch (error) { - logger.error(`Failed to refresh user message lock for account ${accountId}:`, error) - return false - } -} - /** * 释放用户消息队列锁并记录完成时间 * @param {string} accountId - 账户ID diff --git a/src/services/bedrockRelayService.js b/src/services/bedrockRelayService.js index c14a5a40..ec8ec126 100644 --- a/src/services/bedrockRelayService.js +++ b/src/services/bedrockRelayService.js @@ -73,7 +73,6 @@ class BedrockRelayService { const accountId = bedrockAccount?.id let queueLockAcquired = false let queueRequestId = null - let queueLockRenewalStopper = null try { // 📬 用户消息队列处理 @@ -127,9 +126,8 @@ class BedrockRelayService { if (queueResult.acquired && !queueResult.skipped) { queueLockAcquired = true queueRequestId = queueResult.requestId - queueLockRenewalStopper = await userMessageQueueService.startLockRenewal( - accountId, - queueRequestId + logger.debug( + `📬 User message queue lock acquired for Bedrock account ${accountId}, requestId: ${queueRequestId}` ) } } @@ -154,6 +152,23 @@ class BedrockRelayService { const response = await client.send(command) const duration = Date.now() - startTime + // 📬 请求已发送成功,立即释放队列锁(无需等待响应处理完成) + // 因为限流基于请求发送时刻计算(RPM),不是请求完成时刻 + if (queueLockAcquired && queueRequestId && accountId) { + try { + await userMessageQueueService.releaseQueueLock(accountId, queueRequestId) + queueLockAcquired = false // 标记已释放,防止 finally 重复释放 + logger.debug( + `📬 User message queue lock released early for Bedrock account ${accountId}, requestId: ${queueRequestId}` + ) + } catch (releaseError) { + logger.error( + `❌ Failed to release user message queue lock early for Bedrock account ${accountId}:`, + releaseError.message + ) + } + } + // 解析响应 const responseBody = JSON.parse(new TextDecoder().decode(response.body)) const claudeResponse = this._convertFromBedrockFormat(responseBody) @@ -171,13 +186,13 @@ class BedrockRelayService { logger.error('❌ Bedrock非流式请求失败:', error) throw this._handleBedrockError(error) } finally { - // 📬 释放用户消息队列锁 + // 📬 释放用户消息队列锁(兜底,正常情况下已在请求发送后提前释放) if (queueLockAcquired && queueRequestId && accountId) { try { - if (queueLockRenewalStopper) { - queueLockRenewalStopper() - } await userMessageQueueService.releaseQueueLock(accountId, queueRequestId) + logger.debug( + `📬 User message queue lock released in finally for Bedrock account ${accountId}, requestId: ${queueRequestId}` + ) } catch (releaseError) { logger.error( `❌ Failed to release user message queue lock for Bedrock account ${accountId}:`, @@ -193,7 +208,6 @@ class BedrockRelayService { const accountId = bedrockAccount?.id let queueLockAcquired = false let queueRequestId = null - let queueLockRenewalStopper = null try { // 📬 用户消息队列处理 @@ -252,9 +266,8 @@ class BedrockRelayService { if (queueResult.acquired && !queueResult.skipped) { queueLockAcquired = true queueRequestId = queueResult.requestId - queueLockRenewalStopper = await userMessageQueueService.startLockRenewal( - accountId, - queueRequestId + logger.debug( + `📬 User message queue lock acquired for Bedrock account ${accountId} (stream), requestId: ${queueRequestId}` ) } } @@ -278,6 +291,23 @@ class BedrockRelayService { const startTime = Date.now() const response = await client.send(command) + // 📬 请求已发送成功,立即释放队列锁(无需等待响应处理完成) + // 因为限流基于请求发送时刻计算(RPM),不是请求完成时刻 + if (queueLockAcquired && queueRequestId && accountId) { + try { + await userMessageQueueService.releaseQueueLock(accountId, queueRequestId) + queueLockAcquired = false // 标记已释放,防止 finally 重复释放 + logger.debug( + `📬 User message queue lock released early for Bedrock stream account ${accountId}, requestId: ${queueRequestId}` + ) + } catch (releaseError) { + logger.error( + `❌ Failed to release user message queue lock early for Bedrock stream account ${accountId}:`, + releaseError.message + ) + } + } + // 设置SSE响应头 res.writeHead(200, { 'Content-Type': 'text/event-stream', @@ -339,13 +369,13 @@ class BedrockRelayService { throw this._handleBedrockError(error) } finally { - // 📬 释放用户消息队列锁 + // 📬 释放用户消息队列锁(兜底,正常情况下已在请求发送后提前释放) if (queueLockAcquired && queueRequestId && accountId) { try { - if (queueLockRenewalStopper) { - queueLockRenewalStopper() - } await userMessageQueueService.releaseQueueLock(accountId, queueRequestId) + logger.debug( + `📬 User message queue lock released in finally for Bedrock stream account ${accountId}, requestId: ${queueRequestId}` + ) } catch (releaseError) { logger.error( `❌ Failed to release user message queue lock for Bedrock stream account ${accountId}:`, diff --git a/src/services/ccrRelayService.js b/src/services/ccrRelayService.js index 5cd1a2a0..2f812ad9 100644 --- a/src/services/ccrRelayService.js +++ b/src/services/ccrRelayService.js @@ -24,7 +24,6 @@ class CcrRelayService { let account = null let queueLockAcquired = false let queueRequestId = null - let queueLockRenewalStopper = null try { // 📬 用户消息队列处理 @@ -78,9 +77,8 @@ class CcrRelayService { if (queueResult.acquired && !queueResult.skipped) { queueLockAcquired = true queueRequestId = queueResult.requestId - queueLockRenewalStopper = await userMessageQueueService.startLockRenewal( - accountId, - queueRequestId + logger.debug( + `📬 User message queue lock acquired for CCR account ${accountId}, requestId: ${queueRequestId}` ) } } @@ -224,6 +222,23 @@ class CcrRelayService { ) const response = await axios(requestConfig) + // 📬 请求已发送成功,立即释放队列锁(无需等待响应处理完成) + // 因为 Claude API 限流基于请求发送时刻计算(RPM),不是请求完成时刻 + if (queueLockAcquired && queueRequestId && accountId) { + try { + await userMessageQueueService.releaseQueueLock(accountId, queueRequestId) + queueLockAcquired = false // 标记已释放,防止 finally 重复释放 + logger.debug( + `📬 User message queue lock released early for CCR account ${accountId}, requestId: ${queueRequestId}` + ) + } catch (releaseError) { + logger.error( + `❌ Failed to release user message queue lock early for CCR account ${accountId}:`, + releaseError.message + ) + } + } + // 移除监听器(请求成功完成) if (clientRequest) { clientRequest.removeListener('close', handleClientDisconnect) @@ -296,13 +311,13 @@ class CcrRelayService { throw error } finally { - // 📬 释放用户消息队列锁 + // 📬 释放用户消息队列锁(兜底,正常情况下已在请求发送后提前释放) if (queueLockAcquired && queueRequestId && accountId) { try { - if (queueLockRenewalStopper) { - queueLockRenewalStopper() - } await userMessageQueueService.releaseQueueLock(accountId, queueRequestId) + logger.debug( + `📬 User message queue lock released in finally for CCR account ${accountId}, requestId: ${queueRequestId}` + ) } catch (releaseError) { logger.error( `❌ Failed to release user message queue lock for CCR account ${accountId}:`, @@ -327,7 +342,6 @@ class CcrRelayService { let account = null let queueLockAcquired = false let queueRequestId = null - let queueLockRenewalStopper = null try { // 📬 用户消息队列处理 @@ -388,9 +402,8 @@ class CcrRelayService { if (queueResult.acquired && !queueResult.skipped) { queueLockAcquired = true queueRequestId = queueResult.requestId - queueLockRenewalStopper = await userMessageQueueService.startLockRenewal( - accountId, - queueRequestId + logger.debug( + `📬 User message queue lock acquired for CCR account ${accountId} (stream), requestId: ${queueRequestId}` ) } } @@ -442,7 +455,24 @@ class CcrRelayService { accountId, usageCallback, streamTransformer, - options + options, + // 📬 回调:在收到响应头时释放队列锁 + async () => { + if (queueLockAcquired && queueRequestId && accountId) { + try { + await userMessageQueueService.releaseQueueLock(accountId, queueRequestId) + queueLockAcquired = false // 标记已释放,防止 finally 重复释放 + logger.debug( + `📬 User message queue lock released early for CCR stream account ${accountId}, requestId: ${queueRequestId}` + ) + } catch (releaseError) { + logger.error( + `❌ Failed to release user message queue lock early for CCR stream account ${accountId}:`, + releaseError.message + ) + } + } + } ) // 更新最后使用时间 @@ -451,13 +481,13 @@ class CcrRelayService { logger.error(`❌ CCR stream relay failed (Account: ${account?.name || accountId}):`, error) throw error } finally { - // 📬 释放用户消息队列锁 + // 📬 释放用户消息队列锁(兜底,正常情况下已在收到响应头后提前释放) if (queueLockAcquired && queueRequestId && accountId) { try { - if (queueLockRenewalStopper) { - queueLockRenewalStopper() - } await userMessageQueueService.releaseQueueLock(accountId, queueRequestId) + logger.debug( + `📬 User message queue lock released in finally for CCR stream account ${accountId}, requestId: ${queueRequestId}` + ) } catch (releaseError) { logger.error( `❌ Failed to release user message queue lock for CCR stream account ${accountId}:`, @@ -478,7 +508,8 @@ class CcrRelayService { accountId, usageCallback, streamTransformer = null, - requestOptions = {} + requestOptions = {}, + onResponseHeaderReceived = null ) { return new Promise((resolve, reject) => { let aborted = false @@ -541,8 +572,11 @@ class CcrRelayService { // 发送请求 const request = axios(requestConfig) + // 注意:使用 .then(async ...) 模式处理响应 + // - 内部的 releaseQueueLock 有独立的 try-catch,不会导致未捕获异常 + // - queueLockAcquired = false 的赋值会在 finally 执行前完成(JS 单线程保证) request - .then((response) => { + .then(async (response) => { logger.debug(`🌊 CCR stream response status: ${response.status}`) // 错误响应处理 @@ -592,6 +626,19 @@ class CcrRelayService { return } + // 📬 收到成功响应头(HTTP 200),调用回调释放队列锁 + // 此时请求已被 Claude API 接受并计入 RPM 配额,无需等待响应完成 + if (onResponseHeaderReceived && typeof onResponseHeaderReceived === 'function') { + try { + await onResponseHeaderReceived() + } catch (callbackError) { + logger.error( + `❌ Failed to execute onResponseHeaderReceived callback for CCR stream account ${accountId}:`, + callbackError.message + ) + } + } + // 成功响应,检查并移除错误状态 ccrAccountService.isAccountRateLimited(accountId).then((isRateLimited) => { if (isRateLimited) { diff --git a/src/services/claudeConsoleRelayService.js b/src/services/claudeConsoleRelayService.js index c8c2c4b8..9b539b8c 100644 --- a/src/services/claudeConsoleRelayService.js +++ b/src/services/claudeConsoleRelayService.js @@ -32,7 +32,6 @@ class ClaudeConsoleRelayService { let concurrencyAcquired = false let queueLockAcquired = false let queueRequestId = null - let queueLockRenewalStopper = null try { // 📬 用户消息队列处理:如果是用户消息请求,需要获取队列锁 @@ -87,10 +86,6 @@ class ClaudeConsoleRelayService { if (queueResult.acquired && !queueResult.skipped) { queueLockAcquired = true queueRequestId = queueResult.requestId - queueLockRenewalStopper = await userMessageQueueService.startLockRenewal( - accountId, - queueRequestId - ) logger.debug( `📬 User message queue lock acquired for console account ${accountId}, requestId: ${queueRequestId}` ) @@ -269,6 +264,23 @@ class ClaudeConsoleRelayService { ) const response = await axios(requestConfig) + // 📬 请求已发送成功,立即释放队列锁(无需等待响应处理完成) + // 因为 Claude API 限流基于请求发送时刻计算(RPM),不是请求完成时刻 + if (queueLockAcquired && queueRequestId && accountId) { + try { + await userMessageQueueService.releaseQueueLock(accountId, queueRequestId) + queueLockAcquired = false // 标记已释放,防止 finally 重复释放 + logger.debug( + `📬 User message queue lock released early for console account ${accountId}, requestId: ${queueRequestId}` + ) + } catch (releaseError) { + logger.error( + `❌ Failed to release user message queue lock early for console account ${accountId}:`, + releaseError.message + ) + } + } + // 移除监听器(请求成功完成) if (clientRequest) { clientRequest.removeListener('close', handleClientDisconnect) @@ -433,13 +445,13 @@ class ClaudeConsoleRelayService { } } - // 📬 释放用户消息队列锁 + // 📬 释放用户消息队列锁(兜底,正常情况下已在请求发送后提前释放) if (queueLockAcquired && queueRequestId && accountId) { try { - if (queueLockRenewalStopper) { - queueLockRenewalStopper() - } await userMessageQueueService.releaseQueueLock(accountId, queueRequestId) + logger.debug( + `📬 User message queue lock released in finally for console account ${accountId}, requestId: ${queueRequestId}` + ) } catch (releaseError) { logger.error( `❌ Failed to release user message queue lock for account ${accountId}:`, @@ -467,7 +479,6 @@ class ClaudeConsoleRelayService { let leaseRefreshInterval = null // 租约刷新定时器 let queueLockAcquired = false let queueRequestId = null - let queueLockRenewalStopper = null try { // 📬 用户消息队列处理:如果是用户消息请求,需要获取队列锁 @@ -522,10 +533,6 @@ class ClaudeConsoleRelayService { if (queueResult.acquired && !queueResult.skipped) { queueLockAcquired = true queueRequestId = queueResult.requestId - queueLockRenewalStopper = await userMessageQueueService.startLockRenewal( - accountId, - queueRequestId - ) logger.debug( `📬 User message queue lock acquired for console account ${accountId} (stream), requestId: ${queueRequestId}` ) @@ -629,7 +636,24 @@ class ClaudeConsoleRelayService { accountId, usageCallback, streamTransformer, - options + options, + // 📬 回调:在收到响应头时释放队列锁 + async () => { + if (queueLockAcquired && queueRequestId && accountId) { + try { + await userMessageQueueService.releaseQueueLock(accountId, queueRequestId) + queueLockAcquired = false // 标记已释放,防止 finally 重复释放 + logger.debug( + `📬 User message queue lock released early for console stream account ${accountId}, requestId: ${queueRequestId}` + ) + } catch (releaseError) { + logger.error( + `❌ Failed to release user message queue lock early for console stream account ${accountId}:`, + releaseError.message + ) + } + } + } ) // 更新最后使用时间 @@ -664,13 +688,13 @@ class ClaudeConsoleRelayService { } } - // 📬 释放用户消息队列锁 + // 📬 释放用户消息队列锁(兜底,正常情况下已在收到响应头后提前释放) if (queueLockAcquired && queueRequestId && accountId) { try { - if (queueLockRenewalStopper) { - queueLockRenewalStopper() - } await userMessageQueueService.releaseQueueLock(accountId, queueRequestId) + logger.debug( + `📬 User message queue lock released in finally for console stream account ${accountId}, requestId: ${queueRequestId}` + ) } catch (releaseError) { logger.error( `❌ Failed to release user message queue lock for stream account ${accountId}:`, @@ -691,7 +715,8 @@ class ClaudeConsoleRelayService { accountId, usageCallback, streamTransformer = null, - requestOptions = {} + requestOptions = {}, + onResponseHeaderReceived = null ) { return new Promise((resolve, reject) => { let aborted = false @@ -754,8 +779,11 @@ class ClaudeConsoleRelayService { // 发送请求 const request = axios(requestConfig) + // 注意:使用 .then(async ...) 模式处理响应 + // - 内部的 releaseQueueLock 有独立的 try-catch,不会导致未捕获异常 + // - queueLockAcquired = false 的赋值会在 finally 执行前完成(JS 单线程保证) request - .then((response) => { + .then(async (response) => { logger.debug(`🌊 Claude Console Claude stream response status: ${response.status}`) // 错误响应处理 @@ -862,6 +890,19 @@ class ClaudeConsoleRelayService { return } + // 📬 收到成功响应头(HTTP 200),调用回调释放队列锁 + // 此时请求已被 Claude API 接受并计入 RPM 配额,无需等待响应完成 + if (onResponseHeaderReceived && typeof onResponseHeaderReceived === 'function') { + try { + await onResponseHeaderReceived() + } catch (callbackError) { + logger.error( + `❌ Failed to execute onResponseHeaderReceived callback for console stream account ${accountId}:`, + callbackError.message + ) + } + } + // 成功响应,检查并移除错误状态 claudeConsoleAccountService.isAccountRateLimited(accountId).then((isRateLimited) => { if (isRateLimited) { diff --git a/src/services/claudeRelayConfigService.js b/src/services/claudeRelayConfigService.js index b4e7d0c1..6bab76ea 100644 --- a/src/services/claudeRelayConfigService.js +++ b/src/services/claudeRelayConfigService.js @@ -17,8 +17,9 @@ const DEFAULT_CONFIG = { sessionBindingTtlDays: 30, // 会话绑定 TTL(天),默认30天 // 用户消息队列配置 userMessageQueueEnabled: false, // 是否启用用户消息队列(默认关闭) - userMessageQueueDelayMs: 100, // 请求间隔(毫秒) - userMessageQueueTimeoutMs: 60000, // 队列超时(毫秒) + userMessageQueueDelayMs: 200, // 请求间隔(毫秒) + userMessageQueueTimeoutMs: 5000, // 队列等待超时(毫秒),优化后锁持有时间短无需长等待 + userMessageQueueLockTtlMs: 5000, // 锁TTL(毫秒),请求发送后立即释放无需长TTL updatedAt: null, updatedBy: null } @@ -320,11 +321,11 @@ class ClaudeRelayConfigService { /** * 验证新会话请求 - * @param {Object} requestBody - 请求体 + * @param {Object} _requestBody - 请求体(预留参数,当前未使用) * @param {string} originalSessionId - 原始会话ID * @returns {Promise} { valid: boolean, error?: string, binding?: object, isNewSession?: boolean } */ - async validateNewSession(requestBody, originalSessionId) { + async validateNewSession(_requestBody, originalSessionId) { const cfg = await this.getConfig() if (!cfg.globalSessionBindingEnabled) { diff --git a/src/services/claudeRelayService.js b/src/services/claudeRelayService.js index 742372df..48b22413 100644 --- a/src/services/claudeRelayService.js +++ b/src/services/claudeRelayService.js @@ -151,7 +151,6 @@ class ClaudeRelayService { let upstreamRequest = null let queueLockAcquired = false let queueRequestId = null - let queueLockRenewalStopper = null let selectedAccountId = null try { @@ -255,10 +254,6 @@ class ClaudeRelayService { if (queueResult.acquired && !queueResult.skipped) { queueLockAcquired = true queueRequestId = queueResult.requestId - queueLockRenewalStopper = await userMessageQueueService.startLockRenewal( - accountId, - queueRequestId - ) logger.debug( `📬 User message queue lock acquired for account ${accountId}, requestId: ${queueRequestId}` ) @@ -339,6 +334,23 @@ class ClaudeRelayService { options ) + // 📬 请求已发送成功,立即释放队列锁(无需等待响应处理完成) + // 因为 Claude API 限流基于请求发送时刻计算(RPM),不是请求完成时刻 + if (queueLockAcquired && queueRequestId && selectedAccountId) { + try { + await userMessageQueueService.releaseQueueLock(selectedAccountId, queueRequestId) + queueLockAcquired = false // 标记已释放,防止 finally 重复释放 + logger.debug( + `📬 User message queue lock released early for account ${selectedAccountId}, requestId: ${queueRequestId}` + ) + } catch (releaseError) { + logger.error( + `❌ Failed to release user message queue lock early for account ${selectedAccountId}:`, + releaseError.message + ) + } + } + response.accountId = accountId response.accountType = accountType @@ -608,13 +620,13 @@ class ClaudeRelayService { ) throw error } finally { - // 📬 释放用户消息队列锁 + // 📬 释放用户消息队列锁(兜底,正常情况下已在请求发送后提前释放) if (queueLockAcquired && queueRequestId && selectedAccountId) { try { - if (queueLockRenewalStopper) { - queueLockRenewalStopper() - } await userMessageQueueService.releaseQueueLock(selectedAccountId, queueRequestId) + logger.debug( + `📬 User message queue lock released in finally for account ${selectedAccountId}, requestId: ${queueRequestId}` + ) } catch (releaseError) { logger.error( `❌ Failed to release user message queue lock for account ${selectedAccountId}:`, @@ -1245,7 +1257,6 @@ class ClaudeRelayService { ) { let queueLockAcquired = false let queueRequestId = null - let queueLockRenewalStopper = null let selectedAccountId = null try { @@ -1350,10 +1361,6 @@ class ClaudeRelayService { if (queueResult.acquired && !queueResult.skipped) { queueLockAcquired = true queueRequestId = queueResult.requestId - queueLockRenewalStopper = await userMessageQueueService.startLockRenewal( - accountId, - queueRequestId - ) logger.debug( `📬 User message queue lock acquired for account ${accountId} (stream), requestId: ${queueRequestId}` ) @@ -1425,19 +1432,36 @@ class ClaudeRelayService { sessionHash, streamTransformer, options, - isDedicatedOfficialAccount + isDedicatedOfficialAccount, + // 📬 新增回调:在收到响应头时释放队列锁 + async () => { + if (queueLockAcquired && queueRequestId && selectedAccountId) { + try { + await userMessageQueueService.releaseQueueLock(selectedAccountId, queueRequestId) + queueLockAcquired = false // 标记已释放,防止 finally 重复释放 + logger.debug( + `📬 User message queue lock released early for stream account ${selectedAccountId}, requestId: ${queueRequestId}` + ) + } catch (releaseError) { + logger.error( + `❌ Failed to release user message queue lock early for stream account ${selectedAccountId}:`, + releaseError.message + ) + } + } + } ) } catch (error) { logger.error(`❌ Claude stream relay with usage capture failed:`, error) throw error } finally { - // 📬 释放用户消息队列锁 + // 📬 释放用户消息队列锁(兜底,正常情况下已在收到响应头后提前释放) if (queueLockAcquired && queueRequestId && selectedAccountId) { try { - if (queueLockRenewalStopper) { - queueLockRenewalStopper() - } await userMessageQueueService.releaseQueueLock(selectedAccountId, queueRequestId) + logger.debug( + `📬 User message queue lock released in finally for stream account ${selectedAccountId}, requestId: ${queueRequestId}` + ) } catch (releaseError) { logger.error( `❌ Failed to release user message queue lock for stream account ${selectedAccountId}:`, @@ -1461,7 +1485,8 @@ class ClaudeRelayService { sessionHash, streamTransformer = null, requestOptions = {}, - isDedicatedOfficialAccount = false + isDedicatedOfficialAccount = false, + onResponseStart = null // 📬 新增:收到响应头时的回调,用于提前释放队列锁 ) { // 获取账户信息用于统一 User-Agent const account = await claudeAccountService.getAccount(accountId) @@ -1707,6 +1732,16 @@ class ClaudeRelayService { return } + // 📬 收到成功响应头(HTTP 200),立即调用回调释放队列锁 + // 此时请求已被 Claude API 接受并计入 RPM 配额,无需等待响应完成 + if (onResponseStart && typeof onResponseStart === 'function') { + try { + await onResponseStart() + } catch (callbackError) { + logger.error('❌ Error in onResponseStart callback:', callbackError.message) + } + } + let buffer = '' const allUsageData = [] // 收集所有的usage事件 let currentUsageData = {} // 当前正在收集的usage数据 diff --git a/src/services/userMessageQueueService.js b/src/services/userMessageQueueService.js index 6437ef4b..e35a9f64 100644 --- a/src/services/userMessageQueueService.js +++ b/src/services/userMessageQueueService.js @@ -14,9 +14,6 @@ const logger = require('../utils/logger') // 清理任务间隔 const CLEANUP_INTERVAL_MS = 60000 // 1分钟 -// 锁续租最大持续时间(从配置读取,与 REQUEST_TIMEOUT 保持一致) -const MAX_RENEWAL_DURATION_MS = config.requestTimeout || 10 * 60 * 1000 - // 轮询等待配置 const POLL_INTERVAL_BASE_MS = 50 // 基础轮询间隔 const POLL_INTERVAL_MAX_MS = 500 // 最大轮询间隔 @@ -25,8 +22,6 @@ const POLL_BACKOFF_FACTOR = 1.5 // 退避因子 class UserMessageQueueService { constructor() { this.cleanupTimer = null - // 跟踪活跃的续租定时器,用于服务关闭时清理 - this.activeRenewalTimers = new Map() } /** @@ -74,12 +69,13 @@ class UserMessageQueueService { */ async getConfig() { // 默认配置(防止 config.userMessageQueue 未定义) + // 注意:优化后的默认值 - 锁持有时间从分钟级降到毫秒级,无需长等待 const queueConfig = config.userMessageQueue || {} const defaults = { enabled: queueConfig.enabled ?? false, - delayMs: queueConfig.delayMs ?? 100, - timeoutMs: queueConfig.timeoutMs ?? 60000, - lockTtlMs: queueConfig.lockTtlMs ?? 120000 + delayMs: queueConfig.delayMs ?? 200, + timeoutMs: queueConfig.timeoutMs ?? 5000, // 从 60000 降到 5000,因为锁持有时间短 + lockTtlMs: queueConfig.lockTtlMs ?? 5000 // 从 120000 降到 5000,5秒足以覆盖请求发送 } // 尝试从 claudeRelayConfigService 获取 Web 界面配置 @@ -100,7 +96,10 @@ class UserMessageQueueService { webConfig.userMessageQueueTimeoutMs !== undefined ? webConfig.userMessageQueueTimeoutMs : defaults.timeoutMs, - lockTtlMs: defaults.lockTtlMs + lockTtlMs: + webConfig.userMessageQueueLockTtlMs !== undefined + ? webConfig.userMessageQueueLockTtlMs + : defaults.lockTtlMs } } catch { // 回退到环境变量配置 @@ -232,83 +231,6 @@ class UserMessageQueueService { return released } - /** - * 启动锁续租(防止长连接超过TTL导致锁丢失) - * @param {string} accountId - 账户ID - * @param {string} requestId - 请求ID - * @returns {Promise} 停止续租的函数 - */ - async startLockRenewal(accountId, requestId) { - const cfg = await this.getConfig() - if (!cfg.enabled || !accountId || !requestId) { - return () => {} - } - - const intervalMs = Math.max(10000, Math.floor(cfg.lockTtlMs / 2)) // 约一半TTL刷新一次 - const maxRenewals = Math.ceil(MAX_RENEWAL_DURATION_MS / intervalMs) // 最大续租次数 - const startTime = Date.now() - const timerKey = `${accountId}:${requestId}` - - let stopped = false - let renewalCount = 0 - - const stopRenewal = () => { - if (!stopped) { - clearInterval(timer) - stopped = true - this.activeRenewalTimers.delete(timerKey) - } - } - - const timer = setInterval(async () => { - if (stopped) { - return - } - - renewalCount++ - - // 检查是否超过最大续租次数或最大持续时间 - if (renewalCount > maxRenewals || Date.now() - startTime > MAX_RENEWAL_DURATION_MS) { - logger.warn(`📬 User message queue: max renewal duration exceeded, stopping renewal`, { - accountId, - requestId, - renewalCount, - durationMs: Date.now() - startTime - }) - stopRenewal() - return - } - - try { - const refreshed = await redis.refreshUserMessageLock(accountId, requestId, cfg.lockTtlMs) - if (!refreshed) { - // 锁可能已被释放或超时,停止续租 - logger.warn( - `📬 User message queue: failed to refresh lock (possibly lost), stop renewal`, - { - accountId, - requestId, - renewalCount - } - ) - stopRenewal() - } - } catch (error) { - logger.error('📬 User message queue: lock renewal error:', error) - } - }, intervalMs) - - // 避免阻止进程退出 - if (typeof timer.unref === 'function') { - timer.unref() - } - - // 跟踪活跃的定时器 - this.activeRenewalTimers.set(timerKey, { timer, stopRenewal, accountId, requestId, startTime }) - - return stopRenewal - } - /** * 获取队列统计信息 * @param {string} accountId - 账户ID @@ -385,32 +307,6 @@ class UserMessageQueueService { } } - /** - * 停止所有活跃的锁续租定时器(服务关闭时调用) - */ - stopAllRenewalTimers() { - const count = this.activeRenewalTimers.size - if (count > 0) { - for (const [key, { stopRenewal }] of this.activeRenewalTimers) { - try { - stopRenewal() - } catch (error) { - logger.error(`📬 User message queue: failed to stop renewal timer ${key}:`, error) - } - } - this.activeRenewalTimers.clear() - logger.info(`📬 User message queue: stopped ${count} active renewal timer(s)`) - } - } - - /** - * 获取活跃续租定时器数量(用于监控) - * @returns {number} - */ - getActiveRenewalCount() { - return this.activeRenewalTimers.size - } - /** * 清理孤儿锁 * 检测异常情况:锁存在但没有设置过期时间(lockTtlRaw === -1) diff --git a/tests/userMessageQueue.test.js b/tests/userMessageQueue.test.js index 1d9e544f..4fd7adb2 100644 --- a/tests/userMessageQueue.test.js +++ b/tests/userMessageQueue.test.js @@ -179,88 +179,6 @@ describe('UserMessageQueueService', () => { }) }) - describe('startLockRenewal', () => { - beforeEach(() => { - jest.useFakeTimers() - }) - - afterEach(() => { - jest.useRealTimers() - jest.restoreAllMocks() - }) - - it('should periodically refresh lock while enabled', async () => { - jest.spyOn(userMessageQueueService, 'getConfig').mockResolvedValue({ - enabled: true, - delayMs: 200, - timeoutMs: 30000, - lockTtlMs: 120000 - }) - const refreshSpy = jest.spyOn(redis, 'refreshUserMessageLock').mockResolvedValue(true) - - const stop = await userMessageQueueService.startLockRenewal('acct-1', 'req-1') - - jest.advanceTimersByTime(60000) // 半个TTL - await Promise.resolve() - - expect(refreshSpy).toHaveBeenCalledWith('acct-1', 'req-1', 120000) - - stop() - }) - - it('should no-op when queue disabled', async () => { - jest.spyOn(userMessageQueueService, 'getConfig').mockResolvedValue({ - enabled: false, - delayMs: 200, - timeoutMs: 30000, - lockTtlMs: 120000 - }) - const refreshSpy = jest.spyOn(redis, 'refreshUserMessageLock').mockResolvedValue(true) - - const stop = await userMessageQueueService.startLockRenewal('acct-1', 'req-1') - jest.advanceTimersByTime(120000) - await Promise.resolve() - - expect(refreshSpy).not.toHaveBeenCalled() - stop() - }) - - it('should track active renewal timer', async () => { - jest.spyOn(userMessageQueueService, 'getConfig').mockResolvedValue({ - enabled: true, - delayMs: 200, - timeoutMs: 30000, - lockTtlMs: 120000 - }) - jest.spyOn(redis, 'refreshUserMessageLock').mockResolvedValue(true) - - expect(userMessageQueueService.getActiveRenewalCount()).toBe(0) - - const stop = await userMessageQueueService.startLockRenewal('acct-1', 'req-1') - expect(userMessageQueueService.getActiveRenewalCount()).toBe(1) - - stop() - expect(userMessageQueueService.getActiveRenewalCount()).toBe(0) - }) - - it('should stop all renewal timers on service shutdown', async () => { - jest.spyOn(userMessageQueueService, 'getConfig').mockResolvedValue({ - enabled: true, - delayMs: 200, - timeoutMs: 30000, - lockTtlMs: 120000 - }) - jest.spyOn(redis, 'refreshUserMessageLock').mockResolvedValue(true) - - await userMessageQueueService.startLockRenewal('acct-1', 'req-1') - await userMessageQueueService.startLockRenewal('acct-2', 'req-2') - expect(userMessageQueueService.getActiveRenewalCount()).toBe(2) - - userMessageQueueService.stopAllRenewalTimers() - expect(userMessageQueueService.getActiveRenewalCount()).toBe(0) - }) - }) - describe('acquireQueueLock', () => { afterEach(() => { jest.restoreAllMocks()