mirror of
https://github.com/Wei-Shaw/claude-relay-service.git
synced 2026-01-22 16:43:35 +00:00
Merge pull request #800 from DaydreamCoding/feature/concurrency-queue
feat: enhance concurrency queue with health check and admin endpoints
This commit is contained in:
28
src/app.js
28
src/app.js
@@ -584,6 +584,20 @@ class Application {
|
||||
|
||||
// 使用 Lua 脚本批量清理所有过期项
|
||||
for (const key of keys) {
|
||||
// 跳过非 Sorted Set 类型的键(这些键有各自的清理逻辑)
|
||||
// - concurrency:queue:stats:* 是 Hash 类型
|
||||
// - concurrency:queue:wait_times:* 是 List 类型
|
||||
// - concurrency:queue:* (不含stats/wait_times) 是 String 类型
|
||||
if (
|
||||
key.startsWith('concurrency:queue:stats:') ||
|
||||
key.startsWith('concurrency:queue:wait_times:') ||
|
||||
(key.startsWith('concurrency:queue:') &&
|
||||
!key.includes(':stats:') &&
|
||||
!key.includes(':wait_times:'))
|
||||
) {
|
||||
continue
|
||||
}
|
||||
|
||||
try {
|
||||
const cleaned = await redis.client.eval(
|
||||
`
|
||||
@@ -633,6 +647,20 @@ class Application {
|
||||
// 然后启动定时清理任务
|
||||
userMessageQueueService.startCleanupTask()
|
||||
})
|
||||
|
||||
// 🚦 清理服务重启后残留的并发排队计数器
|
||||
// 多实例部署时建议关闭此开关,避免新实例启动时清空其他实例的队列计数
|
||||
// 可通过 DELETE /admin/concurrency/queue 接口手动清理
|
||||
const clearQueuesOnStartup = process.env.CLEAR_CONCURRENCY_QUEUES_ON_STARTUP !== 'false'
|
||||
if (clearQueuesOnStartup) {
|
||||
redis.clearAllConcurrencyQueues().catch((error) => {
|
||||
logger.error('❌ Error clearing concurrency queues on startup:', error)
|
||||
})
|
||||
} else {
|
||||
logger.info(
|
||||
'🚦 Skipping concurrency queue cleanup on startup (CLEAR_CONCURRENCY_QUEUES_ON_STARTUP=false)'
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
setupGracefulShutdown() {
|
||||
|
||||
@@ -8,6 +8,102 @@ const redis = require('../models/redis')
|
||||
const ClientValidator = require('../validators/clientValidator')
|
||||
const ClaudeCodeValidator = require('../validators/clients/claudeCodeValidator')
|
||||
const claudeRelayConfigService = require('../services/claudeRelayConfigService')
|
||||
const { calculateWaitTimeStats } = require('../utils/statsHelper')
|
||||
|
||||
// 工具函数
|
||||
function sleep(ms) {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms))
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查排队是否过载,决定是否应该快速失败
|
||||
* 详见 design.md Decision 7: 排队健康检查与快速失败
|
||||
*
|
||||
* @param {string} apiKeyId - API Key ID
|
||||
* @param {number} timeoutMs - 排队超时时间(毫秒)
|
||||
* @param {Object} queueConfig - 队列配置
|
||||
* @param {number} maxQueueSize - 最大排队数
|
||||
* @returns {Promise<Object>} { reject: boolean, reason?: string, estimatedWaitMs?: number, timeoutMs?: number }
|
||||
*/
|
||||
async function shouldRejectDueToOverload(apiKeyId, timeoutMs, queueConfig, maxQueueSize) {
|
||||
try {
|
||||
// 如果健康检查被禁用,直接返回不拒绝
|
||||
if (!queueConfig.concurrentRequestQueueHealthCheckEnabled) {
|
||||
return { reject: false, reason: 'health_check_disabled' }
|
||||
}
|
||||
|
||||
// 🔑 先检查当前队列长度
|
||||
const currentQueueCount = await redis.getConcurrencyQueueCount(apiKeyId).catch(() => 0)
|
||||
|
||||
// 队列为空,说明系统已恢复,跳过健康检查
|
||||
if (currentQueueCount === 0) {
|
||||
return { reject: false, reason: 'queue_empty', currentQueueCount: 0 }
|
||||
}
|
||||
|
||||
// 🔑 关键改进:只有当队列接近满载时才进行健康检查
|
||||
// 队列长度 <= maxQueueSize * 0.5 时,认为系统有足够余量,跳过健康检查
|
||||
// 这避免了在队列较短时过于保守地拒绝请求
|
||||
// 使用 ceil 确保小队列(如 maxQueueSize=3)时阈值为 2,即队列 <=1 时跳过
|
||||
const queueLoadThreshold = Math.ceil(maxQueueSize * 0.5)
|
||||
if (currentQueueCount <= queueLoadThreshold) {
|
||||
return {
|
||||
reject: false,
|
||||
reason: 'queue_not_loaded',
|
||||
currentQueueCount,
|
||||
queueLoadThreshold,
|
||||
maxQueueSize
|
||||
}
|
||||
}
|
||||
|
||||
// 获取该 API Key 的等待时间样本
|
||||
const waitTimes = await redis.getQueueWaitTimes(apiKeyId)
|
||||
const stats = calculateWaitTimeStats(waitTimes)
|
||||
|
||||
// 样本不足(< 10),跳过健康检查,避免冷启动误判
|
||||
if (!stats || stats.sampleCount < 10) {
|
||||
return { reject: false, reason: 'insufficient_samples', sampleCount: stats?.sampleCount || 0 }
|
||||
}
|
||||
|
||||
// P90 不可靠时也跳过(虽然 sampleCount >= 10 时 p90Unreliable 应该是 false)
|
||||
if (stats.p90Unreliable) {
|
||||
return { reject: false, reason: 'p90_unreliable', sampleCount: stats.sampleCount }
|
||||
}
|
||||
|
||||
// 计算健康阈值:P90 >= 超时时间 × 阈值 时拒绝
|
||||
const threshold = queueConfig.concurrentRequestQueueHealthThreshold || 0.8
|
||||
const maxAllowedP90 = timeoutMs * threshold
|
||||
|
||||
if (stats.p90 >= maxAllowedP90) {
|
||||
return {
|
||||
reject: true,
|
||||
reason: 'queue_overloaded',
|
||||
estimatedWaitMs: stats.p90,
|
||||
timeoutMs,
|
||||
threshold,
|
||||
sampleCount: stats.sampleCount,
|
||||
currentQueueCount,
|
||||
maxQueueSize
|
||||
}
|
||||
}
|
||||
|
||||
return { reject: false, p90: stats.p90, sampleCount: stats.sampleCount, currentQueueCount }
|
||||
} catch (error) {
|
||||
// 健康检查出错时不阻塞请求,记录警告并继续
|
||||
logger.warn(`Health check failed for ${apiKeyId}:`, error.message)
|
||||
return { reject: false, reason: 'health_check_error', error: error.message }
|
||||
}
|
||||
}
|
||||
|
||||
// 排队轮询配置常量(可通过配置文件覆盖)
|
||||
// 性能权衡:初始间隔越短响应越快,但 Redis QPS 越高
|
||||
// 当前配置:100 个等待者时约 250-300 QPS(指数退避后)
|
||||
const QUEUE_POLLING_CONFIG = {
|
||||
pollIntervalMs: 200, // 初始轮询间隔(毫秒)- 平衡响应速度和 Redis 压力
|
||||
maxPollIntervalMs: 2000, // 最大轮询间隔(毫秒)- 长时间等待时降低 Redis 压力
|
||||
backoffFactor: 1.5, // 指数退避系数
|
||||
jitterRatio: 0.2, // 抖动比例(±20%)- 防止惊群效应
|
||||
maxRedisFailCount: 5 // 连续 Redis 失败阈值(从 3 提高到 5,提高网络抖动容忍度)
|
||||
}
|
||||
|
||||
const FALLBACK_CONCURRENCY_CONFIG = {
|
||||
leaseSeconds: 300,
|
||||
@@ -128,9 +224,223 @@ function isTokenCountRequest(req) {
|
||||
return false
|
||||
}
|
||||
|
||||
/**
|
||||
* 等待并发槽位(排队机制核心)
|
||||
*
|
||||
* 采用「先占后检查」模式避免竞态条件:
|
||||
* - 每次轮询时尝试 incrConcurrency 占位
|
||||
* - 如果超限则 decrConcurrency 释放并继续等待
|
||||
* - 成功获取槽位后返回,调用方无需再次 incrConcurrency
|
||||
*
|
||||
* ⚠️ 重要清理责任说明:
|
||||
* - 排队计数:此函数的 finally 块负责调用 decrConcurrencyQueue 清理
|
||||
* - 并发槽位:当返回 acquired=true 时,槽位已被占用(通过 incrConcurrency)
|
||||
* 调用方必须在请求结束时调用 decrConcurrency 释放槽位
|
||||
* (已在 authenticateApiKey 的 finally 块中处理)
|
||||
*
|
||||
* @param {Object} req - Express 请求对象
|
||||
* @param {Object} res - Express 响应对象
|
||||
* @param {string} apiKeyId - API Key ID
|
||||
* @param {Object} queueOptions - 配置参数
|
||||
* @returns {Promise<Object>} { acquired: boolean, reason?: string, waitTimeMs: number }
|
||||
*/
|
||||
async function waitForConcurrencySlot(req, res, apiKeyId, queueOptions) {
|
||||
const {
|
||||
concurrencyLimit,
|
||||
requestId,
|
||||
leaseSeconds,
|
||||
timeoutMs,
|
||||
pollIntervalMs,
|
||||
maxPollIntervalMs,
|
||||
backoffFactor,
|
||||
jitterRatio,
|
||||
maxRedisFailCount: configMaxRedisFailCount
|
||||
} = queueOptions
|
||||
|
||||
let clientDisconnected = false
|
||||
// 追踪轮询过程中是否临时占用了槽位(用于异常时清理)
|
||||
// 工作流程:
|
||||
// 1. incrConcurrency 成功且 count <= limit 时,设置 internalSlotAcquired = true
|
||||
// 2. 统计记录完成后,设置 internalSlotAcquired = false 并返回(所有权转移给调用方)
|
||||
// 3. 如果在步骤 1-2 之间发生异常,finally 块会检测到 internalSlotAcquired = true 并释放槽位
|
||||
let internalSlotAcquired = false
|
||||
|
||||
// 监听客户端断开事件
|
||||
// ⚠️ 重要:必须监听 socket 的事件,而不是 req 的事件!
|
||||
// 原因:对于 POST 请求,当 body-parser 读取完请求体后,req(IncomingMessage 可读流)
|
||||
// 的 'close' 事件会立即触发,但这不代表客户端断开连接!客户端仍在等待响应。
|
||||
// socket 的 'close' 事件才是真正的连接关闭信号。
|
||||
const { socket } = req
|
||||
const onSocketClose = () => {
|
||||
clientDisconnected = true
|
||||
logger.debug(
|
||||
`🔌 [Queue] Socket closed during queue wait for API key ${apiKeyId}, requestId: ${requestId}`
|
||||
)
|
||||
}
|
||||
|
||||
if (socket) {
|
||||
socket.once('close', onSocketClose)
|
||||
}
|
||||
|
||||
// 检查 socket 是否在监听器注册前已被销毁(边界情况)
|
||||
if (socket?.destroyed) {
|
||||
clientDisconnected = true
|
||||
}
|
||||
|
||||
const startTime = Date.now()
|
||||
let pollInterval = pollIntervalMs
|
||||
let redisFailCount = 0
|
||||
// 优先使用配置中的值,否则使用默认值
|
||||
const maxRedisFailCount = configMaxRedisFailCount || QUEUE_POLLING_CONFIG.maxRedisFailCount
|
||||
|
||||
try {
|
||||
while (Date.now() - startTime < timeoutMs) {
|
||||
// 检测客户端是否断开(双重检查:事件标记 + socket 状态)
|
||||
// socket.destroyed 是同步检查,确保即使事件处理有延迟也能及时检测
|
||||
if (clientDisconnected || socket?.destroyed) {
|
||||
redis
|
||||
.incrConcurrencyQueueStats(apiKeyId, 'cancelled')
|
||||
.catch((e) => logger.warn('Failed to record cancelled stat:', e))
|
||||
return {
|
||||
acquired: false,
|
||||
reason: 'client_disconnected',
|
||||
waitTimeMs: Date.now() - startTime
|
||||
}
|
||||
}
|
||||
|
||||
// 尝试获取槽位(先占后检查)
|
||||
try {
|
||||
const count = await redis.incrConcurrency(apiKeyId, requestId, leaseSeconds)
|
||||
redisFailCount = 0 // 重置失败计数
|
||||
|
||||
if (count <= concurrencyLimit) {
|
||||
// 成功获取槽位!
|
||||
const waitTimeMs = Date.now() - startTime
|
||||
|
||||
// 槽位所有权转移说明:
|
||||
// 1. 此时槽位已通过 incrConcurrency 获取
|
||||
// 2. 先标记 internalSlotAcquired = true,确保异常时 finally 块能清理
|
||||
// 3. 统计操作完成后,清除标记并返回,所有权转移给调用方
|
||||
// 4. 调用方(authenticateApiKey)负责在请求结束时释放槽位
|
||||
|
||||
// 标记槽位已获取(用于异常时 finally 块清理)
|
||||
internalSlotAcquired = true
|
||||
|
||||
// 记录统计(非阻塞,fire-and-forget 模式)
|
||||
// ⚠️ 设计说明:
|
||||
// - 故意不 await 这些 Promise,因为统计记录不应阻塞请求处理
|
||||
// - 每个 Promise 都有独立的 .catch(),确保单个失败不影响其他
|
||||
// - 外层 .catch() 是防御性措施,处理 Promise.all 本身的异常
|
||||
// - 即使统计记录在函数返回后才完成/失败,也是安全的(仅日志记录)
|
||||
// - 统计数据丢失可接受,不影响核心业务逻辑
|
||||
Promise.all([
|
||||
redis
|
||||
.recordQueueWaitTime(apiKeyId, waitTimeMs)
|
||||
.catch((e) => logger.warn('Failed to record queue wait time:', e)),
|
||||
redis
|
||||
.recordGlobalQueueWaitTime(waitTimeMs)
|
||||
.catch((e) => logger.warn('Failed to record global wait time:', e)),
|
||||
redis
|
||||
.incrConcurrencyQueueStats(apiKeyId, 'success')
|
||||
.catch((e) => logger.warn('Failed to increment success stats:', e))
|
||||
]).catch((e) => logger.warn('Failed to record queue stats batch:', e))
|
||||
|
||||
// 成功返回前清除标记(所有权转移给调用方,由其负责释放)
|
||||
internalSlotAcquired = false
|
||||
return { acquired: true, waitTimeMs }
|
||||
}
|
||||
|
||||
// 超限,释放槽位继续等待
|
||||
try {
|
||||
await redis.decrConcurrency(apiKeyId, requestId)
|
||||
} catch (decrError) {
|
||||
// 释放失败时记录警告但继续轮询
|
||||
// 下次 incrConcurrency 会自然覆盖同一 requestId 的条目
|
||||
logger.warn(
|
||||
`Failed to release slot during polling for ${apiKeyId}, will retry:`,
|
||||
decrError
|
||||
)
|
||||
}
|
||||
} catch (redisError) {
|
||||
redisFailCount++
|
||||
logger.error(
|
||||
`Redis error in queue polling (${redisFailCount}/${maxRedisFailCount}):`,
|
||||
redisError
|
||||
)
|
||||
|
||||
if (redisFailCount >= maxRedisFailCount) {
|
||||
// 连续 Redis 失败,放弃排队
|
||||
return {
|
||||
acquired: false,
|
||||
reason: 'redis_error',
|
||||
waitTimeMs: Date.now() - startTime
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 指数退避等待
|
||||
await sleep(pollInterval)
|
||||
|
||||
// 计算下一次轮询间隔(指数退避 + 抖动)
|
||||
// 1. 先应用指数退避
|
||||
let nextInterval = pollInterval * backoffFactor
|
||||
// 2. 添加抖动防止惊群效应(±jitterRatio 范围内的随机偏移)
|
||||
// 抖动范围:[-jitterRatio, +jitterRatio],例如 jitterRatio=0.2 时为 ±20%
|
||||
// 这是预期行为:负抖动可使间隔略微缩短,正抖动可使间隔略微延长
|
||||
// 目的是分散多个等待者的轮询时间点,避免同时请求 Redis
|
||||
const jitter = nextInterval * jitterRatio * (Math.random() * 2 - 1)
|
||||
nextInterval = nextInterval + jitter
|
||||
// 3. 确保在合理范围内:最小 1ms,最大 maxPollIntervalMs
|
||||
// Math.max(1, ...) 保证即使负抖动也不会产生 ≤0 的间隔
|
||||
pollInterval = Math.max(1, Math.min(nextInterval, maxPollIntervalMs))
|
||||
}
|
||||
|
||||
// 超时
|
||||
redis
|
||||
.incrConcurrencyQueueStats(apiKeyId, 'timeout')
|
||||
.catch((e) => logger.warn('Failed to record timeout stat:', e))
|
||||
return { acquired: false, reason: 'timeout', waitTimeMs: Date.now() - startTime }
|
||||
} finally {
|
||||
// 确保清理:
|
||||
// 1. 减少排队计数(排队计数在调用方已增加,这里负责减少)
|
||||
try {
|
||||
await redis.decrConcurrencyQueue(apiKeyId)
|
||||
} catch (cleanupError) {
|
||||
// 清理失败记录错误(可能导致计数泄漏,但有 TTL 保护)
|
||||
logger.error(
|
||||
`Failed to decrement queue count in finally block for ${apiKeyId}:`,
|
||||
cleanupError
|
||||
)
|
||||
}
|
||||
|
||||
// 2. 如果内部获取了槽位但未正常返回(异常路径),释放槽位
|
||||
if (internalSlotAcquired) {
|
||||
try {
|
||||
await redis.decrConcurrency(apiKeyId, requestId)
|
||||
logger.warn(
|
||||
`⚠️ Released orphaned concurrency slot in finally block for ${apiKeyId}, requestId: ${requestId}`
|
||||
)
|
||||
} catch (slotCleanupError) {
|
||||
logger.error(
|
||||
`Failed to release orphaned concurrency slot for ${apiKeyId}:`,
|
||||
slotCleanupError
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// 清理 socket 事件监听器
|
||||
if (socket) {
|
||||
socket.removeListener('close', onSocketClose)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 🔑 API Key验证中间件(优化版)
|
||||
const authenticateApiKey = async (req, res, next) => {
|
||||
const startTime = Date.now()
|
||||
let authErrored = false
|
||||
let concurrencyCleanup = null
|
||||
let hasConcurrencySlot = false
|
||||
|
||||
try {
|
||||
// 安全提取API Key,支持多种格式(包括Gemini CLI支持)
|
||||
@@ -265,39 +575,346 @@ const authenticateApiKey = async (req, res, next) => {
|
||||
}
|
||||
const requestId = uuidv4()
|
||||
|
||||
// ⚠️ 优化后的 Connection: close 设置策略
|
||||
// 问题背景:HTTP Keep-Alive 使多个请求共用同一个 TCP 连接
|
||||
// 当第一个请求正在处理,第二个请求进入排队时,它们共用同一个 socket
|
||||
// 如果客户端超时关闭连接,两个请求都会受影响
|
||||
// 优化方案:只有在请求实际进入排队时才设置 Connection: close
|
||||
// 未排队的请求保持 Keep-Alive,避免不必要的 TCP 握手开销
|
||||
// 详见 design.md Decision 2: Connection: close 设置时机
|
||||
// 注意:Connection: close 将在下方代码实际进入排队时设置(第 637 行左右)
|
||||
|
||||
// ============================================================
|
||||
// 🔒 并发槽位状态管理说明
|
||||
// ============================================================
|
||||
// 此函数中有两个关键状态变量:
|
||||
// - hasConcurrencySlot: 当前是否持有并发槽位
|
||||
// - concurrencyCleanup: 错误时调用的清理函数
|
||||
//
|
||||
// 状态转换流程:
|
||||
// 1. incrConcurrency 成功 → hasConcurrencySlot=true, 设置临时清理函数
|
||||
// 2. 若超限 → 释放槽位,hasConcurrencySlot=false, concurrencyCleanup=null
|
||||
// 3. 若排队成功 → hasConcurrencySlot=true, 升级为完整清理函数(含 interval 清理)
|
||||
// 4. 请求结束(res.close/req.close)→ 调用 decrementConcurrency 释放
|
||||
// 5. 认证错误 → finally 块调用 concurrencyCleanup 释放
|
||||
//
|
||||
// 为什么需要两种清理函数?
|
||||
// - 临时清理:在排队/认证过程中出错时使用,只释放槽位
|
||||
// - 完整清理:请求正常开始后使用,还需清理 leaseRenewInterval
|
||||
// ============================================================
|
||||
const setTemporaryConcurrencyCleanup = () => {
|
||||
concurrencyCleanup = async () => {
|
||||
if (!hasConcurrencySlot) {
|
||||
return
|
||||
}
|
||||
hasConcurrencySlot = false
|
||||
try {
|
||||
await redis.decrConcurrency(validation.keyData.id, requestId)
|
||||
} catch (cleanupError) {
|
||||
logger.error(
|
||||
`Failed to decrement concurrency after auth error for key ${validation.keyData.id}:`,
|
||||
cleanupError
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const currentConcurrency = await redis.incrConcurrency(
|
||||
validation.keyData.id,
|
||||
requestId,
|
||||
leaseSeconds
|
||||
)
|
||||
hasConcurrencySlot = true
|
||||
setTemporaryConcurrencyCleanup()
|
||||
logger.api(
|
||||
`📈 Incremented concurrency for key: ${validation.keyData.id} (${validation.keyData.name}), current: ${currentConcurrency}, limit: ${concurrencyLimit}`
|
||||
)
|
||||
|
||||
if (currentConcurrency > concurrencyLimit) {
|
||||
// 如果超过限制,立即减少计数(添加 try-catch 防止异常导致并发泄漏)
|
||||
// 1. 先释放刚占用的槽位
|
||||
try {
|
||||
const newCount = await redis.decrConcurrency(validation.keyData.id, requestId)
|
||||
logger.api(
|
||||
`📉 Decremented concurrency (429 rejected) for key: ${validation.keyData.id} (${validation.keyData.name}), new count: ${newCount}`
|
||||
)
|
||||
await redis.decrConcurrency(validation.keyData.id, requestId)
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Failed to decrement concurrency after limit exceeded for key ${validation.keyData.id}:`,
|
||||
error
|
||||
)
|
||||
}
|
||||
logger.security(
|
||||
`🚦 Concurrency limit exceeded for key: ${validation.keyData.id} (${
|
||||
validation.keyData.name
|
||||
}), current: ${currentConcurrency - 1}, limit: ${concurrencyLimit}`
|
||||
hasConcurrencySlot = false
|
||||
concurrencyCleanup = null
|
||||
|
||||
// 2. 获取排队配置
|
||||
const queueConfig = await claudeRelayConfigService.getConfig()
|
||||
|
||||
// 3. 排队功能未启用,直接返回 429(保持现有行为)
|
||||
if (!queueConfig.concurrentRequestQueueEnabled) {
|
||||
logger.security(
|
||||
`🚦 Concurrency limit exceeded for key: ${validation.keyData.id} (${
|
||||
validation.keyData.name
|
||||
}), current: ${currentConcurrency - 1}, limit: ${concurrencyLimit}`
|
||||
)
|
||||
// 建议客户端在短暂延迟后重试(并发场景下通常很快会有槽位释放)
|
||||
res.set('Retry-After', '1')
|
||||
return res.status(429).json({
|
||||
error: 'Concurrency limit exceeded',
|
||||
message: `Too many concurrent requests. Limit: ${concurrencyLimit} concurrent requests`,
|
||||
currentConcurrency: currentConcurrency - 1,
|
||||
concurrencyLimit
|
||||
})
|
||||
}
|
||||
|
||||
// 4. 计算最大排队数
|
||||
const maxQueueSize = Math.max(
|
||||
concurrencyLimit * queueConfig.concurrentRequestQueueMaxSizeMultiplier,
|
||||
queueConfig.concurrentRequestQueueMaxSize
|
||||
)
|
||||
return res.status(429).json({
|
||||
error: 'Concurrency limit exceeded',
|
||||
message: `Too many concurrent requests. Limit: ${concurrencyLimit} concurrent requests`,
|
||||
currentConcurrency: currentConcurrency - 1,
|
||||
concurrencyLimit
|
||||
})
|
||||
|
||||
// 4.5 排队健康检查:过载时快速失败
|
||||
// 详见 design.md Decision 7: 排队健康检查与快速失败
|
||||
const overloadCheck = await shouldRejectDueToOverload(
|
||||
validation.keyData.id,
|
||||
queueConfig.concurrentRequestQueueTimeoutMs,
|
||||
queueConfig,
|
||||
maxQueueSize
|
||||
)
|
||||
if (overloadCheck.reject) {
|
||||
// 使用健康检查返回的当前排队数,避免重复调用 Redis
|
||||
const currentQueueCount = overloadCheck.currentQueueCount || 0
|
||||
logger.api(
|
||||
`🚨 Queue overloaded for key: ${validation.keyData.id} (${validation.keyData.name}), ` +
|
||||
`P90=${overloadCheck.estimatedWaitMs}ms, timeout=${overloadCheck.timeoutMs}ms, ` +
|
||||
`threshold=${overloadCheck.threshold}, samples=${overloadCheck.sampleCount}, ` +
|
||||
`concurrency=${concurrencyLimit}, queue=${currentQueueCount}/${maxQueueSize}`
|
||||
)
|
||||
// 记录被拒绝的过载统计
|
||||
redis
|
||||
.incrConcurrencyQueueStats(validation.keyData.id, 'rejected_overload')
|
||||
.catch((e) => logger.warn('Failed to record rejected_overload stat:', e))
|
||||
// 返回 429 + Retry-After,让客户端稍后重试
|
||||
const retryAfterSeconds = 30
|
||||
res.set('Retry-After', String(retryAfterSeconds))
|
||||
return res.status(429).json({
|
||||
error: 'Queue overloaded',
|
||||
message: `Queue is overloaded. Estimated wait time (${overloadCheck.estimatedWaitMs}ms) exceeds threshold. Limit: ${concurrencyLimit} concurrent requests, queue: ${currentQueueCount}/${maxQueueSize}. Please retry later.`,
|
||||
currentConcurrency: concurrencyLimit,
|
||||
concurrencyLimit,
|
||||
queueCount: currentQueueCount,
|
||||
maxQueueSize,
|
||||
estimatedWaitMs: overloadCheck.estimatedWaitMs,
|
||||
timeoutMs: overloadCheck.timeoutMs,
|
||||
queueTimeoutMs: queueConfig.concurrentRequestQueueTimeoutMs,
|
||||
retryAfterSeconds
|
||||
})
|
||||
}
|
||||
|
||||
// 5. 尝试进入排队(原子操作:先增加再检查,避免竞态条件)
|
||||
let queueIncremented = false
|
||||
try {
|
||||
const newQueueCount = await redis.incrConcurrencyQueue(
|
||||
validation.keyData.id,
|
||||
queueConfig.concurrentRequestQueueTimeoutMs
|
||||
)
|
||||
queueIncremented = true
|
||||
|
||||
if (newQueueCount > maxQueueSize) {
|
||||
// 超过最大排队数,立即释放并返回 429
|
||||
await redis.decrConcurrencyQueue(validation.keyData.id)
|
||||
queueIncremented = false
|
||||
logger.api(
|
||||
`🚦 Concurrency queue full for key: ${validation.keyData.id} (${validation.keyData.name}), ` +
|
||||
`queue: ${newQueueCount - 1}, maxQueue: ${maxQueueSize}`
|
||||
)
|
||||
// 队列已满,建议客户端在排队超时时间后重试
|
||||
const retryAfterSeconds = Math.ceil(queueConfig.concurrentRequestQueueTimeoutMs / 1000)
|
||||
res.set('Retry-After', String(retryAfterSeconds))
|
||||
return res.status(429).json({
|
||||
error: 'Concurrency queue full',
|
||||
message: `Too many requests waiting in queue. Limit: ${concurrencyLimit} concurrent requests, queue: ${newQueueCount - 1}/${maxQueueSize}, timeout: ${retryAfterSeconds}s`,
|
||||
currentConcurrency: concurrencyLimit,
|
||||
concurrencyLimit,
|
||||
queueCount: newQueueCount - 1,
|
||||
maxQueueSize,
|
||||
queueTimeoutMs: queueConfig.concurrentRequestQueueTimeoutMs,
|
||||
retryAfterSeconds
|
||||
})
|
||||
}
|
||||
|
||||
// 6. 已成功进入排队,记录统计并开始等待槽位
|
||||
logger.api(
|
||||
`⏳ Request entering queue for key: ${validation.keyData.id} (${validation.keyData.name}), ` +
|
||||
`queue position: ${newQueueCount}`
|
||||
)
|
||||
redis
|
||||
.incrConcurrencyQueueStats(validation.keyData.id, 'entered')
|
||||
.catch((e) => logger.warn('Failed to record entered stat:', e))
|
||||
|
||||
// ⚠️ 仅在请求实际进入排队时设置 Connection: close
|
||||
// 详见 design.md Decision 2: Connection: close 设置时机
|
||||
// 未排队的请求保持 Keep-Alive,避免不必要的 TCP 握手开销
|
||||
if (!res.headersSent) {
|
||||
res.setHeader('Connection', 'close')
|
||||
logger.api(
|
||||
`🔌 [Queue] Set Connection: close for queued request, key: ${validation.keyData.id}`
|
||||
)
|
||||
}
|
||||
|
||||
// ⚠️ 记录排队开始时的 socket 标识,用于排队完成后验证
|
||||
// 问题背景:HTTP Keep-Alive 连接复用时,长时间排队可能导致 socket 被其他请求使用
|
||||
// 验证方法:使用 UUID token + socket 对象引用双重验证
|
||||
// 详见 design.md Decision 1: Socket 身份验证机制
|
||||
req._crService = req._crService || {}
|
||||
req._crService.queueToken = uuidv4()
|
||||
req._crService.originalSocket = req.socket
|
||||
req._crService.startTime = Date.now()
|
||||
const savedToken = req._crService.queueToken
|
||||
const savedSocket = req._crService.originalSocket
|
||||
|
||||
// ⚠️ 重要:在调用前将 queueIncremented 设为 false
|
||||
// 因为 waitForConcurrencySlot 的 finally 块会负责清理排队计数
|
||||
// 如果在调用后设置,当 waitForConcurrencySlot 抛出异常时
|
||||
// 外层 catch 块会重复减少计数(finally 已经减过一次)
|
||||
queueIncremented = false
|
||||
|
||||
const slot = await waitForConcurrencySlot(req, res, validation.keyData.id, {
|
||||
concurrencyLimit,
|
||||
requestId,
|
||||
leaseSeconds,
|
||||
timeoutMs: queueConfig.concurrentRequestQueueTimeoutMs,
|
||||
pollIntervalMs: QUEUE_POLLING_CONFIG.pollIntervalMs,
|
||||
maxPollIntervalMs: QUEUE_POLLING_CONFIG.maxPollIntervalMs,
|
||||
backoffFactor: QUEUE_POLLING_CONFIG.backoffFactor,
|
||||
jitterRatio: QUEUE_POLLING_CONFIG.jitterRatio,
|
||||
maxRedisFailCount: queueConfig.concurrentRequestQueueMaxRedisFailCount
|
||||
})
|
||||
|
||||
// 7. 处理排队结果
|
||||
if (!slot.acquired) {
|
||||
if (slot.reason === 'client_disconnected') {
|
||||
// 客户端已断开,不返回响应(连接已关闭)
|
||||
logger.api(
|
||||
`🔌 Client disconnected while queuing for key: ${validation.keyData.id} (${validation.keyData.name})`
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
if (slot.reason === 'redis_error') {
|
||||
// Redis 连续失败,返回 503
|
||||
logger.error(
|
||||
`❌ Redis error during queue wait for key: ${validation.keyData.id} (${validation.keyData.name})`
|
||||
)
|
||||
return res.status(503).json({
|
||||
error: 'Service temporarily unavailable',
|
||||
message: 'Failed to acquire concurrency slot due to internal error'
|
||||
})
|
||||
}
|
||||
// 排队超时(使用 api 级别,与其他排队日志保持一致)
|
||||
logger.api(
|
||||
`⏰ Queue timeout for key: ${validation.keyData.id} (${validation.keyData.name}), waited: ${slot.waitTimeMs}ms`
|
||||
)
|
||||
// 已等待超时,建议客户端稍后重试
|
||||
// ⚠️ Retry-After 策略优化:
|
||||
// - 请求已经等了完整的 timeout 时间,说明系统负载较高
|
||||
// - 过早重试(如固定 5 秒)会加剧拥塞,导致更多超时
|
||||
// - 合理策略:使用 timeout 时间的一半作为重试间隔
|
||||
// - 最小值 5 秒,最大值 30 秒,避免极端情况
|
||||
const timeoutSeconds = Math.ceil(queueConfig.concurrentRequestQueueTimeoutMs / 1000)
|
||||
const retryAfterSeconds = Math.max(5, Math.min(30, Math.ceil(timeoutSeconds / 2)))
|
||||
res.set('Retry-After', String(retryAfterSeconds))
|
||||
return res.status(429).json({
|
||||
error: 'Queue timeout',
|
||||
message: `Request timed out waiting for concurrency slot. Limit: ${concurrencyLimit} concurrent requests, maxQueue: ${maxQueueSize}, Queue timeout: ${timeoutSeconds}s, waited: ${slot.waitTimeMs}ms`,
|
||||
currentConcurrency: concurrencyLimit,
|
||||
concurrencyLimit,
|
||||
maxQueueSize,
|
||||
queueTimeoutMs: queueConfig.concurrentRequestQueueTimeoutMs,
|
||||
waitTimeMs: slot.waitTimeMs,
|
||||
retryAfterSeconds
|
||||
})
|
||||
}
|
||||
|
||||
// 8. 排队成功,slot.acquired 表示已在 waitForConcurrencySlot 中获取到槽位
|
||||
logger.api(
|
||||
`✅ Queue wait completed for key: ${validation.keyData.id} (${validation.keyData.name}), ` +
|
||||
`waited: ${slot.waitTimeMs}ms`
|
||||
)
|
||||
hasConcurrencySlot = true
|
||||
setTemporaryConcurrencyCleanup()
|
||||
|
||||
// 9. ⚠️ 关键检查:排队等待结束后,验证客户端是否还在等待响应
|
||||
// 长时间排队后,客户端可能在应用层已放弃(如 Claude Code 的超时机制),
|
||||
// 但 TCP 连接仍然存活。此时继续处理请求是浪费资源。
|
||||
// 注意:如果发送了心跳,headersSent 会是 true,但这是正常的
|
||||
const postQueueSocket = req.socket
|
||||
// 只检查连接是否真正断开(destroyed/writableEnded/socketDestroyed)
|
||||
// headersSent 在心跳场景下是正常的,不应该作为放弃的依据
|
||||
if (res.destroyed || res.writableEnded || postQueueSocket?.destroyed) {
|
||||
logger.warn(
|
||||
`⚠️ Client no longer waiting after queue for key: ${validation.keyData.id} (${validation.keyData.name}), ` +
|
||||
`waited: ${slot.waitTimeMs}ms | destroyed: ${res.destroyed}, ` +
|
||||
`writableEnded: ${res.writableEnded}, socketDestroyed: ${postQueueSocket?.destroyed}`
|
||||
)
|
||||
// 释放刚获取的槽位
|
||||
hasConcurrencySlot = false
|
||||
await redis
|
||||
.decrConcurrency(validation.keyData.id, requestId)
|
||||
.catch((e) => logger.error('Failed to release slot after client abandoned:', e))
|
||||
// 不返回响应(客户端已不在等待)
|
||||
return
|
||||
}
|
||||
|
||||
// 10. ⚠️ 关键检查:验证 socket 身份是否改变
|
||||
// HTTP Keep-Alive 连接复用可能导致排队期间 socket 被其他请求使用
|
||||
// 验证方法:UUID token + socket 对象引用双重验证
|
||||
// 详见 design.md Decision 1: Socket 身份验证机制
|
||||
const queueData = req._crService
|
||||
const socketIdentityChanged =
|
||||
!queueData ||
|
||||
queueData.queueToken !== savedToken ||
|
||||
queueData.originalSocket !== savedSocket
|
||||
|
||||
if (socketIdentityChanged) {
|
||||
logger.error(
|
||||
`❌ [Queue] Socket identity changed during queue wait! ` +
|
||||
`key: ${validation.keyData.id} (${validation.keyData.name}), ` +
|
||||
`waited: ${slot.waitTimeMs}ms | ` +
|
||||
`tokenMatch: ${queueData?.queueToken === savedToken}, ` +
|
||||
`socketMatch: ${queueData?.originalSocket === savedSocket}`
|
||||
)
|
||||
// 释放刚获取的槽位
|
||||
hasConcurrencySlot = false
|
||||
await redis
|
||||
.decrConcurrency(validation.keyData.id, requestId)
|
||||
.catch((e) => logger.error('Failed to release slot after socket identity change:', e))
|
||||
// 记录 socket_changed 统计
|
||||
redis
|
||||
.incrConcurrencyQueueStats(validation.keyData.id, 'socket_changed')
|
||||
.catch((e) => logger.warn('Failed to record socket_changed stat:', e))
|
||||
// 不返回响应(socket 已被其他请求使用)
|
||||
return
|
||||
}
|
||||
} catch (queueError) {
|
||||
// 异常时清理资源,防止泄漏
|
||||
// 1. 清理排队计数(如果还没被 waitForConcurrencySlot 的 finally 清理)
|
||||
if (queueIncremented) {
|
||||
await redis
|
||||
.decrConcurrencyQueue(validation.keyData.id)
|
||||
.catch((e) => logger.error('Failed to cleanup queue count after error:', e))
|
||||
}
|
||||
|
||||
// 2. 防御性清理:如果 waitForConcurrencySlot 内部获取了槽位但在返回前异常
|
||||
// 虽然这种情况极少发生(统计记录的异常会被内部捕获),但为了安全起见
|
||||
// 尝试释放可能已获取的槽位。decrConcurrency 使用 ZREM,即使成员不存在也安全
|
||||
if (hasConcurrencySlot) {
|
||||
hasConcurrencySlot = false
|
||||
await redis
|
||||
.decrConcurrency(validation.keyData.id, requestId)
|
||||
.catch((e) =>
|
||||
logger.error('Failed to cleanup concurrency slot after queue error:', e)
|
||||
)
|
||||
}
|
||||
|
||||
throw queueError
|
||||
}
|
||||
}
|
||||
|
||||
const renewIntervalMs =
|
||||
@@ -358,6 +975,7 @@ const authenticateApiKey = async (req, res, next) => {
|
||||
const decrementConcurrency = async () => {
|
||||
if (!concurrencyDecremented) {
|
||||
concurrencyDecremented = true
|
||||
hasConcurrencySlot = false
|
||||
if (leaseRenewInterval) {
|
||||
clearInterval(leaseRenewInterval)
|
||||
leaseRenewInterval = null
|
||||
@@ -372,6 +990,11 @@ const authenticateApiKey = async (req, res, next) => {
|
||||
}
|
||||
}
|
||||
}
|
||||
// 升级为完整清理函数(包含 leaseRenewInterval 清理逻辑)
|
||||
// 此时请求已通过认证,后续由 res.close/req.close 事件触发清理
|
||||
if (hasConcurrencySlot) {
|
||||
concurrencyCleanup = decrementConcurrency
|
||||
}
|
||||
|
||||
// 监听最可靠的事件(避免重复监听)
|
||||
// res.on('close') 是最可靠的,会在连接关闭时触发
|
||||
@@ -697,6 +1320,7 @@ const authenticateApiKey = async (req, res, next) => {
|
||||
|
||||
return next()
|
||||
} catch (error) {
|
||||
authErrored = true
|
||||
const authDuration = Date.now() - startTime
|
||||
logger.error(`❌ Authentication middleware error (${authDuration}ms):`, {
|
||||
error: error.message,
|
||||
@@ -710,6 +1334,14 @@ const authenticateApiKey = async (req, res, next) => {
|
||||
error: 'Authentication error',
|
||||
message: 'Internal server error during authentication'
|
||||
})
|
||||
} finally {
|
||||
if (authErrored && typeof concurrencyCleanup === 'function') {
|
||||
try {
|
||||
await concurrencyCleanup()
|
||||
} catch (cleanupError) {
|
||||
logger.error('Failed to cleanup concurrency after auth error:', cleanupError)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -50,6 +50,18 @@ function getWeekStringInTimezone(date = new Date()) {
|
||||
return `${year}-W${String(weekNumber).padStart(2, '0')}`
|
||||
}
|
||||
|
||||
// 并发队列相关常量
|
||||
const QUEUE_STATS_TTL_SECONDS = 86400 * 7 // 统计计数保留 7 天
|
||||
const WAIT_TIME_TTL_SECONDS = 86400 // 等待时间样本保留 1 天(滚动窗口,无需长期保留)
|
||||
// 等待时间样本数配置(提高统计置信度)
|
||||
// - 每 API Key 从 100 提高到 500:提供更稳定的 P99 估计
|
||||
// - 全局从 500 提高到 2000:支持更高精度的 P99.9 分析
|
||||
// - 内存开销约 12-20KB(Redis quicklist 每元素 1-10 字节),可接受
|
||||
// 详见 design.md Decision 5: 等待时间统计样本数
|
||||
const WAIT_TIME_SAMPLES_PER_KEY = 500 // 每个 API Key 保留的等待时间样本数
|
||||
const WAIT_TIME_SAMPLES_GLOBAL = 2000 // 全局保留的等待时间样本数
|
||||
const QUEUE_TTL_BUFFER_SECONDS = 30 // 排队计数器TTL缓冲时间
|
||||
|
||||
class RedisClient {
|
||||
constructor() {
|
||||
this.client = null
|
||||
@@ -2769,4 +2781,380 @@ redisClient.scanUserMessageQueueLocks = async function () {
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================
|
||||
// 🚦 API Key 并发请求排队方法
|
||||
// ============================================
|
||||
|
||||
/**
|
||||
* 增加排队计数(使用 Lua 脚本确保原子性)
|
||||
* @param {string} apiKeyId - API Key ID
|
||||
* @param {number} [timeoutMs=60000] - 排队超时时间(毫秒),用于计算 TTL
|
||||
* @returns {Promise<number>} 增加后的排队数量
|
||||
*/
|
||||
redisClient.incrConcurrencyQueue = async function (apiKeyId, timeoutMs = 60000) {
|
||||
const key = `concurrency:queue:${apiKeyId}`
|
||||
try {
|
||||
// 使用 Lua 脚本确保 INCR 和 EXPIRE 原子执行,防止进程崩溃导致计数器泄漏
|
||||
// TTL = 超时时间 + 缓冲时间(确保键不会在请求还在等待时过期)
|
||||
const ttlSeconds = Math.ceil(timeoutMs / 1000) + QUEUE_TTL_BUFFER_SECONDS
|
||||
const script = `
|
||||
local count = redis.call('INCR', KEYS[1])
|
||||
redis.call('EXPIRE', KEYS[1], ARGV[1])
|
||||
return count
|
||||
`
|
||||
const count = await this.client.eval(script, 1, key, String(ttlSeconds))
|
||||
logger.database(
|
||||
`🚦 Incremented queue count for key ${apiKeyId}: ${count} (TTL: ${ttlSeconds}s)`
|
||||
)
|
||||
return parseInt(count)
|
||||
} catch (error) {
|
||||
logger.error(`Failed to increment concurrency queue for ${apiKeyId}:`, error)
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 减少排队计数(使用 Lua 脚本确保原子性)
|
||||
* @param {string} apiKeyId - API Key ID
|
||||
* @returns {Promise<number>} 减少后的排队数量
|
||||
*/
|
||||
redisClient.decrConcurrencyQueue = async function (apiKeyId) {
|
||||
const key = `concurrency:queue:${apiKeyId}`
|
||||
try {
|
||||
// 使用 Lua 脚本确保 DECR 和 DEL 原子执行,防止进程崩溃导致计数器残留
|
||||
const script = `
|
||||
local count = redis.call('DECR', KEYS[1])
|
||||
if count <= 0 then
|
||||
redis.call('DEL', KEYS[1])
|
||||
return 0
|
||||
end
|
||||
return count
|
||||
`
|
||||
const count = await this.client.eval(script, 1, key)
|
||||
const result = parseInt(count)
|
||||
if (result === 0) {
|
||||
logger.database(`🚦 Queue count for key ${apiKeyId} is 0, removed key`)
|
||||
} else {
|
||||
logger.database(`🚦 Decremented queue count for key ${apiKeyId}: ${result}`)
|
||||
}
|
||||
return result
|
||||
} catch (error) {
|
||||
logger.error(`Failed to decrement concurrency queue for ${apiKeyId}:`, error)
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取排队计数
|
||||
* @param {string} apiKeyId - API Key ID
|
||||
* @returns {Promise<number>} 当前排队数量
|
||||
*/
|
||||
redisClient.getConcurrencyQueueCount = async function (apiKeyId) {
|
||||
const key = `concurrency:queue:${apiKeyId}`
|
||||
try {
|
||||
const count = await this.client.get(key)
|
||||
return parseInt(count || 0)
|
||||
} catch (error) {
|
||||
logger.error(`Failed to get concurrency queue count for ${apiKeyId}:`, error)
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 清空排队计数
|
||||
* @param {string} apiKeyId - API Key ID
|
||||
* @returns {Promise<boolean>} 是否成功清空
|
||||
*/
|
||||
redisClient.clearConcurrencyQueue = async function (apiKeyId) {
|
||||
const key = `concurrency:queue:${apiKeyId}`
|
||||
try {
|
||||
await this.client.del(key)
|
||||
logger.database(`🚦 Cleared queue count for key ${apiKeyId}`)
|
||||
return true
|
||||
} catch (error) {
|
||||
logger.error(`Failed to clear concurrency queue for ${apiKeyId}:`, error)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 扫描所有排队计数器
|
||||
* @returns {Promise<string[]>} API Key ID 列表
|
||||
*/
|
||||
redisClient.scanConcurrencyQueueKeys = async function () {
|
||||
const apiKeyIds = []
|
||||
let cursor = '0'
|
||||
let iterations = 0
|
||||
const MAX_ITERATIONS = 1000
|
||||
|
||||
try {
|
||||
do {
|
||||
const [newCursor, keys] = await this.client.scan(
|
||||
cursor,
|
||||
'MATCH',
|
||||
'concurrency:queue:*',
|
||||
'COUNT',
|
||||
100
|
||||
)
|
||||
cursor = newCursor
|
||||
iterations++
|
||||
|
||||
for (const key of keys) {
|
||||
// 排除统计和等待时间相关的键
|
||||
if (
|
||||
key.startsWith('concurrency:queue:stats:') ||
|
||||
key.startsWith('concurrency:queue:wait_times:')
|
||||
) {
|
||||
continue
|
||||
}
|
||||
const apiKeyId = key.replace('concurrency:queue:', '')
|
||||
apiKeyIds.push(apiKeyId)
|
||||
}
|
||||
|
||||
if (iterations >= MAX_ITERATIONS) {
|
||||
logger.warn(
|
||||
`🚦 Concurrency queue: SCAN reached max iterations (${MAX_ITERATIONS}), stopping early`,
|
||||
{ foundQueues: apiKeyIds.length }
|
||||
)
|
||||
break
|
||||
}
|
||||
} while (cursor !== '0')
|
||||
|
||||
return apiKeyIds
|
||||
} catch (error) {
|
||||
logger.error('Failed to scan concurrency queue keys:', error)
|
||||
return []
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理所有排队计数器(用于服务重启)
|
||||
* @returns {Promise<number>} 清理的计数器数量
|
||||
*/
|
||||
redisClient.clearAllConcurrencyQueues = async function () {
|
||||
let cleared = 0
|
||||
let cursor = '0'
|
||||
let iterations = 0
|
||||
const MAX_ITERATIONS = 1000
|
||||
|
||||
try {
|
||||
do {
|
||||
const [newCursor, keys] = await this.client.scan(
|
||||
cursor,
|
||||
'MATCH',
|
||||
'concurrency:queue:*',
|
||||
'COUNT',
|
||||
100
|
||||
)
|
||||
cursor = newCursor
|
||||
iterations++
|
||||
|
||||
// 只删除排队计数器,保留统计数据
|
||||
const queueKeys = keys.filter(
|
||||
(key) =>
|
||||
!key.startsWith('concurrency:queue:stats:') &&
|
||||
!key.startsWith('concurrency:queue:wait_times:')
|
||||
)
|
||||
|
||||
if (queueKeys.length > 0) {
|
||||
await this.client.del(...queueKeys)
|
||||
cleared += queueKeys.length
|
||||
}
|
||||
|
||||
if (iterations >= MAX_ITERATIONS) {
|
||||
break
|
||||
}
|
||||
} while (cursor !== '0')
|
||||
|
||||
if (cleared > 0) {
|
||||
logger.info(`🚦 Cleared ${cleared} concurrency queue counter(s) on startup`)
|
||||
}
|
||||
return cleared
|
||||
} catch (error) {
|
||||
logger.error('Failed to clear all concurrency queues:', error)
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 增加排队统计计数(使用 Lua 脚本确保原子性)
|
||||
* @param {string} apiKeyId - API Key ID
|
||||
* @param {string} field - 统计字段 (entered/success/timeout/cancelled)
|
||||
* @returns {Promise<number>} 增加后的计数
|
||||
*/
|
||||
redisClient.incrConcurrencyQueueStats = async function (apiKeyId, field) {
|
||||
const key = `concurrency:queue:stats:${apiKeyId}`
|
||||
try {
|
||||
// 使用 Lua 脚本确保 HINCRBY 和 EXPIRE 原子执行
|
||||
// 防止在两者之间崩溃导致统计键没有 TTL(内存泄漏)
|
||||
const script = `
|
||||
local count = redis.call('HINCRBY', KEYS[1], ARGV[1], 1)
|
||||
redis.call('EXPIRE', KEYS[1], ARGV[2])
|
||||
return count
|
||||
`
|
||||
const count = await this.client.eval(script, 1, key, field, String(QUEUE_STATS_TTL_SECONDS))
|
||||
return parseInt(count)
|
||||
} catch (error) {
|
||||
logger.error(`Failed to increment queue stats ${field} for ${apiKeyId}:`, error)
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取排队统计
|
||||
* @param {string} apiKeyId - API Key ID
|
||||
* @returns {Promise<Object>} 统计数据
|
||||
*/
|
||||
redisClient.getConcurrencyQueueStats = async function (apiKeyId) {
|
||||
const key = `concurrency:queue:stats:${apiKeyId}`
|
||||
try {
|
||||
const stats = await this.client.hgetall(key)
|
||||
return {
|
||||
entered: parseInt(stats?.entered || 0),
|
||||
success: parseInt(stats?.success || 0),
|
||||
timeout: parseInt(stats?.timeout || 0),
|
||||
cancelled: parseInt(stats?.cancelled || 0),
|
||||
socket_changed: parseInt(stats?.socket_changed || 0),
|
||||
rejected_overload: parseInt(stats?.rejected_overload || 0)
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`Failed to get queue stats for ${apiKeyId}:`, error)
|
||||
return {
|
||||
entered: 0,
|
||||
success: 0,
|
||||
timeout: 0,
|
||||
cancelled: 0,
|
||||
socket_changed: 0,
|
||||
rejected_overload: 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 记录排队等待时间(按 API Key 分开存储)
|
||||
* @param {string} apiKeyId - API Key ID
|
||||
* @param {number} waitTimeMs - 等待时间(毫秒)
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
redisClient.recordQueueWaitTime = async function (apiKeyId, waitTimeMs) {
|
||||
const key = `concurrency:queue:wait_times:${apiKeyId}`
|
||||
try {
|
||||
// 使用 Lua 脚本确保原子性,同时设置 TTL 防止内存泄漏
|
||||
const script = `
|
||||
redis.call('LPUSH', KEYS[1], ARGV[1])
|
||||
redis.call('LTRIM', KEYS[1], 0, ARGV[2])
|
||||
redis.call('EXPIRE', KEYS[1], ARGV[3])
|
||||
return 1
|
||||
`
|
||||
await this.client.eval(
|
||||
script,
|
||||
1,
|
||||
key,
|
||||
waitTimeMs,
|
||||
WAIT_TIME_SAMPLES_PER_KEY - 1,
|
||||
WAIT_TIME_TTL_SECONDS
|
||||
)
|
||||
} catch (error) {
|
||||
logger.error(`Failed to record queue wait time for ${apiKeyId}:`, error)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 记录全局排队等待时间
|
||||
* @param {number} waitTimeMs - 等待时间(毫秒)
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
redisClient.recordGlobalQueueWaitTime = async function (waitTimeMs) {
|
||||
const key = 'concurrency:queue:wait_times:global'
|
||||
try {
|
||||
// 使用 Lua 脚本确保原子性,同时设置 TTL 防止内存泄漏
|
||||
const script = `
|
||||
redis.call('LPUSH', KEYS[1], ARGV[1])
|
||||
redis.call('LTRIM', KEYS[1], 0, ARGV[2])
|
||||
redis.call('EXPIRE', KEYS[1], ARGV[3])
|
||||
return 1
|
||||
`
|
||||
await this.client.eval(
|
||||
script,
|
||||
1,
|
||||
key,
|
||||
waitTimeMs,
|
||||
WAIT_TIME_SAMPLES_GLOBAL - 1,
|
||||
WAIT_TIME_TTL_SECONDS
|
||||
)
|
||||
} catch (error) {
|
||||
logger.error('Failed to record global queue wait time:', error)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取全局等待时间列表
|
||||
* @returns {Promise<number[]>} 等待时间列表
|
||||
*/
|
||||
redisClient.getGlobalQueueWaitTimes = async function () {
|
||||
const key = 'concurrency:queue:wait_times:global'
|
||||
try {
|
||||
const samples = await this.client.lrange(key, 0, -1)
|
||||
return samples.map(Number)
|
||||
} catch (error) {
|
||||
logger.error('Failed to get global queue wait times:', error)
|
||||
return []
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取指定 API Key 的等待时间列表
|
||||
* @param {string} apiKeyId - API Key ID
|
||||
* @returns {Promise<number[]>} 等待时间列表
|
||||
*/
|
||||
redisClient.getQueueWaitTimes = async function (apiKeyId) {
|
||||
const key = `concurrency:queue:wait_times:${apiKeyId}`
|
||||
try {
|
||||
const samples = await this.client.lrange(key, 0, -1)
|
||||
return samples.map(Number)
|
||||
} catch (error) {
|
||||
logger.error(`Failed to get queue wait times for ${apiKeyId}:`, error)
|
||||
return []
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 扫描所有排队统计键
|
||||
* @returns {Promise<string[]>} API Key ID 列表
|
||||
*/
|
||||
redisClient.scanConcurrencyQueueStatsKeys = async function () {
|
||||
const apiKeyIds = []
|
||||
let cursor = '0'
|
||||
let iterations = 0
|
||||
const MAX_ITERATIONS = 1000
|
||||
|
||||
try {
|
||||
do {
|
||||
const [newCursor, keys] = await this.client.scan(
|
||||
cursor,
|
||||
'MATCH',
|
||||
'concurrency:queue:stats:*',
|
||||
'COUNT',
|
||||
100
|
||||
)
|
||||
cursor = newCursor
|
||||
iterations++
|
||||
|
||||
for (const key of keys) {
|
||||
const apiKeyId = key.replace('concurrency:queue:stats:', '')
|
||||
apiKeyIds.push(apiKeyId)
|
||||
}
|
||||
|
||||
if (iterations >= MAX_ITERATIONS) {
|
||||
break
|
||||
}
|
||||
} while (cursor !== '0')
|
||||
|
||||
return apiKeyIds
|
||||
} catch (error) {
|
||||
logger.error('Failed to scan concurrency queue stats keys:', error)
|
||||
return []
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = redisClient
|
||||
|
||||
@@ -43,7 +43,11 @@ router.put('/claude-relay-config', authenticateAdmin, async (req, res) => {
|
||||
sessionBindingTtlDays,
|
||||
userMessageQueueEnabled,
|
||||
userMessageQueueDelayMs,
|
||||
userMessageQueueTimeoutMs
|
||||
userMessageQueueTimeoutMs,
|
||||
concurrentRequestQueueEnabled,
|
||||
concurrentRequestQueueMaxSize,
|
||||
concurrentRequestQueueMaxSizeMultiplier,
|
||||
concurrentRequestQueueTimeoutMs
|
||||
} = req.body
|
||||
|
||||
// 验证输入
|
||||
@@ -110,6 +114,54 @@ router.put('/claude-relay-config', authenticateAdmin, async (req, res) => {
|
||||
}
|
||||
}
|
||||
|
||||
// 验证并发请求排队配置
|
||||
if (
|
||||
concurrentRequestQueueEnabled !== undefined &&
|
||||
typeof concurrentRequestQueueEnabled !== 'boolean'
|
||||
) {
|
||||
return res.status(400).json({ error: 'concurrentRequestQueueEnabled must be a boolean' })
|
||||
}
|
||||
|
||||
if (concurrentRequestQueueMaxSize !== undefined) {
|
||||
if (
|
||||
typeof concurrentRequestQueueMaxSize !== 'number' ||
|
||||
!Number.isInteger(concurrentRequestQueueMaxSize) ||
|
||||
concurrentRequestQueueMaxSize < 1 ||
|
||||
concurrentRequestQueueMaxSize > 100
|
||||
) {
|
||||
return res
|
||||
.status(400)
|
||||
.json({ error: 'concurrentRequestQueueMaxSize must be an integer between 1 and 100' })
|
||||
}
|
||||
}
|
||||
|
||||
if (concurrentRequestQueueMaxSizeMultiplier !== undefined) {
|
||||
// 使用 Number.isFinite() 同时排除 NaN、Infinity、-Infinity 和非数字类型
|
||||
if (
|
||||
!Number.isFinite(concurrentRequestQueueMaxSizeMultiplier) ||
|
||||
concurrentRequestQueueMaxSizeMultiplier < 0 ||
|
||||
concurrentRequestQueueMaxSizeMultiplier > 10
|
||||
) {
|
||||
return res.status(400).json({
|
||||
error: 'concurrentRequestQueueMaxSizeMultiplier must be a finite number between 0 and 10'
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if (concurrentRequestQueueTimeoutMs !== undefined) {
|
||||
if (
|
||||
typeof concurrentRequestQueueTimeoutMs !== 'number' ||
|
||||
!Number.isInteger(concurrentRequestQueueTimeoutMs) ||
|
||||
concurrentRequestQueueTimeoutMs < 5000 ||
|
||||
concurrentRequestQueueTimeoutMs > 300000
|
||||
) {
|
||||
return res.status(400).json({
|
||||
error:
|
||||
'concurrentRequestQueueTimeoutMs must be an integer between 5000 and 300000 (5 seconds to 5 minutes)'
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
const updateData = {}
|
||||
if (claudeCodeOnlyEnabled !== undefined) {
|
||||
updateData.claudeCodeOnlyEnabled = claudeCodeOnlyEnabled
|
||||
@@ -132,6 +184,18 @@ router.put('/claude-relay-config', authenticateAdmin, async (req, res) => {
|
||||
if (userMessageQueueTimeoutMs !== undefined) {
|
||||
updateData.userMessageQueueTimeoutMs = userMessageQueueTimeoutMs
|
||||
}
|
||||
if (concurrentRequestQueueEnabled !== undefined) {
|
||||
updateData.concurrentRequestQueueEnabled = concurrentRequestQueueEnabled
|
||||
}
|
||||
if (concurrentRequestQueueMaxSize !== undefined) {
|
||||
updateData.concurrentRequestQueueMaxSize = concurrentRequestQueueMaxSize
|
||||
}
|
||||
if (concurrentRequestQueueMaxSizeMultiplier !== undefined) {
|
||||
updateData.concurrentRequestQueueMaxSizeMultiplier = concurrentRequestQueueMaxSizeMultiplier
|
||||
}
|
||||
if (concurrentRequestQueueTimeoutMs !== undefined) {
|
||||
updateData.concurrentRequestQueueTimeoutMs = concurrentRequestQueueTimeoutMs
|
||||
}
|
||||
|
||||
const updatedConfig = await claudeRelayConfigService.updateConfig(
|
||||
updateData,
|
||||
|
||||
@@ -8,6 +8,7 @@ const router = express.Router()
|
||||
const redis = require('../../models/redis')
|
||||
const logger = require('../../utils/logger')
|
||||
const { authenticateAdmin } = require('../../middleware/auth')
|
||||
const { calculateWaitTimeStats } = require('../../utils/statsHelper')
|
||||
|
||||
/**
|
||||
* GET /admin/concurrency
|
||||
@@ -17,17 +18,29 @@ router.get('/concurrency', authenticateAdmin, async (req, res) => {
|
||||
try {
|
||||
const status = await redis.getAllConcurrencyStatus()
|
||||
|
||||
// 为每个 API Key 获取排队计数
|
||||
const statusWithQueue = await Promise.all(
|
||||
status.map(async (s) => {
|
||||
const queueCount = await redis.getConcurrencyQueueCount(s.apiKeyId)
|
||||
return {
|
||||
...s,
|
||||
queueCount
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
// 计算汇总统计
|
||||
const summary = {
|
||||
totalKeys: status.length,
|
||||
totalActiveRequests: status.reduce((sum, s) => sum + s.activeCount, 0),
|
||||
totalExpiredRequests: status.reduce((sum, s) => sum + s.expiredCount, 0)
|
||||
totalKeys: statusWithQueue.length,
|
||||
totalActiveRequests: statusWithQueue.reduce((sum, s) => sum + s.activeCount, 0),
|
||||
totalExpiredRequests: statusWithQueue.reduce((sum, s) => sum + s.expiredCount, 0),
|
||||
totalQueuedRequests: statusWithQueue.reduce((sum, s) => sum + s.queueCount, 0)
|
||||
}
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
summary,
|
||||
concurrencyStatus: status
|
||||
concurrencyStatus: statusWithQueue
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error('❌ Failed to get concurrency status:', error)
|
||||
@@ -39,6 +52,156 @@ router.get('/concurrency', authenticateAdmin, async (req, res) => {
|
||||
}
|
||||
})
|
||||
|
||||
/**
|
||||
* GET /admin/concurrency-queue/stats
|
||||
* 获取排队统计信息
|
||||
*/
|
||||
router.get('/concurrency-queue/stats', authenticateAdmin, async (req, res) => {
|
||||
try {
|
||||
// 获取所有有统计数据的 API Key
|
||||
const statsKeys = await redis.scanConcurrencyQueueStatsKeys()
|
||||
const queueKeys = await redis.scanConcurrencyQueueKeys()
|
||||
|
||||
// 合并所有相关的 API Key
|
||||
const allApiKeyIds = [...new Set([...statsKeys, ...queueKeys])]
|
||||
|
||||
// 获取各 API Key 的详细统计
|
||||
const perKeyStats = await Promise.all(
|
||||
allApiKeyIds.map(async (apiKeyId) => {
|
||||
const [queueCount, stats, waitTimes] = await Promise.all([
|
||||
redis.getConcurrencyQueueCount(apiKeyId),
|
||||
redis.getConcurrencyQueueStats(apiKeyId),
|
||||
redis.getQueueWaitTimes(apiKeyId)
|
||||
])
|
||||
|
||||
return {
|
||||
apiKeyId,
|
||||
currentQueueCount: queueCount,
|
||||
stats,
|
||||
waitTimeStats: calculateWaitTimeStats(waitTimes)
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
// 获取全局等待时间统计
|
||||
const globalWaitTimes = await redis.getGlobalQueueWaitTimes()
|
||||
const globalWaitTimeStats = calculateWaitTimeStats(globalWaitTimes)
|
||||
|
||||
// 计算全局汇总
|
||||
const globalStats = {
|
||||
totalEntered: perKeyStats.reduce((sum, s) => sum + s.stats.entered, 0),
|
||||
totalSuccess: perKeyStats.reduce((sum, s) => sum + s.stats.success, 0),
|
||||
totalTimeout: perKeyStats.reduce((sum, s) => sum + s.stats.timeout, 0),
|
||||
totalCancelled: perKeyStats.reduce((sum, s) => sum + s.stats.cancelled, 0),
|
||||
totalSocketChanged: perKeyStats.reduce((sum, s) => sum + (s.stats.socket_changed || 0), 0),
|
||||
totalRejectedOverload: perKeyStats.reduce(
|
||||
(sum, s) => sum + (s.stats.rejected_overload || 0),
|
||||
0
|
||||
),
|
||||
currentTotalQueued: perKeyStats.reduce((sum, s) => sum + s.currentQueueCount, 0),
|
||||
// 队列资源利用率指标
|
||||
peakQueueSize:
|
||||
perKeyStats.length > 0 ? Math.max(...perKeyStats.map((s) => s.currentQueueCount)) : 0,
|
||||
avgQueueSize:
|
||||
perKeyStats.length > 0
|
||||
? Math.round(
|
||||
perKeyStats.reduce((sum, s) => sum + s.currentQueueCount, 0) / perKeyStats.length
|
||||
)
|
||||
: 0,
|
||||
activeApiKeys: perKeyStats.filter((s) => s.currentQueueCount > 0).length
|
||||
}
|
||||
|
||||
// 计算成功率
|
||||
if (globalStats.totalEntered > 0) {
|
||||
globalStats.successRate = Math.round(
|
||||
(globalStats.totalSuccess / globalStats.totalEntered) * 100
|
||||
)
|
||||
globalStats.timeoutRate = Math.round(
|
||||
(globalStats.totalTimeout / globalStats.totalEntered) * 100
|
||||
)
|
||||
globalStats.cancelledRate = Math.round(
|
||||
(globalStats.totalCancelled / globalStats.totalEntered) * 100
|
||||
)
|
||||
}
|
||||
|
||||
// 从全局等待时间统计中提取关键指标
|
||||
if (globalWaitTimeStats) {
|
||||
globalStats.avgWaitTimeMs = globalWaitTimeStats.avg
|
||||
globalStats.p50WaitTimeMs = globalWaitTimeStats.p50
|
||||
globalStats.p90WaitTimeMs = globalWaitTimeStats.p90
|
||||
globalStats.p99WaitTimeMs = globalWaitTimeStats.p99
|
||||
// 多实例采样策略标记(详见 design.md Decision 9)
|
||||
// 全局 P90 仅用于可视化和监控,不用于系统决策
|
||||
// 健康检查使用 API Key 级别的 P90(每 Key 独立采样)
|
||||
globalWaitTimeStats.globalP90ForVisualizationOnly = true
|
||||
}
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
globalStats,
|
||||
globalWaitTimeStats,
|
||||
perKeyStats
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error('❌ Failed to get queue stats:', error)
|
||||
res.status(500).json({
|
||||
success: false,
|
||||
error: 'Failed to get queue stats',
|
||||
message: error.message
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
/**
|
||||
* DELETE /admin/concurrency-queue/:apiKeyId
|
||||
* 清理特定 API Key 的排队计数
|
||||
*/
|
||||
router.delete('/concurrency-queue/:apiKeyId', authenticateAdmin, async (req, res) => {
|
||||
try {
|
||||
const { apiKeyId } = req.params
|
||||
await redis.clearConcurrencyQueue(apiKeyId)
|
||||
|
||||
logger.warn(`🧹 Admin ${req.admin?.username || 'unknown'} cleared queue for key ${apiKeyId}`)
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
message: `Successfully cleared queue for API key ${apiKeyId}`
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error(`❌ Failed to clear queue for ${req.params.apiKeyId}:`, error)
|
||||
res.status(500).json({
|
||||
success: false,
|
||||
error: 'Failed to clear queue',
|
||||
message: error.message
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
/**
|
||||
* DELETE /admin/concurrency-queue
|
||||
* 清理所有排队计数
|
||||
*/
|
||||
router.delete('/concurrency-queue', authenticateAdmin, async (req, res) => {
|
||||
try {
|
||||
const cleared = await redis.clearAllConcurrencyQueues()
|
||||
|
||||
logger.warn(`🧹 Admin ${req.admin?.username || 'unknown'} cleared ALL queues`)
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
message: 'Successfully cleared all queues',
|
||||
cleared
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error('❌ Failed to clear all queues:', error)
|
||||
res.status(500).json({
|
||||
success: false,
|
||||
error: 'Failed to clear all queues',
|
||||
message: error.message
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
/**
|
||||
* GET /admin/concurrency/:apiKeyId
|
||||
* 获取特定 API Key 的并发状态详情
|
||||
@@ -47,10 +210,14 @@ router.get('/concurrency/:apiKeyId', authenticateAdmin, async (req, res) => {
|
||||
try {
|
||||
const { apiKeyId } = req.params
|
||||
const status = await redis.getConcurrencyStatus(apiKeyId)
|
||||
const queueCount = await redis.getConcurrencyQueueCount(apiKeyId)
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
concurrencyStatus: status
|
||||
concurrencyStatus: {
|
||||
...status,
|
||||
queueCount
|
||||
}
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error(`❌ Failed to get concurrency status for ${req.params.apiKeyId}:`, error)
|
||||
|
||||
@@ -190,12 +190,42 @@ async function handleMessagesRequest(req, res) {
|
||||
)
|
||||
|
||||
if (isStream) {
|
||||
// 🔍 检查客户端连接是否仍然有效(可能在并发排队等待期间断开)
|
||||
if (res.destroyed || res.socket?.destroyed || res.writableEnded) {
|
||||
logger.warn(
|
||||
`⚠️ Client disconnected before stream response could start for key: ${req.apiKey?.name || 'unknown'}`
|
||||
)
|
||||
return undefined
|
||||
}
|
||||
|
||||
// 流式响应 - 只使用官方真实usage数据
|
||||
res.setHeader('Content-Type', 'text/event-stream')
|
||||
res.setHeader('Cache-Control', 'no-cache')
|
||||
res.setHeader('Connection', 'keep-alive')
|
||||
res.setHeader('Access-Control-Allow-Origin', '*')
|
||||
res.setHeader('X-Accel-Buffering', 'no') // 禁用 Nginx 缓冲
|
||||
// ⚠️ 检查 headers 是否已发送(可能在排队心跳时已设置)
|
||||
if (!res.headersSent) {
|
||||
res.setHeader('Content-Type', 'text/event-stream')
|
||||
res.setHeader('Cache-Control', 'no-cache')
|
||||
// ⚠️ 关键修复:尊重 auth.js 提前设置的 Connection: close
|
||||
// 当并发队列功能启用时,auth.js 会设置 Connection: close 来禁用 Keep-Alive
|
||||
// 这里只在没有设置过 Connection 头时才设置 keep-alive
|
||||
const existingConnection = res.getHeader('Connection')
|
||||
if (!existingConnection) {
|
||||
res.setHeader('Connection', 'keep-alive')
|
||||
} else {
|
||||
logger.api(
|
||||
`🔌 [STREAM] Preserving existing Connection header: ${existingConnection} for key: ${req.apiKey?.name || 'unknown'}`
|
||||
)
|
||||
}
|
||||
res.setHeader('Access-Control-Allow-Origin', '*')
|
||||
res.setHeader('X-Accel-Buffering', 'no') // 禁用 Nginx 缓冲
|
||||
} else {
|
||||
logger.debug(
|
||||
`📤 [STREAM] Headers already sent, skipping setHeader for key: ${req.apiKey?.name || 'unknown'}`
|
||||
)
|
||||
}
|
||||
|
||||
// 禁用 Nagle 算法,确保数据立即发送
|
||||
if (res.socket && typeof res.socket.setNoDelay === 'function') {
|
||||
@@ -657,12 +687,61 @@ async function handleMessagesRequest(req, res) {
|
||||
}
|
||||
}, 1000) // 1秒后检查
|
||||
} else {
|
||||
// 🔍 检查客户端连接是否仍然有效(可能在并发排队等待期间断开)
|
||||
if (res.destroyed || res.socket?.destroyed || res.writableEnded) {
|
||||
logger.warn(
|
||||
`⚠️ Client disconnected before non-stream request could start for key: ${req.apiKey?.name || 'unknown'}`
|
||||
)
|
||||
return undefined
|
||||
}
|
||||
|
||||
// 非流式响应 - 只使用官方真实usage数据
|
||||
logger.info('📄 Starting non-streaming request', {
|
||||
apiKeyId: req.apiKey.id,
|
||||
apiKeyName: req.apiKey.name
|
||||
})
|
||||
|
||||
// 📊 监听 socket 事件以追踪连接状态变化
|
||||
const nonStreamSocket = res.socket
|
||||
let _clientClosedConnection = false
|
||||
let _socketCloseTime = null
|
||||
|
||||
if (nonStreamSocket) {
|
||||
const onSocketEnd = () => {
|
||||
_clientClosedConnection = true
|
||||
_socketCloseTime = Date.now()
|
||||
logger.warn(
|
||||
`⚠️ [NON-STREAM] Socket 'end' event - client sent FIN | key: ${req.apiKey?.name}, ` +
|
||||
`requestId: ${req.requestId}, elapsed: ${Date.now() - startTime}ms`
|
||||
)
|
||||
}
|
||||
const onSocketClose = () => {
|
||||
_clientClosedConnection = true
|
||||
logger.warn(
|
||||
`⚠️ [NON-STREAM] Socket 'close' event | key: ${req.apiKey?.name}, ` +
|
||||
`requestId: ${req.requestId}, elapsed: ${Date.now() - startTime}ms, ` +
|
||||
`hadError: ${nonStreamSocket.destroyed}`
|
||||
)
|
||||
}
|
||||
const onSocketError = (err) => {
|
||||
logger.error(
|
||||
`❌ [NON-STREAM] Socket error | key: ${req.apiKey?.name}, ` +
|
||||
`requestId: ${req.requestId}, error: ${err.message}`
|
||||
)
|
||||
}
|
||||
|
||||
nonStreamSocket.once('end', onSocketEnd)
|
||||
nonStreamSocket.once('close', onSocketClose)
|
||||
nonStreamSocket.once('error', onSocketError)
|
||||
|
||||
// 清理监听器(在响应结束后)
|
||||
res.once('finish', () => {
|
||||
nonStreamSocket.removeListener('end', onSocketEnd)
|
||||
nonStreamSocket.removeListener('close', onSocketClose)
|
||||
nonStreamSocket.removeListener('error', onSocketError)
|
||||
})
|
||||
}
|
||||
|
||||
// 生成会话哈希用于sticky会话
|
||||
const sessionHash = sessionHelper.generateSessionHash(req.body)
|
||||
|
||||
@@ -867,6 +946,15 @@ async function handleMessagesRequest(req, res) {
|
||||
bodyLength: response.body ? response.body.length : 0
|
||||
})
|
||||
|
||||
// 🔍 检查客户端连接是否仍然有效
|
||||
// 在长时间请求过程中,客户端可能已经断开连接(超时、用户取消等)
|
||||
if (res.destroyed || res.socket?.destroyed || res.writableEnded) {
|
||||
logger.warn(
|
||||
`⚠️ Client disconnected before non-stream response could be sent for key: ${req.apiKey?.name || 'unknown'}`
|
||||
)
|
||||
return undefined
|
||||
}
|
||||
|
||||
res.status(response.statusCode)
|
||||
|
||||
// 设置响应头,避免 Content-Length 和 Transfer-Encoding 冲突
|
||||
@@ -932,10 +1020,12 @@ async function handleMessagesRequest(req, res) {
|
||||
logger.warn('⚠️ No usage data found in Claude API JSON response')
|
||||
}
|
||||
|
||||
// 使用 Express 内建的 res.json() 发送响应(简单可靠)
|
||||
res.json(jsonData)
|
||||
} catch (parseError) {
|
||||
logger.warn('⚠️ Failed to parse Claude API response as JSON:', parseError.message)
|
||||
logger.info('📄 Raw response body:', response.body)
|
||||
// 使用 Express 内建的 res.send() 发送响应(简单可靠)
|
||||
res.send(response.body)
|
||||
}
|
||||
|
||||
|
||||
@@ -243,10 +243,11 @@ class BedrockRelayService {
|
||||
isBackendError ? { backendError: queueResult.errorMessage } : {}
|
||||
)
|
||||
if (!res.headersSent) {
|
||||
const existingConnection = res.getHeader ? res.getHeader('Connection') : null
|
||||
res.writeHead(statusCode, {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive',
|
||||
Connection: existingConnection || 'keep-alive',
|
||||
'x-user-message-queue-error': errorType
|
||||
})
|
||||
}
|
||||
@@ -309,10 +310,17 @@ class BedrockRelayService {
|
||||
}
|
||||
|
||||
// 设置SSE响应头
|
||||
// ⚠️ 关键修复:尊重 auth.js 提前设置的 Connection: close
|
||||
const existingConnection = res.getHeader ? res.getHeader('Connection') : null
|
||||
if (existingConnection) {
|
||||
logger.debug(
|
||||
`🔌 [Bedrock Stream] Preserving existing Connection header: ${existingConnection}`
|
||||
)
|
||||
}
|
||||
res.writeHead(200, {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive',
|
||||
Connection: existingConnection || 'keep-alive',
|
||||
'Access-Control-Allow-Origin': '*',
|
||||
'Access-Control-Allow-Headers': 'Content-Type, Authorization'
|
||||
})
|
||||
|
||||
@@ -4,6 +4,7 @@ const logger = require('../utils/logger')
|
||||
const config = require('../../config/config')
|
||||
const { parseVendorPrefixedModel } = require('../utils/modelHelper')
|
||||
const userMessageQueueService = require('./userMessageQueueService')
|
||||
const { isStreamWritable } = require('../utils/streamHelper')
|
||||
|
||||
class CcrRelayService {
|
||||
constructor() {
|
||||
@@ -379,10 +380,13 @@ class CcrRelayService {
|
||||
isBackendError ? { backendError: queueResult.errorMessage } : {}
|
||||
)
|
||||
if (!responseStream.headersSent) {
|
||||
const existingConnection = responseStream.getHeader
|
||||
? responseStream.getHeader('Connection')
|
||||
: null
|
||||
responseStream.writeHead(statusCode, {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive',
|
||||
Connection: existingConnection || 'keep-alive',
|
||||
'x-user-message-queue-error': errorType
|
||||
})
|
||||
}
|
||||
@@ -606,10 +610,13 @@ class CcrRelayService {
|
||||
|
||||
// 设置错误响应的状态码和响应头
|
||||
if (!responseStream.headersSent) {
|
||||
const existingConnection = responseStream.getHeader
|
||||
? responseStream.getHeader('Connection')
|
||||
: null
|
||||
const errorHeaders = {
|
||||
'Content-Type': response.headers['content-type'] || 'application/json',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive'
|
||||
Connection: existingConnection || 'keep-alive'
|
||||
}
|
||||
// 避免 Transfer-Encoding 冲突,让 Express 自动处理
|
||||
delete errorHeaders['Transfer-Encoding']
|
||||
@@ -619,13 +626,13 @@ class CcrRelayService {
|
||||
|
||||
// 直接透传错误数据,不进行包装
|
||||
response.data.on('data', (chunk) => {
|
||||
if (!responseStream.destroyed) {
|
||||
if (isStreamWritable(responseStream)) {
|
||||
responseStream.write(chunk)
|
||||
}
|
||||
})
|
||||
|
||||
response.data.on('end', () => {
|
||||
if (!responseStream.destroyed) {
|
||||
if (isStreamWritable(responseStream)) {
|
||||
responseStream.end()
|
||||
}
|
||||
resolve() // 不抛出异常,正常完成流处理
|
||||
@@ -659,11 +666,20 @@ class CcrRelayService {
|
||||
})
|
||||
|
||||
// 设置响应头
|
||||
// ⚠️ 关键修复:尊重 auth.js 提前设置的 Connection: close
|
||||
if (!responseStream.headersSent) {
|
||||
const existingConnection = responseStream.getHeader
|
||||
? responseStream.getHeader('Connection')
|
||||
: null
|
||||
if (existingConnection) {
|
||||
logger.debug(
|
||||
`🔌 [CCR Stream] Preserving existing Connection header: ${existingConnection}`
|
||||
)
|
||||
}
|
||||
const headers = {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive',
|
||||
Connection: existingConnection || 'keep-alive',
|
||||
'Access-Control-Allow-Origin': '*',
|
||||
'Access-Control-Allow-Headers': 'Cache-Control'
|
||||
}
|
||||
@@ -702,12 +718,17 @@ class CcrRelayService {
|
||||
}
|
||||
|
||||
// 写入到响应流
|
||||
if (outputLine && !responseStream.destroyed) {
|
||||
if (outputLine && isStreamWritable(responseStream)) {
|
||||
responseStream.write(`${outputLine}\n`)
|
||||
} else if (outputLine) {
|
||||
// 客户端连接已断开,记录警告
|
||||
logger.warn(
|
||||
`⚠️ [CCR] Client disconnected during stream, skipping data for account: ${accountId}`
|
||||
)
|
||||
}
|
||||
} else {
|
||||
// 空行也需要传递
|
||||
if (!responseStream.destroyed) {
|
||||
if (isStreamWritable(responseStream)) {
|
||||
responseStream.write('\n')
|
||||
}
|
||||
}
|
||||
@@ -718,10 +739,6 @@ class CcrRelayService {
|
||||
})
|
||||
|
||||
response.data.on('end', () => {
|
||||
if (!responseStream.destroyed) {
|
||||
responseStream.end()
|
||||
}
|
||||
|
||||
// 如果收集到使用统计数据,调用回调
|
||||
if (usageCallback && Object.keys(collectedUsage).length > 0) {
|
||||
try {
|
||||
@@ -733,12 +750,26 @@ class CcrRelayService {
|
||||
}
|
||||
}
|
||||
|
||||
resolve()
|
||||
if (isStreamWritable(responseStream)) {
|
||||
// 等待数据完全 flush 到客户端后再 resolve
|
||||
responseStream.end(() => {
|
||||
logger.debug(
|
||||
`🌊 CCR stream response completed and flushed | bytesWritten: ${responseStream.bytesWritten || 'unknown'}`
|
||||
)
|
||||
resolve()
|
||||
})
|
||||
} else {
|
||||
// 连接已断开,记录警告
|
||||
logger.warn(
|
||||
`⚠️ [CCR] Client disconnected before stream end, data may not have been received | account: ${accountId}`
|
||||
)
|
||||
resolve()
|
||||
}
|
||||
})
|
||||
|
||||
response.data.on('error', (err) => {
|
||||
logger.error('❌ Stream data error:', err)
|
||||
if (!responseStream.destroyed) {
|
||||
if (isStreamWritable(responseStream)) {
|
||||
responseStream.end()
|
||||
}
|
||||
reject(err)
|
||||
@@ -770,7 +801,7 @@ class CcrRelayService {
|
||||
}
|
||||
}
|
||||
|
||||
if (!responseStream.destroyed) {
|
||||
if (isStreamWritable(responseStream)) {
|
||||
responseStream.write(`data: ${JSON.stringify(errorResponse)}\n\n`)
|
||||
responseStream.end()
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ const {
|
||||
isAccountDisabledError
|
||||
} = require('../utils/errorSanitizer')
|
||||
const userMessageQueueService = require('./userMessageQueueService')
|
||||
const { isStreamWritable } = require('../utils/streamHelper')
|
||||
|
||||
class ClaudeConsoleRelayService {
|
||||
constructor() {
|
||||
@@ -517,10 +518,13 @@ class ClaudeConsoleRelayService {
|
||||
isBackendError ? { backendError: queueResult.errorMessage } : {}
|
||||
)
|
||||
if (!responseStream.headersSent) {
|
||||
const existingConnection = responseStream.getHeader
|
||||
? responseStream.getHeader('Connection')
|
||||
: null
|
||||
responseStream.writeHead(statusCode, {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive',
|
||||
Connection: existingConnection || 'keep-alive',
|
||||
'x-user-message-queue-error': errorType
|
||||
})
|
||||
}
|
||||
@@ -878,7 +882,7 @@ class ClaudeConsoleRelayService {
|
||||
`🧹 [Stream] [SANITIZED] Error response to client: ${JSON.stringify(sanitizedError)}`
|
||||
)
|
||||
|
||||
if (!responseStream.destroyed) {
|
||||
if (isStreamWritable(responseStream)) {
|
||||
responseStream.write(JSON.stringify(sanitizedError))
|
||||
responseStream.end()
|
||||
}
|
||||
@@ -886,7 +890,7 @@ class ClaudeConsoleRelayService {
|
||||
const sanitizedText = sanitizeErrorMessage(errorDataForCheck)
|
||||
logger.error(`🧹 [Stream] [SANITIZED] Error response to client: ${sanitizedText}`)
|
||||
|
||||
if (!responseStream.destroyed) {
|
||||
if (isStreamWritable(responseStream)) {
|
||||
responseStream.write(sanitizedText)
|
||||
responseStream.end()
|
||||
}
|
||||
@@ -923,11 +927,22 @@ class ClaudeConsoleRelayService {
|
||||
})
|
||||
|
||||
// 设置响应头
|
||||
// ⚠️ 关键修复:尊重 auth.js 提前设置的 Connection: close
|
||||
// 当并发队列功能启用时,auth.js 会设置 Connection: close 来禁用 Keep-Alive
|
||||
if (!responseStream.headersSent) {
|
||||
const existingConnection = responseStream.getHeader
|
||||
? responseStream.getHeader('Connection')
|
||||
: null
|
||||
const connectionHeader = existingConnection || 'keep-alive'
|
||||
if (existingConnection) {
|
||||
logger.debug(
|
||||
`🔌 [Console Stream] Preserving existing Connection header: ${existingConnection}`
|
||||
)
|
||||
}
|
||||
responseStream.writeHead(200, {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive',
|
||||
Connection: connectionHeader,
|
||||
'X-Accel-Buffering': 'no'
|
||||
})
|
||||
}
|
||||
@@ -953,20 +968,33 @@ class ClaudeConsoleRelayService {
|
||||
buffer = lines.pop() || ''
|
||||
|
||||
// 转发数据并解析usage
|
||||
if (lines.length > 0 && !responseStream.destroyed) {
|
||||
const linesToForward = lines.join('\n') + (lines.length > 0 ? '\n' : '')
|
||||
if (lines.length > 0) {
|
||||
// 检查流是否可写(客户端连接是否有效)
|
||||
if (isStreamWritable(responseStream)) {
|
||||
const linesToForward = lines.join('\n') + (lines.length > 0 ? '\n' : '')
|
||||
|
||||
// 应用流转换器如果有
|
||||
if (streamTransformer) {
|
||||
const transformed = streamTransformer(linesToForward)
|
||||
if (transformed) {
|
||||
responseStream.write(transformed)
|
||||
// 应用流转换器如果有
|
||||
let dataToWrite = linesToForward
|
||||
if (streamTransformer) {
|
||||
const transformed = streamTransformer(linesToForward)
|
||||
if (transformed) {
|
||||
dataToWrite = transformed
|
||||
} else {
|
||||
dataToWrite = null
|
||||
}
|
||||
}
|
||||
|
||||
if (dataToWrite) {
|
||||
responseStream.write(dataToWrite)
|
||||
}
|
||||
} else {
|
||||
responseStream.write(linesToForward)
|
||||
// 客户端连接已断开,记录警告(但仍继续解析usage)
|
||||
logger.warn(
|
||||
`⚠️ [Console] Client disconnected during stream, skipping ${lines.length} lines for account: ${account?.name || accountId}`
|
||||
)
|
||||
}
|
||||
|
||||
// 解析SSE数据寻找usage信息
|
||||
// 解析SSE数据寻找usage信息(无论连接状态如何)
|
||||
for (const line of lines) {
|
||||
if (line.startsWith('data:')) {
|
||||
const jsonStr = line.slice(5).trimStart()
|
||||
@@ -1074,7 +1102,7 @@ class ClaudeConsoleRelayService {
|
||||
`❌ Error processing Claude Console stream data (Account: ${account?.name || accountId}):`,
|
||||
error
|
||||
)
|
||||
if (!responseStream.destroyed) {
|
||||
if (isStreamWritable(responseStream)) {
|
||||
// 如果有 streamTransformer(如测试请求),使用前端期望的格式
|
||||
if (streamTransformer) {
|
||||
responseStream.write(
|
||||
@@ -1097,7 +1125,7 @@ class ClaudeConsoleRelayService {
|
||||
response.data.on('end', () => {
|
||||
try {
|
||||
// 处理缓冲区中剩余的数据
|
||||
if (buffer.trim() && !responseStream.destroyed) {
|
||||
if (buffer.trim() && isStreamWritable(responseStream)) {
|
||||
if (streamTransformer) {
|
||||
const transformed = streamTransformer(buffer)
|
||||
if (transformed) {
|
||||
@@ -1146,12 +1174,33 @@ class ClaudeConsoleRelayService {
|
||||
}
|
||||
|
||||
// 确保流正确结束
|
||||
if (!responseStream.destroyed) {
|
||||
responseStream.end()
|
||||
}
|
||||
if (isStreamWritable(responseStream)) {
|
||||
// 📊 诊断日志:流结束前状态
|
||||
logger.info(
|
||||
`📤 [STREAM] Ending response | destroyed: ${responseStream.destroyed}, ` +
|
||||
`socketDestroyed: ${responseStream.socket?.destroyed}, ` +
|
||||
`socketBytesWritten: ${responseStream.socket?.bytesWritten || 0}`
|
||||
)
|
||||
|
||||
logger.debug('🌊 Claude Console Claude stream response completed')
|
||||
resolve()
|
||||
// 禁用 Nagle 算法确保数据立即发送
|
||||
if (responseStream.socket && !responseStream.socket.destroyed) {
|
||||
responseStream.socket.setNoDelay(true)
|
||||
}
|
||||
|
||||
// 等待数据完全 flush 到客户端后再 resolve
|
||||
responseStream.end(() => {
|
||||
logger.info(
|
||||
`✅ [STREAM] Response ended and flushed | socketBytesWritten: ${responseStream.socket?.bytesWritten || 'unknown'}`
|
||||
)
|
||||
resolve()
|
||||
})
|
||||
} else {
|
||||
// 连接已断开,记录警告
|
||||
logger.warn(
|
||||
`⚠️ [Console] Client disconnected before stream end, data may not have been received | account: ${account?.name || accountId}`
|
||||
)
|
||||
resolve()
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('❌ Error processing stream end:', error)
|
||||
reject(error)
|
||||
@@ -1163,7 +1212,7 @@ class ClaudeConsoleRelayService {
|
||||
`❌ Claude Console stream error (Account: ${account?.name || accountId}):`,
|
||||
error
|
||||
)
|
||||
if (!responseStream.destroyed) {
|
||||
if (isStreamWritable(responseStream)) {
|
||||
// 如果有 streamTransformer(如测试请求),使用前端期望的格式
|
||||
if (streamTransformer) {
|
||||
responseStream.write(
|
||||
@@ -1211,14 +1260,17 @@ class ClaudeConsoleRelayService {
|
||||
|
||||
// 发送错误响应
|
||||
if (!responseStream.headersSent) {
|
||||
const existingConnection = responseStream.getHeader
|
||||
? responseStream.getHeader('Connection')
|
||||
: null
|
||||
responseStream.writeHead(error.response?.status || 500, {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive'
|
||||
Connection: existingConnection || 'keep-alive'
|
||||
})
|
||||
}
|
||||
|
||||
if (!responseStream.destroyed) {
|
||||
if (isStreamWritable(responseStream)) {
|
||||
// 如果有 streamTransformer(如测试请求),使用前端期望的格式
|
||||
if (streamTransformer) {
|
||||
responseStream.write(
|
||||
@@ -1388,7 +1440,7 @@ class ClaudeConsoleRelayService {
|
||||
'Cache-Control': 'no-cache'
|
||||
})
|
||||
}
|
||||
if (!responseStream.destroyed && !responseStream.writableEnded) {
|
||||
if (isStreamWritable(responseStream)) {
|
||||
responseStream.write(
|
||||
`data: ${JSON.stringify({ type: 'test_complete', success: false, error: error.message })}\n\n`
|
||||
)
|
||||
|
||||
@@ -20,6 +20,15 @@ const DEFAULT_CONFIG = {
|
||||
userMessageQueueDelayMs: 200, // 请求间隔(毫秒)
|
||||
userMessageQueueTimeoutMs: 5000, // 队列等待超时(毫秒),优化后锁持有时间短无需长等待
|
||||
userMessageQueueLockTtlMs: 5000, // 锁TTL(毫秒),请求发送后立即释放无需长TTL
|
||||
// 并发请求排队配置
|
||||
concurrentRequestQueueEnabled: false, // 是否启用并发请求排队(默认关闭)
|
||||
concurrentRequestQueueMaxSize: 3, // 固定最小排队数(默认3)
|
||||
concurrentRequestQueueMaxSizeMultiplier: 0, // 并发数的倍数(默认0,仅使用固定值)
|
||||
concurrentRequestQueueTimeoutMs: 10000, // 排队超时(毫秒,默认10秒)
|
||||
concurrentRequestQueueMaxRedisFailCount: 5, // 连续 Redis 失败阈值(默认5次)
|
||||
// 排队健康检查配置
|
||||
concurrentRequestQueueHealthCheckEnabled: true, // 是否启用排队健康检查(默认开启)
|
||||
concurrentRequestQueueHealthThreshold: 0.8, // 健康检查阈值(P90 >= 超时 × 阈值时拒绝新请求)
|
||||
updatedAt: null,
|
||||
updatedBy: null
|
||||
}
|
||||
@@ -105,7 +114,8 @@ class ClaudeRelayConfigService {
|
||||
|
||||
logger.info(`✅ Claude relay config updated by ${updatedBy}:`, {
|
||||
claudeCodeOnlyEnabled: updatedConfig.claudeCodeOnlyEnabled,
|
||||
globalSessionBindingEnabled: updatedConfig.globalSessionBindingEnabled
|
||||
globalSessionBindingEnabled: updatedConfig.globalSessionBindingEnabled,
|
||||
concurrentRequestQueueEnabled: updatedConfig.concurrentRequestQueueEnabled
|
||||
})
|
||||
|
||||
return updatedConfig
|
||||
|
||||
@@ -16,6 +16,7 @@ const { formatDateWithTimezone } = require('../utils/dateHelper')
|
||||
const requestIdentityService = require('./requestIdentityService')
|
||||
const { createClaudeTestPayload } = require('../utils/testPayloadHelper')
|
||||
const userMessageQueueService = require('./userMessageQueueService')
|
||||
const { isStreamWritable } = require('../utils/streamHelper')
|
||||
|
||||
class ClaudeRelayService {
|
||||
constructor() {
|
||||
@@ -1057,6 +1058,8 @@ class ClaudeRelayService {
|
||||
|
||||
logger.info(`🔗 指纹是这个: ${headers['User-Agent']}`)
|
||||
|
||||
logger.info(`🔗 指纹是这个: ${headers['User-Agent']}`)
|
||||
|
||||
// 根据模型和客户端传递的 anthropic-beta 动态设置 header
|
||||
const modelId = requestPayload?.model || body?.model
|
||||
const clientBetaHeader = clientHeaders?.['anthropic-beta']
|
||||
@@ -1338,10 +1341,13 @@ class ClaudeRelayService {
|
||||
isBackendError ? { backendError: queueResult.errorMessage } : {}
|
||||
)
|
||||
if (!responseStream.headersSent) {
|
||||
const existingConnection = responseStream.getHeader
|
||||
? responseStream.getHeader('Connection')
|
||||
: null
|
||||
responseStream.writeHead(statusCode, {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive',
|
||||
Connection: existingConnection || 'keep-alive',
|
||||
'x-user-message-queue-error': errorType
|
||||
})
|
||||
}
|
||||
@@ -1699,7 +1705,7 @@ class ClaudeRelayService {
|
||||
}
|
||||
})()
|
||||
}
|
||||
if (!responseStream.destroyed) {
|
||||
if (isStreamWritable(responseStream)) {
|
||||
// 解析 Claude API 返回的错误详情
|
||||
let errorMessage = `Claude API error: ${res.statusCode}`
|
||||
try {
|
||||
@@ -1764,16 +1770,23 @@ class ClaudeRelayService {
|
||||
buffer = lines.pop() || '' // 保留最后的不完整行
|
||||
|
||||
// 转发已处理的完整行到客户端
|
||||
if (lines.length > 0 && !responseStream.destroyed) {
|
||||
const linesToForward = lines.join('\n') + (lines.length > 0 ? '\n' : '')
|
||||
// 如果有流转换器,应用转换
|
||||
if (streamTransformer) {
|
||||
const transformed = streamTransformer(linesToForward)
|
||||
if (transformed) {
|
||||
responseStream.write(transformed)
|
||||
if (lines.length > 0) {
|
||||
if (isStreamWritable(responseStream)) {
|
||||
const linesToForward = lines.join('\n') + (lines.length > 0 ? '\n' : '')
|
||||
// 如果有流转换器,应用转换
|
||||
if (streamTransformer) {
|
||||
const transformed = streamTransformer(linesToForward)
|
||||
if (transformed) {
|
||||
responseStream.write(transformed)
|
||||
}
|
||||
} else {
|
||||
responseStream.write(linesToForward)
|
||||
}
|
||||
} else {
|
||||
responseStream.write(linesToForward)
|
||||
// 客户端连接已断开,记录警告(但仍继续解析usage)
|
||||
logger.warn(
|
||||
`⚠️ [Official] Client disconnected during stream, skipping ${lines.length} lines for account: ${accountId}`
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1878,7 +1891,7 @@ class ClaudeRelayService {
|
||||
} catch (error) {
|
||||
logger.error('❌ Error processing stream data:', error)
|
||||
// 发送错误但不破坏流,让它自然结束
|
||||
if (!responseStream.destroyed) {
|
||||
if (isStreamWritable(responseStream)) {
|
||||
responseStream.write('event: error\n')
|
||||
responseStream.write(
|
||||
`data: ${JSON.stringify({
|
||||
@@ -1894,7 +1907,7 @@ class ClaudeRelayService {
|
||||
res.on('end', async () => {
|
||||
try {
|
||||
// 处理缓冲区中剩余的数据
|
||||
if (buffer.trim() && !responseStream.destroyed) {
|
||||
if (buffer.trim() && isStreamWritable(responseStream)) {
|
||||
if (streamTransformer) {
|
||||
const transformed = streamTransformer(buffer)
|
||||
if (transformed) {
|
||||
@@ -1906,8 +1919,16 @@ class ClaudeRelayService {
|
||||
}
|
||||
|
||||
// 确保流正确结束
|
||||
if (!responseStream.destroyed) {
|
||||
if (isStreamWritable(responseStream)) {
|
||||
responseStream.end()
|
||||
logger.debug(
|
||||
`🌊 Stream end called | bytesWritten: ${responseStream.bytesWritten || 'unknown'}`
|
||||
)
|
||||
} else {
|
||||
// 连接已断开,记录警告
|
||||
logger.warn(
|
||||
`⚠️ [Official] Client disconnected before stream end, data may not have been received | account: ${account?.name || accountId}`
|
||||
)
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('❌ Error processing stream end:', error)
|
||||
@@ -2105,14 +2126,17 @@ class ClaudeRelayService {
|
||||
}
|
||||
|
||||
if (!responseStream.headersSent) {
|
||||
const existingConnection = responseStream.getHeader
|
||||
? responseStream.getHeader('Connection')
|
||||
: null
|
||||
responseStream.writeHead(statusCode, {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive'
|
||||
Connection: existingConnection || 'keep-alive'
|
||||
})
|
||||
}
|
||||
|
||||
if (!responseStream.destroyed) {
|
||||
if (isStreamWritable(responseStream)) {
|
||||
// 发送 SSE 错误事件
|
||||
responseStream.write('event: error\n')
|
||||
responseStream.write(
|
||||
@@ -2132,13 +2156,16 @@ class ClaudeRelayService {
|
||||
logger.error(`❌ Claude stream request timeout | Account: ${account?.name || accountId}`)
|
||||
|
||||
if (!responseStream.headersSent) {
|
||||
const existingConnection = responseStream.getHeader
|
||||
? responseStream.getHeader('Connection')
|
||||
: null
|
||||
responseStream.writeHead(504, {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive'
|
||||
Connection: existingConnection || 'keep-alive'
|
||||
})
|
||||
}
|
||||
if (!responseStream.destroyed) {
|
||||
if (isStreamWritable(responseStream)) {
|
||||
// 发送 SSE 错误事件
|
||||
responseStream.write('event: error\n')
|
||||
responseStream.write(
|
||||
@@ -2453,10 +2480,13 @@ class ClaudeRelayService {
|
||||
|
||||
// 设置响应头
|
||||
if (!responseStream.headersSent) {
|
||||
const existingConnection = responseStream.getHeader
|
||||
? responseStream.getHeader('Connection')
|
||||
: null
|
||||
responseStream.writeHead(200, {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive',
|
||||
Connection: existingConnection || 'keep-alive',
|
||||
'X-Accel-Buffering': 'no'
|
||||
})
|
||||
}
|
||||
@@ -2484,7 +2514,7 @@ class ClaudeRelayService {
|
||||
} catch (error) {
|
||||
logger.error(`❌ Test account connection failed:`, error)
|
||||
// 发送错误事件给前端
|
||||
if (!responseStream.destroyed && !responseStream.writableEnded) {
|
||||
if (isStreamWritable(responseStream)) {
|
||||
try {
|
||||
const errorMsg = error.message || '测试失败'
|
||||
responseStream.write(`data: ${JSON.stringify({ type: 'error', error: errorMsg })}\n\n`)
|
||||
|
||||
105
src/utils/statsHelper.js
Normal file
105
src/utils/statsHelper.js
Normal file
@@ -0,0 +1,105 @@
|
||||
/**
|
||||
* 统计计算工具函数
|
||||
* 提供百分位数计算、等待时间统计等通用统计功能
|
||||
*/
|
||||
|
||||
/**
|
||||
* 计算百分位数(使用 nearest-rank 方法)
|
||||
* @param {number[]} sortedArray - 已排序的数组(升序)
|
||||
* @param {number} percentile - 百分位数 (0-100)
|
||||
* @returns {number} 百分位值
|
||||
*
|
||||
* 边界情况说明:
|
||||
* - percentile=0: 返回最小值 (index=0)
|
||||
* - percentile=100: 返回最大值 (index=len-1)
|
||||
* - percentile=50 且 len=2: 返回第一个元素(nearest-rank 向下取)
|
||||
*
|
||||
* 算法说明(nearest-rank 方法):
|
||||
* - index = ceil(percentile / 100 * len) - 1
|
||||
* - 示例:len=100, P50 → ceil(50) - 1 = 49(第50个元素,0-indexed)
|
||||
* - 示例:len=100, P99 → ceil(99) - 1 = 98(第99个元素)
|
||||
*/
|
||||
function getPercentile(sortedArray, percentile) {
|
||||
const len = sortedArray.length
|
||||
if (len === 0) {
|
||||
return 0
|
||||
}
|
||||
if (len === 1) {
|
||||
return sortedArray[0]
|
||||
}
|
||||
|
||||
// 边界处理:percentile <= 0 返回最小值
|
||||
if (percentile <= 0) {
|
||||
return sortedArray[0]
|
||||
}
|
||||
// 边界处理:percentile >= 100 返回最大值
|
||||
if (percentile >= 100) {
|
||||
return sortedArray[len - 1]
|
||||
}
|
||||
|
||||
const index = Math.ceil((percentile / 100) * len) - 1
|
||||
return sortedArray[index]
|
||||
}
|
||||
|
||||
/**
|
||||
* 计算等待时间分布统计
|
||||
* @param {number[]} waitTimes - 等待时间数组(无需预先排序)
|
||||
* @returns {Object|null} 统计对象,空数组返回 null
|
||||
*
|
||||
* 返回对象包含:
|
||||
* - sampleCount: 样本数量(始终包含,便于调用方判断可靠性)
|
||||
* - count: 样本数量(向后兼容)
|
||||
* - min: 最小值
|
||||
* - max: 最大值
|
||||
* - avg: 平均值(四舍五入)
|
||||
* - p50: 50百分位数(中位数)
|
||||
* - p90: 90百分位数
|
||||
* - p99: 99百分位数
|
||||
* - sampleSizeWarning: 样本量不足时的警告信息(样本 < 10)
|
||||
* - p90Unreliable: P90 统计不可靠标记(样本 < 10)
|
||||
* - p99Unreliable: P99 统计不可靠标记(样本 < 100)
|
||||
*
|
||||
* 可靠性标记说明(详见 design.md Decision 6):
|
||||
* - 样本 < 10: P90 和 P99 都不可靠
|
||||
* - 样本 < 100: P99 不可靠(P90 需要 10 个样本,P99 需要 100 个样本)
|
||||
* - 即使标记为不可靠,仍返回计算值供参考
|
||||
*/
|
||||
function calculateWaitTimeStats(waitTimes) {
|
||||
if (!waitTimes || waitTimes.length === 0) {
|
||||
return null
|
||||
}
|
||||
|
||||
const sorted = [...waitTimes].sort((a, b) => a - b)
|
||||
const sum = sorted.reduce((a, b) => a + b, 0)
|
||||
const len = sorted.length
|
||||
|
||||
const stats = {
|
||||
sampleCount: len, // 新增:始终包含样本数
|
||||
count: len, // 向后兼容
|
||||
min: sorted[0],
|
||||
max: sorted[len - 1],
|
||||
avg: Math.round(sum / len),
|
||||
p50: getPercentile(sorted, 50),
|
||||
p90: getPercentile(sorted, 90),
|
||||
p99: getPercentile(sorted, 99)
|
||||
}
|
||||
|
||||
// 渐进式可靠性标记(详见 design.md Decision 6)
|
||||
// 样本 < 10: P90 不可靠(P90 至少需要 ceil(100/10) = 10 个样本)
|
||||
if (len < 10) {
|
||||
stats.sampleSizeWarning = 'Results may be inaccurate due to small sample size'
|
||||
stats.p90Unreliable = true
|
||||
}
|
||||
|
||||
// 样本 < 100: P99 不可靠(P99 至少需要 ceil(100/1) = 100 个样本)
|
||||
if (len < 100) {
|
||||
stats.p99Unreliable = true
|
||||
}
|
||||
|
||||
return stats
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
getPercentile,
|
||||
calculateWaitTimeStats
|
||||
}
|
||||
36
src/utils/streamHelper.js
Normal file
36
src/utils/streamHelper.js
Normal file
@@ -0,0 +1,36 @@
|
||||
/**
|
||||
* Stream Helper Utilities
|
||||
* 流处理辅助工具函数
|
||||
*/
|
||||
|
||||
/**
|
||||
* 检查响应流是否仍然可写(客户端连接是否有效)
|
||||
* @param {import('http').ServerResponse} stream - HTTP响应流
|
||||
* @returns {boolean} 如果流可写返回true,否则返回false
|
||||
*/
|
||||
function isStreamWritable(stream) {
|
||||
if (!stream) {
|
||||
return false
|
||||
}
|
||||
|
||||
// 检查流是否已销毁
|
||||
if (stream.destroyed) {
|
||||
return false
|
||||
}
|
||||
|
||||
// 检查底层socket是否已销毁
|
||||
if (stream.socket?.destroyed) {
|
||||
return false
|
||||
}
|
||||
|
||||
// 检查流是否已结束写入
|
||||
if (stream.writableEnded) {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
isStreamWritable
|
||||
}
|
||||
Reference in New Issue
Block a user