From f5d1c25295ea5268eb8fc206f4587280c9793dd5 Mon Sep 17 00:00:00 2001 From: QTom Date: Tue, 9 Dec 2025 17:04:01 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E7=94=A8?= =?UTF-8?q?=E6=88=B7=E6=B6=88=E6=81=AF=E4=B8=B2=E8=A1=8C=E9=98=9F=E5=88=97?= =?UTF-8?q?=E5=8A=9F=E8=83=BD=EF=BC=8C=E9=98=B2=E6=AD=A2=E5=90=8C=E8=B4=A6?= =?UTF-8?q?=E6=88=B7=E5=B9=B6=E5=8F=91=E8=AF=B7=E6=B1=82=E8=A7=A6=E5=8F=91?= =?UTF-8?q?=E9=99=90=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 userMessageQueueService.js 实现基于 Redis 的队列锁机制 - 在 claudeRelayService、claudeConsoleRelayService、bedrockRelayService、ccrRelayService 中集成队列锁 - 添加 Redis 原子性 Lua 脚本:acquireUserMessageLock、releaseUserMessageLock、refreshUserMessageLock - 支持锁续租机制,防止长时间请求锁过期 - 添加可配置参数:USER_MESSAGE_QUEUE_ENABLED、USER_MESSAGE_QUEUE_DELAY_MS、USER_MESSAGE_QUEUE_TIMEOUT_MS - 添加 Web 管理界面配置入口 - 添加 logger.performance 方法用于结构化性能日志 - 添加完整单元测试 (tests/userMessageQueue.test.js) --- CLAUDE.md | 8 + src/app.js | 18 + src/models/redis.js | 245 +++++++++++ src/routes/admin/claudeRelayConfig.js | 40 +- src/services/bedrockRelayService.js | 162 +++++++ src/services/ccrRelayService.js | 161 +++++++ src/services/claudeConsoleRelayService.js | 161 +++++++ src/services/claudeRelayConfigService.js | 4 + src/services/claudeRelayService.js | 177 +++++++- src/services/droidRelayService.js | 2 +- src/services/userMessageQueueService.js | 448 +++++++++++++++++++ src/utils/logger.js | 23 +- tests/userMessageQueue.test.js | 512 ++++++++++++++++++++++ web/admin-spa/src/views/SettingsView.vue | 105 ++++- 14 files changed, 2048 insertions(+), 18 deletions(-) create mode 100644 src/services/userMessageQueueService.js create mode 100644 tests/userMessageQueue.test.js diff --git a/CLAUDE.md b/CLAUDE.md index 1eac1b03..c918feef 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -60,6 +60,7 @@ Claude Relay Service 是一个多平台 AI API 中转服务,支持 **Claude ( - **apiKeyService.js**: API Key管理,验证、限流、使用统计、成本计算 - **userService.js**: 用户管理系统,支持用户注册、登录、API Key管理 +- **userMessageQueueService.js**: 用户消息串行队列,防止同账户并发用户消息触发限流 - **pricingService.js**: 定价服务,模型价格管理和成本计算 - **costInitService.js**: 成本数据初始化服务 - **webhookService.js**: Webhook通知服务 @@ -185,6 +186,9 @@ npm run service:stop # 停止服务 - `CLAUDE_OVERLOAD_HANDLING_MINUTES`: Claude 529错误处理持续时间(分钟,0表示禁用) - `STICKY_SESSION_TTL_HOURS`: 粘性会话TTL(小时,默认1) - `STICKY_SESSION_RENEWAL_THRESHOLD_MINUTES`: 粘性会话续期阈值(分钟,默认0) +- `USER_MESSAGE_QUEUE_ENABLED`: 启用用户消息串行队列(默认true) +- `USER_MESSAGE_QUEUE_DELAY_MS`: 用户消息请求间隔(毫秒,默认200) +- `USER_MESSAGE_QUEUE_TIMEOUT_MS`: 队列等待超时(毫秒,默认30000) - `METRICS_WINDOW`: 实时指标统计窗口(分钟,1-60,默认5) - `MAX_API_KEYS_PER_USER`: 每用户最大API Key数量(默认1) - `ALLOW_USER_DELETE_API_KEYS`: 允许用户删除自己的API Keys(默认false) @@ -337,6 +341,7 @@ npm run setup # 自动生成密钥并创建管理员账户 11. **速率限制未清理**: rateLimitCleanupService每5分钟自动清理过期限流状态 12. **成本统计不准确**: 运行 `npm run init:costs` 初始化成本数据,检查pricingService是否正确加载模型价格 13. **缓存命中率低**: 查看缓存监控统计,调整LRU缓存大小配置 +14. **用户消息队列超时**: 检查 `USER_MESSAGE_QUEUE_TIMEOUT_MS` 配置是否合理,查看日志中的 `queue_timeout` 错误,可通过 Web 界面或 `USER_MESSAGE_QUEUE_ENABLED=false` 禁用此功能 ### 调试工具 @@ -510,6 +515,9 @@ npm run setup # 自动生成密钥并创建管理员账户 - `concurrency:{accountId}` - Redis Sorted Set实现的并发计数 - **Webhook配置**: - `webhook_config:{id}` - Webhook配置 +- **用户消息队列**: + - `user_msg_queue_lock:{accountId}` - 用户消息队列锁(当前持有者requestId) + - `user_msg_queue_last:{accountId}` - 上次请求完成时间戳(用于延迟计算) - **系统信息**: - `system_info` - 系统状态缓存 - `model_pricing` - 模型价格数据(pricingService) diff --git a/src/app.js b/src/app.js index 77047247..2a85850e 100644 --- a/src/app.js +++ b/src/app.js @@ -625,6 +625,14 @@ class Application { }, 60000) // 每分钟执行一次 logger.info('🔢 Concurrency cleanup task started (running every 1 minute)') + + // 📬 启动用户消息队列服务 + const userMessageQueueService = require('./services/userMessageQueueService') + // 先清理服务重启后残留的锁,防止旧锁阻塞新请求 + userMessageQueueService.cleanupStaleLocks().then(() => { + // 然后启动定时清理任务 + userMessageQueueService.startCleanupTask() + }) } setupGracefulShutdown() { @@ -661,6 +669,16 @@ class Application { logger.error('❌ Error stopping rate limit cleanup service:', error) } + // 停止用户消息队列清理服务和续租定时器 + try { + const userMessageQueueService = require('./services/userMessageQueueService') + userMessageQueueService.stopAllRenewalTimers() + userMessageQueueService.stopCleanupTask() + logger.info('📬 User message queue service stopped') + } catch (error) { + logger.error('❌ Error stopping user message queue service:', error) + } + // 停止费用排序索引服务 try { const costRankService = require('./services/costRankService') diff --git a/src/models/redis.js b/src/models/redis.js index 2393f3b3..a36a27aa 100644 --- a/src/models/redis.js +++ b/src/models/redis.js @@ -2556,4 +2556,249 @@ redisClient.getDateStringInTimezone = getDateStringInTimezone redisClient.getHourInTimezone = getHourInTimezone redisClient.getWeekStringInTimezone = getWeekStringInTimezone +// ============== 用户消息队列相关方法 ============== + +/** + * 尝试获取用户消息队列锁 + * 使用 Lua 脚本保证原子性 + * @param {string} accountId - 账户ID + * @param {string} requestId - 请求ID + * @param {number} lockTtlMs - 锁 TTL(毫秒) + * @param {number} delayMs - 请求间隔(毫秒) + * @returns {Promise<{acquired: boolean, waitMs: number}>} + * - acquired: 是否成功获取锁 + * - waitMs: 需要等待的毫秒数(-1表示被占用需等待,>=0表示需要延迟的毫秒数) + */ +redisClient.acquireUserMessageLock = async function (accountId, requestId, lockTtlMs, delayMs) { + const lockKey = `user_msg_queue_lock:${accountId}` + const lastTimeKey = `user_msg_queue_last:${accountId}` + + const script = ` + local lockKey = KEYS[1] + local lastTimeKey = KEYS[2] + local requestId = ARGV[1] + local lockTtl = tonumber(ARGV[2]) + local delayMs = tonumber(ARGV[3]) + + -- 检查锁是否空闲 + local currentLock = redis.call('GET', lockKey) + if currentLock == false then + -- 检查是否需要延迟 + local lastTime = redis.call('GET', lastTimeKey) + local now = redis.call('TIME') + local nowMs = tonumber(now[1]) * 1000 + math.floor(tonumber(now[2]) / 1000) + + if lastTime then + local elapsed = nowMs - tonumber(lastTime) + if elapsed < delayMs then + -- 需要等待的毫秒数 + return {0, delayMs - elapsed} + end + end + + -- 获取锁 + redis.call('SET', lockKey, requestId, 'PX', lockTtl) + return {1, 0} + end + + -- 锁被占用,返回等待 + return {0, -1} + ` + + try { + const result = await this.client.eval( + script, + 2, + lockKey, + lastTimeKey, + requestId, + lockTtlMs, + delayMs + ) + return { + acquired: result[0] === 1, + waitMs: result[1] + } + } catch (error) { + logger.error(`Failed to acquire user message lock for account ${accountId}:`, error) + // 返回 redisError 标记,让上层能区分 Redis 故障和正常锁占用 + return { acquired: false, waitMs: -1, redisError: true, errorMessage: error.message } + } +} + +/** + * 续租用户消息队列锁(仅锁持有者可续租) + * @param {string} accountId - 账户ID + * @param {string} requestId - 请求ID + * @param {number} lockTtlMs - 锁 TTL(毫秒) + * @returns {Promise} 是否续租成功(只有锁持有者才能续租) + */ +redisClient.refreshUserMessageLock = async function (accountId, requestId, lockTtlMs) { + const lockKey = `user_msg_queue_lock:${accountId}` + + const script = ` + local lockKey = KEYS[1] + local requestId = ARGV[1] + local lockTtl = tonumber(ARGV[2]) + + local currentLock = redis.call('GET', lockKey) + if currentLock == requestId then + redis.call('PEXPIRE', lockKey, lockTtl) + return 1 + end + return 0 + ` + + try { + const result = await this.client.eval(script, 1, lockKey, requestId, lockTtlMs) + return result === 1 + } catch (error) { + logger.error(`Failed to refresh user message lock for account ${accountId}:`, error) + return false + } +} + +/** + * 释放用户消息队列锁并记录完成时间 + * @param {string} accountId - 账户ID + * @param {string} requestId - 请求ID + * @returns {Promise} 是否成功释放 + */ +redisClient.releaseUserMessageLock = async function (accountId, requestId) { + const lockKey = `user_msg_queue_lock:${accountId}` + const lastTimeKey = `user_msg_queue_last:${accountId}` + + const script = ` + local lockKey = KEYS[1] + local lastTimeKey = KEYS[2] + local requestId = ARGV[1] + + -- 验证锁持有者 + local currentLock = redis.call('GET', lockKey) + if currentLock == requestId then + -- 记录完成时间 + local now = redis.call('TIME') + local nowMs = tonumber(now[1]) * 1000 + math.floor(tonumber(now[2]) / 1000) + redis.call('SET', lastTimeKey, nowMs, 'EX', 60) -- 60秒后过期 + + -- 删除锁 + redis.call('DEL', lockKey) + return 1 + end + return 0 + ` + + try { + const result = await this.client.eval(script, 2, lockKey, lastTimeKey, requestId) + return result === 1 + } catch (error) { + logger.error(`Failed to release user message lock for account ${accountId}:`, error) + return false + } +} + +/** + * 强制释放用户消息队列锁(用于清理孤儿锁) + * @param {string} accountId - 账户ID + * @returns {Promise} 是否成功释放 + */ +redisClient.forceReleaseUserMessageLock = async function (accountId) { + const lockKey = `user_msg_queue_lock:${accountId}` + + try { + await this.client.del(lockKey) + return true + } catch (error) { + logger.error(`Failed to force release user message lock for account ${accountId}:`, error) + return false + } +} + +/** + * 获取用户消息队列统计信息(用于调试) + * @param {string} accountId - 账户ID + * @returns {Promise} 队列统计 + */ +redisClient.getUserMessageQueueStats = async function (accountId) { + const lockKey = `user_msg_queue_lock:${accountId}` + const lastTimeKey = `user_msg_queue_last:${accountId}` + + try { + const [lockHolder, lastTime, lockTtl] = await Promise.all([ + this.client.get(lockKey), + this.client.get(lastTimeKey), + this.client.pttl(lockKey) + ]) + + return { + accountId, + isLocked: !!lockHolder, + lockHolder, + lockTtlMs: lockTtl > 0 ? lockTtl : 0, + lockTtlRaw: lockTtl, // 原始 PTTL 值:>0 有TTL,-1 无过期时间,-2 键不存在 + lastCompletedAt: lastTime ? new Date(parseInt(lastTime)).toISOString() : null + } + } catch (error) { + logger.error(`Failed to get user message queue stats for account ${accountId}:`, error) + return { + accountId, + isLocked: false, + lockHolder: null, + lockTtlMs: 0, + lockTtlRaw: -2, + lastCompletedAt: null + } + } +} + +/** + * 扫描所有用户消息队列锁(用于清理任务) + * @returns {Promise} 账户ID列表 + */ +redisClient.scanUserMessageQueueLocks = async function () { + const accountIds = [] + let cursor = '0' + let iterations = 0 + const MAX_ITERATIONS = 1000 // 防止无限循环 + + try { + do { + const [newCursor, keys] = await this.client.scan( + cursor, + 'MATCH', + 'user_msg_queue_lock:*', + 'COUNT', + 100 + ) + cursor = newCursor + iterations++ + + for (const key of keys) { + const accountId = key.replace('user_msg_queue_lock:', '') + accountIds.push(accountId) + } + + // 防止无限循环 + if (iterations >= MAX_ITERATIONS) { + logger.warn( + `📬 User message queue: SCAN reached max iterations (${MAX_ITERATIONS}), stopping early`, + { foundLocks: accountIds.length } + ) + break + } + } while (cursor !== '0') + + if (accountIds.length > 0) { + logger.debug( + `📬 User message queue: scanned ${accountIds.length} lock(s) in ${iterations} iteration(s)` + ) + } + + return accountIds + } catch (error) { + logger.error('Failed to scan user message queue locks:', error) + return [] + } +} + module.exports = redisClient diff --git a/src/routes/admin/claudeRelayConfig.js b/src/routes/admin/claudeRelayConfig.js index e3c78ef4..cbe98ecf 100644 --- a/src/routes/admin/claudeRelayConfig.js +++ b/src/routes/admin/claudeRelayConfig.js @@ -40,7 +40,10 @@ router.put('/claude-relay-config', authenticateAdmin, async (req, res) => { claudeCodeOnlyEnabled, globalSessionBindingEnabled, sessionBindingErrorMessage, - sessionBindingTtlDays + sessionBindingTtlDays, + userMessageQueueEnabled, + userMessageQueueDelayMs, + userMessageQueueTimeoutMs } = req.body // 验证输入 @@ -78,6 +81,35 @@ router.put('/claude-relay-config', authenticateAdmin, async (req, res) => { } } + // 验证用户消息队列配置 + if (userMessageQueueEnabled !== undefined && typeof userMessageQueueEnabled !== 'boolean') { + return res.status(400).json({ error: 'userMessageQueueEnabled must be a boolean' }) + } + + if (userMessageQueueDelayMs !== undefined) { + if ( + typeof userMessageQueueDelayMs !== 'number' || + userMessageQueueDelayMs < 0 || + userMessageQueueDelayMs > 10000 + ) { + return res + .status(400) + .json({ error: 'userMessageQueueDelayMs must be a number between 0 and 10000' }) + } + } + + if (userMessageQueueTimeoutMs !== undefined) { + if ( + typeof userMessageQueueTimeoutMs !== 'number' || + userMessageQueueTimeoutMs < 1000 || + userMessageQueueTimeoutMs > 300000 + ) { + return res + .status(400) + .json({ error: 'userMessageQueueTimeoutMs must be a number between 1000 and 300000' }) + } + } + const updateData = {} if (claudeCodeOnlyEnabled !== undefined) updateData.claudeCodeOnlyEnabled = claudeCodeOnlyEnabled @@ -87,6 +119,12 @@ router.put('/claude-relay-config', authenticateAdmin, async (req, res) => { updateData.sessionBindingErrorMessage = sessionBindingErrorMessage if (sessionBindingTtlDays !== undefined) updateData.sessionBindingTtlDays = sessionBindingTtlDays + if (userMessageQueueEnabled !== undefined) + updateData.userMessageQueueEnabled = userMessageQueueEnabled + if (userMessageQueueDelayMs !== undefined) + updateData.userMessageQueueDelayMs = userMessageQueueDelayMs + if (userMessageQueueTimeoutMs !== undefined) + updateData.userMessageQueueTimeoutMs = userMessageQueueTimeoutMs const updatedConfig = await claudeRelayConfigService.updateConfig( updateData, diff --git a/src/services/bedrockRelayService.js b/src/services/bedrockRelayService.js index e27dfd5c..c14a5a40 100644 --- a/src/services/bedrockRelayService.js +++ b/src/services/bedrockRelayService.js @@ -6,6 +6,7 @@ const { const { fromEnv } = require('@aws-sdk/credential-providers') const logger = require('../utils/logger') const config = require('../../config/config') +const userMessageQueueService = require('./userMessageQueueService') class BedrockRelayService { constructor() { @@ -69,7 +70,70 @@ class BedrockRelayService { // 处理非流式请求 async handleNonStreamRequest(requestBody, bedrockAccount = null) { + const accountId = bedrockAccount?.id + let queueLockAcquired = false + let queueRequestId = null + let queueLockRenewalStopper = null + try { + // 📬 用户消息队列处理 + if (userMessageQueueService.isUserMessageRequest(requestBody)) { + // 校验 accountId 非空,避免空值污染队列锁键 + if (!accountId || accountId === '') { + logger.error('❌ accountId missing for queue lock in Bedrock handleNonStreamRequest') + throw new Error('accountId missing for queue lock') + } + const queueResult = await userMessageQueueService.acquireQueueLock(accountId) + if (!queueResult.acquired && !queueResult.skipped) { + // 区分 Redis 后端错误和队列超时 + const isBackendError = queueResult.error === 'queue_backend_error' + const errorCode = isBackendError ? 'QUEUE_BACKEND_ERROR' : 'QUEUE_TIMEOUT' + const errorType = isBackendError ? 'queue_backend_error' : 'queue_timeout' + const errorMessage = isBackendError + ? 'Queue service temporarily unavailable, please retry later' + : 'User message queue wait timeout, please retry later' + const statusCode = isBackendError ? 500 : 503 + + // 结构化性能日志,用于后续统计 + logger.performance('user_message_queue_error', { + errorType, + errorCode, + accountId, + statusCode, + backendError: isBackendError ? queueResult.errorMessage : undefined + }) + + logger.warn( + `📬 User message queue ${errorType} for Bedrock account ${accountId}`, + isBackendError ? { backendError: queueResult.errorMessage } : {} + ) + return { + statusCode, + headers: { + 'Content-Type': 'application/json', + 'x-user-message-queue-error': errorType + }, + body: JSON.stringify({ + type: 'error', + error: { + type: errorType, + code: errorCode, + message: errorMessage + } + }), + success: false + } + } + if (queueResult.acquired && !queueResult.skipped) { + queueLockAcquired = true + queueRequestId = queueResult.requestId + queueLockRenewalStopper = await userMessageQueueService.startLockRenewal( + accountId, + queueRequestId + ) + } + } + const modelId = this._selectModel(requestBody, bedrockAccount) const region = this._selectRegion(modelId, bedrockAccount) const client = this._getBedrockClient(region, bedrockAccount) @@ -106,12 +170,95 @@ class BedrockRelayService { } catch (error) { logger.error('❌ Bedrock非流式请求失败:', error) throw this._handleBedrockError(error) + } finally { + // 📬 释放用户消息队列锁 + if (queueLockAcquired && queueRequestId && accountId) { + try { + if (queueLockRenewalStopper) { + queueLockRenewalStopper() + } + await userMessageQueueService.releaseQueueLock(accountId, queueRequestId) + } catch (releaseError) { + logger.error( + `❌ Failed to release user message queue lock for Bedrock account ${accountId}:`, + releaseError.message + ) + } + } } } // 处理流式请求 async handleStreamRequest(requestBody, bedrockAccount = null, res) { + const accountId = bedrockAccount?.id + let queueLockAcquired = false + let queueRequestId = null + let queueLockRenewalStopper = null + try { + // 📬 用户消息队列处理 + if (userMessageQueueService.isUserMessageRequest(requestBody)) { + // 校验 accountId 非空,避免空值污染队列锁键 + if (!accountId || accountId === '') { + logger.error('❌ accountId missing for queue lock in Bedrock handleStreamRequest') + throw new Error('accountId missing for queue lock') + } + const queueResult = await userMessageQueueService.acquireQueueLock(accountId) + if (!queueResult.acquired && !queueResult.skipped) { + // 区分 Redis 后端错误和队列超时 + const isBackendError = queueResult.error === 'queue_backend_error' + const errorCode = isBackendError ? 'QUEUE_BACKEND_ERROR' : 'QUEUE_TIMEOUT' + const errorType = isBackendError ? 'queue_backend_error' : 'queue_timeout' + const errorMessage = isBackendError + ? 'Queue service temporarily unavailable, please retry later' + : 'User message queue wait timeout, please retry later' + const statusCode = isBackendError ? 500 : 503 + + // 结构化性能日志,用于后续统计 + logger.performance('user_message_queue_error', { + errorType, + errorCode, + accountId, + statusCode, + stream: true, + backendError: isBackendError ? queueResult.errorMessage : undefined + }) + + logger.warn( + `📬 User message queue ${errorType} for Bedrock account ${accountId} (stream)`, + isBackendError ? { backendError: queueResult.errorMessage } : {} + ) + if (!res.headersSent) { + res.writeHead(statusCode, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + 'x-user-message-queue-error': errorType + }) + } + const errorEvent = `event: error\ndata: ${JSON.stringify({ + type: 'error', + error: { + type: errorType, + code: errorCode, + message: errorMessage + } + })}\n\n` + res.write(errorEvent) + res.write('data: [DONE]\n\n') + res.end() + return { success: false, error: errorType } + } + if (queueResult.acquired && !queueResult.skipped) { + queueLockAcquired = true + queueRequestId = queueResult.requestId + queueLockRenewalStopper = await userMessageQueueService.startLockRenewal( + accountId, + queueRequestId + ) + } + } + const modelId = this._selectModel(requestBody, bedrockAccount) const region = this._selectRegion(modelId, bedrockAccount) const client = this._getBedrockClient(region, bedrockAccount) @@ -191,6 +338,21 @@ class BedrockRelayService { res.end() throw this._handleBedrockError(error) + } finally { + // 📬 释放用户消息队列锁 + if (queueLockAcquired && queueRequestId && accountId) { + try { + if (queueLockRenewalStopper) { + queueLockRenewalStopper() + } + await userMessageQueueService.releaseQueueLock(accountId, queueRequestId) + } catch (releaseError) { + logger.error( + `❌ Failed to release user message queue lock for Bedrock stream account ${accountId}:`, + releaseError.message + ) + } + } } } diff --git a/src/services/ccrRelayService.js b/src/services/ccrRelayService.js index 50ad7b58..5cd1a2a0 100644 --- a/src/services/ccrRelayService.js +++ b/src/services/ccrRelayService.js @@ -3,6 +3,7 @@ const ccrAccountService = require('./ccrAccountService') const logger = require('../utils/logger') const config = require('../../config/config') const { parseVendorPrefixedModel } = require('../utils/modelHelper') +const userMessageQueueService = require('./userMessageQueueService') class CcrRelayService { constructor() { @@ -21,8 +22,69 @@ class CcrRelayService { ) { let abortController = null let account = null + let queueLockAcquired = false + let queueRequestId = null + let queueLockRenewalStopper = null try { + // 📬 用户消息队列处理 + if (userMessageQueueService.isUserMessageRequest(requestBody)) { + // 校验 accountId 非空,避免空值污染队列锁键 + if (!accountId || accountId === '') { + logger.error('❌ accountId missing for queue lock in CCR relayRequest') + throw new Error('accountId missing for queue lock') + } + const queueResult = await userMessageQueueService.acquireQueueLock(accountId) + if (!queueResult.acquired && !queueResult.skipped) { + // 区分 Redis 后端错误和队列超时 + const isBackendError = queueResult.error === 'queue_backend_error' + const errorCode = isBackendError ? 'QUEUE_BACKEND_ERROR' : 'QUEUE_TIMEOUT' + const errorType = isBackendError ? 'queue_backend_error' : 'queue_timeout' + const errorMessage = isBackendError + ? 'Queue service temporarily unavailable, please retry later' + : 'User message queue wait timeout, please retry later' + const statusCode = isBackendError ? 500 : 503 + + // 结构化性能日志,用于后续统计 + logger.performance('user_message_queue_error', { + errorType, + errorCode, + accountId, + statusCode, + backendError: isBackendError ? queueResult.errorMessage : undefined + }) + + logger.warn( + `📬 User message queue ${errorType} for CCR account ${accountId}`, + isBackendError ? { backendError: queueResult.errorMessage } : {} + ) + return { + statusCode, + headers: { + 'Content-Type': 'application/json', + 'x-user-message-queue-error': errorType + }, + body: JSON.stringify({ + type: 'error', + error: { + type: errorType, + code: errorCode, + message: errorMessage + } + }), + accountId + } + } + if (queueResult.acquired && !queueResult.skipped) { + queueLockAcquired = true + queueRequestId = queueResult.requestId + queueLockRenewalStopper = await userMessageQueueService.startLockRenewal( + accountId, + queueRequestId + ) + } + } + // 获取账户信息 account = await ccrAccountService.getAccount(accountId) if (!account) { @@ -233,6 +295,21 @@ class CcrRelayService { ) throw error + } finally { + // 📬 释放用户消息队列锁 + if (queueLockAcquired && queueRequestId && accountId) { + try { + if (queueLockRenewalStopper) { + queueLockRenewalStopper() + } + await userMessageQueueService.releaseQueueLock(accountId, queueRequestId) + } catch (releaseError) { + logger.error( + `❌ Failed to release user message queue lock for CCR account ${accountId}:`, + releaseError.message + ) + } + } } } @@ -248,7 +325,76 @@ class CcrRelayService { options = {} ) { let account = null + let queueLockAcquired = false + let queueRequestId = null + let queueLockRenewalStopper = null + try { + // 📬 用户消息队列处理 + if (userMessageQueueService.isUserMessageRequest(requestBody)) { + // 校验 accountId 非空,避免空值污染队列锁键 + if (!accountId || accountId === '') { + logger.error( + '❌ accountId missing for queue lock in CCR relayStreamRequestWithUsageCapture' + ) + throw new Error('accountId missing for queue lock') + } + const queueResult = await userMessageQueueService.acquireQueueLock(accountId) + if (!queueResult.acquired && !queueResult.skipped) { + // 区分 Redis 后端错误和队列超时 + const isBackendError = queueResult.error === 'queue_backend_error' + const errorCode = isBackendError ? 'QUEUE_BACKEND_ERROR' : 'QUEUE_TIMEOUT' + const errorType = isBackendError ? 'queue_backend_error' : 'queue_timeout' + const errorMessage = isBackendError + ? 'Queue service temporarily unavailable, please retry later' + : 'User message queue wait timeout, please retry later' + const statusCode = isBackendError ? 500 : 503 + + // 结构化性能日志,用于后续��计 + logger.performance('user_message_queue_error', { + errorType, + errorCode, + accountId, + statusCode, + stream: true, + backendError: isBackendError ? queueResult.errorMessage : undefined + }) + + logger.warn( + `📬 User message queue ${errorType} for CCR account ${accountId} (stream)`, + isBackendError ? { backendError: queueResult.errorMessage } : {} + ) + if (!responseStream.headersSent) { + responseStream.writeHead(statusCode, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + 'x-user-message-queue-error': errorType + }) + } + const errorEvent = `event: error\ndata: ${JSON.stringify({ + type: 'error', + error: { + type: errorType, + code: errorCode, + message: errorMessage + } + })}\n\n` + responseStream.write(errorEvent) + responseStream.write('data: [DONE]\n\n') + responseStream.end() + return + } + if (queueResult.acquired && !queueResult.skipped) { + queueLockAcquired = true + queueRequestId = queueResult.requestId + queueLockRenewalStopper = await userMessageQueueService.startLockRenewal( + accountId, + queueRequestId + ) + } + } + // 获取账户信息 account = await ccrAccountService.getAccount(accountId) if (!account) { @@ -304,6 +450,21 @@ class CcrRelayService { } catch (error) { logger.error(`❌ CCR stream relay failed (Account: ${account?.name || accountId}):`, error) throw error + } finally { + // 📬 释放用户消息队列锁 + if (queueLockAcquired && queueRequestId && accountId) { + try { + if (queueLockRenewalStopper) { + queueLockRenewalStopper() + } + await userMessageQueueService.releaseQueueLock(accountId, queueRequestId) + } catch (releaseError) { + logger.error( + `❌ Failed to release user message queue lock for CCR stream account ${accountId}:`, + releaseError.message + ) + } + } } } diff --git a/src/services/claudeConsoleRelayService.js b/src/services/claudeConsoleRelayService.js index 08e56653..c8c2c4b8 100644 --- a/src/services/claudeConsoleRelayService.js +++ b/src/services/claudeConsoleRelayService.js @@ -9,6 +9,7 @@ const { sanitizeErrorMessage, isAccountDisabledError } = require('../utils/errorSanitizer') +const userMessageQueueService = require('./userMessageQueueService') class ClaudeConsoleRelayService { constructor() { @@ -29,8 +30,73 @@ class ClaudeConsoleRelayService { let account = null const requestId = uuidv4() // 用于并发追踪 let concurrencyAcquired = false + let queueLockAcquired = false + let queueRequestId = null + let queueLockRenewalStopper = null try { + // 📬 用户消息队列处理:如果是用户消息请求,需要获取队列锁 + if (userMessageQueueService.isUserMessageRequest(requestBody)) { + // 校验 accountId 非空,避免空值污染队列锁键 + if (!accountId || accountId === '') { + logger.error('❌ accountId missing for queue lock in console relayRequest') + throw new Error('accountId missing for queue lock') + } + const queueResult = await userMessageQueueService.acquireQueueLock(accountId) + if (!queueResult.acquired && !queueResult.skipped) { + // 区分 Redis 后端错误和队列超时 + const isBackendError = queueResult.error === 'queue_backend_error' + const errorCode = isBackendError ? 'QUEUE_BACKEND_ERROR' : 'QUEUE_TIMEOUT' + const errorType = isBackendError ? 'queue_backend_error' : 'queue_timeout' + const errorMessage = isBackendError + ? 'Queue service temporarily unavailable, please retry later' + : 'User message queue wait timeout, please retry later' + const statusCode = isBackendError ? 500 : 503 + + // 结构化性能日志,用于后续统计 + logger.performance('user_message_queue_error', { + errorType, + errorCode, + accountId, + statusCode, + apiKeyName: apiKeyData.name, + backendError: isBackendError ? queueResult.errorMessage : undefined + }) + + logger.warn( + `📬 User message queue ${errorType} for console account ${accountId}, key: ${apiKeyData.name}`, + isBackendError ? { backendError: queueResult.errorMessage } : {} + ) + return { + statusCode, + headers: { + 'Content-Type': 'application/json', + 'x-user-message-queue-error': errorType + }, + body: JSON.stringify({ + type: 'error', + error: { + type: errorType, + code: errorCode, + message: errorMessage + } + }), + accountId + } + } + if (queueResult.acquired && !queueResult.skipped) { + queueLockAcquired = true + queueRequestId = queueResult.requestId + queueLockRenewalStopper = await userMessageQueueService.startLockRenewal( + accountId, + queueRequestId + ) + logger.debug( + `📬 User message queue lock acquired for console account ${accountId}, requestId: ${queueRequestId}` + ) + } + } + // 获取账户信息 account = await claudeConsoleAccountService.getAccount(accountId) if (!account) { @@ -366,6 +432,21 @@ class ClaudeConsoleRelayService { ) } } + + // 📬 释放用户消息队列锁 + if (queueLockAcquired && queueRequestId && accountId) { + try { + if (queueLockRenewalStopper) { + queueLockRenewalStopper() + } + await userMessageQueueService.releaseQueueLock(accountId, queueRequestId) + } catch (releaseError) { + logger.error( + `❌ Failed to release user message queue lock for account ${accountId}:`, + releaseError.message + ) + } + } } } @@ -384,8 +465,73 @@ class ClaudeConsoleRelayService { const requestId = uuidv4() // 用于并发追踪 let concurrencyAcquired = false let leaseRefreshInterval = null // 租约刷新定时器 + let queueLockAcquired = false + let queueRequestId = null + let queueLockRenewalStopper = null try { + // 📬 用户消息队列处理:如果是用户消息请求,需要获取队列锁 + if (userMessageQueueService.isUserMessageRequest(requestBody)) { + // 校验 accountId 非空,避免空值污染队列锁键 + if (!accountId || accountId === '') { + logger.error( + '❌ accountId missing for queue lock in console relayStreamRequestWithUsageCapture' + ) + throw new Error('accountId missing for queue lock') + } + const queueResult = await userMessageQueueService.acquireQueueLock(accountId) + if (!queueResult.acquired && !queueResult.skipped) { + // 区分 Redis 后端错误和队列超时 + const isBackendError = queueResult.error === 'queue_backend_error' + const errorCode = isBackendError ? 'QUEUE_BACKEND_ERROR' : 'QUEUE_TIMEOUT' + const errorType = isBackendError ? 'queue_backend_error' : 'queue_timeout' + const errorMessage = isBackendError + ? 'Queue service temporarily unavailable, please retry later' + : 'User message queue wait timeout, please retry later' + const statusCode = isBackendError ? 500 : 503 + + // 结构化性能日志,用于后续统计 + logger.performance('user_message_queue_error', { + errorType, + errorCode, + accountId, + statusCode, + stream: true, + apiKeyName: apiKeyData.name, + backendError: isBackendError ? queueResult.errorMessage : undefined + }) + + logger.warn( + `📬 User message queue ${errorType} for console account ${accountId} (stream), key: ${apiKeyData.name}`, + isBackendError ? { backendError: queueResult.errorMessage } : {} + ) + if (!responseStream.headersSent) { + responseStream.writeHead(statusCode, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + 'x-user-message-queue-error': errorType + }) + } + const errorEvent = `event: error\ndata: ${JSON.stringify({ type: 'error', error: { type: errorType, code: errorCode, message: errorMessage } })}\n\n` + responseStream.write(errorEvent) + responseStream.write('data: [DONE]\n\n') + responseStream.end() + return + } + if (queueResult.acquired && !queueResult.skipped) { + queueLockAcquired = true + queueRequestId = queueResult.requestId + queueLockRenewalStopper = await userMessageQueueService.startLockRenewal( + accountId, + queueRequestId + ) + logger.debug( + `📬 User message queue lock acquired for console account ${accountId} (stream), requestId: ${queueRequestId}` + ) + } + } + // 获取账户信息 account = await claudeConsoleAccountService.getAccount(accountId) if (!account) { @@ -517,6 +663,21 @@ class ClaudeConsoleRelayService { ) } } + + // 📬 释放用户消息队列锁 + if (queueLockAcquired && queueRequestId && accountId) { + try { + if (queueLockRenewalStopper) { + queueLockRenewalStopper() + } + await userMessageQueueService.releaseQueueLock(accountId, queueRequestId) + } catch (releaseError) { + logger.error( + `❌ Failed to release user message queue lock for stream account ${accountId}:`, + releaseError.message + ) + } + } } } diff --git a/src/services/claudeRelayConfigService.js b/src/services/claudeRelayConfigService.js index 5d4c3bd5..b4e7d0c1 100644 --- a/src/services/claudeRelayConfigService.js +++ b/src/services/claudeRelayConfigService.js @@ -15,6 +15,10 @@ const DEFAULT_CONFIG = { globalSessionBindingEnabled: false, sessionBindingErrorMessage: '你的本地session已污染,请清理后使用。', sessionBindingTtlDays: 30, // 会话绑定 TTL(天),默认30天 + // 用户消息队列配置 + userMessageQueueEnabled: false, // 是否启用用户消息队列(默认关闭) + userMessageQueueDelayMs: 100, // 请求间隔(毫秒) + userMessageQueueTimeoutMs: 60000, // 队列超时(毫秒) updatedAt: null, updatedBy: null } diff --git a/src/services/claudeRelayService.js b/src/services/claudeRelayService.js index 998b14ef..742372df 100644 --- a/src/services/claudeRelayService.js +++ b/src/services/claudeRelayService.js @@ -15,6 +15,7 @@ const ClaudeCodeValidator = require('../validators/clients/claudeCodeValidator') const { formatDateWithTimezone } = require('../utils/dateHelper') const requestIdentityService = require('./requestIdentityService') const { createClaudeTestPayload } = require('../utils/testPayloadHelper') +const userMessageQueueService = require('./userMessageQueueService') class ClaudeRelayService { constructor() { @@ -148,6 +149,10 @@ class ClaudeRelayService { options = {} ) { let upstreamRequest = null + let queueLockAcquired = false + let queueRequestId = null + let queueLockRenewalStopper = null + let selectedAccountId = null try { // 调试日志:查看API Key数据 @@ -192,11 +197,74 @@ class ClaudeRelayService { } const { accountId } = accountSelection const { accountType } = accountSelection + selectedAccountId = accountId logger.info( `📤 Processing API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${accountId} (${accountType})${sessionHash ? `, session: ${sessionHash}` : ''}` ) + // 📬 用户消息队列处理:如果是用户消息请求,需要获取队列锁 + if (userMessageQueueService.isUserMessageRequest(requestBody)) { + // 校验 accountId 非空,避免空值污染队列锁键 + if (!accountId || accountId === '') { + logger.error('❌ accountId missing for queue lock in relayRequest') + throw new Error('accountId missing for queue lock') + } + const queueResult = await userMessageQueueService.acquireQueueLock(accountId) + if (!queueResult.acquired && !queueResult.skipped) { + // 区分 Redis 后端错误和队列超时 + const isBackendError = queueResult.error === 'queue_backend_error' + const errorCode = isBackendError ? 'QUEUE_BACKEND_ERROR' : 'QUEUE_TIMEOUT' + const errorType = isBackendError ? 'queue_backend_error' : 'queue_timeout' + const errorMessage = isBackendError + ? 'Queue service temporarily unavailable, please retry later' + : 'User message queue wait timeout, please retry later' + const statusCode = isBackendError ? 500 : 503 + + // 结构化性能日志,用于后续统计 + logger.performance('user_message_queue_error', { + errorType, + errorCode, + accountId, + statusCode, + apiKeyName: apiKeyData.name, + backendError: isBackendError ? queueResult.errorMessage : undefined + }) + + logger.warn( + `📬 User message queue ${errorType} for account ${accountId}, key: ${apiKeyData.name}`, + isBackendError ? { backendError: queueResult.errorMessage } : {} + ) + return { + statusCode, + headers: { + 'Content-Type': 'application/json', + 'x-user-message-queue-error': errorType + }, + body: JSON.stringify({ + type: 'error', + error: { + type: errorType, + code: errorCode, + message: errorMessage + } + }), + accountId + } + } + if (queueResult.acquired && !queueResult.skipped) { + queueLockAcquired = true + queueRequestId = queueResult.requestId + queueLockRenewalStopper = await userMessageQueueService.startLockRenewal( + accountId, + queueRequestId + ) + logger.debug( + `📬 User message queue lock acquired for account ${accountId}, requestId: ${queueRequestId}` + ) + } + } + // 获取账户信息 let account = await claudeAccountService.getAccount(accountId) @@ -539,6 +607,21 @@ class ClaudeRelayService { error.message ) throw error + } finally { + // 📬 释放用户消息队列锁 + if (queueLockAcquired && queueRequestId && selectedAccountId) { + try { + if (queueLockRenewalStopper) { + queueLockRenewalStopper() + } + await userMessageQueueService.releaseQueueLock(selectedAccountId, queueRequestId) + } catch (releaseError) { + logger.error( + `❌ Failed to release user message queue lock for account ${selectedAccountId}:`, + releaseError.message + ) + } + } } } @@ -1057,8 +1140,6 @@ class ClaudeRelayService { timeout: config.requestTimeout || 600000 } - console.log(options.path) - const req = https.request(options, (res) => { let responseData = Buffer.alloc(0) @@ -1112,7 +1193,6 @@ class ClaudeRelayService { } req.on('error', async (error) => { - console.error(': ❌ ', error) logger.error(`❌ Claude API request error (Account: ${accountId}):`, error.message, { code: error.code, errno: error.errno, @@ -1163,6 +1243,11 @@ class ClaudeRelayService { streamTransformer = null, options = {} ) { + let queueLockAcquired = false + let queueRequestId = null + let queueLockRenewalStopper = null + let selectedAccountId = null + try { // 调试日志:查看API Key数据(流式请求) logger.info('🔍 [Stream] API Key data received:', { @@ -1206,6 +1291,74 @@ class ClaudeRelayService { } const { accountId } = accountSelection const { accountType } = accountSelection + selectedAccountId = accountId + + // 📬 用户消息队列处理:如果是用户消息请求,需要获取队列锁 + if (userMessageQueueService.isUserMessageRequest(requestBody)) { + // 校验 accountId 非空,避免空值污染队列锁键 + if (!accountId || accountId === '') { + logger.error('❌ accountId missing for queue lock in relayStreamRequestWithUsageCapture') + throw new Error('accountId missing for queue lock') + } + const queueResult = await userMessageQueueService.acquireQueueLock(accountId) + if (!queueResult.acquired && !queueResult.skipped) { + // 区分 Redis 后端错误和队列超时 + const isBackendError = queueResult.error === 'queue_backend_error' + const errorCode = isBackendError ? 'QUEUE_BACKEND_ERROR' : 'QUEUE_TIMEOUT' + const errorType = isBackendError ? 'queue_backend_error' : 'queue_timeout' + const errorMessage = isBackendError + ? 'Queue service temporarily unavailable, please retry later' + : 'User message queue wait timeout, please retry later' + const statusCode = isBackendError ? 500 : 503 + + // 结构化性能日志,用于后续统计 + logger.performance('user_message_queue_error', { + errorType, + errorCode, + accountId, + statusCode, + stream: true, + apiKeyName: apiKeyData.name, + backendError: isBackendError ? queueResult.errorMessage : undefined + }) + + logger.warn( + `📬 User message queue ${errorType} for account ${accountId} (stream), key: ${apiKeyData.name}`, + isBackendError ? { backendError: queueResult.errorMessage } : {} + ) + if (!responseStream.headersSent) { + responseStream.writeHead(statusCode, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + 'x-user-message-queue-error': errorType + }) + } + const errorEvent = `event: error\ndata: ${JSON.stringify({ + type: 'error', + error: { + type: errorType, + code: errorCode, + message: errorMessage + } + })}\n\n` + responseStream.write(errorEvent) + responseStream.write('data: [DONE]\n\n') + responseStream.end() + return + } + if (queueResult.acquired && !queueResult.skipped) { + queueLockAcquired = true + queueRequestId = queueResult.requestId + queueLockRenewalStopper = await userMessageQueueService.startLockRenewal( + accountId, + queueRequestId + ) + logger.debug( + `📬 User message queue lock acquired for account ${accountId} (stream), requestId: ${queueRequestId}` + ) + } + } logger.info( `📡 Processing streaming API request with usage capture for key: ${apiKeyData.name || apiKeyData.id}, account: ${accountId} (${accountType})${sessionHash ? `, session: ${sessionHash}` : ''}` @@ -1277,6 +1430,21 @@ class ClaudeRelayService { } catch (error) { logger.error(`❌ Claude stream relay with usage capture failed:`, error) throw error + } finally { + // 📬 释放用户消息队列锁 + if (queueLockAcquired && queueRequestId && selectedAccountId) { + try { + if (queueLockRenewalStopper) { + queueLockRenewalStopper() + } + await userMessageQueueService.releaseQueueLock(selectedAccountId, queueRequestId) + } catch (releaseError) { + logger.error( + `❌ Failed to release user message queue lock for stream account ${selectedAccountId}:`, + releaseError.message + ) + } + } } } @@ -1478,7 +1646,6 @@ class ClaudeRelayService { }) res.on('end', () => { - console.error(': ❌ ', errorData) logger.error( `❌ Claude API error response (Account: ${account?.name || accountId}):`, errorData @@ -1950,7 +2117,7 @@ class ClaudeRelayService { responseStream.on('close', () => { logger.debug('🔌 Client disconnected, cleaning up stream') if (!req.destroyed) { - req.destroy() + req.destroy(new Error('Client disconnected')) } }) diff --git a/src/services/droidRelayService.js b/src/services/droidRelayService.js index e62d5e85..25909c4b 100644 --- a/src/services/droidRelayService.js +++ b/src/services/droidRelayService.js @@ -634,7 +634,7 @@ class DroidRelayService { // 客户端断开连接时清理 clientResponse.on('close', () => { if (req && !req.destroyed) { - req.destroy() + req.destroy(new Error('Client disconnected')) } }) diff --git a/src/services/userMessageQueueService.js b/src/services/userMessageQueueService.js new file mode 100644 index 00000000..0c8851e9 --- /dev/null +++ b/src/services/userMessageQueueService.js @@ -0,0 +1,448 @@ +/** + * 用户消息队列服务 + * 为 Claude 账户实现基于消息类型的串行排队机制 + * + * 当请求的最后一条消息是用户输入(role: user)时, + * 同一账户的此类请求需要串行等待,并在请求之间添加延迟 + */ + +const { v4: uuidv4 } = require('uuid') +const redis = require('../models/redis') +const config = require('../../config/config') +const logger = require('../utils/logger') + +// 清理任务间隔 +const CLEANUP_INTERVAL_MS = 60000 // 1分钟 + +// 锁续租最大持续时间(从配置读取,与 REQUEST_TIMEOUT 保持一致) +const MAX_RENEWAL_DURATION_MS = config.requestTimeout || 10 * 60 * 1000 + +// 轮询等待配置 +const POLL_INTERVAL_BASE_MS = 50 // 基础轮询间隔 +const POLL_INTERVAL_MAX_MS = 500 // 最大轮询间隔 +const POLL_BACKOFF_FACTOR = 1.5 // 退避因子 + +class UserMessageQueueService { + constructor() { + this.cleanupTimer = null + // 跟踪活跃的续租定时器,用于服务关闭时清理 + this.activeRenewalTimers = new Map() + } + + /** + * 检测请求是否为真正的用户消息请求 + * 区分真正的用户输入和 tool_result 消息 + * + * Claude API 消息格式: + * - 用户文本消息: { role: 'user', content: 'text' } 或 { role: 'user', content: [{ type: 'text', text: '...' }] } + * - 工具结果消息: { role: 'user', content: [{ type: 'tool_result', tool_use_id: '...', content: '...' }] } + * + * @param {Object} requestBody - 请求体 + * @returns {boolean} - 是否为真正的用户消息(排除 tool_result) + */ + isUserMessageRequest(requestBody) { + const messages = requestBody?.messages + if (!Array.isArray(messages) || messages.length === 0) { + return false + } + const lastMessage = messages[messages.length - 1] + + // 检查 role 是否为 user + if (lastMessage?.role !== 'user') { + return false + } + + // 检查 content 是否包含 tool_result 类型 + const { content } = lastMessage + if (Array.isArray(content)) { + // 如果 content 数组中任何元素是 tool_result,则不是真正的用户消息 + const hasToolResult = content.some( + (block) => block?.type === 'tool_result' || block?.type === 'tool_use_result' + ) + if (hasToolResult) { + return false + } + } + + // role 是 user 且不包含 tool_result,是真正的用户消息 + return true + } + + /** + * 获取当前配置(支持 Web 界面配置优先) + * @returns {Promise} 配置对象 + */ + async getConfig() { + // 尝试从 claudeRelayConfigService 获取 Web 界面配置 + try { + const claudeRelayConfigService = require('./claudeRelayConfigService') + const webConfig = await claudeRelayConfigService.getConfig() + + return { + enabled: + webConfig.userMessageQueueEnabled !== undefined + ? webConfig.userMessageQueueEnabled + : config.userMessageQueue.enabled, + delayMs: + webConfig.userMessageQueueDelayMs !== undefined + ? webConfig.userMessageQueueDelayMs + : config.userMessageQueue.delayMs, + timeoutMs: + webConfig.userMessageQueueTimeoutMs !== undefined + ? webConfig.userMessageQueueTimeoutMs + : config.userMessageQueue.timeoutMs, + lockTtlMs: config.userMessageQueue.lockTtlMs + } + } catch { + // 回退到环境变量配置 + return { + enabled: config.userMessageQueue.enabled, + delayMs: config.userMessageQueue.delayMs, + timeoutMs: config.userMessageQueue.timeoutMs, + lockTtlMs: config.userMessageQueue.lockTtlMs + } + } + } + + /** + * 检查功能是否启用 + * @returns {Promise} + */ + async isEnabled() { + const cfg = await this.getConfig() + return cfg.enabled === true + } + + /** + * 获取账户队列锁(阻塞等待) + * @param {string} accountId - 账户ID + * @param {string} requestId - 请求ID(可选,会自动生成) + * @param {number} timeoutMs - 超时时间(可选,使用配置默认值) + * @returns {Promise<{acquired: boolean, requestId: string, error?: string}>} + */ + async acquireQueueLock(accountId, requestId = null, timeoutMs = null) { + const cfg = await this.getConfig() + + if (!cfg.enabled) { + return { acquired: true, requestId: requestId || uuidv4(), skipped: true } + } + + const reqId = requestId || uuidv4() + const timeout = timeoutMs || cfg.timeoutMs + const startTime = Date.now() + let retryCount = 0 + + logger.debug(`📬 User message queue: attempting to acquire lock for account ${accountId}`, { + requestId: reqId, + timeoutMs: timeout + }) + + while (Date.now() - startTime < timeout) { + const result = await redis.acquireUserMessageLock( + accountId, + reqId, + cfg.lockTtlMs, + cfg.delayMs + ) + + // 检测 Redis 错误,立即返回系统错误而非继续轮询 + if (result.redisError) { + logger.error(`📬 User message queue: Redis error while acquiring lock`, { + accountId, + requestId: reqId, + errorMessage: result.errorMessage + }) + return { + acquired: false, + requestId: reqId, + error: 'queue_backend_error', + errorMessage: result.errorMessage + } + } + + if (result.acquired) { + logger.debug(`📬 User message queue: lock acquired for account ${accountId}`, { + requestId: reqId, + waitedMs: Date.now() - startTime, + retries: retryCount + }) + return { acquired: true, requestId: reqId } + } + + // 需要等待 + if (result.waitMs > 0) { + // 需要延迟(上一个请求刚完成) + await this._sleep(Math.min(result.waitMs, timeout - (Date.now() - startTime))) + } else { + // 锁被占用,使用指数退避轮询等待 + const basePollInterval = Math.min( + POLL_INTERVAL_BASE_MS * Math.pow(POLL_BACKOFF_FACTOR, retryCount), + POLL_INTERVAL_MAX_MS + ) + // 添加 ±15% 随机抖动,避免高并发下的周期性碰撞 + const jitter = basePollInterval * (0.85 + Math.random() * 0.3) + const pollInterval = Math.min(jitter, POLL_INTERVAL_MAX_MS) + await this._sleep(pollInterval) + retryCount++ + } + } + + // 超时 + logger.warn(`📬 User message queue: timeout waiting for lock`, { + accountId, + requestId: reqId, + timeoutMs: timeout + }) + + return { + acquired: false, + requestId: reqId, + error: 'queue_timeout' + } + } + + /** + * 释放账户队列锁 + * @param {string} accountId - 账户ID + * @param {string} requestId - 请求ID + * @returns {Promise} + */ + async releaseQueueLock(accountId, requestId) { + if (!accountId || !requestId) { + return false + } + + const released = await redis.releaseUserMessageLock(accountId, requestId) + + if (released) { + logger.debug(`📬 User message queue: lock released for account ${accountId}`, { + requestId + }) + } else { + logger.warn(`📬 User message queue: failed to release lock (not owner?)`, { + accountId, + requestId + }) + } + + return released + } + + /** + * 启动锁续租(防止长连接超过TTL导致锁丢失) + * @param {string} accountId - 账户ID + * @param {string} requestId - 请求ID + * @returns {Promise} 停止续租的函数 + */ + async startLockRenewal(accountId, requestId) { + const cfg = await this.getConfig() + if (!cfg.enabled || !accountId || !requestId) { + return () => {} + } + + const intervalMs = Math.max(10000, Math.floor(cfg.lockTtlMs / 2)) // 约一半TTL刷新一次 + const maxRenewals = Math.ceil(MAX_RENEWAL_DURATION_MS / intervalMs) // 最大续租次数 + const startTime = Date.now() + const timerKey = `${accountId}:${requestId}` + + let stopped = false + let renewalCount = 0 + + const stopRenewal = () => { + if (!stopped) { + clearInterval(timer) + stopped = true + this.activeRenewalTimers.delete(timerKey) + } + } + + const timer = setInterval(async () => { + if (stopped) { + return + } + + renewalCount++ + + // 检查是否超过最大续租次数或最大持续时间 + if (renewalCount > maxRenewals || Date.now() - startTime > MAX_RENEWAL_DURATION_MS) { + logger.warn(`📬 User message queue: max renewal duration exceeded, stopping renewal`, { + accountId, + requestId, + renewalCount, + durationMs: Date.now() - startTime + }) + stopRenewal() + return + } + + try { + const refreshed = await redis.refreshUserMessageLock(accountId, requestId, cfg.lockTtlMs) + if (!refreshed) { + // 锁可能已被释放或超时,停止续租 + logger.warn( + `📬 User message queue: failed to refresh lock (possibly lost), stop renewal`, + { + accountId, + requestId, + renewalCount + } + ) + stopRenewal() + } + } catch (error) { + logger.error('📬 User message queue: lock renewal error:', error) + } + }, intervalMs) + + // 避免阻止进程退出 + if (typeof timer.unref === 'function') { + timer.unref() + } + + // 跟踪活跃的定时器 + this.activeRenewalTimers.set(timerKey, { timer, stopRenewal, accountId, requestId, startTime }) + + return stopRenewal + } + + /** + * 获取队列统计信息 + * @param {string} accountId - 账户ID + * @returns {Promise} + */ + async getQueueStats(accountId) { + return await redis.getUserMessageQueueStats(accountId) + } + + /** + * 服务启动时清理所有残留的队列锁 + * 防止服务重启后旧锁阻塞新请求 + * @returns {Promise} 清理的锁数量 + */ + async cleanupStaleLocks() { + try { + const accountIds = await redis.scanUserMessageQueueLocks() + let cleanedCount = 0 + + for (const accountId of accountIds) { + try { + await redis.forceReleaseUserMessageLock(accountId) + cleanedCount++ + logger.debug(`📬 User message queue: cleaned stale lock for account ${accountId}`) + } catch (error) { + logger.error( + `📬 User message queue: failed to clean lock for account ${accountId}:`, + error + ) + } + } + + if (cleanedCount > 0) { + logger.info(`📬 User message queue: cleaned ${cleanedCount} stale lock(s) on startup`) + } + + return cleanedCount + } catch (error) { + logger.error('📬 User message queue: failed to cleanup stale locks on startup:', error) + return 0 + } + } + + /** + * 启动定时清理任务 + * 始终启动,每次执行时检查配置以支持运行时动态启用/禁用 + */ + startCleanupTask() { + if (this.cleanupTimer) { + return + } + + this.cleanupTimer = setInterval(async () => { + // 每次运行时检查配置,以便在运行时动态启用/禁用 + const currentConfig = await this.getConfig() + if (!currentConfig.enabled) { + logger.debug('📬 User message queue: cleanup skipped (feature disabled)') + return + } + await this._cleanupOrphanLocks() + }, CLEANUP_INTERVAL_MS) + + logger.info('📬 User message queue: cleanup task started') + } + + /** + * 停止定时清理任务 + */ + stopCleanupTask() { + if (this.cleanupTimer) { + clearInterval(this.cleanupTimer) + this.cleanupTimer = null + logger.info('📬 User message queue: cleanup task stopped') + } + } + + /** + * 停止所有活跃的锁续租定时器(服务关闭时调用) + */ + stopAllRenewalTimers() { + const count = this.activeRenewalTimers.size + if (count > 0) { + for (const [key, { stopRenewal }] of this.activeRenewalTimers) { + try { + stopRenewal() + } catch (error) { + logger.error(`📬 User message queue: failed to stop renewal timer ${key}:`, error) + } + } + this.activeRenewalTimers.clear() + logger.info(`📬 User message queue: stopped ${count} active renewal timer(s)`) + } + } + + /** + * 获取活跃续租定时器数量(用于监控) + * @returns {number} + */ + getActiveRenewalCount() { + return this.activeRenewalTimers.size + } + + /** + * 清理孤儿锁 + * 检测异常情况:锁存在但没有设置过期时间(lockTtlRaw === -1) + * 正常情况下所有锁都应该有 TTL,Redis 会自动过期 + * @private + */ + async _cleanupOrphanLocks() { + try { + const accountIds = await redis.scanUserMessageQueueLocks() + + for (const accountId of accountIds) { + const stats = await redis.getUserMessageQueueStats(accountId) + + // 检测异常情况:锁存在(isLocked=true)但没有过期时间(lockTtlRaw=-1) + // 正常创建的锁都带有 PX 过期时间,如果没有说明是异常状态 + if (stats.isLocked && stats.lockTtlRaw === -1) { + logger.warn( + `📬 User message queue: cleaning up orphan lock without TTL for account ${accountId}`, + { lockHolder: stats.lockHolder } + ) + await redis.forceReleaseUserMessageLock(accountId) + } + } + } catch (error) { + logger.error('📬 User message queue: cleanup task error:', error) + } + } + + /** + * 睡眠辅助函数 + * @param {number} ms - 毫秒 + * @private + */ + _sleep(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)) + } +} + +module.exports = new UserMessageQueueService() diff --git a/src/utils/logger.js b/src/utils/logger.js index df5b5faa..f0202e89 100644 --- a/src/utils/logger.js +++ b/src/utils/logger.js @@ -137,6 +137,7 @@ const createLogFormat = (colorize = false) => { const logFormat = createLogFormat(false) const consoleFormat = createLogFormat(true) +const isTestEnv = process.env.NODE_ENV === 'test' || process.env.JEST_WORKER_ID // 📁 确保日志目录存在并设置权限 if (!fs.existsSync(config.logging.dirname)) { @@ -159,18 +160,20 @@ const createRotateTransport = (filename, level = null) => { transport.level = level } - // 监听轮转事件 - transport.on('rotate', (oldFilename, newFilename) => { - console.log(`📦 Log rotated: ${oldFilename} -> ${newFilename}`) - }) + // 监听轮转事件(测试环境关闭以避免 Jest 退出后输出) + if (!isTestEnv) { + transport.on('rotate', (oldFilename, newFilename) => { + console.log(`📦 Log rotated: ${oldFilename} -> ${newFilename}`) + }) - transport.on('new', (newFilename) => { - console.log(`📄 New log file created: ${newFilename}`) - }) + transport.on('new', (newFilename) => { + console.log(`📄 New log file created: ${newFilename}`) + }) - transport.on('archive', (zipFilename) => { - console.log(`🗜️ Log archived: ${zipFilename}`) - }) + transport.on('archive', (zipFilename) => { + console.log(`🗜️ Log archived: ${zipFilename}`) + }) + } return transport } diff --git a/tests/userMessageQueue.test.js b/tests/userMessageQueue.test.js new file mode 100644 index 00000000..5166bdbb --- /dev/null +++ b/tests/userMessageQueue.test.js @@ -0,0 +1,512 @@ +/** + * 用户消息队列服务测试 + * 测试消息类型检测、队列串行行为、延迟间隔、超时处理和功能开关 + */ + +const redis = require('../src/models/redis') +const userMessageQueueService = require('../src/services/userMessageQueueService') + +describe('UserMessageQueueService', () => { + describe('isUserMessageRequest', () => { + it('should return true when last message role is user', () => { + const requestBody = { + messages: [ + { role: 'user', content: 'Hello' }, + { role: 'assistant', content: 'Hi there' }, + { role: 'user', content: 'How are you?' } + ] + } + expect(userMessageQueueService.isUserMessageRequest(requestBody)).toBe(true) + }) + + it('should return false when last message role is assistant', () => { + const requestBody = { + messages: [ + { role: 'user', content: 'Hello' }, + { role: 'assistant', content: 'Hi there' } + ] + } + expect(userMessageQueueService.isUserMessageRequest(requestBody)).toBe(false) + }) + + it('should return false when last message contains tool_result', () => { + const requestBody = { + messages: [ + { role: 'user', content: 'Hello' }, + { role: 'assistant', content: 'Let me check that' }, + { + role: 'user', + content: [ + { + type: 'tool_result', + tool_use_id: 'test-id', + content: 'Tool result' + } + ] + } + ] + } + // tool_result 消息虽然 role 是 user,但不是真正的用户消息 + // 应该返回 false,不进入用户消息队列 + expect(userMessageQueueService.isUserMessageRequest(requestBody)).toBe(false) + }) + + it('should return false when last message contains multiple tool_results', () => { + const requestBody = { + messages: [ + { role: 'user', content: 'Run multiple tools' }, + { + role: 'user', + content: [ + { + type: 'tool_result', + tool_use_id: 'tool-1', + content: 'Result 1' + }, + { + type: 'tool_result', + tool_use_id: 'tool-2', + content: 'Result 2' + } + ] + } + ] + } + expect(userMessageQueueService.isUserMessageRequest(requestBody)).toBe(false) + }) + + it('should return true when user message has array content with text type', () => { + const requestBody = { + messages: [ + { + role: 'user', + content: [ + { + type: 'text', + text: 'Hello, this is a user message' + } + ] + } + ] + } + expect(userMessageQueueService.isUserMessageRequest(requestBody)).toBe(true) + }) + + it('should return true when user message has mixed text and image content', () => { + const requestBody = { + messages: [ + { + role: 'user', + content: [ + { + type: 'text', + text: 'What is in this image?' + }, + { + type: 'image', + source: { type: 'base64', media_type: 'image/png', data: '...' } + } + ] + } + ] + } + expect(userMessageQueueService.isUserMessageRequest(requestBody)).toBe(true) + }) + + it('should return false when messages is empty', () => { + const requestBody = { messages: [] } + expect(userMessageQueueService.isUserMessageRequest(requestBody)).toBe(false) + }) + + it('should return false when messages is not an array', () => { + const requestBody = { messages: 'not an array' } + expect(userMessageQueueService.isUserMessageRequest(requestBody)).toBe(false) + }) + + it('should return false when messages is undefined', () => { + const requestBody = {} + expect(userMessageQueueService.isUserMessageRequest(requestBody)).toBe(false) + }) + + it('should return false when requestBody is null', () => { + expect(userMessageQueueService.isUserMessageRequest(null)).toBe(false) + }) + + it('should return false when requestBody is undefined', () => { + expect(userMessageQueueService.isUserMessageRequest(undefined)).toBe(false) + }) + + it('should return false when last message has no role', () => { + const requestBody = { + messages: [{ content: 'Hello' }] + } + expect(userMessageQueueService.isUserMessageRequest(requestBody)).toBe(false) + }) + + it('should handle single user message', () => { + const requestBody = { + messages: [{ role: 'user', content: 'Hello' }] + } + expect(userMessageQueueService.isUserMessageRequest(requestBody)).toBe(true) + }) + + it('should handle single assistant message', () => { + const requestBody = { + messages: [{ role: 'assistant', content: 'Hello' }] + } + expect(userMessageQueueService.isUserMessageRequest(requestBody)).toBe(false) + }) + }) + + describe('getConfig', () => { + it('should return config with expected properties', async () => { + const config = await userMessageQueueService.getConfig() + expect(config).toHaveProperty('enabled') + expect(config).toHaveProperty('delayMs') + expect(config).toHaveProperty('timeoutMs') + expect(config).toHaveProperty('lockTtlMs') + expect(typeof config.enabled).toBe('boolean') + expect(typeof config.delayMs).toBe('number') + expect(typeof config.timeoutMs).toBe('number') + expect(typeof config.lockTtlMs).toBe('number') + }) + }) + + describe('isEnabled', () => { + it('should return boolean', async () => { + const enabled = await userMessageQueueService.isEnabled() + expect(typeof enabled).toBe('boolean') + }) + }) + + describe('startLockRenewal', () => { + beforeEach(() => { + jest.useFakeTimers() + }) + + afterEach(() => { + jest.useRealTimers() + jest.restoreAllMocks() + }) + + it('should periodically refresh lock while enabled', async () => { + jest.spyOn(userMessageQueueService, 'getConfig').mockResolvedValue({ + enabled: true, + delayMs: 200, + timeoutMs: 30000, + lockTtlMs: 120000 + }) + const refreshSpy = jest.spyOn(redis, 'refreshUserMessageLock').mockResolvedValue(true) + + const stop = await userMessageQueueService.startLockRenewal('acct-1', 'req-1') + + jest.advanceTimersByTime(60000) // 半个TTL + await Promise.resolve() + + expect(refreshSpy).toHaveBeenCalledWith('acct-1', 'req-1', 120000) + + stop() + }) + + it('should no-op when queue disabled', async () => { + jest.spyOn(userMessageQueueService, 'getConfig').mockResolvedValue({ + enabled: false, + delayMs: 200, + timeoutMs: 30000, + lockTtlMs: 120000 + }) + const refreshSpy = jest.spyOn(redis, 'refreshUserMessageLock').mockResolvedValue(true) + + const stop = await userMessageQueueService.startLockRenewal('acct-1', 'req-1') + jest.advanceTimersByTime(120000) + await Promise.resolve() + + expect(refreshSpy).not.toHaveBeenCalled() + stop() + }) + + it('should track active renewal timer', async () => { + jest.spyOn(userMessageQueueService, 'getConfig').mockResolvedValue({ + enabled: true, + delayMs: 200, + timeoutMs: 30000, + lockTtlMs: 120000 + }) + jest.spyOn(redis, 'refreshUserMessageLock').mockResolvedValue(true) + + expect(userMessageQueueService.getActiveRenewalCount()).toBe(0) + + const stop = await userMessageQueueService.startLockRenewal('acct-1', 'req-1') + expect(userMessageQueueService.getActiveRenewalCount()).toBe(1) + + stop() + expect(userMessageQueueService.getActiveRenewalCount()).toBe(0) + }) + + it('should stop all renewal timers on service shutdown', async () => { + jest.spyOn(userMessageQueueService, 'getConfig').mockResolvedValue({ + enabled: true, + delayMs: 200, + timeoutMs: 30000, + lockTtlMs: 120000 + }) + jest.spyOn(redis, 'refreshUserMessageLock').mockResolvedValue(true) + + await userMessageQueueService.startLockRenewal('acct-1', 'req-1') + await userMessageQueueService.startLockRenewal('acct-2', 'req-2') + expect(userMessageQueueService.getActiveRenewalCount()).toBe(2) + + userMessageQueueService.stopAllRenewalTimers() + expect(userMessageQueueService.getActiveRenewalCount()).toBe(0) + }) + }) + + describe('acquireQueueLock', () => { + afterEach(() => { + jest.restoreAllMocks() + }) + + it('should acquire lock immediately when no lock exists', async () => { + jest.spyOn(userMessageQueueService, 'getConfig').mockResolvedValue({ + enabled: true, + delayMs: 200, + timeoutMs: 30000, + lockTtlMs: 120000 + }) + jest.spyOn(redis, 'acquireUserMessageLock').mockResolvedValue({ + acquired: true, + waitMs: 0 + }) + + const result = await userMessageQueueService.acquireQueueLock('acct-1', 'req-1') + + expect(result.acquired).toBe(true) + expect(result.requestId).toBe('req-1') + expect(result.error).toBeUndefined() + }) + + it('should skip lock acquisition when queue disabled', async () => { + jest.spyOn(userMessageQueueService, 'getConfig').mockResolvedValue({ + enabled: false, + delayMs: 200, + timeoutMs: 30000, + lockTtlMs: 120000 + }) + const acquireSpy = jest.spyOn(redis, 'acquireUserMessageLock') + + const result = await userMessageQueueService.acquireQueueLock('acct-1') + + expect(result.acquired).toBe(true) + expect(result.skipped).toBe(true) + expect(acquireSpy).not.toHaveBeenCalled() + }) + + it('should generate requestId when not provided', async () => { + jest.spyOn(userMessageQueueService, 'getConfig').mockResolvedValue({ + enabled: true, + delayMs: 200, + timeoutMs: 30000, + lockTtlMs: 120000 + }) + jest.spyOn(redis, 'acquireUserMessageLock').mockResolvedValue({ + acquired: true, + waitMs: 0 + }) + + const result = await userMessageQueueService.acquireQueueLock('acct-1') + + expect(result.acquired).toBe(true) + expect(result.requestId).toBeDefined() + expect(result.requestId.length).toBeGreaterThan(0) + }) + + it('should wait and retry when lock is held by another request', async () => { + jest.spyOn(userMessageQueueService, 'getConfig').mockResolvedValue({ + enabled: true, + delayMs: 200, + timeoutMs: 1000, + lockTtlMs: 120000 + }) + + let callCount = 0 + jest.spyOn(redis, 'acquireUserMessageLock').mockImplementation(async () => { + callCount++ + if (callCount < 3) { + return { acquired: false, waitMs: -1 } // lock held + } + return { acquired: true, waitMs: 0 } + }) + + // Mock sleep to speed up test + jest.spyOn(userMessageQueueService, '_sleep').mockResolvedValue(undefined) + + const result = await userMessageQueueService.acquireQueueLock('acct-1', 'req-1') + + expect(result.acquired).toBe(true) + expect(callCount).toBe(3) + }) + + it('should respect delay when previous request just completed', async () => { + jest.spyOn(userMessageQueueService, 'getConfig').mockResolvedValue({ + enabled: true, + delayMs: 200, + timeoutMs: 1000, + lockTtlMs: 120000 + }) + + let callCount = 0 + jest.spyOn(redis, 'acquireUserMessageLock').mockImplementation(async () => { + callCount++ + if (callCount === 1) { + return { acquired: false, waitMs: 150 } // need to wait 150ms for delay + } + return { acquired: true, waitMs: 0 } + }) + + const sleepSpy = jest.spyOn(userMessageQueueService, '_sleep').mockResolvedValue(undefined) + + const result = await userMessageQueueService.acquireQueueLock('acct-1', 'req-1') + + expect(result.acquired).toBe(true) + expect(sleepSpy).toHaveBeenCalledWith(150) // Should wait for delay + }) + + it('should timeout and return error when wait exceeds timeout', async () => { + jest.spyOn(userMessageQueueService, 'getConfig').mockResolvedValue({ + enabled: true, + delayMs: 200, + timeoutMs: 100, // very short timeout + lockTtlMs: 120000 + }) + + jest.spyOn(redis, 'acquireUserMessageLock').mockResolvedValue({ + acquired: false, + waitMs: -1 // always held + }) + + // Use real timers for timeout test but mock sleep to be instant + jest.spyOn(userMessageQueueService, '_sleep').mockImplementation(async () => { + // Simulate time passing + await new Promise((resolve) => setTimeout(resolve, 60)) + }) + + const result = await userMessageQueueService.acquireQueueLock('acct-1', 'req-1', 100) + + expect(result.acquired).toBe(false) + expect(result.error).toBe('queue_timeout') + }) + }) + + describe('releaseQueueLock', () => { + afterEach(() => { + jest.restoreAllMocks() + }) + + it('should release lock successfully when holding the lock', async () => { + jest.spyOn(redis, 'releaseUserMessageLock').mockResolvedValue(true) + + const result = await userMessageQueueService.releaseQueueLock('acct-1', 'req-1') + + expect(result).toBe(true) + expect(redis.releaseUserMessageLock).toHaveBeenCalledWith('acct-1', 'req-1') + }) + + it('should return false when not holding the lock', async () => { + jest.spyOn(redis, 'releaseUserMessageLock').mockResolvedValue(false) + + const result = await userMessageQueueService.releaseQueueLock('acct-1', 'req-1') + + expect(result).toBe(false) + }) + + it('should return false when accountId is missing', async () => { + const releaseSpy = jest.spyOn(redis, 'releaseUserMessageLock') + + const result = await userMessageQueueService.releaseQueueLock(null, 'req-1') + + expect(result).toBe(false) + expect(releaseSpy).not.toHaveBeenCalled() + }) + + it('should return false when requestId is missing', async () => { + const releaseSpy = jest.spyOn(redis, 'releaseUserMessageLock') + + const result = await userMessageQueueService.releaseQueueLock('acct-1', null) + + expect(result).toBe(false) + expect(releaseSpy).not.toHaveBeenCalled() + }) + }) + + describe('queue serialization behavior', () => { + afterEach(() => { + jest.restoreAllMocks() + }) + + it('should allow different accounts to acquire locks simultaneously', async () => { + jest.spyOn(userMessageQueueService, 'getConfig').mockResolvedValue({ + enabled: true, + delayMs: 200, + timeoutMs: 30000, + lockTtlMs: 120000 + }) + jest.spyOn(redis, 'acquireUserMessageLock').mockResolvedValue({ + acquired: true, + waitMs: 0 + }) + + const [result1, result2] = await Promise.all([ + userMessageQueueService.acquireQueueLock('acct-1', 'req-1'), + userMessageQueueService.acquireQueueLock('acct-2', 'req-2') + ]) + + expect(result1.acquired).toBe(true) + expect(result2.acquired).toBe(true) + }) + + it('should serialize requests for same account', async () => { + jest.spyOn(userMessageQueueService, 'getConfig').mockResolvedValue({ + enabled: true, + delayMs: 50, + timeoutMs: 5000, + lockTtlMs: 120000 + }) + + const lockState = { held: false, holderId: null } + + jest.spyOn(redis, 'acquireUserMessageLock').mockImplementation(async (accountId, requestId) => { + if (!lockState.held) { + lockState.held = true + lockState.holderId = requestId + return { acquired: true, waitMs: 0 } + } + return { acquired: false, waitMs: -1 } + }) + + jest.spyOn(redis, 'releaseUserMessageLock').mockImplementation(async (accountId, requestId) => { + if (lockState.holderId === requestId) { + lockState.held = false + lockState.holderId = null + return true + } + return false + }) + + jest.spyOn(userMessageQueueService, '_sleep').mockResolvedValue(undefined) + + // First request acquires lock + const result1 = await userMessageQueueService.acquireQueueLock('acct-1', 'req-1') + expect(result1.acquired).toBe(true) + + // Second request should fail to acquire (lock held) + const acquirePromise = userMessageQueueService.acquireQueueLock('acct-1', 'req-2', 200) + + // Release first lock + await userMessageQueueService.releaseQueueLock('acct-1', 'req-1') + + // Now second request should acquire + const result2 = await acquirePromise + expect(result2.acquired).toBe(true) + }) + }) +}) diff --git a/web/admin-spa/src/views/SettingsView.vue b/web/admin-spa/src/views/SettingsView.vue index c60651e6..20280697 100644 --- a/web/admin-spa/src/views/SettingsView.vue +++ b/web/admin-spa/src/views/SettingsView.vue @@ -804,6 +804,100 @@ + +
+
+
+
+
+ +
+
+

+ 用户消息串行队列 +

+

+ 启用后,同一账户的用户消息请求将串行执行,并在请求之间添加延迟,防止触发上游限流 +

+
+
+
+ +
+ + +
+ +
+ + +

+ 同一账户的用户消息请求之间的最小间隔时间(0-10000毫秒) +

+
+ + +
+ + +

+ 请求在队列中等待的最大时间,超时将返回 503 错误(1000-300000毫秒) +

+
+
+ +
+
+ +
+

+ 工作原理:系统检测请求中最后一条消息的 + role + 是否为 + user。用户消息请求需要排队串行执行,而工具调用结果、助手消息续传等不受此限制。 +

+
+
+
+
+
{ sessionBindingErrorMessage: response.config?.sessionBindingErrorMessage || '你的本地session已污染,请清理后使用。', sessionBindingTtlDays: response.config?.sessionBindingTtlDays ?? 30, + userMessageQueueEnabled: response.config?.userMessageQueueEnabled ?? true, + userMessageQueueDelayMs: response.config?.userMessageQueueDelayMs ?? 200, + userMessageQueueTimeoutMs: response.config?.userMessageQueueTimeoutMs ?? 30000, updatedAt: response.config?.updatedAt || null, updatedBy: response.config?.updatedBy || null } @@ -1762,7 +1862,10 @@ const saveClaudeConfig = async () => { claudeCodeOnlyEnabled: claudeConfig.value.claudeCodeOnlyEnabled, globalSessionBindingEnabled: claudeConfig.value.globalSessionBindingEnabled, sessionBindingErrorMessage: claudeConfig.value.sessionBindingErrorMessage, - sessionBindingTtlDays: claudeConfig.value.sessionBindingTtlDays + sessionBindingTtlDays: claudeConfig.value.sessionBindingTtlDays, + userMessageQueueEnabled: claudeConfig.value.userMessageQueueEnabled, + userMessageQueueDelayMs: claudeConfig.value.userMessageQueueDelayMs, + userMessageQueueTimeoutMs: claudeConfig.value.userMessageQueueTimeoutMs } const response = await apiClient.put('/admin/claude-relay-config', payload, { From dc96447d721ab4433f0e505271b9e142c355448d Mon Sep 17 00:00:00 2001 From: QTom Date: Tue, 9 Dec 2025 17:10:26 +0800 Subject: [PATCH 2/2] =?UTF-8?q?style:=20=E6=A0=BC=E5=BC=8F=E5=8C=96?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E4=BB=A5=E7=AC=A6=E5=90=88=20Prettier=20?= =?UTF-8?q?=E8=A7=84=E8=8C=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/routes/admin/claudeRelayConfig.js | 21 ++++++++++------ tests/userMessageQueue.test.js | 36 +++++++++++++++------------ 2 files changed, 34 insertions(+), 23 deletions(-) diff --git a/src/routes/admin/claudeRelayConfig.js b/src/routes/admin/claudeRelayConfig.js index cbe98ecf..261b2092 100644 --- a/src/routes/admin/claudeRelayConfig.js +++ b/src/routes/admin/claudeRelayConfig.js @@ -111,20 +111,27 @@ router.put('/claude-relay-config', authenticateAdmin, async (req, res) => { } const updateData = {} - if (claudeCodeOnlyEnabled !== undefined) + if (claudeCodeOnlyEnabled !== undefined) { updateData.claudeCodeOnlyEnabled = claudeCodeOnlyEnabled - if (globalSessionBindingEnabled !== undefined) + } + if (globalSessionBindingEnabled !== undefined) { updateData.globalSessionBindingEnabled = globalSessionBindingEnabled - if (sessionBindingErrorMessage !== undefined) + } + if (sessionBindingErrorMessage !== undefined) { updateData.sessionBindingErrorMessage = sessionBindingErrorMessage - if (sessionBindingTtlDays !== undefined) + } + if (sessionBindingTtlDays !== undefined) { updateData.sessionBindingTtlDays = sessionBindingTtlDays - if (userMessageQueueEnabled !== undefined) + } + if (userMessageQueueEnabled !== undefined) { updateData.userMessageQueueEnabled = userMessageQueueEnabled - if (userMessageQueueDelayMs !== undefined) + } + if (userMessageQueueDelayMs !== undefined) { updateData.userMessageQueueDelayMs = userMessageQueueDelayMs - if (userMessageQueueTimeoutMs !== undefined) + } + if (userMessageQueueTimeoutMs !== undefined) { updateData.userMessageQueueTimeoutMs = userMessageQueueTimeoutMs + } const updatedConfig = await claudeRelayConfigService.updateConfig( updateData, diff --git a/tests/userMessageQueue.test.js b/tests/userMessageQueue.test.js index 5166bdbb..1d9e544f 100644 --- a/tests/userMessageQueue.test.js +++ b/tests/userMessageQueue.test.js @@ -474,23 +474,27 @@ describe('UserMessageQueueService', () => { const lockState = { held: false, holderId: null } - jest.spyOn(redis, 'acquireUserMessageLock').mockImplementation(async (accountId, requestId) => { - if (!lockState.held) { - lockState.held = true - lockState.holderId = requestId - return { acquired: true, waitMs: 0 } - } - return { acquired: false, waitMs: -1 } - }) + jest + .spyOn(redis, 'acquireUserMessageLock') + .mockImplementation(async (accountId, requestId) => { + if (!lockState.held) { + lockState.held = true + lockState.holderId = requestId + return { acquired: true, waitMs: 0 } + } + return { acquired: false, waitMs: -1 } + }) - jest.spyOn(redis, 'releaseUserMessageLock').mockImplementation(async (accountId, requestId) => { - if (lockState.holderId === requestId) { - lockState.held = false - lockState.holderId = null - return true - } - return false - }) + jest + .spyOn(redis, 'releaseUserMessageLock') + .mockImplementation(async (accountId, requestId) => { + if (lockState.holderId === requestId) { + lockState.held = false + lockState.holderId = null + return true + } + return false + }) jest.spyOn(userMessageQueueService, '_sleep').mockResolvedValue(undefined)