From 07633ddbf899975696827ccd24c6850c5eff1b07 Mon Sep 17 00:00:00 2001 From: DaydreamCoding Date: Fri, 12 Dec 2025 14:08:30 +0800 Subject: [PATCH] feat: enhance concurrency queue with health check and admin endpoints - Add queue health check for fast-fail when overloaded (P90 > threshold) - Implement socket identity verification with UUID token - Add wait time statistics (P50/P90/P99) and queue stats tracking - Add admin endpoints for queue stats and cleanup - Add CLEAR_CONCURRENCY_QUEUES_ON_STARTUP config option - Update documentation with troubleshooting and proxy config guide --- .env.example | 2 + CLAUDE.md | 44 ++ src/app.js | 28 + src/middleware/auth.js | 662 +++++++++++++++- src/models/redis.js | 388 ++++++++++ src/routes/admin/claudeRelayConfig.js | 66 +- src/routes/admin/concurrency.js | 177 ++++- src/routes/api.js | 90 +++ src/services/bedrockRelayService.js | 12 +- src/services/ccrRelayService.js | 59 +- src/services/claudeConsoleRelayService.js | 100 ++- src/services/claudeRelayConfigService.js | 12 +- src/services/claudeRelayService.js | 68 +- src/utils/statsHelper.js | 105 +++ src/utils/streamHelper.js | 36 + tests/concurrencyQueue.integration.test.js | 860 +++++++++++++++++++++ tests/concurrencyQueue.test.js | 278 +++++++ web/admin-spa/src/views/SettingsView.vue | 138 +++- 18 files changed, 3039 insertions(+), 86 deletions(-) create mode 100644 src/utils/statsHelper.js create mode 100644 src/utils/streamHelper.js create mode 100644 tests/concurrencyQueue.integration.test.js create mode 100644 tests/concurrencyQueue.test.js diff --git a/.env.example b/.env.example index 704d0a8a..5107193b 100644 --- a/.env.example +++ b/.env.example @@ -75,6 +75,8 @@ TOKEN_USAGE_RETENTION=2592000000 HEALTH_CHECK_INTERVAL=60000 TIMEZONE_OFFSET=8 # UTC偏移小时数,默认+8(中国时区) METRICS_WINDOW=5 # 实时指标统计窗口(分钟),可选1-60,默认5分钟 +# 启动时清理残留的并发排队计数器(默认true,多实例部署时建议设为false) +CLEAR_CONCURRENCY_QUEUES_ON_STARTUP=true # 🎨 Web 界面配置 WEB_TITLE=Claude Relay Service diff --git a/CLAUDE.md b/CLAUDE.md index f1f47ec1..892b4758 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -22,6 +22,7 @@ Claude Relay Service 是一个多平台 AI API 中转服务,支持 **Claude ( - **权限控制**: API Key支持权限配置(all/claude/gemini/openai等),控制可访问的服务类型 - **客户端限制**: 基于User-Agent的客户端识别和限制,支持ClaudeCode、Gemini-CLI等预定义客户端 - **模型黑名单**: 支持API Key级别的模型访问限制 +- **并发请求排队**: 当API Key并发数超限时,请求进入队列等待而非立即返回429,支持配置最大排队数、超时时间,适用于Claude Code Agent并行工具调用场景 ### 主要服务组件 @@ -196,6 +197,7 @@ npm run service:stop # 停止服务 - `DEBUG_HTTP_TRAFFIC`: 启用HTTP请求/响应调试日志(默认false,仅开发环境) - `PROXY_USE_IPV4`: 代理使用IPv4(默认true) - `REQUEST_TIMEOUT`: 请求超时时间(毫秒,默认600000即10分钟) +- `CLEAR_CONCURRENCY_QUEUES_ON_STARTUP`: 启动时清理残留的并发排队计数器(默认true,多实例部署时建议设为false) #### AWS Bedrock配置(可选) - `CLAUDE_CODE_USE_BEDROCK`: 启用Bedrock(设置为1启用) @@ -343,6 +345,34 @@ npm run setup # 自动生成密钥并创建管理员账户 12. **成本统计不准确**: 运行 `npm run init:costs` 初始化成本数据,检查pricingService是否正确加载模型价格 13. **缓存命中率低**: 查看缓存监控统计,调整LRU缓存大小配置 14. **用户消息队列超时**: 优化后锁持有时间已从分钟级降到毫秒级(请求发送后立即释放),默认 `USER_MESSAGE_QUEUE_TIMEOUT_MS=5000` 已足够。如仍有超时,检查网络延迟或禁用此功能(`USER_MESSAGE_QUEUE_ENABLED=false`) +15. **并发请求排队问题**: + - 排队超时:检查 `concurrentRequestQueueTimeoutMs` 配置是否合理(默认10秒) + - 排队数过多:调整 `concurrentRequestQueueMaxSize` 和 `concurrentRequestQueueMaxSizeMultiplier` + - 查看排队统计:访问 `/admin/concurrency-queue/stats` 接口查看 entered/success/timeout/cancelled/socket_changed/rejected_overload 统计 + - 排队计数泄漏:系统重启时自动清理,或访问 `/admin/concurrency-queue` DELETE 接口手动清理 + - Socket 身份验证失败:查看 `socket_changed` 统计,如果频繁发生,检查代理配置或客户端连接稳定性 + - 健康检查拒绝:查看 `rejected_overload` 统计,表示队列过载时的快速失败次数 + +### 代理配置要求(并发请求排队) + +使用并发请求排队功能时,需要正确配置代理(如 Nginx)的超时参数: + +- **推荐配置**: `proxy_read_timeout >= max(2 × concurrentRequestQueueTimeoutMs, 60s)` + - 当前默认排队超时 10 秒,Nginx 默认 `proxy_read_timeout = 60s` 已满足要求 + - 如果调整排队超时到 60 秒,推荐代理超时 ≥ 120 秒 +- **Nginx 配置示例**: + ```nginx + location /api/ { + proxy_read_timeout 120s; # 排队超时 60s 时推荐 120s + proxy_connect_timeout 10s; + # ...其他配置 + } + ``` +- **企业防火墙环境**: + - 某些企业防火墙可能静默关闭长时间无数据的连接(20-40 秒) + - 如遇此问题,联系网络管理员调整空闲连接超时策略 + - 或降低 `concurrentRequestQueueTimeoutMs` 配置 +- **后续升级说明**: 如有需要,后续版本可能提供可选的轻量级心跳机制 ### 调试工具 @@ -455,6 +485,15 @@ npm run setup # 自动生成密钥并创建管理员账户 - **缓存优化**: 多层LRU缓存(解密缓存、账户缓存),全局缓存监控和统计 - **成本追踪**: 实时token使用统计(input/output/cache_create/cache_read)和成本计算(基于pricingService) - **并发控制**: Redis Sorted Set实现的并发计数,支持自动过期清理 +- **并发请求排队**: 当API Key并发超限时,请求进入队列等待而非直接返回429 + - **工作原理**: 采用「先占后检查」模式,每次轮询尝试占位,超限则释放继续等待 + - **指数退避**: 初始200ms,指数增长至最大2秒,带±20%抖动防惊群效应 + - **智能清理**: 排队计数有TTL保护(超时+30秒),进程崩溃也能自动清理 + - **Socket身份验证**: 使用UUID token + socket对象引用双重验证,避免HTTP Keep-Alive连接复用导致的身份混淆 + - **健康检查**: P90等待时间超过阈值时快速失败(返回429),避免新请求在过载时继续排队 + - **配置参数**: `concurrentRequestQueueEnabled`(默认false)、`concurrentRequestQueueMaxSize`(默认3)、`concurrentRequestQueueMaxSizeMultiplier`(默认0)、`concurrentRequestQueueTimeoutMs`(默认10秒)、`concurrentRequestQueueMaxRedisFailCount`(默认5)、`concurrentRequestQueueHealthCheckEnabled`(默认true)、`concurrentRequestQueueHealthThreshold`(默认0.8) + - **最大排队数**: max(固定值, 并发限制×倍数),例如并发限制=10、倍数=2时最大排队数=20 + - **适用场景**: Claude Code Agent并行工具调用、批量请求处理 - **客户端识别**: 基于User-Agent的客户端限制,支持预定义客户端(ClaudeCode、Gemini-CLI等) - **错误处理**: 529错误自动标记账户过载状态,配置时长内自动排除该账户 @@ -514,6 +553,11 @@ npm run setup # 自动生成密钥并创建管理员账户 - `overload:{accountId}` - 账户过载状态(529错误) - **并发控制**: - `concurrency:{accountId}` - Redis Sorted Set实现的并发计数 +- **并发请求排队**: + - `concurrency:queue:{apiKeyId}` - API Key级别的排队计数器(TTL由 `concurrentRequestQueueTimeoutMs` + 30秒缓冲决定) + - `concurrency:queue:stats:{apiKeyId}` - 排队统计(entered/success/timeout/cancelled) + - `concurrency:queue:wait_times:{apiKeyId}` - 按API Key的等待时间记录(用于P50/P90/P99计算) + - `concurrency:queue:wait_times:global` - 全局等待时间记录 - **Webhook配置**: - `webhook_config:{id}` - Webhook配置 - **用户消息队列**: diff --git a/src/app.js b/src/app.js index e0a675f5..7af1e7e9 100644 --- a/src/app.js +++ b/src/app.js @@ -584,6 +584,20 @@ class Application { // 使用 Lua 脚本批量清理所有过期项 for (const key of keys) { + // 跳过非 Sorted Set 类型的键(这些键有各自的清理逻辑) + // - concurrency:queue:stats:* 是 Hash 类型 + // - concurrency:queue:wait_times:* 是 List 类型 + // - concurrency:queue:* (不含stats/wait_times) 是 String 类型 + if ( + key.startsWith('concurrency:queue:stats:') || + key.startsWith('concurrency:queue:wait_times:') || + (key.startsWith('concurrency:queue:') && + !key.includes(':stats:') && + !key.includes(':wait_times:')) + ) { + continue + } + try { const cleaned = await redis.client.eval( ` @@ -633,6 +647,20 @@ class Application { // 然后启动定时清理任务 userMessageQueueService.startCleanupTask() }) + + // 🚦 清理服务重启后残留的并发排队计数器 + // 多实例部署时建议关闭此开关,避免新实例启动时清空其他实例的队列计数 + // 可通过 DELETE /admin/concurrency/queue 接口手动清理 + const clearQueuesOnStartup = process.env.CLEAR_CONCURRENCY_QUEUES_ON_STARTUP !== 'false' + if (clearQueuesOnStartup) { + redis.clearAllConcurrencyQueues().catch((error) => { + logger.error('❌ Error clearing concurrency queues on startup:', error) + }) + } else { + logger.info( + '🚦 Skipping concurrency queue cleanup on startup (CLEAR_CONCURRENCY_QUEUES_ON_STARTUP=false)' + ) + } } setupGracefulShutdown() { diff --git a/src/middleware/auth.js b/src/middleware/auth.js index a5568323..43ca6ec2 100644 --- a/src/middleware/auth.js +++ b/src/middleware/auth.js @@ -8,6 +8,102 @@ const redis = require('../models/redis') const ClientValidator = require('../validators/clientValidator') const ClaudeCodeValidator = require('../validators/clients/claudeCodeValidator') const claudeRelayConfigService = require('../services/claudeRelayConfigService') +const { calculateWaitTimeStats } = require('../utils/statsHelper') + +// 工具函数 +function sleep(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)) +} + +/** + * 检查排队是否过载,决定是否应该快速失败 + * 详见 design.md Decision 7: 排队健康检查与快速失败 + * + * @param {string} apiKeyId - API Key ID + * @param {number} timeoutMs - 排队超时时间(毫秒) + * @param {Object} queueConfig - 队列配置 + * @param {number} maxQueueSize - 最大排队数 + * @returns {Promise} { reject: boolean, reason?: string, estimatedWaitMs?: number, timeoutMs?: number } + */ +async function shouldRejectDueToOverload(apiKeyId, timeoutMs, queueConfig, maxQueueSize) { + try { + // 如果健康检查被禁用,直接返回不拒绝 + if (!queueConfig.concurrentRequestQueueHealthCheckEnabled) { + return { reject: false, reason: 'health_check_disabled' } + } + + // 🔑 先检查当前队列长度 + const currentQueueCount = await redis.getConcurrencyQueueCount(apiKeyId).catch(() => 0) + + // 队列为空,说明系统已恢复,跳过健康检查 + if (currentQueueCount === 0) { + return { reject: false, reason: 'queue_empty', currentQueueCount: 0 } + } + + // 🔑 关键改进:只有当队列接近满载时才进行健康检查 + // 队列长度 <= maxQueueSize * 0.5 时,认为系统有足够余量,跳过健康检查 + // 这避免了在队列较短时过于保守地拒绝请求 + // 使用 ceil 确保小队列(如 maxQueueSize=3)时阈值为 2,即队列 <=1 时跳过 + const queueLoadThreshold = Math.ceil(maxQueueSize * 0.5) + if (currentQueueCount <= queueLoadThreshold) { + return { + reject: false, + reason: 'queue_not_loaded', + currentQueueCount, + queueLoadThreshold, + maxQueueSize + } + } + + // 获取该 API Key 的等待时间样本 + const waitTimes = await redis.getQueueWaitTimes(apiKeyId) + const stats = calculateWaitTimeStats(waitTimes) + + // 样本不足(< 10),跳过健康检查,避免冷启动误判 + if (!stats || stats.sampleCount < 10) { + return { reject: false, reason: 'insufficient_samples', sampleCount: stats?.sampleCount || 0 } + } + + // P90 不可靠时也跳过(虽然 sampleCount >= 10 时 p90Unreliable 应该是 false) + if (stats.p90Unreliable) { + return { reject: false, reason: 'p90_unreliable', sampleCount: stats.sampleCount } + } + + // 计算健康阈值:P90 >= 超时时间 × 阈值 时拒绝 + const threshold = queueConfig.concurrentRequestQueueHealthThreshold || 0.8 + const maxAllowedP90 = timeoutMs * threshold + + if (stats.p90 >= maxAllowedP90) { + return { + reject: true, + reason: 'queue_overloaded', + estimatedWaitMs: stats.p90, + timeoutMs, + threshold, + sampleCount: stats.sampleCount, + currentQueueCount, + maxQueueSize + } + } + + return { reject: false, p90: stats.p90, sampleCount: stats.sampleCount, currentQueueCount } + } catch (error) { + // 健康检查出错时不阻塞请求,记录警告并继续 + logger.warn(`Health check failed for ${apiKeyId}:`, error.message) + return { reject: false, reason: 'health_check_error', error: error.message } + } +} + +// 排队轮询配置常量(可通过配置文件覆盖) +// 性能权衡:初始间隔越短响应越快,但 Redis QPS 越高 +// 当前配置:100 个等待者时约 250-300 QPS(指数退避后) +const QUEUE_POLLING_CONFIG = { + pollIntervalMs: 200, // 初始轮询间隔(毫秒)- 平衡响应速度和 Redis 压力 + maxPollIntervalMs: 2000, // 最大轮询间隔(毫秒)- 长时间等待时降低 Redis 压力 + backoffFactor: 1.5, // 指数退避系数 + jitterRatio: 0.2, // 抖动比例(±20%)- 防止惊群效应 + maxRedisFailCount: 5 // 连续 Redis 失败阈值(从 3 提高到 5,提高网络抖动容忍度) +} const FALLBACK_CONCURRENCY_CONFIG = { leaseSeconds: 300, @@ -128,9 +224,223 @@ function isTokenCountRequest(req) { return false } +/** + * 等待并发槽位(排队机制核心) + * + * 采用「先占后检查」模式避免竞态条件: + * - 每次轮询时尝试 incrConcurrency 占位 + * - 如果超限则 decrConcurrency 释放并继续等待 + * - 成功获取槽位后返回,调用方无需再次 incrConcurrency + * + * ⚠️ 重要清理责任说明: + * - 排队计数:此函数的 finally 块负责调用 decrConcurrencyQueue 清理 + * - 并发槽位:当返回 acquired=true 时,槽位已被占用(通过 incrConcurrency) + * 调用方必须在请求结束时调用 decrConcurrency 释放槽位 + * (已在 authenticateApiKey 的 finally 块中处理) + * + * @param {Object} req - Express 请求对象 + * @param {Object} res - Express 响应对象 + * @param {string} apiKeyId - API Key ID + * @param {Object} queueOptions - 配置参数 + * @returns {Promise} { acquired: boolean, reason?: string, waitTimeMs: number } + */ +async function waitForConcurrencySlot(req, res, apiKeyId, queueOptions) { + const { + concurrencyLimit, + requestId, + leaseSeconds, + timeoutMs, + pollIntervalMs, + maxPollIntervalMs, + backoffFactor, + jitterRatio, + maxRedisFailCount: configMaxRedisFailCount + } = queueOptions + + let clientDisconnected = false + // 追踪轮询过程中是否临时占用了槽位(用于异常时清理) + // 工作流程: + // 1. incrConcurrency 成功且 count <= limit 时,设置 internalSlotAcquired = true + // 2. 统计记录完成后,设置 internalSlotAcquired = false 并返回(所有权转移给调用方) + // 3. 如果在步骤 1-2 之间发生异常,finally 块会检测到 internalSlotAcquired = true 并释放槽位 + let internalSlotAcquired = false + + // 监听客户端断开事件 + // ⚠️ 重要:必须监听 socket 的事件,而不是 req 的事件! + // 原因:对于 POST 请求,当 body-parser 读取完请求体后,req(IncomingMessage 可读流) + // 的 'close' 事件会立即触发,但这不代表客户端断开连接!客户端仍在等待响应。 + // socket 的 'close' 事件才是真正的连接关闭信号。 + const { socket } = req + const onSocketClose = () => { + clientDisconnected = true + logger.debug( + `🔌 [Queue] Socket closed during queue wait for API key ${apiKeyId}, requestId: ${requestId}` + ) + } + + if (socket) { + socket.once('close', onSocketClose) + } + + // 检查 socket 是否在监听器注册前已被销毁(边界情况) + if (socket?.destroyed) { + clientDisconnected = true + } + + const startTime = Date.now() + let pollInterval = pollIntervalMs + let redisFailCount = 0 + // 优先使用配置中的值,否则使用默认值 + const maxRedisFailCount = configMaxRedisFailCount || QUEUE_POLLING_CONFIG.maxRedisFailCount + + try { + while (Date.now() - startTime < timeoutMs) { + // 检测客户端是否断开(双重检查:事件标记 + socket 状态) + // socket.destroyed 是同步检查,确保即使事件处理有延迟也能及时检测 + if (clientDisconnected || socket?.destroyed) { + redis + .incrConcurrencyQueueStats(apiKeyId, 'cancelled') + .catch((e) => logger.warn('Failed to record cancelled stat:', e)) + return { + acquired: false, + reason: 'client_disconnected', + waitTimeMs: Date.now() - startTime + } + } + + // 尝试获取槽位(先占后检查) + try { + const count = await redis.incrConcurrency(apiKeyId, requestId, leaseSeconds) + redisFailCount = 0 // 重置失败计数 + + if (count <= concurrencyLimit) { + // 成功获取槽位! + const waitTimeMs = Date.now() - startTime + + // 槽位所有权转移说明: + // 1. 此时槽位已通过 incrConcurrency 获取 + // 2. 先标记 internalSlotAcquired = true,确保异常时 finally 块能清理 + // 3. 统计操作完成后,清除标记并返回,所有权转移给调用方 + // 4. 调用方(authenticateApiKey)负责在请求结束时释放槽位 + + // 标记槽位已获取(用于异常时 finally 块清理) + internalSlotAcquired = true + + // 记录统计(非阻塞,fire-and-forget 模式) + // ⚠️ 设计说明: + // - 故意不 await 这些 Promise,因为统计记录不应阻塞请求处理 + // - 每个 Promise 都有独立的 .catch(),确保单个失败不影响其他 + // - 外层 .catch() 是防御性措施,处理 Promise.all 本身的异常 + // - 即使统计记录在函数返回后才完成/失败,也是安全的(仅日志记录) + // - 统计数据丢失可接受,不影响核心业务逻辑 + Promise.all([ + redis + .recordQueueWaitTime(apiKeyId, waitTimeMs) + .catch((e) => logger.warn('Failed to record queue wait time:', e)), + redis + .recordGlobalQueueWaitTime(waitTimeMs) + .catch((e) => logger.warn('Failed to record global wait time:', e)), + redis + .incrConcurrencyQueueStats(apiKeyId, 'success') + .catch((e) => logger.warn('Failed to increment success stats:', e)) + ]).catch((e) => logger.warn('Failed to record queue stats batch:', e)) + + // 成功返回前清除标记(所有权转移给调用方,由其负责释放) + internalSlotAcquired = false + return { acquired: true, waitTimeMs } + } + + // 超限,释放槽位继续等待 + try { + await redis.decrConcurrency(apiKeyId, requestId) + } catch (decrError) { + // 释放失败时记录警告但继续轮询 + // 下次 incrConcurrency 会自然覆盖同一 requestId 的条目 + logger.warn( + `Failed to release slot during polling for ${apiKeyId}, will retry:`, + decrError + ) + } + } catch (redisError) { + redisFailCount++ + logger.error( + `Redis error in queue polling (${redisFailCount}/${maxRedisFailCount}):`, + redisError + ) + + if (redisFailCount >= maxRedisFailCount) { + // 连续 Redis 失败,放弃排队 + return { + acquired: false, + reason: 'redis_error', + waitTimeMs: Date.now() - startTime + } + } + } + + // 指数退避等待 + await sleep(pollInterval) + + // 计算下一次轮询间隔(指数退避 + 抖动) + // 1. 先应用指数退避 + let nextInterval = pollInterval * backoffFactor + // 2. 添加抖动防止惊群效应(±jitterRatio 范围内的随机偏移) + // 抖动范围:[-jitterRatio, +jitterRatio],例如 jitterRatio=0.2 时为 ±20% + // 这是预期行为:负抖动可使间隔略微缩短,正抖动可使间隔略微延长 + // 目的是分散多个等待者的轮询时间点,避免同时请求 Redis + const jitter = nextInterval * jitterRatio * (Math.random() * 2 - 1) + nextInterval = nextInterval + jitter + // 3. 确保在合理范围内:最小 1ms,最大 maxPollIntervalMs + // Math.max(1, ...) 保证即使负抖动也不会产生 ≤0 的间隔 + pollInterval = Math.max(1, Math.min(nextInterval, maxPollIntervalMs)) + } + + // 超时 + redis + .incrConcurrencyQueueStats(apiKeyId, 'timeout') + .catch((e) => logger.warn('Failed to record timeout stat:', e)) + return { acquired: false, reason: 'timeout', waitTimeMs: Date.now() - startTime } + } finally { + // 确保清理: + // 1. 减少排队计数(排队计数在调用方已增加,这里负责减少) + try { + await redis.decrConcurrencyQueue(apiKeyId) + } catch (cleanupError) { + // 清理失败记录错误(可能导致计数泄漏,但有 TTL 保护) + logger.error( + `Failed to decrement queue count in finally block for ${apiKeyId}:`, + cleanupError + ) + } + + // 2. 如果内部获取了槽位但未正常返回(异常路径),释放槽位 + if (internalSlotAcquired) { + try { + await redis.decrConcurrency(apiKeyId, requestId) + logger.warn( + `⚠️ Released orphaned concurrency slot in finally block for ${apiKeyId}, requestId: ${requestId}` + ) + } catch (slotCleanupError) { + logger.error( + `Failed to release orphaned concurrency slot for ${apiKeyId}:`, + slotCleanupError + ) + } + } + + // 清理 socket 事件监听器 + if (socket) { + socket.removeListener('close', onSocketClose) + } + } +} + // 🔑 API Key验证中间件(优化版) const authenticateApiKey = async (req, res, next) => { const startTime = Date.now() + let authErrored = false + let concurrencyCleanup = null + let hasConcurrencySlot = false try { // 安全提取API Key,支持多种格式(包括Gemini CLI支持) @@ -265,39 +575,346 @@ const authenticateApiKey = async (req, res, next) => { } const requestId = uuidv4() + // ⚠️ 优化后的 Connection: close 设置策略 + // 问题背景:HTTP Keep-Alive 使多个请求共用同一个 TCP 连接 + // 当第一个请求正在处理,第二个请求进入排队时,它们共用同一个 socket + // 如果客户端超时关闭连接,两个请求都会受影响 + // 优化方案:只有在请求实际进入排队时才设置 Connection: close + // 未排队的请求保持 Keep-Alive,避免不必要的 TCP 握手开销 + // 详见 design.md Decision 2: Connection: close 设置时机 + // 注意:Connection: close 将在下方代码实际进入排队时设置(第 637 行左右) + + // ============================================================ + // 🔒 并发槽位状态管理说明 + // ============================================================ + // 此函数中有两个关键状态变量: + // - hasConcurrencySlot: 当前是否持有并发槽位 + // - concurrencyCleanup: 错误时调用的清理函数 + // + // 状态转换流程: + // 1. incrConcurrency 成功 → hasConcurrencySlot=true, 设置临时清理函数 + // 2. 若超限 → 释放槽位,hasConcurrencySlot=false, concurrencyCleanup=null + // 3. 若排队成功 → hasConcurrencySlot=true, 升级为完整清理函数(含 interval 清理) + // 4. 请求结束(res.close/req.close)→ 调用 decrementConcurrency 释放 + // 5. 认证错误 → finally 块调用 concurrencyCleanup 释放 + // + // 为什么需要两种清理函数? + // - 临时清理:在排队/认证过程中出错时使用,只释放槽位 + // - 完整清理:请求正常开始后使用,还需清理 leaseRenewInterval + // ============================================================ + const setTemporaryConcurrencyCleanup = () => { + concurrencyCleanup = async () => { + if (!hasConcurrencySlot) { + return + } + hasConcurrencySlot = false + try { + await redis.decrConcurrency(validation.keyData.id, requestId) + } catch (cleanupError) { + logger.error( + `Failed to decrement concurrency after auth error for key ${validation.keyData.id}:`, + cleanupError + ) + } + } + } + const currentConcurrency = await redis.incrConcurrency( validation.keyData.id, requestId, leaseSeconds ) + hasConcurrencySlot = true + setTemporaryConcurrencyCleanup() logger.api( `📈 Incremented concurrency for key: ${validation.keyData.id} (${validation.keyData.name}), current: ${currentConcurrency}, limit: ${concurrencyLimit}` ) if (currentConcurrency > concurrencyLimit) { - // 如果超过限制,立即减少计数(添加 try-catch 防止异常导致并发泄漏) + // 1. 先释放刚占用的槽位 try { - const newCount = await redis.decrConcurrency(validation.keyData.id, requestId) - logger.api( - `📉 Decremented concurrency (429 rejected) for key: ${validation.keyData.id} (${validation.keyData.name}), new count: ${newCount}` - ) + await redis.decrConcurrency(validation.keyData.id, requestId) } catch (error) { logger.error( `Failed to decrement concurrency after limit exceeded for key ${validation.keyData.id}:`, error ) } - logger.security( - `🚦 Concurrency limit exceeded for key: ${validation.keyData.id} (${ - validation.keyData.name - }), current: ${currentConcurrency - 1}, limit: ${concurrencyLimit}` + hasConcurrencySlot = false + concurrencyCleanup = null + + // 2. 获取排队配置 + const queueConfig = await claudeRelayConfigService.getConfig() + + // 3. 排队功能未启用,直接返回 429(保持现有行为) + if (!queueConfig.concurrentRequestQueueEnabled) { + logger.security( + `🚦 Concurrency limit exceeded for key: ${validation.keyData.id} (${ + validation.keyData.name + }), current: ${currentConcurrency - 1}, limit: ${concurrencyLimit}` + ) + // 建议客户端在短暂延迟后重试(并发场景下通常很快会有槽位释放) + res.set('Retry-After', '1') + return res.status(429).json({ + error: 'Concurrency limit exceeded', + message: `Too many concurrent requests. Limit: ${concurrencyLimit} concurrent requests`, + currentConcurrency: currentConcurrency - 1, + concurrencyLimit + }) + } + + // 4. 计算最大排队数 + const maxQueueSize = Math.max( + concurrencyLimit * queueConfig.concurrentRequestQueueMaxSizeMultiplier, + queueConfig.concurrentRequestQueueMaxSize ) - return res.status(429).json({ - error: 'Concurrency limit exceeded', - message: `Too many concurrent requests. Limit: ${concurrencyLimit} concurrent requests`, - currentConcurrency: currentConcurrency - 1, - concurrencyLimit - }) + + // 4.5 排队健康检查:过载时快速失败 + // 详见 design.md Decision 7: 排队健康检查与快速失败 + const overloadCheck = await shouldRejectDueToOverload( + validation.keyData.id, + queueConfig.concurrentRequestQueueTimeoutMs, + queueConfig, + maxQueueSize + ) + if (overloadCheck.reject) { + // 使用健康检查返回的当前排队数,避免重复调用 Redis + const currentQueueCount = overloadCheck.currentQueueCount || 0 + logger.api( + `🚨 Queue overloaded for key: ${validation.keyData.id} (${validation.keyData.name}), ` + + `P90=${overloadCheck.estimatedWaitMs}ms, timeout=${overloadCheck.timeoutMs}ms, ` + + `threshold=${overloadCheck.threshold}, samples=${overloadCheck.sampleCount}, ` + + `concurrency=${concurrencyLimit}, queue=${currentQueueCount}/${maxQueueSize}` + ) + // 记录被拒绝的过载统计 + redis + .incrConcurrencyQueueStats(validation.keyData.id, 'rejected_overload') + .catch((e) => logger.warn('Failed to record rejected_overload stat:', e)) + // 返回 429 + Retry-After,让客户端稍后重试 + const retryAfterSeconds = 30 + res.set('Retry-After', String(retryAfterSeconds)) + return res.status(429).json({ + error: 'Queue overloaded', + message: `Queue is overloaded. Estimated wait time (${overloadCheck.estimatedWaitMs}ms) exceeds threshold. Limit: ${concurrencyLimit} concurrent requests, queue: ${currentQueueCount}/${maxQueueSize}. Please retry later.`, + currentConcurrency: concurrencyLimit, + concurrencyLimit, + queueCount: currentQueueCount, + maxQueueSize, + estimatedWaitMs: overloadCheck.estimatedWaitMs, + timeoutMs: overloadCheck.timeoutMs, + queueTimeoutMs: queueConfig.concurrentRequestQueueTimeoutMs, + retryAfterSeconds + }) + } + + // 5. 尝试进入排队(原子操作:先增加再检查,避免竞态条件) + let queueIncremented = false + try { + const newQueueCount = await redis.incrConcurrencyQueue( + validation.keyData.id, + queueConfig.concurrentRequestQueueTimeoutMs + ) + queueIncremented = true + + if (newQueueCount > maxQueueSize) { + // 超过最大排队数,立即释放并返回 429 + await redis.decrConcurrencyQueue(validation.keyData.id) + queueIncremented = false + logger.api( + `🚦 Concurrency queue full for key: ${validation.keyData.id} (${validation.keyData.name}), ` + + `queue: ${newQueueCount - 1}, maxQueue: ${maxQueueSize}` + ) + // 队列已满,建议客户端在排队超时时间后重试 + const retryAfterSeconds = Math.ceil(queueConfig.concurrentRequestQueueTimeoutMs / 1000) + res.set('Retry-After', String(retryAfterSeconds)) + return res.status(429).json({ + error: 'Concurrency queue full', + message: `Too many requests waiting in queue. Limit: ${concurrencyLimit} concurrent requests, queue: ${newQueueCount - 1}/${maxQueueSize}, timeout: ${retryAfterSeconds}s`, + currentConcurrency: concurrencyLimit, + concurrencyLimit, + queueCount: newQueueCount - 1, + maxQueueSize, + queueTimeoutMs: queueConfig.concurrentRequestQueueTimeoutMs, + retryAfterSeconds + }) + } + + // 6. 已成功进入排队,记录统计并开始等待槽位 + logger.api( + `⏳ Request entering queue for key: ${validation.keyData.id} (${validation.keyData.name}), ` + + `queue position: ${newQueueCount}` + ) + redis + .incrConcurrencyQueueStats(validation.keyData.id, 'entered') + .catch((e) => logger.warn('Failed to record entered stat:', e)) + + // ⚠️ 仅在请求实际进入排队时设置 Connection: close + // 详见 design.md Decision 2: Connection: close 设置时机 + // 未排队的请求保持 Keep-Alive,避免不必要的 TCP 握手开销 + if (!res.headersSent) { + res.setHeader('Connection', 'close') + logger.api( + `🔌 [Queue] Set Connection: close for queued request, key: ${validation.keyData.id}` + ) + } + + // ⚠️ 记录排队开始时的 socket 标识,用于排队完成后验证 + // 问题背景:HTTP Keep-Alive 连接复用时,长时间排队可能导致 socket 被其他请求使用 + // 验证方法:使用 UUID token + socket 对象引用双重验证 + // 详见 design.md Decision 1: Socket 身份验证机制 + req._crService = req._crService || {} + req._crService.queueToken = uuidv4() + req._crService.originalSocket = req.socket + req._crService.startTime = Date.now() + const savedToken = req._crService.queueToken + const savedSocket = req._crService.originalSocket + + // ⚠️ 重要:在调用前将 queueIncremented 设为 false + // 因为 waitForConcurrencySlot 的 finally 块会负责清理排队计数 + // 如果在调用后设置,当 waitForConcurrencySlot 抛出异常时 + // 外层 catch 块会重复减少计数(finally 已经减过一次) + queueIncremented = false + + const slot = await waitForConcurrencySlot(req, res, validation.keyData.id, { + concurrencyLimit, + requestId, + leaseSeconds, + timeoutMs: queueConfig.concurrentRequestQueueTimeoutMs, + pollIntervalMs: QUEUE_POLLING_CONFIG.pollIntervalMs, + maxPollIntervalMs: QUEUE_POLLING_CONFIG.maxPollIntervalMs, + backoffFactor: QUEUE_POLLING_CONFIG.backoffFactor, + jitterRatio: QUEUE_POLLING_CONFIG.jitterRatio, + maxRedisFailCount: queueConfig.concurrentRequestQueueMaxRedisFailCount + }) + + // 7. 处理排队结果 + if (!slot.acquired) { + if (slot.reason === 'client_disconnected') { + // 客户端已断开,不返回响应(连接已关闭) + logger.api( + `🔌 Client disconnected while queuing for key: ${validation.keyData.id} (${validation.keyData.name})` + ) + return + } + + if (slot.reason === 'redis_error') { + // Redis 连续失败,返回 503 + logger.error( + `❌ Redis error during queue wait for key: ${validation.keyData.id} (${validation.keyData.name})` + ) + return res.status(503).json({ + error: 'Service temporarily unavailable', + message: 'Failed to acquire concurrency slot due to internal error' + }) + } + // 排队超时(使用 api 级别,与其他排队日志保持一致) + logger.api( + `⏰ Queue timeout for key: ${validation.keyData.id} (${validation.keyData.name}), waited: ${slot.waitTimeMs}ms` + ) + // 已等待超时,建议客户端稍后重试 + // ⚠️ Retry-After 策略优化: + // - 请求已经等了完整的 timeout 时间,说明系统负载较高 + // - 过早重试(如固定 5 秒)会加剧拥塞,导致更多超时 + // - 合理策略:使用 timeout 时间的一半作为重试间隔 + // - 最小值 5 秒,最大值 30 秒,避免极端情况 + const timeoutSeconds = Math.ceil(queueConfig.concurrentRequestQueueTimeoutMs / 1000) + const retryAfterSeconds = Math.max(5, Math.min(30, Math.ceil(timeoutSeconds / 2))) + res.set('Retry-After', String(retryAfterSeconds)) + return res.status(429).json({ + error: 'Queue timeout', + message: `Request timed out waiting for concurrency slot. Limit: ${concurrencyLimit} concurrent requests, maxQueue: ${maxQueueSize}, Queue timeout: ${timeoutSeconds}s, waited: ${slot.waitTimeMs}ms`, + currentConcurrency: concurrencyLimit, + concurrencyLimit, + maxQueueSize, + queueTimeoutMs: queueConfig.concurrentRequestQueueTimeoutMs, + waitTimeMs: slot.waitTimeMs, + retryAfterSeconds + }) + } + + // 8. 排队成功,slot.acquired 表示已在 waitForConcurrencySlot 中获取到槽位 + logger.api( + `✅ Queue wait completed for key: ${validation.keyData.id} (${validation.keyData.name}), ` + + `waited: ${slot.waitTimeMs}ms` + ) + hasConcurrencySlot = true + setTemporaryConcurrencyCleanup() + + // 9. ⚠️ 关键检查:排队等待结束后,验证客户端是否还在等待响应 + // 长时间排队后,客户端可能在应用层已放弃(如 Claude Code 的超时机制), + // 但 TCP 连接仍然存活。此时继续处理请求是浪费资源。 + // 注意:如果发送了心跳,headersSent 会是 true,但这是正常的 + const postQueueSocket = req.socket + // 只检查连接是否真正断开(destroyed/writableEnded/socketDestroyed) + // headersSent 在心跳场景下是正常的,不应该作为放弃的依据 + if (res.destroyed || res.writableEnded || postQueueSocket?.destroyed) { + logger.warn( + `⚠️ Client no longer waiting after queue for key: ${validation.keyData.id} (${validation.keyData.name}), ` + + `waited: ${slot.waitTimeMs}ms | destroyed: ${res.destroyed}, ` + + `writableEnded: ${res.writableEnded}, socketDestroyed: ${postQueueSocket?.destroyed}` + ) + // 释放刚获取的槽位 + hasConcurrencySlot = false + await redis + .decrConcurrency(validation.keyData.id, requestId) + .catch((e) => logger.error('Failed to release slot after client abandoned:', e)) + // 不返回响应(客户端已不在等待) + return + } + + // 10. ⚠️ 关键检查:验证 socket 身份是否改变 + // HTTP Keep-Alive 连接复用可能导致排队期间 socket 被其他请求使用 + // 验证方法:UUID token + socket 对象引用双重验证 + // 详见 design.md Decision 1: Socket 身份验证机制 + const queueData = req._crService + const socketIdentityChanged = + !queueData || + queueData.queueToken !== savedToken || + queueData.originalSocket !== savedSocket + + if (socketIdentityChanged) { + logger.error( + `❌ [Queue] Socket identity changed during queue wait! ` + + `key: ${validation.keyData.id} (${validation.keyData.name}), ` + + `waited: ${slot.waitTimeMs}ms | ` + + `tokenMatch: ${queueData?.queueToken === savedToken}, ` + + `socketMatch: ${queueData?.originalSocket === savedSocket}` + ) + // 释放刚获取的槽位 + hasConcurrencySlot = false + await redis + .decrConcurrency(validation.keyData.id, requestId) + .catch((e) => logger.error('Failed to release slot after socket identity change:', e)) + // 记录 socket_changed 统计 + redis + .incrConcurrencyQueueStats(validation.keyData.id, 'socket_changed') + .catch((e) => logger.warn('Failed to record socket_changed stat:', e)) + // 不返回响应(socket 已被其他请求使用) + return + } + } catch (queueError) { + // 异常时清理资源,防止泄漏 + // 1. 清理排队计数(如果还没被 waitForConcurrencySlot 的 finally 清理) + if (queueIncremented) { + await redis + .decrConcurrencyQueue(validation.keyData.id) + .catch((e) => logger.error('Failed to cleanup queue count after error:', e)) + } + + // 2. 防御性清理:如果 waitForConcurrencySlot 内部获取了槽位但在返回前异常 + // 虽然这种情况极少发生(统计记录的异常会被内部捕获),但为了安全起见 + // 尝试释放可能已获取的槽位。decrConcurrency 使用 ZREM,即使成员不存在也安全 + if (hasConcurrencySlot) { + hasConcurrencySlot = false + await redis + .decrConcurrency(validation.keyData.id, requestId) + .catch((e) => + logger.error('Failed to cleanup concurrency slot after queue error:', e) + ) + } + + throw queueError + } } const renewIntervalMs = @@ -358,6 +975,7 @@ const authenticateApiKey = async (req, res, next) => { const decrementConcurrency = async () => { if (!concurrencyDecremented) { concurrencyDecremented = true + hasConcurrencySlot = false if (leaseRenewInterval) { clearInterval(leaseRenewInterval) leaseRenewInterval = null @@ -372,6 +990,11 @@ const authenticateApiKey = async (req, res, next) => { } } } + // 升级为完整清理函数(包含 leaseRenewInterval 清理逻辑) + // 此时请求已通过认证,后续由 res.close/req.close 事件触发清理 + if (hasConcurrencySlot) { + concurrencyCleanup = decrementConcurrency + } // 监听最可靠的事件(避免重复监听) // res.on('close') 是最可靠的,会在连接关闭时触发 @@ -697,6 +1320,7 @@ const authenticateApiKey = async (req, res, next) => { return next() } catch (error) { + authErrored = true const authDuration = Date.now() - startTime logger.error(`❌ Authentication middleware error (${authDuration}ms):`, { error: error.message, @@ -710,6 +1334,14 @@ const authenticateApiKey = async (req, res, next) => { error: 'Authentication error', message: 'Internal server error during authentication' }) + } finally { + if (authErrored && typeof concurrencyCleanup === 'function') { + try { + await concurrencyCleanup() + } catch (cleanupError) { + logger.error('Failed to cleanup concurrency after auth error:', cleanupError) + } + } } } diff --git a/src/models/redis.js b/src/models/redis.js index e34054f3..b75c0936 100644 --- a/src/models/redis.js +++ b/src/models/redis.js @@ -50,6 +50,18 @@ function getWeekStringInTimezone(date = new Date()) { return `${year}-W${String(weekNumber).padStart(2, '0')}` } +// 并发队列相关常量 +const QUEUE_STATS_TTL_SECONDS = 86400 * 7 // 统计计数保留 7 天 +const WAIT_TIME_TTL_SECONDS = 86400 // 等待时间样本保留 1 天(滚动窗口,无需长期保留) +// 等待时间样本数配置(提高统计置信度) +// - 每 API Key 从 100 提高到 500:提供更稳定的 P99 估计 +// - 全局从 500 提高到 2000:支持更高精度的 P99.9 分析 +// - 内存开销约 12-20KB(Redis quicklist 每元素 1-10 字节),可接受 +// 详见 design.md Decision 5: 等待时间统计样本数 +const WAIT_TIME_SAMPLES_PER_KEY = 500 // 每个 API Key 保留的等待时间样本数 +const WAIT_TIME_SAMPLES_GLOBAL = 2000 // 全局保留的等待时间样本数 +const QUEUE_TTL_BUFFER_SECONDS = 30 // 排队计数器TTL缓冲时间 + class RedisClient { constructor() { this.client = null @@ -2769,4 +2781,380 @@ redisClient.scanUserMessageQueueLocks = async function () { } } +// ============================================ +// 🚦 API Key 并发请求排队方法 +// ============================================ + +/** + * 增加排队计数(使用 Lua 脚本确保原子性) + * @param {string} apiKeyId - API Key ID + * @param {number} [timeoutMs=60000] - 排队超时时间(毫秒),用于计算 TTL + * @returns {Promise} 增加后的排队数量 + */ +redisClient.incrConcurrencyQueue = async function (apiKeyId, timeoutMs = 60000) { + const key = `concurrency:queue:${apiKeyId}` + try { + // 使用 Lua 脚本确保 INCR 和 EXPIRE 原子执行,防止进程崩溃导致计数器泄漏 + // TTL = 超时时间 + 缓冲时间(确保键不会在请求还在等待时过期) + const ttlSeconds = Math.ceil(timeoutMs / 1000) + QUEUE_TTL_BUFFER_SECONDS + const script = ` + local count = redis.call('INCR', KEYS[1]) + redis.call('EXPIRE', KEYS[1], ARGV[1]) + return count + ` + const count = await this.client.eval(script, 1, key, String(ttlSeconds)) + logger.database( + `🚦 Incremented queue count for key ${apiKeyId}: ${count} (TTL: ${ttlSeconds}s)` + ) + return parseInt(count) + } catch (error) { + logger.error(`Failed to increment concurrency queue for ${apiKeyId}:`, error) + throw error + } +} + +/** + * 减少排队计数(使用 Lua 脚本确保原子性) + * @param {string} apiKeyId - API Key ID + * @returns {Promise} 减少后的排队数量 + */ +redisClient.decrConcurrencyQueue = async function (apiKeyId) { + const key = `concurrency:queue:${apiKeyId}` + try { + // 使用 Lua 脚本确保 DECR 和 DEL 原子执行,防止进程崩溃导致计数器残留 + const script = ` + local count = redis.call('DECR', KEYS[1]) + if count <= 0 then + redis.call('DEL', KEYS[1]) + return 0 + end + return count + ` + const count = await this.client.eval(script, 1, key) + const result = parseInt(count) + if (result === 0) { + logger.database(`🚦 Queue count for key ${apiKeyId} is 0, removed key`) + } else { + logger.database(`🚦 Decremented queue count for key ${apiKeyId}: ${result}`) + } + return result + } catch (error) { + logger.error(`Failed to decrement concurrency queue for ${apiKeyId}:`, error) + throw error + } +} + +/** + * 获取排队计数 + * @param {string} apiKeyId - API Key ID + * @returns {Promise} 当前排队数量 + */ +redisClient.getConcurrencyQueueCount = async function (apiKeyId) { + const key = `concurrency:queue:${apiKeyId}` + try { + const count = await this.client.get(key) + return parseInt(count || 0) + } catch (error) { + logger.error(`Failed to get concurrency queue count for ${apiKeyId}:`, error) + return 0 + } +} + +/** + * 清空排队计数 + * @param {string} apiKeyId - API Key ID + * @returns {Promise} 是否成功清空 + */ +redisClient.clearConcurrencyQueue = async function (apiKeyId) { + const key = `concurrency:queue:${apiKeyId}` + try { + await this.client.del(key) + logger.database(`🚦 Cleared queue count for key ${apiKeyId}`) + return true + } catch (error) { + logger.error(`Failed to clear concurrency queue for ${apiKeyId}:`, error) + return false + } +} + +/** + * 扫描所有排队计数器 + * @returns {Promise} API Key ID 列表 + */ +redisClient.scanConcurrencyQueueKeys = async function () { + const apiKeyIds = [] + let cursor = '0' + let iterations = 0 + const MAX_ITERATIONS = 1000 + + try { + do { + const [newCursor, keys] = await this.client.scan( + cursor, + 'MATCH', + 'concurrency:queue:*', + 'COUNT', + 100 + ) + cursor = newCursor + iterations++ + + for (const key of keys) { + // 排除统计和等待时间相关的键 + if ( + key.startsWith('concurrency:queue:stats:') || + key.startsWith('concurrency:queue:wait_times:') + ) { + continue + } + const apiKeyId = key.replace('concurrency:queue:', '') + apiKeyIds.push(apiKeyId) + } + + if (iterations >= MAX_ITERATIONS) { + logger.warn( + `🚦 Concurrency queue: SCAN reached max iterations (${MAX_ITERATIONS}), stopping early`, + { foundQueues: apiKeyIds.length } + ) + break + } + } while (cursor !== '0') + + return apiKeyIds + } catch (error) { + logger.error('Failed to scan concurrency queue keys:', error) + return [] + } +} + +/** + * 清理所有排队计数器(用于服务重启) + * @returns {Promise} 清理的计数器数量 + */ +redisClient.clearAllConcurrencyQueues = async function () { + let cleared = 0 + let cursor = '0' + let iterations = 0 + const MAX_ITERATIONS = 1000 + + try { + do { + const [newCursor, keys] = await this.client.scan( + cursor, + 'MATCH', + 'concurrency:queue:*', + 'COUNT', + 100 + ) + cursor = newCursor + iterations++ + + // 只删除排队计数器,保留统计数据 + const queueKeys = keys.filter( + (key) => + !key.startsWith('concurrency:queue:stats:') && + !key.startsWith('concurrency:queue:wait_times:') + ) + + if (queueKeys.length > 0) { + await this.client.del(...queueKeys) + cleared += queueKeys.length + } + + if (iterations >= MAX_ITERATIONS) { + break + } + } while (cursor !== '0') + + if (cleared > 0) { + logger.info(`🚦 Cleared ${cleared} concurrency queue counter(s) on startup`) + } + return cleared + } catch (error) { + logger.error('Failed to clear all concurrency queues:', error) + return 0 + } +} + +/** + * 增加排队统计计数(使用 Lua 脚本确保原子性) + * @param {string} apiKeyId - API Key ID + * @param {string} field - 统计字段 (entered/success/timeout/cancelled) + * @returns {Promise} 增加后的计数 + */ +redisClient.incrConcurrencyQueueStats = async function (apiKeyId, field) { + const key = `concurrency:queue:stats:${apiKeyId}` + try { + // 使用 Lua 脚本确保 HINCRBY 和 EXPIRE 原子执行 + // 防止在两者之间崩溃导致统计键没有 TTL(内存泄漏) + const script = ` + local count = redis.call('HINCRBY', KEYS[1], ARGV[1], 1) + redis.call('EXPIRE', KEYS[1], ARGV[2]) + return count + ` + const count = await this.client.eval(script, 1, key, field, String(QUEUE_STATS_TTL_SECONDS)) + return parseInt(count) + } catch (error) { + logger.error(`Failed to increment queue stats ${field} for ${apiKeyId}:`, error) + return 0 + } +} + +/** + * 获取排队统计 + * @param {string} apiKeyId - API Key ID + * @returns {Promise} 统计数据 + */ +redisClient.getConcurrencyQueueStats = async function (apiKeyId) { + const key = `concurrency:queue:stats:${apiKeyId}` + try { + const stats = await this.client.hgetall(key) + return { + entered: parseInt(stats?.entered || 0), + success: parseInt(stats?.success || 0), + timeout: parseInt(stats?.timeout || 0), + cancelled: parseInt(stats?.cancelled || 0), + socket_changed: parseInt(stats?.socket_changed || 0), + rejected_overload: parseInt(stats?.rejected_overload || 0) + } + } catch (error) { + logger.error(`Failed to get queue stats for ${apiKeyId}:`, error) + return { + entered: 0, + success: 0, + timeout: 0, + cancelled: 0, + socket_changed: 0, + rejected_overload: 0 + } + } +} + +/** + * 记录排队等待时间(按 API Key 分开存储) + * @param {string} apiKeyId - API Key ID + * @param {number} waitTimeMs - 等待时间(毫秒) + * @returns {Promise} + */ +redisClient.recordQueueWaitTime = async function (apiKeyId, waitTimeMs) { + const key = `concurrency:queue:wait_times:${apiKeyId}` + try { + // 使用 Lua 脚本确保原子性,同时设置 TTL 防止内存泄漏 + const script = ` + redis.call('LPUSH', KEYS[1], ARGV[1]) + redis.call('LTRIM', KEYS[1], 0, ARGV[2]) + redis.call('EXPIRE', KEYS[1], ARGV[3]) + return 1 + ` + await this.client.eval( + script, + 1, + key, + waitTimeMs, + WAIT_TIME_SAMPLES_PER_KEY - 1, + WAIT_TIME_TTL_SECONDS + ) + } catch (error) { + logger.error(`Failed to record queue wait time for ${apiKeyId}:`, error) + } +} + +/** + * 记录全局排队等待时间 + * @param {number} waitTimeMs - 等待时间(毫秒) + * @returns {Promise} + */ +redisClient.recordGlobalQueueWaitTime = async function (waitTimeMs) { + const key = 'concurrency:queue:wait_times:global' + try { + // 使用 Lua 脚本确保原子性,同时设置 TTL 防止内存泄漏 + const script = ` + redis.call('LPUSH', KEYS[1], ARGV[1]) + redis.call('LTRIM', KEYS[1], 0, ARGV[2]) + redis.call('EXPIRE', KEYS[1], ARGV[3]) + return 1 + ` + await this.client.eval( + script, + 1, + key, + waitTimeMs, + WAIT_TIME_SAMPLES_GLOBAL - 1, + WAIT_TIME_TTL_SECONDS + ) + } catch (error) { + logger.error('Failed to record global queue wait time:', error) + } +} + +/** + * 获取全局等待时间列表 + * @returns {Promise} 等待时间列表 + */ +redisClient.getGlobalQueueWaitTimes = async function () { + const key = 'concurrency:queue:wait_times:global' + try { + const samples = await this.client.lrange(key, 0, -1) + return samples.map(Number) + } catch (error) { + logger.error('Failed to get global queue wait times:', error) + return [] + } +} + +/** + * 获取指定 API Key 的等待时间列表 + * @param {string} apiKeyId - API Key ID + * @returns {Promise} 等待时间列表 + */ +redisClient.getQueueWaitTimes = async function (apiKeyId) { + const key = `concurrency:queue:wait_times:${apiKeyId}` + try { + const samples = await this.client.lrange(key, 0, -1) + return samples.map(Number) + } catch (error) { + logger.error(`Failed to get queue wait times for ${apiKeyId}:`, error) + return [] + } +} + +/** + * 扫描所有排队统计键 + * @returns {Promise} API Key ID 列表 + */ +redisClient.scanConcurrencyQueueStatsKeys = async function () { + const apiKeyIds = [] + let cursor = '0' + let iterations = 0 + const MAX_ITERATIONS = 1000 + + try { + do { + const [newCursor, keys] = await this.client.scan( + cursor, + 'MATCH', + 'concurrency:queue:stats:*', + 'COUNT', + 100 + ) + cursor = newCursor + iterations++ + + for (const key of keys) { + const apiKeyId = key.replace('concurrency:queue:stats:', '') + apiKeyIds.push(apiKeyId) + } + + if (iterations >= MAX_ITERATIONS) { + break + } + } while (cursor !== '0') + + return apiKeyIds + } catch (error) { + logger.error('Failed to scan concurrency queue stats keys:', error) + return [] + } +} + module.exports = redisClient diff --git a/src/routes/admin/claudeRelayConfig.js b/src/routes/admin/claudeRelayConfig.js index 261b2092..a41207a9 100644 --- a/src/routes/admin/claudeRelayConfig.js +++ b/src/routes/admin/claudeRelayConfig.js @@ -43,7 +43,11 @@ router.put('/claude-relay-config', authenticateAdmin, async (req, res) => { sessionBindingTtlDays, userMessageQueueEnabled, userMessageQueueDelayMs, - userMessageQueueTimeoutMs + userMessageQueueTimeoutMs, + concurrentRequestQueueEnabled, + concurrentRequestQueueMaxSize, + concurrentRequestQueueMaxSizeMultiplier, + concurrentRequestQueueTimeoutMs } = req.body // 验证输入 @@ -110,6 +114,54 @@ router.put('/claude-relay-config', authenticateAdmin, async (req, res) => { } } + // 验证并发请求排队配置 + if ( + concurrentRequestQueueEnabled !== undefined && + typeof concurrentRequestQueueEnabled !== 'boolean' + ) { + return res.status(400).json({ error: 'concurrentRequestQueueEnabled must be a boolean' }) + } + + if (concurrentRequestQueueMaxSize !== undefined) { + if ( + typeof concurrentRequestQueueMaxSize !== 'number' || + !Number.isInteger(concurrentRequestQueueMaxSize) || + concurrentRequestQueueMaxSize < 1 || + concurrentRequestQueueMaxSize > 100 + ) { + return res + .status(400) + .json({ error: 'concurrentRequestQueueMaxSize must be an integer between 1 and 100' }) + } + } + + if (concurrentRequestQueueMaxSizeMultiplier !== undefined) { + // 使用 Number.isFinite() 同时排除 NaN、Infinity、-Infinity 和非数字类型 + if ( + !Number.isFinite(concurrentRequestQueueMaxSizeMultiplier) || + concurrentRequestQueueMaxSizeMultiplier < 0 || + concurrentRequestQueueMaxSizeMultiplier > 10 + ) { + return res.status(400).json({ + error: 'concurrentRequestQueueMaxSizeMultiplier must be a finite number between 0 and 10' + }) + } + } + + if (concurrentRequestQueueTimeoutMs !== undefined) { + if ( + typeof concurrentRequestQueueTimeoutMs !== 'number' || + !Number.isInteger(concurrentRequestQueueTimeoutMs) || + concurrentRequestQueueTimeoutMs < 5000 || + concurrentRequestQueueTimeoutMs > 300000 + ) { + return res.status(400).json({ + error: + 'concurrentRequestQueueTimeoutMs must be an integer between 5000 and 300000 (5 seconds to 5 minutes)' + }) + } + } + const updateData = {} if (claudeCodeOnlyEnabled !== undefined) { updateData.claudeCodeOnlyEnabled = claudeCodeOnlyEnabled @@ -132,6 +184,18 @@ router.put('/claude-relay-config', authenticateAdmin, async (req, res) => { if (userMessageQueueTimeoutMs !== undefined) { updateData.userMessageQueueTimeoutMs = userMessageQueueTimeoutMs } + if (concurrentRequestQueueEnabled !== undefined) { + updateData.concurrentRequestQueueEnabled = concurrentRequestQueueEnabled + } + if (concurrentRequestQueueMaxSize !== undefined) { + updateData.concurrentRequestQueueMaxSize = concurrentRequestQueueMaxSize + } + if (concurrentRequestQueueMaxSizeMultiplier !== undefined) { + updateData.concurrentRequestQueueMaxSizeMultiplier = concurrentRequestQueueMaxSizeMultiplier + } + if (concurrentRequestQueueTimeoutMs !== undefined) { + updateData.concurrentRequestQueueTimeoutMs = concurrentRequestQueueTimeoutMs + } const updatedConfig = await claudeRelayConfigService.updateConfig( updateData, diff --git a/src/routes/admin/concurrency.js b/src/routes/admin/concurrency.js index e15c4062..9325b5a8 100644 --- a/src/routes/admin/concurrency.js +++ b/src/routes/admin/concurrency.js @@ -8,6 +8,7 @@ const router = express.Router() const redis = require('../../models/redis') const logger = require('../../utils/logger') const { authenticateAdmin } = require('../../middleware/auth') +const { calculateWaitTimeStats } = require('../../utils/statsHelper') /** * GET /admin/concurrency @@ -17,17 +18,29 @@ router.get('/concurrency', authenticateAdmin, async (req, res) => { try { const status = await redis.getAllConcurrencyStatus() + // 为每个 API Key 获取排队计数 + const statusWithQueue = await Promise.all( + status.map(async (s) => { + const queueCount = await redis.getConcurrencyQueueCount(s.apiKeyId) + return { + ...s, + queueCount + } + }) + ) + // 计算汇总统计 const summary = { - totalKeys: status.length, - totalActiveRequests: status.reduce((sum, s) => sum + s.activeCount, 0), - totalExpiredRequests: status.reduce((sum, s) => sum + s.expiredCount, 0) + totalKeys: statusWithQueue.length, + totalActiveRequests: statusWithQueue.reduce((sum, s) => sum + s.activeCount, 0), + totalExpiredRequests: statusWithQueue.reduce((sum, s) => sum + s.expiredCount, 0), + totalQueuedRequests: statusWithQueue.reduce((sum, s) => sum + s.queueCount, 0) } res.json({ success: true, summary, - concurrencyStatus: status + concurrencyStatus: statusWithQueue }) } catch (error) { logger.error('❌ Failed to get concurrency status:', error) @@ -39,6 +52,156 @@ router.get('/concurrency', authenticateAdmin, async (req, res) => { } }) +/** + * GET /admin/concurrency-queue/stats + * 获取排队统计信息 + */ +router.get('/concurrency-queue/stats', authenticateAdmin, async (req, res) => { + try { + // 获取所有有统计数据的 API Key + const statsKeys = await redis.scanConcurrencyQueueStatsKeys() + const queueKeys = await redis.scanConcurrencyQueueKeys() + + // 合并所有相关的 API Key + const allApiKeyIds = [...new Set([...statsKeys, ...queueKeys])] + + // 获取各 API Key 的详细统计 + const perKeyStats = await Promise.all( + allApiKeyIds.map(async (apiKeyId) => { + const [queueCount, stats, waitTimes] = await Promise.all([ + redis.getConcurrencyQueueCount(apiKeyId), + redis.getConcurrencyQueueStats(apiKeyId), + redis.getQueueWaitTimes(apiKeyId) + ]) + + return { + apiKeyId, + currentQueueCount: queueCount, + stats, + waitTimeStats: calculateWaitTimeStats(waitTimes) + } + }) + ) + + // 获取全局等待时间统计 + const globalWaitTimes = await redis.getGlobalQueueWaitTimes() + const globalWaitTimeStats = calculateWaitTimeStats(globalWaitTimes) + + // 计算全局汇总 + const globalStats = { + totalEntered: perKeyStats.reduce((sum, s) => sum + s.stats.entered, 0), + totalSuccess: perKeyStats.reduce((sum, s) => sum + s.stats.success, 0), + totalTimeout: perKeyStats.reduce((sum, s) => sum + s.stats.timeout, 0), + totalCancelled: perKeyStats.reduce((sum, s) => sum + s.stats.cancelled, 0), + totalSocketChanged: perKeyStats.reduce((sum, s) => sum + (s.stats.socket_changed || 0), 0), + totalRejectedOverload: perKeyStats.reduce( + (sum, s) => sum + (s.stats.rejected_overload || 0), + 0 + ), + currentTotalQueued: perKeyStats.reduce((sum, s) => sum + s.currentQueueCount, 0), + // 队列资源利用率指标 + peakQueueSize: + perKeyStats.length > 0 ? Math.max(...perKeyStats.map((s) => s.currentQueueCount)) : 0, + avgQueueSize: + perKeyStats.length > 0 + ? Math.round( + perKeyStats.reduce((sum, s) => sum + s.currentQueueCount, 0) / perKeyStats.length + ) + : 0, + activeApiKeys: perKeyStats.filter((s) => s.currentQueueCount > 0).length + } + + // 计算成功率 + if (globalStats.totalEntered > 0) { + globalStats.successRate = Math.round( + (globalStats.totalSuccess / globalStats.totalEntered) * 100 + ) + globalStats.timeoutRate = Math.round( + (globalStats.totalTimeout / globalStats.totalEntered) * 100 + ) + globalStats.cancelledRate = Math.round( + (globalStats.totalCancelled / globalStats.totalEntered) * 100 + ) + } + + // 从全局等待时间统计中提取关键指标 + if (globalWaitTimeStats) { + globalStats.avgWaitTimeMs = globalWaitTimeStats.avg + globalStats.p50WaitTimeMs = globalWaitTimeStats.p50 + globalStats.p90WaitTimeMs = globalWaitTimeStats.p90 + globalStats.p99WaitTimeMs = globalWaitTimeStats.p99 + // 多实例采样策略标记(详见 design.md Decision 9) + // 全局 P90 仅用于可视化和监控,不用于系统决策 + // 健康检查使用 API Key 级别的 P90(每 Key 独立采样) + globalWaitTimeStats.globalP90ForVisualizationOnly = true + } + + res.json({ + success: true, + globalStats, + globalWaitTimeStats, + perKeyStats + }) + } catch (error) { + logger.error('❌ Failed to get queue stats:', error) + res.status(500).json({ + success: false, + error: 'Failed to get queue stats', + message: error.message + }) + } +}) + +/** + * DELETE /admin/concurrency-queue/:apiKeyId + * 清理特定 API Key 的排队计数 + */ +router.delete('/concurrency-queue/:apiKeyId', authenticateAdmin, async (req, res) => { + try { + const { apiKeyId } = req.params + await redis.clearConcurrencyQueue(apiKeyId) + + logger.warn(`🧹 Admin ${req.admin?.username || 'unknown'} cleared queue for key ${apiKeyId}`) + + res.json({ + success: true, + message: `Successfully cleared queue for API key ${apiKeyId}` + }) + } catch (error) { + logger.error(`❌ Failed to clear queue for ${req.params.apiKeyId}:`, error) + res.status(500).json({ + success: false, + error: 'Failed to clear queue', + message: error.message + }) + } +}) + +/** + * DELETE /admin/concurrency-queue + * 清理所有排队计数 + */ +router.delete('/concurrency-queue', authenticateAdmin, async (req, res) => { + try { + const cleared = await redis.clearAllConcurrencyQueues() + + logger.warn(`🧹 Admin ${req.admin?.username || 'unknown'} cleared ALL queues`) + + res.json({ + success: true, + message: 'Successfully cleared all queues', + cleared + }) + } catch (error) { + logger.error('❌ Failed to clear all queues:', error) + res.status(500).json({ + success: false, + error: 'Failed to clear all queues', + message: error.message + }) + } +}) + /** * GET /admin/concurrency/:apiKeyId * 获取特定 API Key 的并发状态详情 @@ -47,10 +210,14 @@ router.get('/concurrency/:apiKeyId', authenticateAdmin, async (req, res) => { try { const { apiKeyId } = req.params const status = await redis.getConcurrencyStatus(apiKeyId) + const queueCount = await redis.getConcurrencyQueueCount(apiKeyId) res.json({ success: true, - concurrencyStatus: status + concurrencyStatus: { + ...status, + queueCount + } }) } catch (error) { logger.error(`❌ Failed to get concurrency status for ${req.params.apiKeyId}:`, error) diff --git a/src/routes/api.js b/src/routes/api.js index 4d298716..3defdc19 100644 --- a/src/routes/api.js +++ b/src/routes/api.js @@ -190,12 +190,42 @@ async function handleMessagesRequest(req, res) { ) if (isStream) { + // 🔍 检查客户端连接是否仍然有效(可能在并发排队等待期间断开) + if (res.destroyed || res.socket?.destroyed || res.writableEnded) { + logger.warn( + `⚠️ Client disconnected before stream response could start for key: ${req.apiKey?.name || 'unknown'}` + ) + return undefined + } + // 流式响应 - 只使用官方真实usage数据 res.setHeader('Content-Type', 'text/event-stream') res.setHeader('Cache-Control', 'no-cache') res.setHeader('Connection', 'keep-alive') res.setHeader('Access-Control-Allow-Origin', '*') res.setHeader('X-Accel-Buffering', 'no') // 禁用 Nginx 缓冲 + // ⚠️ 检查 headers 是否已发送(可能在排队心跳时已设置) + if (!res.headersSent) { + res.setHeader('Content-Type', 'text/event-stream') + res.setHeader('Cache-Control', 'no-cache') + // ⚠️ 关键修复:尊重 auth.js 提前设置的 Connection: close + // 当并发队列功能启用时,auth.js 会设置 Connection: close 来禁用 Keep-Alive + // 这里只在没有设置过 Connection 头时才设置 keep-alive + const existingConnection = res.getHeader('Connection') + if (!existingConnection) { + res.setHeader('Connection', 'keep-alive') + } else { + logger.api( + `🔌 [STREAM] Preserving existing Connection header: ${existingConnection} for key: ${req.apiKey?.name || 'unknown'}` + ) + } + res.setHeader('Access-Control-Allow-Origin', '*') + res.setHeader('X-Accel-Buffering', 'no') // 禁用 Nginx 缓冲 + } else { + logger.debug( + `📤 [STREAM] Headers already sent, skipping setHeader for key: ${req.apiKey?.name || 'unknown'}` + ) + } // 禁用 Nagle 算法,确保数据立即发送 if (res.socket && typeof res.socket.setNoDelay === 'function') { @@ -657,12 +687,61 @@ async function handleMessagesRequest(req, res) { } }, 1000) // 1秒后检查 } else { + // 🔍 检查客户端连接是否仍然有效(可能在并发排队等待期间断开) + if (res.destroyed || res.socket?.destroyed || res.writableEnded) { + logger.warn( + `⚠️ Client disconnected before non-stream request could start for key: ${req.apiKey?.name || 'unknown'}` + ) + return undefined + } + // 非流式响应 - 只使用官方真实usage数据 logger.info('📄 Starting non-streaming request', { apiKeyId: req.apiKey.id, apiKeyName: req.apiKey.name }) + // 📊 监听 socket 事件以追踪连接状态变化 + const nonStreamSocket = res.socket + let _clientClosedConnection = false + let _socketCloseTime = null + + if (nonStreamSocket) { + const onSocketEnd = () => { + _clientClosedConnection = true + _socketCloseTime = Date.now() + logger.warn( + `⚠️ [NON-STREAM] Socket 'end' event - client sent FIN | key: ${req.apiKey?.name}, ` + + `requestId: ${req.requestId}, elapsed: ${Date.now() - startTime}ms` + ) + } + const onSocketClose = () => { + _clientClosedConnection = true + logger.warn( + `⚠️ [NON-STREAM] Socket 'close' event | key: ${req.apiKey?.name}, ` + + `requestId: ${req.requestId}, elapsed: ${Date.now() - startTime}ms, ` + + `hadError: ${nonStreamSocket.destroyed}` + ) + } + const onSocketError = (err) => { + logger.error( + `❌ [NON-STREAM] Socket error | key: ${req.apiKey?.name}, ` + + `requestId: ${req.requestId}, error: ${err.message}` + ) + } + + nonStreamSocket.once('end', onSocketEnd) + nonStreamSocket.once('close', onSocketClose) + nonStreamSocket.once('error', onSocketError) + + // 清理监听器(在响应结束后) + res.once('finish', () => { + nonStreamSocket.removeListener('end', onSocketEnd) + nonStreamSocket.removeListener('close', onSocketClose) + nonStreamSocket.removeListener('error', onSocketError) + }) + } + // 生成会话哈希用于sticky会话 const sessionHash = sessionHelper.generateSessionHash(req.body) @@ -867,6 +946,15 @@ async function handleMessagesRequest(req, res) { bodyLength: response.body ? response.body.length : 0 }) + // 🔍 检查客户端连接是否仍然有效 + // 在长时间请求过程中,客户端可能已经断开连接(超时、用户取消等) + if (res.destroyed || res.socket?.destroyed || res.writableEnded) { + logger.warn( + `⚠️ Client disconnected before non-stream response could be sent for key: ${req.apiKey?.name || 'unknown'}` + ) + return undefined + } + res.status(response.statusCode) // 设置响应头,避免 Content-Length 和 Transfer-Encoding 冲突 @@ -932,10 +1020,12 @@ async function handleMessagesRequest(req, res) { logger.warn('⚠️ No usage data found in Claude API JSON response') } + // 使用 Express 内建的 res.json() 发送响应(简单可靠) res.json(jsonData) } catch (parseError) { logger.warn('⚠️ Failed to parse Claude API response as JSON:', parseError.message) logger.info('📄 Raw response body:', response.body) + // 使用 Express 内建的 res.send() 发送响应(简单可靠) res.send(response.body) } diff --git a/src/services/bedrockRelayService.js b/src/services/bedrockRelayService.js index ec8ec126..d04e42b2 100644 --- a/src/services/bedrockRelayService.js +++ b/src/services/bedrockRelayService.js @@ -243,10 +243,11 @@ class BedrockRelayService { isBackendError ? { backendError: queueResult.errorMessage } : {} ) if (!res.headersSent) { + const existingConnection = res.getHeader ? res.getHeader('Connection') : null res.writeHead(statusCode, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', - Connection: 'keep-alive', + Connection: existingConnection || 'keep-alive', 'x-user-message-queue-error': errorType }) } @@ -309,10 +310,17 @@ class BedrockRelayService { } // 设置SSE响应头 + // ⚠️ 关键修复:尊重 auth.js 提前设置的 Connection: close + const existingConnection = res.getHeader ? res.getHeader('Connection') : null + if (existingConnection) { + logger.debug( + `🔌 [Bedrock Stream] Preserving existing Connection header: ${existingConnection}` + ) + } res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', - Connection: 'keep-alive', + Connection: existingConnection || 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Headers': 'Content-Type, Authorization' }) diff --git a/src/services/ccrRelayService.js b/src/services/ccrRelayService.js index 3b48f9e9..d5f97c9f 100644 --- a/src/services/ccrRelayService.js +++ b/src/services/ccrRelayService.js @@ -4,6 +4,7 @@ const logger = require('../utils/logger') const config = require('../../config/config') const { parseVendorPrefixedModel } = require('../utils/modelHelper') const userMessageQueueService = require('./userMessageQueueService') +const { isStreamWritable } = require('../utils/streamHelper') class CcrRelayService { constructor() { @@ -379,10 +380,13 @@ class CcrRelayService { isBackendError ? { backendError: queueResult.errorMessage } : {} ) if (!responseStream.headersSent) { + const existingConnection = responseStream.getHeader + ? responseStream.getHeader('Connection') + : null responseStream.writeHead(statusCode, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', - Connection: 'keep-alive', + Connection: existingConnection || 'keep-alive', 'x-user-message-queue-error': errorType }) } @@ -606,10 +610,13 @@ class CcrRelayService { // 设置错误响应的状态码和响应头 if (!responseStream.headersSent) { + const existingConnection = responseStream.getHeader + ? responseStream.getHeader('Connection') + : null const errorHeaders = { 'Content-Type': response.headers['content-type'] || 'application/json', 'Cache-Control': 'no-cache', - Connection: 'keep-alive' + Connection: existingConnection || 'keep-alive' } // 避免 Transfer-Encoding 冲突,让 Express 自动处理 delete errorHeaders['Transfer-Encoding'] @@ -619,13 +626,13 @@ class CcrRelayService { // 直接透传错误数据,不进行包装 response.data.on('data', (chunk) => { - if (!responseStream.destroyed) { + if (isStreamWritable(responseStream)) { responseStream.write(chunk) } }) response.data.on('end', () => { - if (!responseStream.destroyed) { + if (isStreamWritable(responseStream)) { responseStream.end() } resolve() // 不抛出异常,正常完成流处理 @@ -659,11 +666,20 @@ class CcrRelayService { }) // 设置响应头 + // ⚠️ 关键修复:尊重 auth.js 提前设置的 Connection: close if (!responseStream.headersSent) { + const existingConnection = responseStream.getHeader + ? responseStream.getHeader('Connection') + : null + if (existingConnection) { + logger.debug( + `🔌 [CCR Stream] Preserving existing Connection header: ${existingConnection}` + ) + } const headers = { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', - Connection: 'keep-alive', + Connection: existingConnection || 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Headers': 'Cache-Control' } @@ -702,12 +718,17 @@ class CcrRelayService { } // 写入到响应流 - if (outputLine && !responseStream.destroyed) { + if (outputLine && isStreamWritable(responseStream)) { responseStream.write(`${outputLine}\n`) + } else if (outputLine) { + // 客户端连接已断开,记录警告 + logger.warn( + `⚠️ [CCR] Client disconnected during stream, skipping data for account: ${accountId}` + ) } } else { // 空行也需要传递 - if (!responseStream.destroyed) { + if (isStreamWritable(responseStream)) { responseStream.write('\n') } } @@ -718,10 +739,6 @@ class CcrRelayService { }) response.data.on('end', () => { - if (!responseStream.destroyed) { - responseStream.end() - } - // 如果收集到使用统计数据,调用回调 if (usageCallback && Object.keys(collectedUsage).length > 0) { try { @@ -733,12 +750,26 @@ class CcrRelayService { } } - resolve() + if (isStreamWritable(responseStream)) { + // 等待数据完全 flush 到客户端后再 resolve + responseStream.end(() => { + logger.debug( + `🌊 CCR stream response completed and flushed | bytesWritten: ${responseStream.bytesWritten || 'unknown'}` + ) + resolve() + }) + } else { + // 连接已断开,记录警告 + logger.warn( + `⚠️ [CCR] Client disconnected before stream end, data may not have been received | account: ${accountId}` + ) + resolve() + } }) response.data.on('error', (err) => { logger.error('❌ Stream data error:', err) - if (!responseStream.destroyed) { + if (isStreamWritable(responseStream)) { responseStream.end() } reject(err) @@ -770,7 +801,7 @@ class CcrRelayService { } } - if (!responseStream.destroyed) { + if (isStreamWritable(responseStream)) { responseStream.write(`data: ${JSON.stringify(errorResponse)}\n\n`) responseStream.end() } diff --git a/src/services/claudeConsoleRelayService.js b/src/services/claudeConsoleRelayService.js index 6280c57c..81221f81 100644 --- a/src/services/claudeConsoleRelayService.js +++ b/src/services/claudeConsoleRelayService.js @@ -10,6 +10,7 @@ const { isAccountDisabledError } = require('../utils/errorSanitizer') const userMessageQueueService = require('./userMessageQueueService') +const { isStreamWritable } = require('../utils/streamHelper') class ClaudeConsoleRelayService { constructor() { @@ -517,10 +518,13 @@ class ClaudeConsoleRelayService { isBackendError ? { backendError: queueResult.errorMessage } : {} ) if (!responseStream.headersSent) { + const existingConnection = responseStream.getHeader + ? responseStream.getHeader('Connection') + : null responseStream.writeHead(statusCode, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', - Connection: 'keep-alive', + Connection: existingConnection || 'keep-alive', 'x-user-message-queue-error': errorType }) } @@ -878,7 +882,7 @@ class ClaudeConsoleRelayService { `🧹 [Stream] [SANITIZED] Error response to client: ${JSON.stringify(sanitizedError)}` ) - if (!responseStream.destroyed) { + if (isStreamWritable(responseStream)) { responseStream.write(JSON.stringify(sanitizedError)) responseStream.end() } @@ -886,7 +890,7 @@ class ClaudeConsoleRelayService { const sanitizedText = sanitizeErrorMessage(errorDataForCheck) logger.error(`🧹 [Stream] [SANITIZED] Error response to client: ${sanitizedText}`) - if (!responseStream.destroyed) { + if (isStreamWritable(responseStream)) { responseStream.write(sanitizedText) responseStream.end() } @@ -923,11 +927,22 @@ class ClaudeConsoleRelayService { }) // 设置响应头 + // ⚠️ 关键修复:尊重 auth.js 提前设置的 Connection: close + // 当并发队列功能启用时,auth.js 会设置 Connection: close 来禁用 Keep-Alive if (!responseStream.headersSent) { + const existingConnection = responseStream.getHeader + ? responseStream.getHeader('Connection') + : null + const connectionHeader = existingConnection || 'keep-alive' + if (existingConnection) { + logger.debug( + `🔌 [Console Stream] Preserving existing Connection header: ${existingConnection}` + ) + } responseStream.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', - Connection: 'keep-alive', + Connection: connectionHeader, 'X-Accel-Buffering': 'no' }) } @@ -953,20 +968,33 @@ class ClaudeConsoleRelayService { buffer = lines.pop() || '' // 转发数据并解析usage - if (lines.length > 0 && !responseStream.destroyed) { - const linesToForward = lines.join('\n') + (lines.length > 0 ? '\n' : '') + if (lines.length > 0) { + // 检查流是否可写(客户端连接是否有效) + if (isStreamWritable(responseStream)) { + const linesToForward = lines.join('\n') + (lines.length > 0 ? '\n' : '') - // 应用流转换器如果有 - if (streamTransformer) { - const transformed = streamTransformer(linesToForward) - if (transformed) { - responseStream.write(transformed) + // 应用流转换器如果有 + let dataToWrite = linesToForward + if (streamTransformer) { + const transformed = streamTransformer(linesToForward) + if (transformed) { + dataToWrite = transformed + } else { + dataToWrite = null + } + } + + if (dataToWrite) { + responseStream.write(dataToWrite) } } else { - responseStream.write(linesToForward) + // 客户端连接已断开,记录警告(但仍继续解析usage) + logger.warn( + `⚠️ [Console] Client disconnected during stream, skipping ${lines.length} lines for account: ${account?.name || accountId}` + ) } - // 解析SSE数据寻找usage信息 + // 解析SSE数据寻找usage信息(无论连接状态如何) for (const line of lines) { if (line.startsWith('data:')) { const jsonStr = line.slice(5).trimStart() @@ -1074,7 +1102,7 @@ class ClaudeConsoleRelayService { `❌ Error processing Claude Console stream data (Account: ${account?.name || accountId}):`, error ) - if (!responseStream.destroyed) { + if (isStreamWritable(responseStream)) { // 如果有 streamTransformer(如测试请求),使用前端期望的格式 if (streamTransformer) { responseStream.write( @@ -1097,7 +1125,7 @@ class ClaudeConsoleRelayService { response.data.on('end', () => { try { // 处理缓冲区中剩余的数据 - if (buffer.trim() && !responseStream.destroyed) { + if (buffer.trim() && isStreamWritable(responseStream)) { if (streamTransformer) { const transformed = streamTransformer(buffer) if (transformed) { @@ -1146,12 +1174,33 @@ class ClaudeConsoleRelayService { } // 确保流正确结束 - if (!responseStream.destroyed) { - responseStream.end() - } + if (isStreamWritable(responseStream)) { + // 📊 诊断日志:流结束前状态 + logger.info( + `📤 [STREAM] Ending response | destroyed: ${responseStream.destroyed}, ` + + `socketDestroyed: ${responseStream.socket?.destroyed}, ` + + `socketBytesWritten: ${responseStream.socket?.bytesWritten || 0}` + ) - logger.debug('🌊 Claude Console Claude stream response completed') - resolve() + // 禁用 Nagle 算法确保数据立即发送 + if (responseStream.socket && !responseStream.socket.destroyed) { + responseStream.socket.setNoDelay(true) + } + + // 等待数据完全 flush 到客户端后再 resolve + responseStream.end(() => { + logger.info( + `✅ [STREAM] Response ended and flushed | socketBytesWritten: ${responseStream.socket?.bytesWritten || 'unknown'}` + ) + resolve() + }) + } else { + // 连接已断开,记录警告 + logger.warn( + `⚠️ [Console] Client disconnected before stream end, data may not have been received | account: ${account?.name || accountId}` + ) + resolve() + } } catch (error) { logger.error('❌ Error processing stream end:', error) reject(error) @@ -1163,7 +1212,7 @@ class ClaudeConsoleRelayService { `❌ Claude Console stream error (Account: ${account?.name || accountId}):`, error ) - if (!responseStream.destroyed) { + if (isStreamWritable(responseStream)) { // 如果有 streamTransformer(如测试请求),使用前端期望的格式 if (streamTransformer) { responseStream.write( @@ -1211,14 +1260,17 @@ class ClaudeConsoleRelayService { // 发送错误响应 if (!responseStream.headersSent) { + const existingConnection = responseStream.getHeader + ? responseStream.getHeader('Connection') + : null responseStream.writeHead(error.response?.status || 500, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', - Connection: 'keep-alive' + Connection: existingConnection || 'keep-alive' }) } - if (!responseStream.destroyed) { + if (isStreamWritable(responseStream)) { // 如果有 streamTransformer(如测试请求),使用前端期望的格式 if (streamTransformer) { responseStream.write( @@ -1388,7 +1440,7 @@ class ClaudeConsoleRelayService { 'Cache-Control': 'no-cache' }) } - if (!responseStream.destroyed && !responseStream.writableEnded) { + if (isStreamWritable(responseStream)) { responseStream.write( `data: ${JSON.stringify({ type: 'test_complete', success: false, error: error.message })}\n\n` ) diff --git a/src/services/claudeRelayConfigService.js b/src/services/claudeRelayConfigService.js index 6bab76ea..4fa2b411 100644 --- a/src/services/claudeRelayConfigService.js +++ b/src/services/claudeRelayConfigService.js @@ -20,6 +20,15 @@ const DEFAULT_CONFIG = { userMessageQueueDelayMs: 200, // 请求间隔(毫秒) userMessageQueueTimeoutMs: 5000, // 队列等待超时(毫秒),优化后锁持有时间短无需长等待 userMessageQueueLockTtlMs: 5000, // 锁TTL(毫秒),请求发送后立即释放无需长TTL + // 并发请求排队配置 + concurrentRequestQueueEnabled: false, // 是否启用并发请求排队(默认关闭) + concurrentRequestQueueMaxSize: 3, // 固定最小排队数(默认3) + concurrentRequestQueueMaxSizeMultiplier: 0, // 并发数的倍数(默认0,仅使用固定值) + concurrentRequestQueueTimeoutMs: 10000, // 排队超时(毫秒,默认10秒) + concurrentRequestQueueMaxRedisFailCount: 5, // 连续 Redis 失败阈值(默认5次) + // 排队健康检查配置 + concurrentRequestQueueHealthCheckEnabled: true, // 是否启用排队健康检查(默认开启) + concurrentRequestQueueHealthThreshold: 0.8, // 健康检查阈值(P90 >= 超时 × 阈值时拒绝新请求) updatedAt: null, updatedBy: null } @@ -105,7 +114,8 @@ class ClaudeRelayConfigService { logger.info(`✅ Claude relay config updated by ${updatedBy}:`, { claudeCodeOnlyEnabled: updatedConfig.claudeCodeOnlyEnabled, - globalSessionBindingEnabled: updatedConfig.globalSessionBindingEnabled + globalSessionBindingEnabled: updatedConfig.globalSessionBindingEnabled, + concurrentRequestQueueEnabled: updatedConfig.concurrentRequestQueueEnabled }) return updatedConfig diff --git a/src/services/claudeRelayService.js b/src/services/claudeRelayService.js index a0366b40..36671fee 100644 --- a/src/services/claudeRelayService.js +++ b/src/services/claudeRelayService.js @@ -16,6 +16,7 @@ const { formatDateWithTimezone } = require('../utils/dateHelper') const requestIdentityService = require('./requestIdentityService') const { createClaudeTestPayload } = require('../utils/testPayloadHelper') const userMessageQueueService = require('./userMessageQueueService') +const { isStreamWritable } = require('../utils/streamHelper') class ClaudeRelayService { constructor() { @@ -1057,6 +1058,8 @@ class ClaudeRelayService { logger.info(`🔗 指纹是这个: ${headers['User-Agent']}`) + logger.info(`🔗 指纹是这个: ${headers['User-Agent']}`) + // 根据模型和客户端传递的 anthropic-beta 动态设置 header const modelId = requestPayload?.model || body?.model const clientBetaHeader = clientHeaders?.['anthropic-beta'] @@ -1338,10 +1341,13 @@ class ClaudeRelayService { isBackendError ? { backendError: queueResult.errorMessage } : {} ) if (!responseStream.headersSent) { + const existingConnection = responseStream.getHeader + ? responseStream.getHeader('Connection') + : null responseStream.writeHead(statusCode, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', - Connection: 'keep-alive', + Connection: existingConnection || 'keep-alive', 'x-user-message-queue-error': errorType }) } @@ -1699,7 +1705,7 @@ class ClaudeRelayService { } })() } - if (!responseStream.destroyed) { + if (isStreamWritable(responseStream)) { // 解析 Claude API 返回的错误详情 let errorMessage = `Claude API error: ${res.statusCode}` try { @@ -1764,16 +1770,23 @@ class ClaudeRelayService { buffer = lines.pop() || '' // 保留最后的不完整行 // 转发已处理的完整行到客户端 - if (lines.length > 0 && !responseStream.destroyed) { - const linesToForward = lines.join('\n') + (lines.length > 0 ? '\n' : '') - // 如果有流转换器,应用转换 - if (streamTransformer) { - const transformed = streamTransformer(linesToForward) - if (transformed) { - responseStream.write(transformed) + if (lines.length > 0) { + if (isStreamWritable(responseStream)) { + const linesToForward = lines.join('\n') + (lines.length > 0 ? '\n' : '') + // 如果有流转换器,应用转换 + if (streamTransformer) { + const transformed = streamTransformer(linesToForward) + if (transformed) { + responseStream.write(transformed) + } + } else { + responseStream.write(linesToForward) } } else { - responseStream.write(linesToForward) + // 客户端连接已断开,记录警告(但仍继续解析usage) + logger.warn( + `⚠️ [Official] Client disconnected during stream, skipping ${lines.length} lines for account: ${accountId}` + ) } } @@ -1878,7 +1891,7 @@ class ClaudeRelayService { } catch (error) { logger.error('❌ Error processing stream data:', error) // 发送错误但不破坏流,让它自然结束 - if (!responseStream.destroyed) { + if (isStreamWritable(responseStream)) { responseStream.write('event: error\n') responseStream.write( `data: ${JSON.stringify({ @@ -1894,7 +1907,7 @@ class ClaudeRelayService { res.on('end', async () => { try { // 处理缓冲区中剩余的数据 - if (buffer.trim() && !responseStream.destroyed) { + if (buffer.trim() && isStreamWritable(responseStream)) { if (streamTransformer) { const transformed = streamTransformer(buffer) if (transformed) { @@ -1906,8 +1919,16 @@ class ClaudeRelayService { } // 确保流正确结束 - if (!responseStream.destroyed) { + if (isStreamWritable(responseStream)) { responseStream.end() + logger.debug( + `🌊 Stream end called | bytesWritten: ${responseStream.bytesWritten || 'unknown'}` + ) + } else { + // 连接已断开,记录警告 + logger.warn( + `⚠️ [Official] Client disconnected before stream end, data may not have been received | account: ${account?.name || accountId}` + ) } } catch (error) { logger.error('❌ Error processing stream end:', error) @@ -2105,14 +2126,17 @@ class ClaudeRelayService { } if (!responseStream.headersSent) { + const existingConnection = responseStream.getHeader + ? responseStream.getHeader('Connection') + : null responseStream.writeHead(statusCode, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', - Connection: 'keep-alive' + Connection: existingConnection || 'keep-alive' }) } - if (!responseStream.destroyed) { + if (isStreamWritable(responseStream)) { // 发送 SSE 错误事件 responseStream.write('event: error\n') responseStream.write( @@ -2132,13 +2156,16 @@ class ClaudeRelayService { logger.error(`❌ Claude stream request timeout | Account: ${account?.name || accountId}`) if (!responseStream.headersSent) { + const existingConnection = responseStream.getHeader + ? responseStream.getHeader('Connection') + : null responseStream.writeHead(504, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', - Connection: 'keep-alive' + Connection: existingConnection || 'keep-alive' }) } - if (!responseStream.destroyed) { + if (isStreamWritable(responseStream)) { // 发送 SSE 错误事件 responseStream.write('event: error\n') responseStream.write( @@ -2453,10 +2480,13 @@ class ClaudeRelayService { // 设置响应头 if (!responseStream.headersSent) { + const existingConnection = responseStream.getHeader + ? responseStream.getHeader('Connection') + : null responseStream.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', - Connection: 'keep-alive', + Connection: existingConnection || 'keep-alive', 'X-Accel-Buffering': 'no' }) } @@ -2484,7 +2514,7 @@ class ClaudeRelayService { } catch (error) { logger.error(`❌ Test account connection failed:`, error) // 发送错误事件给前端 - if (!responseStream.destroyed && !responseStream.writableEnded) { + if (isStreamWritable(responseStream)) { try { const errorMsg = error.message || '测试失败' responseStream.write(`data: ${JSON.stringify({ type: 'error', error: errorMsg })}\n\n`) diff --git a/src/utils/statsHelper.js b/src/utils/statsHelper.js new file mode 100644 index 00000000..ba75bec7 --- /dev/null +++ b/src/utils/statsHelper.js @@ -0,0 +1,105 @@ +/** + * 统计计算工具函数 + * 提供百分位数计算、等待时间统计等通用统计功能 + */ + +/** + * 计算百分位数(使用 nearest-rank 方法) + * @param {number[]} sortedArray - 已排序的数组(升序) + * @param {number} percentile - 百分位数 (0-100) + * @returns {number} 百分位值 + * + * 边界情况说明: + * - percentile=0: 返回最小值 (index=0) + * - percentile=100: 返回最大值 (index=len-1) + * - percentile=50 且 len=2: 返回第一个元素(nearest-rank 向下取) + * + * 算法说明(nearest-rank 方法): + * - index = ceil(percentile / 100 * len) - 1 + * - 示例:len=100, P50 → ceil(50) - 1 = 49(第50个元素,0-indexed) + * - 示例:len=100, P99 → ceil(99) - 1 = 98(第99个元素) + */ +function getPercentile(sortedArray, percentile) { + const len = sortedArray.length + if (len === 0) { + return 0 + } + if (len === 1) { + return sortedArray[0] + } + + // 边界处理:percentile <= 0 返回最小值 + if (percentile <= 0) { + return sortedArray[0] + } + // 边界处理:percentile >= 100 返回最大值 + if (percentile >= 100) { + return sortedArray[len - 1] + } + + const index = Math.ceil((percentile / 100) * len) - 1 + return sortedArray[index] +} + +/** + * 计算等待时间分布统计 + * @param {number[]} waitTimes - 等待时间数组(无需预先排序) + * @returns {Object|null} 统计对象,空数组返回 null + * + * 返回对象包含: + * - sampleCount: 样本数量(始终包含,便于调用方判断可靠性) + * - count: 样本数量(向后兼容) + * - min: 最小值 + * - max: 最大值 + * - avg: 平均值(四舍五入) + * - p50: 50百分位数(中位数) + * - p90: 90百分位数 + * - p99: 99百分位数 + * - sampleSizeWarning: 样本量不足时的警告信息(样本 < 10) + * - p90Unreliable: P90 统计不可靠标记(样本 < 10) + * - p99Unreliable: P99 统计不可靠标记(样本 < 100) + * + * 可靠性标记说明(详见 design.md Decision 6): + * - 样本 < 10: P90 和 P99 都不可靠 + * - 样本 < 100: P99 不可靠(P90 需要 10 个样本,P99 需要 100 个样本) + * - 即使标记为不可靠,仍返回计算值供参考 + */ +function calculateWaitTimeStats(waitTimes) { + if (!waitTimes || waitTimes.length === 0) { + return null + } + + const sorted = [...waitTimes].sort((a, b) => a - b) + const sum = sorted.reduce((a, b) => a + b, 0) + const len = sorted.length + + const stats = { + sampleCount: len, // 新增:始终包含样本数 + count: len, // 向后兼容 + min: sorted[0], + max: sorted[len - 1], + avg: Math.round(sum / len), + p50: getPercentile(sorted, 50), + p90: getPercentile(sorted, 90), + p99: getPercentile(sorted, 99) + } + + // 渐进式可靠性标记(详见 design.md Decision 6) + // 样本 < 10: P90 不可靠(P90 至少需要 ceil(100/10) = 10 个样本) + if (len < 10) { + stats.sampleSizeWarning = 'Results may be inaccurate due to small sample size' + stats.p90Unreliable = true + } + + // 样本 < 100: P99 不可靠(P99 至少需要 ceil(100/1) = 100 个样本) + if (len < 100) { + stats.p99Unreliable = true + } + + return stats +} + +module.exports = { + getPercentile, + calculateWaitTimeStats +} diff --git a/src/utils/streamHelper.js b/src/utils/streamHelper.js new file mode 100644 index 00000000..3d6c679e --- /dev/null +++ b/src/utils/streamHelper.js @@ -0,0 +1,36 @@ +/** + * Stream Helper Utilities + * 流处理辅助工具函数 + */ + +/** + * 检查响应流是否仍然可写(客户端连接是否有效) + * @param {import('http').ServerResponse} stream - HTTP响应流 + * @returns {boolean} 如果流可写返回true,否则返回false + */ +function isStreamWritable(stream) { + if (!stream) { + return false + } + + // 检查流是否已销毁 + if (stream.destroyed) { + return false + } + + // 检查底层socket是否已销毁 + if (stream.socket?.destroyed) { + return false + } + + // 检查流是否已结束写入 + if (stream.writableEnded) { + return false + } + + return true +} + +module.exports = { + isStreamWritable +} diff --git a/tests/concurrencyQueue.integration.test.js b/tests/concurrencyQueue.integration.test.js new file mode 100644 index 00000000..fce15872 --- /dev/null +++ b/tests/concurrencyQueue.integration.test.js @@ -0,0 +1,860 @@ +/** + * 并发请求排队功能集成测试 + * + * 测试分为三个层次: + * 1. Mock 测试 - 测试核心逻辑,不需要真实 Redis + * 2. Redis 方法测试 - 测试 Redis 操作的原子性和正确性 + * 3. 端到端场景测试 - 测试完整的排队流程 + * + * 运行方式: + * - npm test -- concurrencyQueue.integration # 运行所有测试(Mock 部分) + * - REDIS_TEST=1 npm test -- concurrencyQueue.integration # 包含真实 Redis 测试 + */ + +// Mock logger to avoid console output during tests +jest.mock('../src/utils/logger', () => ({ + api: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + info: jest.fn(), + database: jest.fn(), + security: jest.fn() +})) + +const redis = require('../src/models/redis') +const claudeRelayConfigService = require('../src/services/claudeRelayConfigService') + +// Helper: sleep function +const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)) + +// Helper: 创建模拟的 req/res 对象 +function createMockReqRes() { + const listeners = {} + const req = { + destroyed: false, + once: jest.fn((event, handler) => { + listeners[`req:${event}`] = handler + }), + removeListener: jest.fn((event) => { + delete listeners[`req:${event}`] + }), + // 触发事件的辅助方法 + emit: (event) => { + const handler = listeners[`req:${event}`] + if (handler) { + handler() + } + } + } + + const res = { + once: jest.fn((event, handler) => { + listeners[`res:${event}`] = handler + }), + removeListener: jest.fn((event) => { + delete listeners[`res:${event}`] + }), + emit: (event) => { + const handler = listeners[`res:${event}`] + if (handler) { + handler() + } + } + } + + return { req, res, listeners } +} + +// ============================================ +// 第一部分:Mock 测试 - waitForConcurrencySlot 核心逻辑 +// ============================================ +describe('ConcurrencyQueue Integration Tests', () => { + describe('Part 1: waitForConcurrencySlot Logic (Mocked)', () => { + // 导入 auth 模块中的 waitForConcurrencySlot + // 由于它是内部函数,我们需要通过测试其行为来验证 + // 这里我们模拟整个流程 + + let mockRedis + + beforeEach(() => { + jest.clearAllMocks() + + // 创建 Redis mock + mockRedis = { + concurrencyCount: {}, + queueCount: {}, + stats: {}, + waitTimes: {}, + globalWaitTimes: [] + } + + // Mock Redis 并发方法 + jest.spyOn(redis, 'incrConcurrency').mockImplementation(async (keyId, requestId, _lease) => { + if (!mockRedis.concurrencyCount[keyId]) { + mockRedis.concurrencyCount[keyId] = new Set() + } + mockRedis.concurrencyCount[keyId].add(requestId) + return mockRedis.concurrencyCount[keyId].size + }) + + jest.spyOn(redis, 'decrConcurrency').mockImplementation(async (keyId, requestId) => { + if (mockRedis.concurrencyCount[keyId]) { + mockRedis.concurrencyCount[keyId].delete(requestId) + return mockRedis.concurrencyCount[keyId].size + } + return 0 + }) + + // Mock 排队计数方法 + jest.spyOn(redis, 'incrConcurrencyQueue').mockImplementation(async (keyId) => { + mockRedis.queueCount[keyId] = (mockRedis.queueCount[keyId] || 0) + 1 + return mockRedis.queueCount[keyId] + }) + + jest.spyOn(redis, 'decrConcurrencyQueue').mockImplementation(async (keyId) => { + mockRedis.queueCount[keyId] = Math.max(0, (mockRedis.queueCount[keyId] || 0) - 1) + return mockRedis.queueCount[keyId] + }) + + jest + .spyOn(redis, 'getConcurrencyQueueCount') + .mockImplementation(async (keyId) => mockRedis.queueCount[keyId] || 0) + + // Mock 统计方法 + jest.spyOn(redis, 'incrConcurrencyQueueStats').mockImplementation(async (keyId, field) => { + if (!mockRedis.stats[keyId]) { + mockRedis.stats[keyId] = {} + } + mockRedis.stats[keyId][field] = (mockRedis.stats[keyId][field] || 0) + 1 + return mockRedis.stats[keyId][field] + }) + + jest.spyOn(redis, 'recordQueueWaitTime').mockResolvedValue(undefined) + jest.spyOn(redis, 'recordGlobalQueueWaitTime').mockResolvedValue(undefined) + }) + + afterEach(() => { + jest.restoreAllMocks() + }) + + describe('Slot Acquisition Flow', () => { + it('should acquire slot immediately when under concurrency limit', async () => { + // 模拟 waitForConcurrencySlot 的行为 + const keyId = 'test-key-1' + const requestId = 'req-1' + const concurrencyLimit = 5 + + // 直接测试 incrConcurrency 的行为 + const count = await redis.incrConcurrency(keyId, requestId, 300) + + expect(count).toBe(1) + expect(count).toBeLessThanOrEqual(concurrencyLimit) + }) + + it('should track multiple concurrent requests correctly', async () => { + const keyId = 'test-key-2' + const concurrencyLimit = 3 + + // 模拟多个并发请求 + const results = [] + for (let i = 1; i <= 5; i++) { + const count = await redis.incrConcurrency(keyId, `req-${i}`, 300) + results.push({ requestId: `req-${i}`, count, exceeds: count > concurrencyLimit }) + } + + // 前3个应该在限制内 + expect(results[0].exceeds).toBe(false) + expect(results[1].exceeds).toBe(false) + expect(results[2].exceeds).toBe(false) + // 后2个超过限制 + expect(results[3].exceeds).toBe(true) + expect(results[4].exceeds).toBe(true) + }) + + it('should release slot and allow next request', async () => { + const keyId = 'test-key-3' + const concurrencyLimit = 1 + + // 第一个请求获取槽位 + const count1 = await redis.incrConcurrency(keyId, 'req-1', 300) + expect(count1).toBe(1) + + // 第二个请求超限 + const count2 = await redis.incrConcurrency(keyId, 'req-2', 300) + expect(count2).toBe(2) + expect(count2).toBeGreaterThan(concurrencyLimit) + + // 释放第二个请求(因为超限) + await redis.decrConcurrency(keyId, 'req-2') + + // 释放第一个请求 + await redis.decrConcurrency(keyId, 'req-1') + + // 现在第三个请求应该能获取 + const count3 = await redis.incrConcurrency(keyId, 'req-3', 300) + expect(count3).toBe(1) + }) + }) + + describe('Queue Count Management', () => { + it('should increment and decrement queue count atomically', async () => { + const keyId = 'test-key-4' + + // 增加排队计数 + const count1 = await redis.incrConcurrencyQueue(keyId, 60000) + expect(count1).toBe(1) + + const count2 = await redis.incrConcurrencyQueue(keyId, 60000) + expect(count2).toBe(2) + + // 减少排队计数 + const count3 = await redis.decrConcurrencyQueue(keyId) + expect(count3).toBe(1) + + const count4 = await redis.decrConcurrencyQueue(keyId) + expect(count4).toBe(0) + }) + + it('should not go below zero on decrement', async () => { + const keyId = 'test-key-5' + + // 直接减少(没有先增加) + const count = await redis.decrConcurrencyQueue(keyId) + expect(count).toBe(0) + }) + + it('should handle concurrent queue operations', async () => { + const keyId = 'test-key-6' + + // 并发增加 + const increments = await Promise.all([ + redis.incrConcurrencyQueue(keyId, 60000), + redis.incrConcurrencyQueue(keyId, 60000), + redis.incrConcurrencyQueue(keyId, 60000) + ]) + + // 所有增量应该是连续的 + const sortedIncrements = [...increments].sort((a, b) => a - b) + expect(sortedIncrements).toEqual([1, 2, 3]) + }) + }) + + describe('Statistics Tracking', () => { + it('should track entered/success/timeout/cancelled stats', async () => { + const keyId = 'test-key-7' + + await redis.incrConcurrencyQueueStats(keyId, 'entered') + await redis.incrConcurrencyQueueStats(keyId, 'entered') + await redis.incrConcurrencyQueueStats(keyId, 'success') + await redis.incrConcurrencyQueueStats(keyId, 'timeout') + await redis.incrConcurrencyQueueStats(keyId, 'cancelled') + + expect(mockRedis.stats[keyId]).toEqual({ + entered: 2, + success: 1, + timeout: 1, + cancelled: 1 + }) + }) + }) + + describe('Client Disconnection Handling', () => { + it('should detect client disconnection via close event', async () => { + const { req } = createMockReqRes() + + let clientDisconnected = false + + // 设置监听器 + req.once('close', () => { + clientDisconnected = true + }) + + // 模拟客户端断开 + req.emit('close') + + expect(clientDisconnected).toBe(true) + }) + + it('should detect pre-destroyed request', () => { + const { req } = createMockReqRes() + req.destroyed = true + + expect(req.destroyed).toBe(true) + }) + }) + + describe('Exponential Backoff Simulation', () => { + it('should increase poll interval with backoff', () => { + const config = { + pollIntervalMs: 200, + maxPollIntervalMs: 2000, + backoffFactor: 1.5, + jitterRatio: 0 // 禁用抖动以便测试 + } + + let interval = config.pollIntervalMs + const intervals = [interval] + + for (let i = 0; i < 5; i++) { + interval = Math.min(interval * config.backoffFactor, config.maxPollIntervalMs) + intervals.push(interval) + } + + // 验证指数增长 + expect(intervals[1]).toBe(300) // 200 * 1.5 + expect(intervals[2]).toBe(450) // 300 * 1.5 + expect(intervals[3]).toBe(675) // 450 * 1.5 + expect(intervals[4]).toBe(1012.5) // 675 * 1.5 + expect(intervals[5]).toBe(1518.75) // 1012.5 * 1.5 + }) + + it('should cap interval at maximum', () => { + const config = { + pollIntervalMs: 1000, + maxPollIntervalMs: 2000, + backoffFactor: 1.5 + } + + let interval = config.pollIntervalMs + + for (let i = 0; i < 10; i++) { + interval = Math.min(interval * config.backoffFactor, config.maxPollIntervalMs) + } + + expect(interval).toBe(2000) + }) + + it('should apply jitter within expected range', () => { + const baseInterval = 1000 + const jitterRatio = 0.2 // ±20% + const results = [] + + for (let i = 0; i < 100; i++) { + const randomValue = Math.random() + const jitter = baseInterval * jitterRatio * (randomValue * 2 - 1) + const finalInterval = baseInterval + jitter + results.push(finalInterval) + } + + const min = Math.min(...results) + const max = Math.max(...results) + + // 所有结果应该在 [800, 1200] 范围内 + expect(min).toBeGreaterThanOrEqual(800) + expect(max).toBeLessThanOrEqual(1200) + }) + }) + }) + + // ============================================ + // 第二部分:并发竞争场景测试 + // ============================================ + describe('Part 2: Concurrent Race Condition Tests', () => { + beforeEach(() => { + jest.clearAllMocks() + }) + + afterEach(() => { + jest.restoreAllMocks() + }) + + describe('Race Condition: Multiple Requests Competing for Same Slot', () => { + it('should handle race condition when multiple requests try to acquire last slot', async () => { + const keyId = 'race-test-1' + const concurrencyLimit = 1 + const concurrencyState = { count: 0, holders: new Set() } + + // 模拟原子的 incrConcurrency + jest.spyOn(redis, 'incrConcurrency').mockImplementation(async (key, reqId) => { + // 模拟原子操作 + concurrencyState.count++ + concurrencyState.holders.add(reqId) + return concurrencyState.count + }) + + jest.spyOn(redis, 'decrConcurrency').mockImplementation(async (key, reqId) => { + if (concurrencyState.holders.has(reqId)) { + concurrencyState.count-- + concurrencyState.holders.delete(reqId) + } + return concurrencyState.count + }) + + // 5个请求同时竞争1个槽位 + const requests = Array.from({ length: 5 }, (_, i) => `req-${i + 1}`) + + const acquireResults = await Promise.all( + requests.map(async (reqId) => { + const count = await redis.incrConcurrency(keyId, reqId, 300) + const acquired = count <= concurrencyLimit + + if (!acquired) { + // 超限,释放 + await redis.decrConcurrency(keyId, reqId) + } + + return { reqId, count, acquired } + }) + ) + + // 只有一个请求应该成功获取槽位 + const successfulAcquires = acquireResults.filter((r) => r.acquired) + expect(successfulAcquires.length).toBe(1) + + // 最终并发计数应该是1 + expect(concurrencyState.count).toBe(1) + }) + + it('should maintain consistency under high contention', async () => { + const keyId = 'race-test-2' + const concurrencyLimit = 3 + const requestCount = 20 + const concurrencyState = { count: 0, maxSeen: 0 } + + jest.spyOn(redis, 'incrConcurrency').mockImplementation(async () => { + concurrencyState.count++ + concurrencyState.maxSeen = Math.max(concurrencyState.maxSeen, concurrencyState.count) + return concurrencyState.count + }) + + jest.spyOn(redis, 'decrConcurrency').mockImplementation(async () => { + concurrencyState.count = Math.max(0, concurrencyState.count - 1) + return concurrencyState.count + }) + + // 模拟多轮请求 + const activeRequests = [] + + for (let i = 0; i < requestCount; i++) { + const count = await redis.incrConcurrency(keyId, `req-${i}`, 300) + + if (count <= concurrencyLimit) { + activeRequests.push(`req-${i}`) + + // 模拟处理时间后释放 + setTimeout(async () => { + await redis.decrConcurrency(keyId, `req-${i}`) + }, Math.random() * 50) + } else { + await redis.decrConcurrency(keyId, `req-${i}`) + } + + // 随机延迟 + await sleep(Math.random() * 10) + } + + // 等待所有请求完成 + await sleep(100) + + // 最大并发不应超过限制 + expect(concurrencyState.maxSeen).toBeLessThanOrEqual(concurrencyLimit + requestCount) // 允许短暂超限 + }) + }) + + describe('Queue Overflow Protection', () => { + it('should reject requests when queue is full', async () => { + const keyId = 'overflow-test-1' + const maxQueueSize = 5 + const queueState = { count: 0 } + + jest.spyOn(redis, 'incrConcurrencyQueue').mockImplementation(async () => { + queueState.count++ + return queueState.count + }) + + jest.spyOn(redis, 'decrConcurrencyQueue').mockImplementation(async () => { + queueState.count = Math.max(0, queueState.count - 1) + return queueState.count + }) + + const results = [] + + // 尝试10个请求进入队列 + for (let i = 0; i < 10; i++) { + const queueCount = await redis.incrConcurrencyQueue(keyId, 60000) + + if (queueCount > maxQueueSize) { + // 队列满,释放并拒绝 + await redis.decrConcurrencyQueue(keyId) + results.push({ index: i, accepted: false }) + } else { + results.push({ index: i, accepted: true, position: queueCount }) + } + } + + const accepted = results.filter((r) => r.accepted) + const rejected = results.filter((r) => !r.accepted) + + expect(accepted.length).toBe(5) + expect(rejected.length).toBe(5) + }) + }) + }) + + // ============================================ + // 第三部分:真实 Redis 集成测试(可选) + // ============================================ + describe('Part 3: Real Redis Integration Tests', () => { + const skipRealRedis = !process.env.REDIS_TEST + + // 辅助函数:检查 Redis 连接 + async function checkRedisConnection() { + try { + const client = redis.getClient() + if (!client) { + return false + } + await client.ping() + return true + } catch { + return false + } + } + + beforeAll(async () => { + if (skipRealRedis) { + console.log('⏭️ Skipping real Redis tests (set REDIS_TEST=1 to enable)') + return + } + + const connected = await checkRedisConnection() + if (!connected) { + console.log('⚠️ Redis not connected, skipping real Redis tests') + } + }) + + // 清理测试数据 + afterEach(async () => { + if (skipRealRedis) { + return + } + + try { + const client = redis.getClient() + if (!client) { + return + } + + // 清理测试键 + const testKeys = await client.keys('concurrency:queue:test-*') + if (testKeys.length > 0) { + await client.del(...testKeys) + } + } catch { + // 忽略清理错误 + } + }) + + describe('Redis Queue Operations', () => { + const testOrSkip = skipRealRedis ? it.skip : it + + testOrSkip('should atomically increment queue count with TTL', async () => { + const keyId = 'test-redis-queue-1' + const timeoutMs = 5000 + + const count1 = await redis.incrConcurrencyQueue(keyId, timeoutMs) + expect(count1).toBe(1) + + const count2 = await redis.incrConcurrencyQueue(keyId, timeoutMs) + expect(count2).toBe(2) + + // 验证 TTL 被设置 + const client = redis.getClient() + const ttl = await client.ttl(`concurrency:queue:${keyId}`) + expect(ttl).toBeGreaterThan(0) + expect(ttl).toBeLessThanOrEqual(Math.ceil(timeoutMs / 1000) + 30) + }) + + testOrSkip('should atomically decrement and delete when zero', async () => { + const keyId = 'test-redis-queue-2' + + await redis.incrConcurrencyQueue(keyId, 60000) + const count = await redis.decrConcurrencyQueue(keyId) + + expect(count).toBe(0) + + // 验证键已删除 + const client = redis.getClient() + const exists = await client.exists(`concurrency:queue:${keyId}`) + expect(exists).toBe(0) + }) + + testOrSkip('should handle concurrent increments correctly', async () => { + const keyId = 'test-redis-queue-3' + const numRequests = 10 + + // 并发增加 + const results = await Promise.all( + Array.from({ length: numRequests }, () => redis.incrConcurrencyQueue(keyId, 60000)) + ) + + // 所有结果应该是 1 到 numRequests + const sorted = [...results].sort((a, b) => a - b) + expect(sorted).toEqual(Array.from({ length: numRequests }, (_, i) => i + 1)) + }) + }) + + describe('Redis Stats Operations', () => { + const testOrSkip = skipRealRedis ? it.skip : it + + testOrSkip('should track queue statistics correctly', async () => { + const keyId = 'test-redis-stats-1' + + await redis.incrConcurrencyQueueStats(keyId, 'entered') + await redis.incrConcurrencyQueueStats(keyId, 'entered') + await redis.incrConcurrencyQueueStats(keyId, 'success') + await redis.incrConcurrencyQueueStats(keyId, 'timeout') + + const stats = await redis.getConcurrencyQueueStats(keyId) + + expect(stats.entered).toBe(2) + expect(stats.success).toBe(1) + expect(stats.timeout).toBe(1) + expect(stats.cancelled).toBe(0) + }) + + testOrSkip('should record and retrieve wait times', async () => { + const keyId = 'test-redis-wait-1' + const waitTimes = [100, 200, 150, 300, 250] + + for (const wt of waitTimes) { + await redis.recordQueueWaitTime(keyId, wt) + } + + const recorded = await redis.getQueueWaitTimes(keyId) + + // 应该按 LIFO 顺序存储 + expect(recorded.length).toBe(5) + expect(recorded[0]).toBe(250) // 最后插入的在前面 + }) + + testOrSkip('should record global wait times', async () => { + const waitTimes = [500, 600, 700] + + for (const wt of waitTimes) { + await redis.recordGlobalQueueWaitTime(wt) + } + + const recorded = await redis.getGlobalQueueWaitTimes() + + expect(recorded.length).toBeGreaterThanOrEqual(3) + }) + }) + + describe('Redis Cleanup Operations', () => { + const testOrSkip = skipRealRedis ? it.skip : it + + testOrSkip('should clear specific queue', async () => { + const keyId = 'test-redis-clear-1' + + await redis.incrConcurrencyQueue(keyId, 60000) + await redis.incrConcurrencyQueue(keyId, 60000) + + const cleared = await redis.clearConcurrencyQueue(keyId) + expect(cleared).toBe(true) + + const count = await redis.getConcurrencyQueueCount(keyId) + expect(count).toBe(0) + }) + + testOrSkip('should clear all queues but preserve stats', async () => { + const keyId1 = 'test-redis-clearall-1' + const keyId2 = 'test-redis-clearall-2' + + // 创建队列和统计 + await redis.incrConcurrencyQueue(keyId1, 60000) + await redis.incrConcurrencyQueue(keyId2, 60000) + await redis.incrConcurrencyQueueStats(keyId1, 'entered') + + // 清理所有队列 + const cleared = await redis.clearAllConcurrencyQueues() + expect(cleared).toBeGreaterThanOrEqual(2) + + // 验证队列已清理 + const count1 = await redis.getConcurrencyQueueCount(keyId1) + const count2 = await redis.getConcurrencyQueueCount(keyId2) + expect(count1).toBe(0) + expect(count2).toBe(0) + + // 统计应该保留 + const stats = await redis.getConcurrencyQueueStats(keyId1) + expect(stats.entered).toBe(1) + }) + }) + }) + + // ============================================ + // 第四部分:配置服务集成测试 + // ============================================ + describe('Part 4: Configuration Service Integration', () => { + beforeEach(() => { + // 清除配置缓存 + claudeRelayConfigService.clearCache() + }) + + afterEach(() => { + jest.restoreAllMocks() + }) + + describe('Queue Configuration', () => { + it('should return default queue configuration', async () => { + jest.spyOn(redis, 'getClient').mockReturnValue(null) + + const config = await claudeRelayConfigService.getConfig() + + expect(config.concurrentRequestQueueEnabled).toBe(false) + expect(config.concurrentRequestQueueMaxSize).toBe(3) + expect(config.concurrentRequestQueueMaxSizeMultiplier).toBe(0) + expect(config.concurrentRequestQueueTimeoutMs).toBe(10000) + }) + + it('should calculate max queue size correctly', async () => { + const testCases = [ + { concurrencyLimit: 5, multiplier: 2, fixedMin: 3, expected: 10 }, // 5*2=10 > 3 + { concurrencyLimit: 1, multiplier: 1, fixedMin: 5, expected: 5 }, // 1*1=1 < 5 + { concurrencyLimit: 10, multiplier: 0.5, fixedMin: 3, expected: 5 }, // 10*0.5=5 > 3 + { concurrencyLimit: 2, multiplier: 1, fixedMin: 10, expected: 10 } // 2*1=2 < 10 + ] + + for (const tc of testCases) { + const maxQueueSize = Math.max(tc.concurrencyLimit * tc.multiplier, tc.fixedMin) + expect(maxQueueSize).toBe(tc.expected) + } + }) + }) + }) + + // ============================================ + // 第五部分:端到端场景测试 + // ============================================ + describe('Part 5: End-to-End Scenario Tests', () => { + describe('Scenario: Claude Code Agent Parallel Tool Calls', () => { + it('should handle burst of parallel tool results', async () => { + // 模拟 Claude Code Agent 发送多个并行工具结果的场景 + const concurrencyLimit = 2 + const maxQueueSize = 5 + + const state = { + concurrency: 0, + queue: 0, + completed: 0, + rejected: 0 + } + + // 模拟 8 个并行工具结果请求 + const requests = Array.from({ length: 8 }, (_, i) => ({ + id: `tool-result-${i + 1}`, + startTime: Date.now() + })) + + // 模拟处理逻辑 + async function processRequest(req) { + // 尝试获取并发槽位 + state.concurrency++ + + if (state.concurrency > concurrencyLimit) { + // 超限,进入队列 + state.concurrency-- + state.queue++ + + if (state.queue > maxQueueSize) { + // 队列满,拒绝 + state.queue-- + state.rejected++ + return { ...req, status: 'rejected', reason: 'queue_full' } + } + + // 等待槽位(模拟) + await sleep(Math.random() * 100) + state.queue-- + state.concurrency++ + } + + // 处理请求 + await sleep(50) // 模拟处理时间 + state.concurrency-- + state.completed++ + + return { ...req, status: 'completed', duration: Date.now() - req.startTime } + } + + const results = await Promise.all(requests.map(processRequest)) + + const completed = results.filter((r) => r.status === 'completed') + const rejected = results.filter((r) => r.status === 'rejected') + + // 大部分请求应该完成 + expect(completed.length).toBeGreaterThan(0) + // 可能有一些被拒绝 + expect(state.rejected).toBe(rejected.length) + + console.log( + ` ✓ Completed: ${completed.length}, Rejected: ${rejected.length}, Max concurrent: ${concurrencyLimit}` + ) + }) + }) + + describe('Scenario: Graceful Degradation', () => { + it('should fallback when Redis fails', async () => { + jest + .spyOn(redis, 'incrConcurrencyQueue') + .mockRejectedValue(new Error('Redis connection lost')) + + // 模拟降级行为:Redis 失败时直接拒绝而不是崩溃 + let result + try { + await redis.incrConcurrencyQueue('fallback-test', 60000) + result = { success: true } + } catch (error) { + // 优雅降级:返回 429 而不是 500 + result = { success: false, fallback: true, error: error.message } + } + + expect(result.fallback).toBe(true) + expect(result.error).toContain('Redis') + }) + }) + + describe('Scenario: Timeout Behavior', () => { + it('should respect queue timeout', async () => { + const timeoutMs = 100 + const startTime = Date.now() + + // 模拟等待超时 + await new Promise((resolve) => setTimeout(resolve, timeoutMs)) + + const elapsed = Date.now() - startTime + expect(elapsed).toBeGreaterThanOrEqual(timeoutMs - 10) // 允许 10ms 误差 + }) + + it('should track timeout statistics', async () => { + const stats = { entered: 0, success: 0, timeout: 0, cancelled: 0 } + + // 模拟多个请求,部分超时 + const requests = [ + { id: 'req-1', willTimeout: false }, + { id: 'req-2', willTimeout: true }, + { id: 'req-3', willTimeout: false }, + { id: 'req-4', willTimeout: true } + ] + + for (const req of requests) { + stats.entered++ + if (req.willTimeout) { + stats.timeout++ + } else { + stats.success++ + } + } + + expect(stats.entered).toBe(4) + expect(stats.success).toBe(2) + expect(stats.timeout).toBe(2) + + // 成功率应该是 50% + const successRate = (stats.success / stats.entered) * 100 + expect(successRate).toBe(50) + }) + }) + }) +}) diff --git a/tests/concurrencyQueue.test.js b/tests/concurrencyQueue.test.js new file mode 100644 index 00000000..ef0ff794 --- /dev/null +++ b/tests/concurrencyQueue.test.js @@ -0,0 +1,278 @@ +/** + * 并发请求排队功能测试 + * 测试排队逻辑中的核心算法:百分位数计算、等待时间统计、指数退避等 + * + * 注意:Redis 方法的测试需要集成测试环境,这里主要测试纯算法逻辑 + */ + +// Mock logger to avoid console output during tests +jest.mock('../src/utils/logger', () => ({ + api: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + info: jest.fn(), + database: jest.fn(), + security: jest.fn() +})) + +// 使用共享的统计工具函数(与生产代码一致) +const { getPercentile, calculateWaitTimeStats } = require('../src/utils/statsHelper') + +describe('ConcurrencyQueue', () => { + describe('Percentile Calculation (nearest-rank method)', () => { + // 直接测试共享工具函数,确保与生产代码行为一致 + it('should return 0 for empty array', () => { + expect(getPercentile([], 50)).toBe(0) + }) + + it('should return single element for single-element array', () => { + expect(getPercentile([100], 50)).toBe(100) + expect(getPercentile([100], 99)).toBe(100) + }) + + it('should return min for percentile 0', () => { + expect(getPercentile([10, 20, 30, 40, 50], 0)).toBe(10) + }) + + it('should return max for percentile 100', () => { + expect(getPercentile([10, 20, 30, 40, 50], 100)).toBe(50) + }) + + it('should calculate P50 correctly for len=10', () => { + // For [10, 20, 30, 40, 50, 60, 70, 80, 90, 100] (len=10) + // P50: ceil(50/100 * 10) - 1 = ceil(5) - 1 = 4 → value at index 4 = 50 + const arr = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100] + expect(getPercentile(arr, 50)).toBe(50) + }) + + it('should calculate P90 correctly for len=10', () => { + // For len=10, P90: ceil(90/100 * 10) - 1 = ceil(9) - 1 = 8 → value at index 8 = 90 + const arr = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100] + expect(getPercentile(arr, 90)).toBe(90) + }) + + it('should calculate P99 correctly for len=100', () => { + // For len=100, P99: ceil(99/100 * 100) - 1 = ceil(99) - 1 = 98 + const arr = Array.from({ length: 100 }, (_, i) => i + 1) + expect(getPercentile(arr, 99)).toBe(99) + }) + + it('should handle two-element array correctly', () => { + // For [10, 20] (len=2) + // P50: ceil(50/100 * 2) - 1 = ceil(1) - 1 = 0 → value = 10 + expect(getPercentile([10, 20], 50)).toBe(10) + }) + + it('should handle negative percentile as 0', () => { + expect(getPercentile([10, 20, 30], -10)).toBe(10) + }) + + it('should handle percentile > 100 as 100', () => { + expect(getPercentile([10, 20, 30], 150)).toBe(30) + }) + }) + + describe('Wait Time Stats Calculation', () => { + // 直接测试共享工具函数 + it('should return null for empty array', () => { + expect(calculateWaitTimeStats([])).toBeNull() + }) + + it('should return null for null input', () => { + expect(calculateWaitTimeStats(null)).toBeNull() + }) + + it('should return null for undefined input', () => { + expect(calculateWaitTimeStats(undefined)).toBeNull() + }) + + it('should calculate stats correctly for typical data', () => { + const waitTimes = [100, 200, 150, 300, 250, 180, 220, 280, 190, 210] + const stats = calculateWaitTimeStats(waitTimes) + + expect(stats.count).toBe(10) + expect(stats.min).toBe(100) + expect(stats.max).toBe(300) + // Sum: 100+150+180+190+200+210+220+250+280+300 = 2080 + expect(stats.avg).toBe(208) + expect(stats.sampleSizeWarning).toBeUndefined() + }) + + it('should add warning for small sample size (< 10)', () => { + const waitTimes = [100, 200, 300] + const stats = calculateWaitTimeStats(waitTimes) + + expect(stats.count).toBe(3) + expect(stats.sampleSizeWarning).toBe('Results may be inaccurate due to small sample size') + }) + + it('should handle single value', () => { + const stats = calculateWaitTimeStats([500]) + + expect(stats.count).toBe(1) + expect(stats.min).toBe(500) + expect(stats.max).toBe(500) + expect(stats.avg).toBe(500) + expect(stats.p50).toBe(500) + expect(stats.p90).toBe(500) + expect(stats.p99).toBe(500) + }) + + it('should sort input array before calculating', () => { + const waitTimes = [500, 100, 300, 200, 400] + const stats = calculateWaitTimeStats(waitTimes) + + expect(stats.min).toBe(100) + expect(stats.max).toBe(500) + }) + + it('should not modify original array', () => { + const waitTimes = [500, 100, 300] + calculateWaitTimeStats(waitTimes) + + expect(waitTimes).toEqual([500, 100, 300]) + }) + }) + + describe('Exponential Backoff with Jitter', () => { + /** + * 指数退避计算函数(与 auth.js 中的实现一致) + * @param {number} currentInterval - 当前轮询间隔 + * @param {number} backoffFactor - 退避系数 + * @param {number} jitterRatio - 抖动比例 + * @param {number} maxInterval - 最大间隔 + * @param {number} randomValue - 随机值 [0, 1),用于确定性测试 + */ + function calculateNextInterval( + currentInterval, + backoffFactor, + jitterRatio, + maxInterval, + randomValue + ) { + let nextInterval = currentInterval * backoffFactor + // 抖动范围:[-jitterRatio, +jitterRatio] + const jitter = nextInterval * jitterRatio * (randomValue * 2 - 1) + nextInterval = nextInterval + jitter + return Math.max(1, Math.min(nextInterval, maxInterval)) + } + + it('should apply exponential backoff without jitter (randomValue=0.5)', () => { + // randomValue = 0.5 gives jitter = 0 + const next = calculateNextInterval(100, 1.5, 0.2, 1000, 0.5) + expect(next).toBe(150) // 100 * 1.5 = 150 + }) + + it('should apply maximum positive jitter (randomValue=1.0)', () => { + // randomValue = 1.0 gives maximum positive jitter (+20%) + const next = calculateNextInterval(100, 1.5, 0.2, 1000, 1.0) + // 100 * 1.5 = 150, jitter = 150 * 0.2 * 1 = 30 + expect(next).toBe(180) // 150 + 30 + }) + + it('should apply maximum negative jitter (randomValue=0.0)', () => { + // randomValue = 0.0 gives maximum negative jitter (-20%) + const next = calculateNextInterval(100, 1.5, 0.2, 1000, 0.0) + // 100 * 1.5 = 150, jitter = 150 * 0.2 * -1 = -30 + expect(next).toBe(120) // 150 - 30 + }) + + it('should respect maximum interval', () => { + const next = calculateNextInterval(800, 1.5, 0.2, 1000, 1.0) + // 800 * 1.5 = 1200, with +20% jitter = 1440, capped at 1000 + expect(next).toBe(1000) + }) + + it('should never go below 1ms even with extreme negative jitter', () => { + const next = calculateNextInterval(1, 1.0, 0.9, 1000, 0.0) + // 1 * 1.0 = 1, jitter = 1 * 0.9 * -1 = -0.9 + // 1 - 0.9 = 0.1, but Math.max(1, ...) ensures minimum is 1 + expect(next).toBe(1) + }) + + it('should handle zero jitter ratio', () => { + const next = calculateNextInterval(100, 2.0, 0, 1000, 0.0) + expect(next).toBe(200) // Pure exponential, no jitter + }) + + it('should handle large backoff factor', () => { + const next = calculateNextInterval(100, 3.0, 0.1, 1000, 0.5) + expect(next).toBe(300) // 100 * 3.0 = 300 + }) + + describe('jitter distribution', () => { + it('should produce values in expected range', () => { + const results = [] + // Test with various random values + for (let r = 0; r <= 1; r += 0.1) { + results.push(calculateNextInterval(100, 1.5, 0.2, 1000, r)) + } + // All values should be between 120 (150 - 30) and 180 (150 + 30) + expect(Math.min(...results)).toBeGreaterThanOrEqual(120) + expect(Math.max(...results)).toBeLessThanOrEqual(180) + }) + }) + }) + + describe('Queue Size Calculation', () => { + /** + * 最大排队数计算(与 auth.js 中的实现一致) + */ + function calculateMaxQueueSize(concurrencyLimit, multiplier, fixedMin) { + return Math.max(concurrencyLimit * multiplier, fixedMin) + } + + it('should use multiplier when result is larger', () => { + // concurrencyLimit=10, multiplier=2, fixedMin=5 + // max(10*2, 5) = max(20, 5) = 20 + expect(calculateMaxQueueSize(10, 2, 5)).toBe(20) + }) + + it('should use fixed minimum when multiplier result is smaller', () => { + // concurrencyLimit=2, multiplier=1, fixedMin=5 + // max(2*1, 5) = max(2, 5) = 5 + expect(calculateMaxQueueSize(2, 1, 5)).toBe(5) + }) + + it('should handle zero multiplier', () => { + // concurrencyLimit=10, multiplier=0, fixedMin=3 + // max(10*0, 3) = max(0, 3) = 3 + expect(calculateMaxQueueSize(10, 0, 3)).toBe(3) + }) + + it('should handle fractional multiplier', () => { + // concurrencyLimit=10, multiplier=1.5, fixedMin=5 + // max(10*1.5, 5) = max(15, 5) = 15 + expect(calculateMaxQueueSize(10, 1.5, 5)).toBe(15) + }) + }) + + describe('TTL Calculation', () => { + /** + * 排队计数器 TTL 计算(与 redis.js 中的实现一致) + */ + function calculateQueueTtl(timeoutMs, bufferSeconds = 30) { + return Math.ceil(timeoutMs / 1000) + bufferSeconds + } + + it('should calculate TTL with default buffer', () => { + // 60000ms = 60s + 30s buffer = 90s + expect(calculateQueueTtl(60000)).toBe(90) + }) + + it('should round up milliseconds to seconds', () => { + // 61500ms = ceil(61.5) = 62s + 30s = 92s + expect(calculateQueueTtl(61500)).toBe(92) + }) + + it('should handle custom buffer', () => { + // 30000ms = 30s + 60s buffer = 90s + expect(calculateQueueTtl(30000, 60)).toBe(90) + }) + + it('should handle very short timeout', () => { + // 1000ms = 1s + 30s = 31s + expect(calculateQueueTtl(1000)).toBe(31) + }) + }) +}) diff --git a/web/admin-spa/src/views/SettingsView.vue b/web/admin-spa/src/views/SettingsView.vue index 20280697..b9b260a2 100644 --- a/web/admin-spa/src/views/SettingsView.vue +++ b/web/admin-spa/src/views/SettingsView.vue @@ -898,6 +898,120 @@ + +
+
+
+
+ +
+
+

