mirror of
https://github.com/Wei-Shaw/claude-relay-service.git
synced 2026-01-23 00:53:33 +00:00
feat: enhance concurrency queue with health check and admin endpoints
- Add queue health check for fast-fail when overloaded (P90 > threshold) - Implement socket identity verification with UUID token - Add wait time statistics (P50/P90/P99) and queue stats tracking - Add admin endpoints for queue stats and cleanup - Add CLEAR_CONCURRENCY_QUEUES_ON_STARTUP config option - Update documentation with troubleshooting and proxy config guide
This commit is contained in:
@@ -8,6 +8,102 @@ const redis = require('../models/redis')
|
||||
const ClientValidator = require('../validators/clientValidator')
|
||||
const ClaudeCodeValidator = require('../validators/clients/claudeCodeValidator')
|
||||
const claudeRelayConfigService = require('../services/claudeRelayConfigService')
|
||||
const { calculateWaitTimeStats } = require('../utils/statsHelper')
|
||||
|
||||
// 工具函数
|
||||
function sleep(ms) {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms))
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查排队是否过载,决定是否应该快速失败
|
||||
* 详见 design.md Decision 7: 排队健康检查与快速失败
|
||||
*
|
||||
* @param {string} apiKeyId - API Key ID
|
||||
* @param {number} timeoutMs - 排队超时时间(毫秒)
|
||||
* @param {Object} queueConfig - 队列配置
|
||||
* @param {number} maxQueueSize - 最大排队数
|
||||
* @returns {Promise<Object>} { reject: boolean, reason?: string, estimatedWaitMs?: number, timeoutMs?: number }
|
||||
*/
|
||||
async function shouldRejectDueToOverload(apiKeyId, timeoutMs, queueConfig, maxQueueSize) {
|
||||
try {
|
||||
// 如果健康检查被禁用,直接返回不拒绝
|
||||
if (!queueConfig.concurrentRequestQueueHealthCheckEnabled) {
|
||||
return { reject: false, reason: 'health_check_disabled' }
|
||||
}
|
||||
|
||||
// 🔑 先检查当前队列长度
|
||||
const currentQueueCount = await redis.getConcurrencyQueueCount(apiKeyId).catch(() => 0)
|
||||
|
||||
// 队列为空,说明系统已恢复,跳过健康检查
|
||||
if (currentQueueCount === 0) {
|
||||
return { reject: false, reason: 'queue_empty', currentQueueCount: 0 }
|
||||
}
|
||||
|
||||
// 🔑 关键改进:只有当队列接近满载时才进行健康检查
|
||||
// 队列长度 <= maxQueueSize * 0.5 时,认为系统有足够余量,跳过健康检查
|
||||
// 这避免了在队列较短时过于保守地拒绝请求
|
||||
// 使用 ceil 确保小队列(如 maxQueueSize=3)时阈值为 2,即队列 <=1 时跳过
|
||||
const queueLoadThreshold = Math.ceil(maxQueueSize * 0.5)
|
||||
if (currentQueueCount <= queueLoadThreshold) {
|
||||
return {
|
||||
reject: false,
|
||||
reason: 'queue_not_loaded',
|
||||
currentQueueCount,
|
||||
queueLoadThreshold,
|
||||
maxQueueSize
|
||||
}
|
||||
}
|
||||
|
||||
// 获取该 API Key 的等待时间样本
|
||||
const waitTimes = await redis.getQueueWaitTimes(apiKeyId)
|
||||
const stats = calculateWaitTimeStats(waitTimes)
|
||||
|
||||
// 样本不足(< 10),跳过健康检查,避免冷启动误判
|
||||
if (!stats || stats.sampleCount < 10) {
|
||||
return { reject: false, reason: 'insufficient_samples', sampleCount: stats?.sampleCount || 0 }
|
||||
}
|
||||
|
||||
// P90 不可靠时也跳过(虽然 sampleCount >= 10 时 p90Unreliable 应该是 false)
|
||||
if (stats.p90Unreliable) {
|
||||
return { reject: false, reason: 'p90_unreliable', sampleCount: stats.sampleCount }
|
||||
}
|
||||
|
||||
// 计算健康阈值:P90 >= 超时时间 × 阈值 时拒绝
|
||||
const threshold = queueConfig.concurrentRequestQueueHealthThreshold || 0.8
|
||||
const maxAllowedP90 = timeoutMs * threshold
|
||||
|
||||
if (stats.p90 >= maxAllowedP90) {
|
||||
return {
|
||||
reject: true,
|
||||
reason: 'queue_overloaded',
|
||||
estimatedWaitMs: stats.p90,
|
||||
timeoutMs,
|
||||
threshold,
|
||||
sampleCount: stats.sampleCount,
|
||||
currentQueueCount,
|
||||
maxQueueSize
|
||||
}
|
||||
}
|
||||
|
||||
return { reject: false, p90: stats.p90, sampleCount: stats.sampleCount, currentQueueCount }
|
||||
} catch (error) {
|
||||
// 健康检查出错时不阻塞请求,记录警告并继续
|
||||
logger.warn(`Health check failed for ${apiKeyId}:`, error.message)
|
||||
return { reject: false, reason: 'health_check_error', error: error.message }
|
||||
}
|
||||
}
|
||||
|
||||
// 排队轮询配置常量(可通过配置文件覆盖)
|
||||
// 性能权衡:初始间隔越短响应越快,但 Redis QPS 越高
|
||||
// 当前配置:100 个等待者时约 250-300 QPS(指数退避后)
|
||||
const QUEUE_POLLING_CONFIG = {
|
||||
pollIntervalMs: 200, // 初始轮询间隔(毫秒)- 平衡响应速度和 Redis 压力
|
||||
maxPollIntervalMs: 2000, // 最大轮询间隔(毫秒)- 长时间等待时降低 Redis 压力
|
||||
backoffFactor: 1.5, // 指数退避系数
|
||||
jitterRatio: 0.2, // 抖动比例(±20%)- 防止惊群效应
|
||||
maxRedisFailCount: 5 // 连续 Redis 失败阈值(从 3 提高到 5,提高网络抖动容忍度)
|
||||
}
|
||||
|
||||
const FALLBACK_CONCURRENCY_CONFIG = {
|
||||
leaseSeconds: 300,
|
||||
@@ -128,9 +224,223 @@ function isTokenCountRequest(req) {
|
||||
return false
|
||||
}
|
||||
|
||||
/**
|
||||
* 等待并发槽位(排队机制核心)
|
||||
*
|
||||
* 采用「先占后检查」模式避免竞态条件:
|
||||
* - 每次轮询时尝试 incrConcurrency 占位
|
||||
* - 如果超限则 decrConcurrency 释放并继续等待
|
||||
* - 成功获取槽位后返回,调用方无需再次 incrConcurrency
|
||||
*
|
||||
* ⚠️ 重要清理责任说明:
|
||||
* - 排队计数:此函数的 finally 块负责调用 decrConcurrencyQueue 清理
|
||||
* - 并发槽位:当返回 acquired=true 时,槽位已被占用(通过 incrConcurrency)
|
||||
* 调用方必须在请求结束时调用 decrConcurrency 释放槽位
|
||||
* (已在 authenticateApiKey 的 finally 块中处理)
|
||||
*
|
||||
* @param {Object} req - Express 请求对象
|
||||
* @param {Object} res - Express 响应对象
|
||||
* @param {string} apiKeyId - API Key ID
|
||||
* @param {Object} queueOptions - 配置参数
|
||||
* @returns {Promise<Object>} { acquired: boolean, reason?: string, waitTimeMs: number }
|
||||
*/
|
||||
async function waitForConcurrencySlot(req, res, apiKeyId, queueOptions) {
|
||||
const {
|
||||
concurrencyLimit,
|
||||
requestId,
|
||||
leaseSeconds,
|
||||
timeoutMs,
|
||||
pollIntervalMs,
|
||||
maxPollIntervalMs,
|
||||
backoffFactor,
|
||||
jitterRatio,
|
||||
maxRedisFailCount: configMaxRedisFailCount
|
||||
} = queueOptions
|
||||
|
||||
let clientDisconnected = false
|
||||
// 追踪轮询过程中是否临时占用了槽位(用于异常时清理)
|
||||
// 工作流程:
|
||||
// 1. incrConcurrency 成功且 count <= limit 时,设置 internalSlotAcquired = true
|
||||
// 2. 统计记录完成后,设置 internalSlotAcquired = false 并返回(所有权转移给调用方)
|
||||
// 3. 如果在步骤 1-2 之间发生异常,finally 块会检测到 internalSlotAcquired = true 并释放槽位
|
||||
let internalSlotAcquired = false
|
||||
|
||||
// 监听客户端断开事件
|
||||
// ⚠️ 重要:必须监听 socket 的事件,而不是 req 的事件!
|
||||
// 原因:对于 POST 请求,当 body-parser 读取完请求体后,req(IncomingMessage 可读流)
|
||||
// 的 'close' 事件会立即触发,但这不代表客户端断开连接!客户端仍在等待响应。
|
||||
// socket 的 'close' 事件才是真正的连接关闭信号。
|
||||
const { socket } = req
|
||||
const onSocketClose = () => {
|
||||
clientDisconnected = true
|
||||
logger.debug(
|
||||
`🔌 [Queue] Socket closed during queue wait for API key ${apiKeyId}, requestId: ${requestId}`
|
||||
)
|
||||
}
|
||||
|
||||
if (socket) {
|
||||
socket.once('close', onSocketClose)
|
||||
}
|
||||
|
||||
// 检查 socket 是否在监听器注册前已被销毁(边界情况)
|
||||
if (socket?.destroyed) {
|
||||
clientDisconnected = true
|
||||
}
|
||||
|
||||
const startTime = Date.now()
|
||||
let pollInterval = pollIntervalMs
|
||||
let redisFailCount = 0
|
||||
// 优先使用配置中的值,否则使用默认值
|
||||
const maxRedisFailCount = configMaxRedisFailCount || QUEUE_POLLING_CONFIG.maxRedisFailCount
|
||||
|
||||
try {
|
||||
while (Date.now() - startTime < timeoutMs) {
|
||||
// 检测客户端是否断开(双重检查:事件标记 + socket 状态)
|
||||
// socket.destroyed 是同步检查,确保即使事件处理有延迟也能及时检测
|
||||
if (clientDisconnected || socket?.destroyed) {
|
||||
redis
|
||||
.incrConcurrencyQueueStats(apiKeyId, 'cancelled')
|
||||
.catch((e) => logger.warn('Failed to record cancelled stat:', e))
|
||||
return {
|
||||
acquired: false,
|
||||
reason: 'client_disconnected',
|
||||
waitTimeMs: Date.now() - startTime
|
||||
}
|
||||
}
|
||||
|
||||
// 尝试获取槽位(先占后检查)
|
||||
try {
|
||||
const count = await redis.incrConcurrency(apiKeyId, requestId, leaseSeconds)
|
||||
redisFailCount = 0 // 重置失败计数
|
||||
|
||||
if (count <= concurrencyLimit) {
|
||||
// 成功获取槽位!
|
||||
const waitTimeMs = Date.now() - startTime
|
||||
|
||||
// 槽位所有权转移说明:
|
||||
// 1. 此时槽位已通过 incrConcurrency 获取
|
||||
// 2. 先标记 internalSlotAcquired = true,确保异常时 finally 块能清理
|
||||
// 3. 统计操作完成后,清除标记并返回,所有权转移给调用方
|
||||
// 4. 调用方(authenticateApiKey)负责在请求结束时释放槽位
|
||||
|
||||
// 标记槽位已获取(用于异常时 finally 块清理)
|
||||
internalSlotAcquired = true
|
||||
|
||||
// 记录统计(非阻塞,fire-and-forget 模式)
|
||||
// ⚠️ 设计说明:
|
||||
// - 故意不 await 这些 Promise,因为统计记录不应阻塞请求处理
|
||||
// - 每个 Promise 都有独立的 .catch(),确保单个失败不影响其他
|
||||
// - 外层 .catch() 是防御性措施,处理 Promise.all 本身的异常
|
||||
// - 即使统计记录在函数返回后才完成/失败,也是安全的(仅日志记录)
|
||||
// - 统计数据丢失可接受,不影响核心业务逻辑
|
||||
Promise.all([
|
||||
redis
|
||||
.recordQueueWaitTime(apiKeyId, waitTimeMs)
|
||||
.catch((e) => logger.warn('Failed to record queue wait time:', e)),
|
||||
redis
|
||||
.recordGlobalQueueWaitTime(waitTimeMs)
|
||||
.catch((e) => logger.warn('Failed to record global wait time:', e)),
|
||||
redis
|
||||
.incrConcurrencyQueueStats(apiKeyId, 'success')
|
||||
.catch((e) => logger.warn('Failed to increment success stats:', e))
|
||||
]).catch((e) => logger.warn('Failed to record queue stats batch:', e))
|
||||
|
||||
// 成功返回前清除标记(所有权转移给调用方,由其负责释放)
|
||||
internalSlotAcquired = false
|
||||
return { acquired: true, waitTimeMs }
|
||||
}
|
||||
|
||||
// 超限,释放槽位继续等待
|
||||
try {
|
||||
await redis.decrConcurrency(apiKeyId, requestId)
|
||||
} catch (decrError) {
|
||||
// 释放失败时记录警告但继续轮询
|
||||
// 下次 incrConcurrency 会自然覆盖同一 requestId 的条目
|
||||
logger.warn(
|
||||
`Failed to release slot during polling for ${apiKeyId}, will retry:`,
|
||||
decrError
|
||||
)
|
||||
}
|
||||
} catch (redisError) {
|
||||
redisFailCount++
|
||||
logger.error(
|
||||
`Redis error in queue polling (${redisFailCount}/${maxRedisFailCount}):`,
|
||||
redisError
|
||||
)
|
||||
|
||||
if (redisFailCount >= maxRedisFailCount) {
|
||||
// 连续 Redis 失败,放弃排队
|
||||
return {
|
||||
acquired: false,
|
||||
reason: 'redis_error',
|
||||
waitTimeMs: Date.now() - startTime
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 指数退避等待
|
||||
await sleep(pollInterval)
|
||||
|
||||
// 计算下一次轮询间隔(指数退避 + 抖动)
|
||||
// 1. 先应用指数退避
|
||||
let nextInterval = pollInterval * backoffFactor
|
||||
// 2. 添加抖动防止惊群效应(±jitterRatio 范围内的随机偏移)
|
||||
// 抖动范围:[-jitterRatio, +jitterRatio],例如 jitterRatio=0.2 时为 ±20%
|
||||
// 这是预期行为:负抖动可使间隔略微缩短,正抖动可使间隔略微延长
|
||||
// 目的是分散多个等待者的轮询时间点,避免同时请求 Redis
|
||||
const jitter = nextInterval * jitterRatio * (Math.random() * 2 - 1)
|
||||
nextInterval = nextInterval + jitter
|
||||
// 3. 确保在合理范围内:最小 1ms,最大 maxPollIntervalMs
|
||||
// Math.max(1, ...) 保证即使负抖动也不会产生 ≤0 的间隔
|
||||
pollInterval = Math.max(1, Math.min(nextInterval, maxPollIntervalMs))
|
||||
}
|
||||
|
||||
// 超时
|
||||
redis
|
||||
.incrConcurrencyQueueStats(apiKeyId, 'timeout')
|
||||
.catch((e) => logger.warn('Failed to record timeout stat:', e))
|
||||
return { acquired: false, reason: 'timeout', waitTimeMs: Date.now() - startTime }
|
||||
} finally {
|
||||
// 确保清理:
|
||||
// 1. 减少排队计数(排队计数在调用方已增加,这里负责减少)
|
||||
try {
|
||||
await redis.decrConcurrencyQueue(apiKeyId)
|
||||
} catch (cleanupError) {
|
||||
// 清理失败记录错误(可能导致计数泄漏,但有 TTL 保护)
|
||||
logger.error(
|
||||
`Failed to decrement queue count in finally block for ${apiKeyId}:`,
|
||||
cleanupError
|
||||
)
|
||||
}
|
||||
|
||||
// 2. 如果内部获取了槽位但未正常返回(异常路径),释放槽位
|
||||
if (internalSlotAcquired) {
|
||||
try {
|
||||
await redis.decrConcurrency(apiKeyId, requestId)
|
||||
logger.warn(
|
||||
`⚠️ Released orphaned concurrency slot in finally block for ${apiKeyId}, requestId: ${requestId}`
|
||||
)
|
||||
} catch (slotCleanupError) {
|
||||
logger.error(
|
||||
`Failed to release orphaned concurrency slot for ${apiKeyId}:`,
|
||||
slotCleanupError
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// 清理 socket 事件监听器
|
||||
if (socket) {
|
||||
socket.removeListener('close', onSocketClose)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 🔑 API Key验证中间件(优化版)
|
||||
const authenticateApiKey = async (req, res, next) => {
|
||||
const startTime = Date.now()
|
||||
let authErrored = false
|
||||
let concurrencyCleanup = null
|
||||
let hasConcurrencySlot = false
|
||||
|
||||
try {
|
||||
// 安全提取API Key,支持多种格式(包括Gemini CLI支持)
|
||||
@@ -265,39 +575,346 @@ const authenticateApiKey = async (req, res, next) => {
|
||||
}
|
||||
const requestId = uuidv4()
|
||||
|
||||
// ⚠️ 优化后的 Connection: close 设置策略
|
||||
// 问题背景:HTTP Keep-Alive 使多个请求共用同一个 TCP 连接
|
||||
// 当第一个请求正在处理,第二个请求进入排队时,它们共用同一个 socket
|
||||
// 如果客户端超时关闭连接,两个请求都会受影响
|
||||
// 优化方案:只有在请求实际进入排队时才设置 Connection: close
|
||||
// 未排队的请求保持 Keep-Alive,避免不必要的 TCP 握手开销
|
||||
// 详见 design.md Decision 2: Connection: close 设置时机
|
||||
// 注意:Connection: close 将在下方代码实际进入排队时设置(第 637 行左右)
|
||||
|
||||
// ============================================================
|
||||
// 🔒 并发槽位状态管理说明
|
||||
// ============================================================
|
||||
// 此函数中有两个关键状态变量:
|
||||
// - hasConcurrencySlot: 当前是否持有并发槽位
|
||||
// - concurrencyCleanup: 错误时调用的清理函数
|
||||
//
|
||||
// 状态转换流程:
|
||||
// 1. incrConcurrency 成功 → hasConcurrencySlot=true, 设置临时清理函数
|
||||
// 2. 若超限 → 释放槽位,hasConcurrencySlot=false, concurrencyCleanup=null
|
||||
// 3. 若排队成功 → hasConcurrencySlot=true, 升级为完整清理函数(含 interval 清理)
|
||||
// 4. 请求结束(res.close/req.close)→ 调用 decrementConcurrency 释放
|
||||
// 5. 认证错误 → finally 块调用 concurrencyCleanup 释放
|
||||
//
|
||||
// 为什么需要两种清理函数?
|
||||
// - 临时清理:在排队/认证过程中出错时使用,只释放槽位
|
||||
// - 完整清理:请求正常开始后使用,还需清理 leaseRenewInterval
|
||||
// ============================================================
|
||||
const setTemporaryConcurrencyCleanup = () => {
|
||||
concurrencyCleanup = async () => {
|
||||
if (!hasConcurrencySlot) {
|
||||
return
|
||||
}
|
||||
hasConcurrencySlot = false
|
||||
try {
|
||||
await redis.decrConcurrency(validation.keyData.id, requestId)
|
||||
} catch (cleanupError) {
|
||||
logger.error(
|
||||
`Failed to decrement concurrency after auth error for key ${validation.keyData.id}:`,
|
||||
cleanupError
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const currentConcurrency = await redis.incrConcurrency(
|
||||
validation.keyData.id,
|
||||
requestId,
|
||||
leaseSeconds
|
||||
)
|
||||
hasConcurrencySlot = true
|
||||
setTemporaryConcurrencyCleanup()
|
||||
logger.api(
|
||||
`📈 Incremented concurrency for key: ${validation.keyData.id} (${validation.keyData.name}), current: ${currentConcurrency}, limit: ${concurrencyLimit}`
|
||||
)
|
||||
|
||||
if (currentConcurrency > concurrencyLimit) {
|
||||
// 如果超过限制,立即减少计数(添加 try-catch 防止异常导致并发泄漏)
|
||||
// 1. 先释放刚占用的槽位
|
||||
try {
|
||||
const newCount = await redis.decrConcurrency(validation.keyData.id, requestId)
|
||||
logger.api(
|
||||
`📉 Decremented concurrency (429 rejected) for key: ${validation.keyData.id} (${validation.keyData.name}), new count: ${newCount}`
|
||||
)
|
||||
await redis.decrConcurrency(validation.keyData.id, requestId)
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Failed to decrement concurrency after limit exceeded for key ${validation.keyData.id}:`,
|
||||
error
|
||||
)
|
||||
}
|
||||
logger.security(
|
||||
`🚦 Concurrency limit exceeded for key: ${validation.keyData.id} (${
|
||||
validation.keyData.name
|
||||
}), current: ${currentConcurrency - 1}, limit: ${concurrencyLimit}`
|
||||
hasConcurrencySlot = false
|
||||
concurrencyCleanup = null
|
||||
|
||||
// 2. 获取排队配置
|
||||
const queueConfig = await claudeRelayConfigService.getConfig()
|
||||
|
||||
// 3. 排队功能未启用,直接返回 429(保持现有行为)
|
||||
if (!queueConfig.concurrentRequestQueueEnabled) {
|
||||
logger.security(
|
||||
`🚦 Concurrency limit exceeded for key: ${validation.keyData.id} (${
|
||||
validation.keyData.name
|
||||
}), current: ${currentConcurrency - 1}, limit: ${concurrencyLimit}`
|
||||
)
|
||||
// 建议客户端在短暂延迟后重试(并发场景下通常很快会有槽位释放)
|
||||
res.set('Retry-After', '1')
|
||||
return res.status(429).json({
|
||||
error: 'Concurrency limit exceeded',
|
||||
message: `Too many concurrent requests. Limit: ${concurrencyLimit} concurrent requests`,
|
||||
currentConcurrency: currentConcurrency - 1,
|
||||
concurrencyLimit
|
||||
})
|
||||
}
|
||||
|
||||
// 4. 计算最大排队数
|
||||
const maxQueueSize = Math.max(
|
||||
concurrencyLimit * queueConfig.concurrentRequestQueueMaxSizeMultiplier,
|
||||
queueConfig.concurrentRequestQueueMaxSize
|
||||
)
|
||||
return res.status(429).json({
|
||||
error: 'Concurrency limit exceeded',
|
||||
message: `Too many concurrent requests. Limit: ${concurrencyLimit} concurrent requests`,
|
||||
currentConcurrency: currentConcurrency - 1,
|
||||
concurrencyLimit
|
||||
})
|
||||
|
||||
// 4.5 排队健康检查:过载时快速失败
|
||||
// 详见 design.md Decision 7: 排队健康检查与快速失败
|
||||
const overloadCheck = await shouldRejectDueToOverload(
|
||||
validation.keyData.id,
|
||||
queueConfig.concurrentRequestQueueTimeoutMs,
|
||||
queueConfig,
|
||||
maxQueueSize
|
||||
)
|
||||
if (overloadCheck.reject) {
|
||||
// 使用健康检查返回的当前排队数,避免重复调用 Redis
|
||||
const currentQueueCount = overloadCheck.currentQueueCount || 0
|
||||
logger.api(
|
||||
`🚨 Queue overloaded for key: ${validation.keyData.id} (${validation.keyData.name}), ` +
|
||||
`P90=${overloadCheck.estimatedWaitMs}ms, timeout=${overloadCheck.timeoutMs}ms, ` +
|
||||
`threshold=${overloadCheck.threshold}, samples=${overloadCheck.sampleCount}, ` +
|
||||
`concurrency=${concurrencyLimit}, queue=${currentQueueCount}/${maxQueueSize}`
|
||||
)
|
||||
// 记录被拒绝的过载统计
|
||||
redis
|
||||
.incrConcurrencyQueueStats(validation.keyData.id, 'rejected_overload')
|
||||
.catch((e) => logger.warn('Failed to record rejected_overload stat:', e))
|
||||
// 返回 429 + Retry-After,让客户端稍后重试
|
||||
const retryAfterSeconds = 30
|
||||
res.set('Retry-After', String(retryAfterSeconds))
|
||||
return res.status(429).json({
|
||||
error: 'Queue overloaded',
|
||||
message: `Queue is overloaded. Estimated wait time (${overloadCheck.estimatedWaitMs}ms) exceeds threshold. Limit: ${concurrencyLimit} concurrent requests, queue: ${currentQueueCount}/${maxQueueSize}. Please retry later.`,
|
||||
currentConcurrency: concurrencyLimit,
|
||||
concurrencyLimit,
|
||||
queueCount: currentQueueCount,
|
||||
maxQueueSize,
|
||||
estimatedWaitMs: overloadCheck.estimatedWaitMs,
|
||||
timeoutMs: overloadCheck.timeoutMs,
|
||||
queueTimeoutMs: queueConfig.concurrentRequestQueueTimeoutMs,
|
||||
retryAfterSeconds
|
||||
})
|
||||
}
|
||||
|
||||
// 5. 尝试进入排队(原子操作:先增加再检查,避免竞态条件)
|
||||
let queueIncremented = false
|
||||
try {
|
||||
const newQueueCount = await redis.incrConcurrencyQueue(
|
||||
validation.keyData.id,
|
||||
queueConfig.concurrentRequestQueueTimeoutMs
|
||||
)
|
||||
queueIncremented = true
|
||||
|
||||
if (newQueueCount > maxQueueSize) {
|
||||
// 超过最大排队数,立即释放并返回 429
|
||||
await redis.decrConcurrencyQueue(validation.keyData.id)
|
||||
queueIncremented = false
|
||||
logger.api(
|
||||
`🚦 Concurrency queue full for key: ${validation.keyData.id} (${validation.keyData.name}), ` +
|
||||
`queue: ${newQueueCount - 1}, maxQueue: ${maxQueueSize}`
|
||||
)
|
||||
// 队列已满,建议客户端在排队超时时间后重试
|
||||
const retryAfterSeconds = Math.ceil(queueConfig.concurrentRequestQueueTimeoutMs / 1000)
|
||||
res.set('Retry-After', String(retryAfterSeconds))
|
||||
return res.status(429).json({
|
||||
error: 'Concurrency queue full',
|
||||
message: `Too many requests waiting in queue. Limit: ${concurrencyLimit} concurrent requests, queue: ${newQueueCount - 1}/${maxQueueSize}, timeout: ${retryAfterSeconds}s`,
|
||||
currentConcurrency: concurrencyLimit,
|
||||
concurrencyLimit,
|
||||
queueCount: newQueueCount - 1,
|
||||
maxQueueSize,
|
||||
queueTimeoutMs: queueConfig.concurrentRequestQueueTimeoutMs,
|
||||
retryAfterSeconds
|
||||
})
|
||||
}
|
||||
|
||||
// 6. 已成功进入排队,记录统计并开始等待槽位
|
||||
logger.api(
|
||||
`⏳ Request entering queue for key: ${validation.keyData.id} (${validation.keyData.name}), ` +
|
||||
`queue position: ${newQueueCount}`
|
||||
)
|
||||
redis
|
||||
.incrConcurrencyQueueStats(validation.keyData.id, 'entered')
|
||||
.catch((e) => logger.warn('Failed to record entered stat:', e))
|
||||
|
||||
// ⚠️ 仅在请求实际进入排队时设置 Connection: close
|
||||
// 详见 design.md Decision 2: Connection: close 设置时机
|
||||
// 未排队的请求保持 Keep-Alive,避免不必要的 TCP 握手开销
|
||||
if (!res.headersSent) {
|
||||
res.setHeader('Connection', 'close')
|
||||
logger.api(
|
||||
`🔌 [Queue] Set Connection: close for queued request, key: ${validation.keyData.id}`
|
||||
)
|
||||
}
|
||||
|
||||
// ⚠️ 记录排队开始时的 socket 标识,用于排队完成后验证
|
||||
// 问题背景:HTTP Keep-Alive 连接复用时,长时间排队可能导致 socket 被其他请求使用
|
||||
// 验证方法:使用 UUID token + socket 对象引用双重验证
|
||||
// 详见 design.md Decision 1: Socket 身份验证机制
|
||||
req._crService = req._crService || {}
|
||||
req._crService.queueToken = uuidv4()
|
||||
req._crService.originalSocket = req.socket
|
||||
req._crService.startTime = Date.now()
|
||||
const savedToken = req._crService.queueToken
|
||||
const savedSocket = req._crService.originalSocket
|
||||
|
||||
// ⚠️ 重要:在调用前将 queueIncremented 设为 false
|
||||
// 因为 waitForConcurrencySlot 的 finally 块会负责清理排队计数
|
||||
// 如果在调用后设置,当 waitForConcurrencySlot 抛出异常时
|
||||
// 外层 catch 块会重复减少计数(finally 已经减过一次)
|
||||
queueIncremented = false
|
||||
|
||||
const slot = await waitForConcurrencySlot(req, res, validation.keyData.id, {
|
||||
concurrencyLimit,
|
||||
requestId,
|
||||
leaseSeconds,
|
||||
timeoutMs: queueConfig.concurrentRequestQueueTimeoutMs,
|
||||
pollIntervalMs: QUEUE_POLLING_CONFIG.pollIntervalMs,
|
||||
maxPollIntervalMs: QUEUE_POLLING_CONFIG.maxPollIntervalMs,
|
||||
backoffFactor: QUEUE_POLLING_CONFIG.backoffFactor,
|
||||
jitterRatio: QUEUE_POLLING_CONFIG.jitterRatio,
|
||||
maxRedisFailCount: queueConfig.concurrentRequestQueueMaxRedisFailCount
|
||||
})
|
||||
|
||||
// 7. 处理排队结果
|
||||
if (!slot.acquired) {
|
||||
if (slot.reason === 'client_disconnected') {
|
||||
// 客户端已断开,不返回响应(连接已关闭)
|
||||
logger.api(
|
||||
`🔌 Client disconnected while queuing for key: ${validation.keyData.id} (${validation.keyData.name})`
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
if (slot.reason === 'redis_error') {
|
||||
// Redis 连续失败,返回 503
|
||||
logger.error(
|
||||
`❌ Redis error during queue wait for key: ${validation.keyData.id} (${validation.keyData.name})`
|
||||
)
|
||||
return res.status(503).json({
|
||||
error: 'Service temporarily unavailable',
|
||||
message: 'Failed to acquire concurrency slot due to internal error'
|
||||
})
|
||||
}
|
||||
// 排队超时(使用 api 级别,与其他排队日志保持一致)
|
||||
logger.api(
|
||||
`⏰ Queue timeout for key: ${validation.keyData.id} (${validation.keyData.name}), waited: ${slot.waitTimeMs}ms`
|
||||
)
|
||||
// 已等待超时,建议客户端稍后重试
|
||||
// ⚠️ Retry-After 策略优化:
|
||||
// - 请求已经等了完整的 timeout 时间,说明系统负载较高
|
||||
// - 过早重试(如固定 5 秒)会加剧拥塞,导致更多超时
|
||||
// - 合理策略:使用 timeout 时间的一半作为重试间隔
|
||||
// - 最小值 5 秒,最大值 30 秒,避免极端情况
|
||||
const timeoutSeconds = Math.ceil(queueConfig.concurrentRequestQueueTimeoutMs / 1000)
|
||||
const retryAfterSeconds = Math.max(5, Math.min(30, Math.ceil(timeoutSeconds / 2)))
|
||||
res.set('Retry-After', String(retryAfterSeconds))
|
||||
return res.status(429).json({
|
||||
error: 'Queue timeout',
|
||||
message: `Request timed out waiting for concurrency slot. Limit: ${concurrencyLimit} concurrent requests, maxQueue: ${maxQueueSize}, Queue timeout: ${timeoutSeconds}s, waited: ${slot.waitTimeMs}ms`,
|
||||
currentConcurrency: concurrencyLimit,
|
||||
concurrencyLimit,
|
||||
maxQueueSize,
|
||||
queueTimeoutMs: queueConfig.concurrentRequestQueueTimeoutMs,
|
||||
waitTimeMs: slot.waitTimeMs,
|
||||
retryAfterSeconds
|
||||
})
|
||||
}
|
||||
|
||||
// 8. 排队成功,slot.acquired 表示已在 waitForConcurrencySlot 中获取到槽位
|
||||
logger.api(
|
||||
`✅ Queue wait completed for key: ${validation.keyData.id} (${validation.keyData.name}), ` +
|
||||
`waited: ${slot.waitTimeMs}ms`
|
||||
)
|
||||
hasConcurrencySlot = true
|
||||
setTemporaryConcurrencyCleanup()
|
||||
|
||||
// 9. ⚠️ 关键检查:排队等待结束后,验证客户端是否还在等待响应
|
||||
// 长时间排队后,客户端可能在应用层已放弃(如 Claude Code 的超时机制),
|
||||
// 但 TCP 连接仍然存活。此时继续处理请求是浪费资源。
|
||||
// 注意:如果发送了心跳,headersSent 会是 true,但这是正常的
|
||||
const postQueueSocket = req.socket
|
||||
// 只检查连接是否真正断开(destroyed/writableEnded/socketDestroyed)
|
||||
// headersSent 在心跳场景下是正常的,不应该作为放弃的依据
|
||||
if (res.destroyed || res.writableEnded || postQueueSocket?.destroyed) {
|
||||
logger.warn(
|
||||
`⚠️ Client no longer waiting after queue for key: ${validation.keyData.id} (${validation.keyData.name}), ` +
|
||||
`waited: ${slot.waitTimeMs}ms | destroyed: ${res.destroyed}, ` +
|
||||
`writableEnded: ${res.writableEnded}, socketDestroyed: ${postQueueSocket?.destroyed}`
|
||||
)
|
||||
// 释放刚获取的槽位
|
||||
hasConcurrencySlot = false
|
||||
await redis
|
||||
.decrConcurrency(validation.keyData.id, requestId)
|
||||
.catch((e) => logger.error('Failed to release slot after client abandoned:', e))
|
||||
// 不返回响应(客户端已不在等待)
|
||||
return
|
||||
}
|
||||
|
||||
// 10. ⚠️ 关键检查:验证 socket 身份是否改变
|
||||
// HTTP Keep-Alive 连接复用可能导致排队期间 socket 被其他请求使用
|
||||
// 验证方法:UUID token + socket 对象引用双重验证
|
||||
// 详见 design.md Decision 1: Socket 身份验证机制
|
||||
const queueData = req._crService
|
||||
const socketIdentityChanged =
|
||||
!queueData ||
|
||||
queueData.queueToken !== savedToken ||
|
||||
queueData.originalSocket !== savedSocket
|
||||
|
||||
if (socketIdentityChanged) {
|
||||
logger.error(
|
||||
`❌ [Queue] Socket identity changed during queue wait! ` +
|
||||
`key: ${validation.keyData.id} (${validation.keyData.name}), ` +
|
||||
`waited: ${slot.waitTimeMs}ms | ` +
|
||||
`tokenMatch: ${queueData?.queueToken === savedToken}, ` +
|
||||
`socketMatch: ${queueData?.originalSocket === savedSocket}`
|
||||
)
|
||||
// 释放刚获取的槽位
|
||||
hasConcurrencySlot = false
|
||||
await redis
|
||||
.decrConcurrency(validation.keyData.id, requestId)
|
||||
.catch((e) => logger.error('Failed to release slot after socket identity change:', e))
|
||||
// 记录 socket_changed 统计
|
||||
redis
|
||||
.incrConcurrencyQueueStats(validation.keyData.id, 'socket_changed')
|
||||
.catch((e) => logger.warn('Failed to record socket_changed stat:', e))
|
||||
// 不返回响应(socket 已被其他请求使用)
|
||||
return
|
||||
}
|
||||
} catch (queueError) {
|
||||
// 异常时清理资源,防止泄漏
|
||||
// 1. 清理排队计数(如果还没被 waitForConcurrencySlot 的 finally 清理)
|
||||
if (queueIncremented) {
|
||||
await redis
|
||||
.decrConcurrencyQueue(validation.keyData.id)
|
||||
.catch((e) => logger.error('Failed to cleanup queue count after error:', e))
|
||||
}
|
||||
|
||||
// 2. 防御性清理:如果 waitForConcurrencySlot 内部获取了槽位但在返回前异常
|
||||
// 虽然这种情况极少发生(统计记录的异常会被内部捕获),但为了安全起见
|
||||
// 尝试释放可能已获取的槽位。decrConcurrency 使用 ZREM,即使成员不存在也安全
|
||||
if (hasConcurrencySlot) {
|
||||
hasConcurrencySlot = false
|
||||
await redis
|
||||
.decrConcurrency(validation.keyData.id, requestId)
|
||||
.catch((e) =>
|
||||
logger.error('Failed to cleanup concurrency slot after queue error:', e)
|
||||
)
|
||||
}
|
||||
|
||||
throw queueError
|
||||
}
|
||||
}
|
||||
|
||||
const renewIntervalMs =
|
||||
@@ -358,6 +975,7 @@ const authenticateApiKey = async (req, res, next) => {
|
||||
const decrementConcurrency = async () => {
|
||||
if (!concurrencyDecremented) {
|
||||
concurrencyDecremented = true
|
||||
hasConcurrencySlot = false
|
||||
if (leaseRenewInterval) {
|
||||
clearInterval(leaseRenewInterval)
|
||||
leaseRenewInterval = null
|
||||
@@ -372,6 +990,11 @@ const authenticateApiKey = async (req, res, next) => {
|
||||
}
|
||||
}
|
||||
}
|
||||
// 升级为完整清理函数(包含 leaseRenewInterval 清理逻辑)
|
||||
// 此时请求已通过认证,后续由 res.close/req.close 事件触发清理
|
||||
if (hasConcurrencySlot) {
|
||||
concurrencyCleanup = decrementConcurrency
|
||||
}
|
||||
|
||||
// 监听最可靠的事件(避免重复监听)
|
||||
// res.on('close') 是最可靠的,会在连接关闭时触发
|
||||
@@ -697,6 +1320,7 @@ const authenticateApiKey = async (req, res, next) => {
|
||||
|
||||
return next()
|
||||
} catch (error) {
|
||||
authErrored = true
|
||||
const authDuration = Date.now() - startTime
|
||||
logger.error(`❌ Authentication middleware error (${authDuration}ms):`, {
|
||||
error: error.message,
|
||||
@@ -710,6 +1334,14 @@ const authenticateApiKey = async (req, res, next) => {
|
||||
error: 'Authentication error',
|
||||
message: 'Internal server error during authentication'
|
||||
})
|
||||
} finally {
|
||||
if (authErrored && typeof concurrencyCleanup === 'function') {
|
||||
try {
|
||||
await concurrencyCleanup()
|
||||
} catch (cleanupError) {
|
||||
logger.error('Failed to cleanup concurrency after auth error:', cleanupError)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user