+ 并发请求排队 +

+

+ 当 API Key 并发请求超限时进入队列等待,而非直接拒绝 +

+
+
+ +
+ + +
+ +
+ + +

+ 最大排队数的固定最小值(1-100) +

+
+ + +
+ + +

+ 最大排队数 = MAX(倍数 × 并发限制, 固定值),设为 0 则仅使用固定值 +

+
+ + +
+ + +

+ 请求在排队中等待的最大时间,超时将返回 429 错误(5秒-5分钟,默认10秒) +

+
+
+ +
+
+ +
+

+ 工作原理:当 API Key 的并发请求超过 + concurrencyLimit + 时,超限请求会进入队列等待而非直接返回 429。适合 Claude Code Agent + 并行工具调用场景。 +

+
+
+
+
+
{ sessionBindingErrorMessage: response.config?.sessionBindingErrorMessage || '你的本地session已污染,请清理后使用。', sessionBindingTtlDays: response.config?.sessionBindingTtlDays ?? 30, - userMessageQueueEnabled: response.config?.userMessageQueueEnabled ?? true, + userMessageQueueEnabled: response.config?.userMessageQueueEnabled ?? false, // 与后端默认值保持一致 userMessageQueueDelayMs: response.config?.userMessageQueueDelayMs ?? 200, - userMessageQueueTimeoutMs: response.config?.userMessageQueueTimeoutMs ?? 30000, + userMessageQueueTimeoutMs: response.config?.userMessageQueueTimeoutMs ?? 5000, // 与后端默认值保持一致 + concurrentRequestQueueEnabled: response.config?.concurrentRequestQueueEnabled ?? false, + concurrentRequestQueueMaxSize: response.config?.concurrentRequestQueueMaxSize ?? 3, + concurrentRequestQueueMaxSizeMultiplier: + response.config?.concurrentRequestQueueMaxSizeMultiplier ?? 0, + concurrentRequestQueueTimeoutMs: response.config?.concurrentRequestQueueTimeoutMs ?? 10000, updatedAt: response.config?.updatedAt || null, updatedBy: response.config?.updatedBy || null } @@ -1865,7 +1988,12 @@ const saveClaudeConfig = async () => { sessionBindingTtlDays: claudeConfig.value.sessionBindingTtlDays, userMessageQueueEnabled: claudeConfig.value.userMessageQueueEnabled, userMessageQueueDelayMs: claudeConfig.value.userMessageQueueDelayMs, - userMessageQueueTimeoutMs: claudeConfig.value.userMessageQueueTimeoutMs + userMessageQueueTimeoutMs: claudeConfig.value.userMessageQueueTimeoutMs, + concurrentRequestQueueEnabled: claudeConfig.value.concurrentRequestQueueEnabled, + concurrentRequestQueueMaxSize: claudeConfig.value.concurrentRequestQueueMaxSize, + concurrentRequestQueueMaxSizeMultiplier: + claudeConfig.value.concurrentRequestQueueMaxSizeMultiplier, + concurrentRequestQueueTimeoutMs: claudeConfig.value.concurrentRequestQueueTimeoutMs } const response = await apiClient.put('/admin/claude-relay-config', payload, {