mirror of
https://github.com/Wei-Shaw/claude-relay-service.git
synced 2026-01-22 16:38:14 +00:00
Merge pull request #789 from DaydreamCoding/feature/user-message-queue-optimize [skip ci]
feat(queue): 优化用户消息队列锁释放时机
This commit is contained in:
@@ -186,9 +186,10 @@ npm run service:stop # 停止服务
|
||||
- `CLAUDE_OVERLOAD_HANDLING_MINUTES`: Claude 529错误处理持续时间(分钟,0表示禁用)
|
||||
- `STICKY_SESSION_TTL_HOURS`: 粘性会话TTL(小时,默认1)
|
||||
- `STICKY_SESSION_RENEWAL_THRESHOLD_MINUTES`: 粘性会话续期阈值(分钟,默认0)
|
||||
- `USER_MESSAGE_QUEUE_ENABLED`: 启用用户消息串行队列(默认true)
|
||||
- `USER_MESSAGE_QUEUE_ENABLED`: 启用用户消息串行队列(默认false)
|
||||
- `USER_MESSAGE_QUEUE_DELAY_MS`: 用户消息请求间隔(毫秒,默认200)
|
||||
- `USER_MESSAGE_QUEUE_TIMEOUT_MS`: 队列等待超时(毫秒,默认30000)
|
||||
- `USER_MESSAGE_QUEUE_TIMEOUT_MS`: 队列等待超时(毫秒,默认5000,锁持有时间短无需长等待)
|
||||
- `USER_MESSAGE_QUEUE_LOCK_TTL_MS`: 锁TTL(毫秒,默认5000,请求发送后立即释放无需长TTL)
|
||||
- `METRICS_WINDOW`: 实时指标统计窗口(分钟,1-60,默认5)
|
||||
- `MAX_API_KEYS_PER_USER`: 每用户最大API Key数量(默认1)
|
||||
- `ALLOW_USER_DELETE_API_KEYS`: 允许用户删除自己的API Keys(默认false)
|
||||
@@ -341,7 +342,7 @@ npm run setup # 自动生成密钥并创建管理员账户
|
||||
11. **速率限制未清理**: rateLimitCleanupService每5分钟自动清理过期限流状态
|
||||
12. **成本统计不准确**: 运行 `npm run init:costs` 初始化成本数据,检查pricingService是否正确加载模型价格
|
||||
13. **缓存命中率低**: 查看缓存监控统计,调整LRU缓存大小配置
|
||||
14. **用户消息队列超时**: 检查 `USER_MESSAGE_QUEUE_TIMEOUT_MS` 配置是否合理,查看日志中的 `queue_timeout` 错误,可通过 Web 界面或 `USER_MESSAGE_QUEUE_ENABLED=false` 禁用此功能
|
||||
14. **用户消息队列超时**: 优化后锁持有时间已从分钟级降到毫秒级(请求发送后立即释放),默认 `USER_MESSAGE_QUEUE_TIMEOUT_MS=5000` 已足够。如仍有超时,检查网络延迟或禁用此功能(`USER_MESSAGE_QUEUE_ENABLED=false`)
|
||||
|
||||
### 调试工具
|
||||
|
||||
|
||||
@@ -206,11 +206,12 @@ const config = {
|
||||
},
|
||||
|
||||
// 📬 用户消息队列配置
|
||||
// 优化说明:锁在请求发送成功后立即释放(而非请求完成后),因为 Claude API 限流基于请求发送时刻计算
|
||||
userMessageQueue: {
|
||||
enabled: process.env.USER_MESSAGE_QUEUE_ENABLED === 'true', // 默认关闭
|
||||
delayMs: parseInt(process.env.USER_MESSAGE_QUEUE_DELAY_MS) || 100, // 请求间隔(毫秒)
|
||||
timeoutMs: parseInt(process.env.USER_MESSAGE_QUEUE_TIMEOUT_MS) || 60000, // 队列等待超时(毫秒)
|
||||
lockTtlMs: 120000 // 锁租约TTL(毫秒),会在请求期间自动续租以防死锁
|
||||
delayMs: parseInt(process.env.USER_MESSAGE_QUEUE_DELAY_MS) || 200, // 请求间隔(毫秒)
|
||||
timeoutMs: parseInt(process.env.USER_MESSAGE_QUEUE_TIMEOUT_MS) || 5000, // 队列等待超时(毫秒),锁持有时间短,无需长等待
|
||||
lockTtlMs: parseInt(process.env.USER_MESSAGE_QUEUE_LOCK_TTL_MS) || 5000 // 锁TTL(毫秒),5秒足以覆盖请求发送
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -669,10 +669,9 @@ class Application {
|
||||
logger.error('❌ Error stopping rate limit cleanup service:', error)
|
||||
}
|
||||
|
||||
// 停止用户消息队列清理服务和续租定时器
|
||||
// 停止用户消息队列清理服务
|
||||
try {
|
||||
const userMessageQueueService = require('./services/userMessageQueueService')
|
||||
userMessageQueueService.stopAllRenewalTimers()
|
||||
userMessageQueueService.stopCleanupTask()
|
||||
logger.info('📬 User message queue service stopped')
|
||||
} catch (error) {
|
||||
|
||||
@@ -2626,38 +2626,6 @@ redisClient.acquireUserMessageLock = async function (accountId, requestId, lockT
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 续租用户消息队列锁(仅锁持有者可续租)
|
||||
* @param {string} accountId - 账户ID
|
||||
* @param {string} requestId - 请求ID
|
||||
* @param {number} lockTtlMs - 锁 TTL(毫秒)
|
||||
* @returns {Promise<boolean>} 是否续租成功(只有锁持有者才能续租)
|
||||
*/
|
||||
redisClient.refreshUserMessageLock = async function (accountId, requestId, lockTtlMs) {
|
||||
const lockKey = `user_msg_queue_lock:${accountId}`
|
||||
|
||||
const script = `
|
||||
local lockKey = KEYS[1]
|
||||
local requestId = ARGV[1]
|
||||
local lockTtl = tonumber(ARGV[2])
|
||||
|
||||
local currentLock = redis.call('GET', lockKey)
|
||||
if currentLock == requestId then
|
||||
redis.call('PEXPIRE', lockKey, lockTtl)
|
||||
return 1
|
||||
end
|
||||
return 0
|
||||
`
|
||||
|
||||
try {
|
||||
const result = await this.client.eval(script, 1, lockKey, requestId, lockTtlMs)
|
||||
return result === 1
|
||||
} catch (error) {
|
||||
logger.error(`Failed to refresh user message lock for account ${accountId}:`, error)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 释放用户消息队列锁并记录完成时间
|
||||
* @param {string} accountId - 账户ID
|
||||
|
||||
@@ -73,7 +73,6 @@ class BedrockRelayService {
|
||||
const accountId = bedrockAccount?.id
|
||||
let queueLockAcquired = false
|
||||
let queueRequestId = null
|
||||
let queueLockRenewalStopper = null
|
||||
|
||||
try {
|
||||
// 📬 用户消息队列处理
|
||||
@@ -127,9 +126,8 @@ class BedrockRelayService {
|
||||
if (queueResult.acquired && !queueResult.skipped) {
|
||||
queueLockAcquired = true
|
||||
queueRequestId = queueResult.requestId
|
||||
queueLockRenewalStopper = await userMessageQueueService.startLockRenewal(
|
||||
accountId,
|
||||
queueRequestId
|
||||
logger.debug(
|
||||
`📬 User message queue lock acquired for Bedrock account ${accountId}, requestId: ${queueRequestId}`
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -154,6 +152,23 @@ class BedrockRelayService {
|
||||
const response = await client.send(command)
|
||||
const duration = Date.now() - startTime
|
||||
|
||||
// 📬 请求已发送成功,立即释放队列锁(无需等待响应处理完成)
|
||||
// 因为限流基于请求发送时刻计算(RPM),不是请求完成时刻
|
||||
if (queueLockAcquired && queueRequestId && accountId) {
|
||||
try {
|
||||
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
|
||||
queueLockAcquired = false // 标记已释放,防止 finally 重复释放
|
||||
logger.debug(
|
||||
`📬 User message queue lock released early for Bedrock account ${accountId}, requestId: ${queueRequestId}`
|
||||
)
|
||||
} catch (releaseError) {
|
||||
logger.error(
|
||||
`❌ Failed to release user message queue lock early for Bedrock account ${accountId}:`,
|
||||
releaseError.message
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// 解析响应
|
||||
const responseBody = JSON.parse(new TextDecoder().decode(response.body))
|
||||
const claudeResponse = this._convertFromBedrockFormat(responseBody)
|
||||
@@ -171,13 +186,13 @@ class BedrockRelayService {
|
||||
logger.error('❌ Bedrock非流式请求失败:', error)
|
||||
throw this._handleBedrockError(error)
|
||||
} finally {
|
||||
// 📬 释放用户消息队列锁
|
||||
// 📬 释放用户消息队列锁(兜底,正常情况下已在请求发送后提前释放)
|
||||
if (queueLockAcquired && queueRequestId && accountId) {
|
||||
try {
|
||||
if (queueLockRenewalStopper) {
|
||||
queueLockRenewalStopper()
|
||||
}
|
||||
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
|
||||
logger.debug(
|
||||
`📬 User message queue lock released in finally for Bedrock account ${accountId}, requestId: ${queueRequestId}`
|
||||
)
|
||||
} catch (releaseError) {
|
||||
logger.error(
|
||||
`❌ Failed to release user message queue lock for Bedrock account ${accountId}:`,
|
||||
@@ -193,7 +208,6 @@ class BedrockRelayService {
|
||||
const accountId = bedrockAccount?.id
|
||||
let queueLockAcquired = false
|
||||
let queueRequestId = null
|
||||
let queueLockRenewalStopper = null
|
||||
|
||||
try {
|
||||
// 📬 用户消息队列处理
|
||||
@@ -252,9 +266,8 @@ class BedrockRelayService {
|
||||
if (queueResult.acquired && !queueResult.skipped) {
|
||||
queueLockAcquired = true
|
||||
queueRequestId = queueResult.requestId
|
||||
queueLockRenewalStopper = await userMessageQueueService.startLockRenewal(
|
||||
accountId,
|
||||
queueRequestId
|
||||
logger.debug(
|
||||
`📬 User message queue lock acquired for Bedrock account ${accountId} (stream), requestId: ${queueRequestId}`
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -278,6 +291,23 @@ class BedrockRelayService {
|
||||
const startTime = Date.now()
|
||||
const response = await client.send(command)
|
||||
|
||||
// 📬 请求已发送成功,立即释放队列锁(无需等待响应处理完成)
|
||||
// 因为限流基于请求发送时刻计算(RPM),不是请求完成时刻
|
||||
if (queueLockAcquired && queueRequestId && accountId) {
|
||||
try {
|
||||
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
|
||||
queueLockAcquired = false // 标记已释放,防止 finally 重复释放
|
||||
logger.debug(
|
||||
`📬 User message queue lock released early for Bedrock stream account ${accountId}, requestId: ${queueRequestId}`
|
||||
)
|
||||
} catch (releaseError) {
|
||||
logger.error(
|
||||
`❌ Failed to release user message queue lock early for Bedrock stream account ${accountId}:`,
|
||||
releaseError.message
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// 设置SSE响应头
|
||||
res.writeHead(200, {
|
||||
'Content-Type': 'text/event-stream',
|
||||
@@ -339,13 +369,13 @@ class BedrockRelayService {
|
||||
|
||||
throw this._handleBedrockError(error)
|
||||
} finally {
|
||||
// 📬 释放用户消息队列锁
|
||||
// 📬 释放用户消息队列锁(兜底,正常情况下已在请求发送后提前释放)
|
||||
if (queueLockAcquired && queueRequestId && accountId) {
|
||||
try {
|
||||
if (queueLockRenewalStopper) {
|
||||
queueLockRenewalStopper()
|
||||
}
|
||||
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
|
||||
logger.debug(
|
||||
`📬 User message queue lock released in finally for Bedrock stream account ${accountId}, requestId: ${queueRequestId}`
|
||||
)
|
||||
} catch (releaseError) {
|
||||
logger.error(
|
||||
`❌ Failed to release user message queue lock for Bedrock stream account ${accountId}:`,
|
||||
|
||||
@@ -24,7 +24,6 @@ class CcrRelayService {
|
||||
let account = null
|
||||
let queueLockAcquired = false
|
||||
let queueRequestId = null
|
||||
let queueLockRenewalStopper = null
|
||||
|
||||
try {
|
||||
// 📬 用户消息队列处理
|
||||
@@ -78,9 +77,8 @@ class CcrRelayService {
|
||||
if (queueResult.acquired && !queueResult.skipped) {
|
||||
queueLockAcquired = true
|
||||
queueRequestId = queueResult.requestId
|
||||
queueLockRenewalStopper = await userMessageQueueService.startLockRenewal(
|
||||
accountId,
|
||||
queueRequestId
|
||||
logger.debug(
|
||||
`📬 User message queue lock acquired for CCR account ${accountId}, requestId: ${queueRequestId}`
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -224,6 +222,23 @@ class CcrRelayService {
|
||||
)
|
||||
const response = await axios(requestConfig)
|
||||
|
||||
// 📬 请求已发送成功,立即释放队列锁(无需等待响应处理完成)
|
||||
// 因为 Claude API 限流基于请求发送时刻计算(RPM),不是请求完成时刻
|
||||
if (queueLockAcquired && queueRequestId && accountId) {
|
||||
try {
|
||||
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
|
||||
queueLockAcquired = false // 标记已释放,防止 finally 重复释放
|
||||
logger.debug(
|
||||
`📬 User message queue lock released early for CCR account ${accountId}, requestId: ${queueRequestId}`
|
||||
)
|
||||
} catch (releaseError) {
|
||||
logger.error(
|
||||
`❌ Failed to release user message queue lock early for CCR account ${accountId}:`,
|
||||
releaseError.message
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// 移除监听器(请求成功完成)
|
||||
if (clientRequest) {
|
||||
clientRequest.removeListener('close', handleClientDisconnect)
|
||||
@@ -296,13 +311,13 @@ class CcrRelayService {
|
||||
|
||||
throw error
|
||||
} finally {
|
||||
// 📬 释放用户消息队列锁
|
||||
// 📬 释放用户消息队列锁(兜底,正常情况下已在请求发送后提前释放)
|
||||
if (queueLockAcquired && queueRequestId && accountId) {
|
||||
try {
|
||||
if (queueLockRenewalStopper) {
|
||||
queueLockRenewalStopper()
|
||||
}
|
||||
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
|
||||
logger.debug(
|
||||
`📬 User message queue lock released in finally for CCR account ${accountId}, requestId: ${queueRequestId}`
|
||||
)
|
||||
} catch (releaseError) {
|
||||
logger.error(
|
||||
`❌ Failed to release user message queue lock for CCR account ${accountId}:`,
|
||||
@@ -327,7 +342,6 @@ class CcrRelayService {
|
||||
let account = null
|
||||
let queueLockAcquired = false
|
||||
let queueRequestId = null
|
||||
let queueLockRenewalStopper = null
|
||||
|
||||
try {
|
||||
// 📬 用户消息队列处理
|
||||
@@ -388,9 +402,8 @@ class CcrRelayService {
|
||||
if (queueResult.acquired && !queueResult.skipped) {
|
||||
queueLockAcquired = true
|
||||
queueRequestId = queueResult.requestId
|
||||
queueLockRenewalStopper = await userMessageQueueService.startLockRenewal(
|
||||
accountId,
|
||||
queueRequestId
|
||||
logger.debug(
|
||||
`📬 User message queue lock acquired for CCR account ${accountId} (stream), requestId: ${queueRequestId}`
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -442,7 +455,24 @@ class CcrRelayService {
|
||||
accountId,
|
||||
usageCallback,
|
||||
streamTransformer,
|
||||
options
|
||||
options,
|
||||
// 📬 回调:在收到响应头时释放队列锁
|
||||
async () => {
|
||||
if (queueLockAcquired && queueRequestId && accountId) {
|
||||
try {
|
||||
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
|
||||
queueLockAcquired = false // 标记已释放,防止 finally 重复释放
|
||||
logger.debug(
|
||||
`📬 User message queue lock released early for CCR stream account ${accountId}, requestId: ${queueRequestId}`
|
||||
)
|
||||
} catch (releaseError) {
|
||||
logger.error(
|
||||
`❌ Failed to release user message queue lock early for CCR stream account ${accountId}:`,
|
||||
releaseError.message
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
// 更新最后使用时间
|
||||
@@ -451,13 +481,13 @@ class CcrRelayService {
|
||||
logger.error(`❌ CCR stream relay failed (Account: ${account?.name || accountId}):`, error)
|
||||
throw error
|
||||
} finally {
|
||||
// 📬 释放用户消息队列锁
|
||||
// 📬 释放用户消息队列锁(兜底,正常情况下已在收到响应头后提前释放)
|
||||
if (queueLockAcquired && queueRequestId && accountId) {
|
||||
try {
|
||||
if (queueLockRenewalStopper) {
|
||||
queueLockRenewalStopper()
|
||||
}
|
||||
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
|
||||
logger.debug(
|
||||
`📬 User message queue lock released in finally for CCR stream account ${accountId}, requestId: ${queueRequestId}`
|
||||
)
|
||||
} catch (releaseError) {
|
||||
logger.error(
|
||||
`❌ Failed to release user message queue lock for CCR stream account ${accountId}:`,
|
||||
@@ -478,7 +508,8 @@ class CcrRelayService {
|
||||
accountId,
|
||||
usageCallback,
|
||||
streamTransformer = null,
|
||||
requestOptions = {}
|
||||
requestOptions = {},
|
||||
onResponseHeaderReceived = null
|
||||
) {
|
||||
return new Promise((resolve, reject) => {
|
||||
let aborted = false
|
||||
@@ -541,8 +572,11 @@ class CcrRelayService {
|
||||
// 发送请求
|
||||
const request = axios(requestConfig)
|
||||
|
||||
// 注意:使用 .then(async ...) 模式处理响应
|
||||
// - 内部的 releaseQueueLock 有独立的 try-catch,不会导致未捕获异常
|
||||
// - queueLockAcquired = false 的赋值会在 finally 执行前完成(JS 单线程保证)
|
||||
request
|
||||
.then((response) => {
|
||||
.then(async (response) => {
|
||||
logger.debug(`🌊 CCR stream response status: ${response.status}`)
|
||||
|
||||
// 错误响应处理
|
||||
@@ -592,6 +626,19 @@ class CcrRelayService {
|
||||
return
|
||||
}
|
||||
|
||||
// 📬 收到成功响应头(HTTP 200),调用回调释放队列锁
|
||||
// 此时请求已被 Claude API 接受并计入 RPM 配额,无需等待响应完成
|
||||
if (onResponseHeaderReceived && typeof onResponseHeaderReceived === 'function') {
|
||||
try {
|
||||
await onResponseHeaderReceived()
|
||||
} catch (callbackError) {
|
||||
logger.error(
|
||||
`❌ Failed to execute onResponseHeaderReceived callback for CCR stream account ${accountId}:`,
|
||||
callbackError.message
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// 成功响应,检查并移除错误状态
|
||||
ccrAccountService.isAccountRateLimited(accountId).then((isRateLimited) => {
|
||||
if (isRateLimited) {
|
||||
|
||||
@@ -32,7 +32,6 @@ class ClaudeConsoleRelayService {
|
||||
let concurrencyAcquired = false
|
||||
let queueLockAcquired = false
|
||||
let queueRequestId = null
|
||||
let queueLockRenewalStopper = null
|
||||
|
||||
try {
|
||||
// 📬 用户消息队列处理:如果是用户消息请求,需要获取队列锁
|
||||
@@ -87,10 +86,6 @@ class ClaudeConsoleRelayService {
|
||||
if (queueResult.acquired && !queueResult.skipped) {
|
||||
queueLockAcquired = true
|
||||
queueRequestId = queueResult.requestId
|
||||
queueLockRenewalStopper = await userMessageQueueService.startLockRenewal(
|
||||
accountId,
|
||||
queueRequestId
|
||||
)
|
||||
logger.debug(
|
||||
`📬 User message queue lock acquired for console account ${accountId}, requestId: ${queueRequestId}`
|
||||
)
|
||||
@@ -269,6 +264,23 @@ class ClaudeConsoleRelayService {
|
||||
)
|
||||
const response = await axios(requestConfig)
|
||||
|
||||
// 📬 请求已发送成功,立即释放队列锁(无需等待响应处理完成)
|
||||
// 因为 Claude API 限流基于请求发送时刻计算(RPM),不是请求完成时刻
|
||||
if (queueLockAcquired && queueRequestId && accountId) {
|
||||
try {
|
||||
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
|
||||
queueLockAcquired = false // 标记已释放,防止 finally 重复释放
|
||||
logger.debug(
|
||||
`📬 User message queue lock released early for console account ${accountId}, requestId: ${queueRequestId}`
|
||||
)
|
||||
} catch (releaseError) {
|
||||
logger.error(
|
||||
`❌ Failed to release user message queue lock early for console account ${accountId}:`,
|
||||
releaseError.message
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// 移除监听器(请求成功完成)
|
||||
if (clientRequest) {
|
||||
clientRequest.removeListener('close', handleClientDisconnect)
|
||||
@@ -433,13 +445,13 @@ class ClaudeConsoleRelayService {
|
||||
}
|
||||
}
|
||||
|
||||
// 📬 释放用户消息队列锁
|
||||
// 📬 释放用户消息队列锁(兜底,正常情况下已在请求发送后提前释放)
|
||||
if (queueLockAcquired && queueRequestId && accountId) {
|
||||
try {
|
||||
if (queueLockRenewalStopper) {
|
||||
queueLockRenewalStopper()
|
||||
}
|
||||
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
|
||||
logger.debug(
|
||||
`📬 User message queue lock released in finally for console account ${accountId}, requestId: ${queueRequestId}`
|
||||
)
|
||||
} catch (releaseError) {
|
||||
logger.error(
|
||||
`❌ Failed to release user message queue lock for account ${accountId}:`,
|
||||
@@ -467,7 +479,6 @@ class ClaudeConsoleRelayService {
|
||||
let leaseRefreshInterval = null // 租约刷新定时器
|
||||
let queueLockAcquired = false
|
||||
let queueRequestId = null
|
||||
let queueLockRenewalStopper = null
|
||||
|
||||
try {
|
||||
// 📬 用户消息队列处理:如果是用户消息请求,需要获取队列锁
|
||||
@@ -522,10 +533,6 @@ class ClaudeConsoleRelayService {
|
||||
if (queueResult.acquired && !queueResult.skipped) {
|
||||
queueLockAcquired = true
|
||||
queueRequestId = queueResult.requestId
|
||||
queueLockRenewalStopper = await userMessageQueueService.startLockRenewal(
|
||||
accountId,
|
||||
queueRequestId
|
||||
)
|
||||
logger.debug(
|
||||
`📬 User message queue lock acquired for console account ${accountId} (stream), requestId: ${queueRequestId}`
|
||||
)
|
||||
@@ -629,7 +636,24 @@ class ClaudeConsoleRelayService {
|
||||
accountId,
|
||||
usageCallback,
|
||||
streamTransformer,
|
||||
options
|
||||
options,
|
||||
// 📬 回调:在收到响应头时释放队列锁
|
||||
async () => {
|
||||
if (queueLockAcquired && queueRequestId && accountId) {
|
||||
try {
|
||||
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
|
||||
queueLockAcquired = false // 标记已释放,防止 finally 重复释放
|
||||
logger.debug(
|
||||
`📬 User message queue lock released early for console stream account ${accountId}, requestId: ${queueRequestId}`
|
||||
)
|
||||
} catch (releaseError) {
|
||||
logger.error(
|
||||
`❌ Failed to release user message queue lock early for console stream account ${accountId}:`,
|
||||
releaseError.message
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
// 更新最后使用时间
|
||||
@@ -664,13 +688,13 @@ class ClaudeConsoleRelayService {
|
||||
}
|
||||
}
|
||||
|
||||
// 📬 释放用户消息队列锁
|
||||
// 📬 释放用户消息队列锁(兜底,正常情况下已在收到响应头后提前释放)
|
||||
if (queueLockAcquired && queueRequestId && accountId) {
|
||||
try {
|
||||
if (queueLockRenewalStopper) {
|
||||
queueLockRenewalStopper()
|
||||
}
|
||||
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
|
||||
logger.debug(
|
||||
`📬 User message queue lock released in finally for console stream account ${accountId}, requestId: ${queueRequestId}`
|
||||
)
|
||||
} catch (releaseError) {
|
||||
logger.error(
|
||||
`❌ Failed to release user message queue lock for stream account ${accountId}:`,
|
||||
@@ -691,7 +715,8 @@ class ClaudeConsoleRelayService {
|
||||
accountId,
|
||||
usageCallback,
|
||||
streamTransformer = null,
|
||||
requestOptions = {}
|
||||
requestOptions = {},
|
||||
onResponseHeaderReceived = null
|
||||
) {
|
||||
return new Promise((resolve, reject) => {
|
||||
let aborted = false
|
||||
@@ -754,8 +779,11 @@ class ClaudeConsoleRelayService {
|
||||
// 发送请求
|
||||
const request = axios(requestConfig)
|
||||
|
||||
// 注意:使用 .then(async ...) 模式处理响应
|
||||
// - 内部的 releaseQueueLock 有独立的 try-catch,不会导致未捕获异常
|
||||
// - queueLockAcquired = false 的赋值会在 finally 执行前完成(JS 单线程保证)
|
||||
request
|
||||
.then((response) => {
|
||||
.then(async (response) => {
|
||||
logger.debug(`🌊 Claude Console Claude stream response status: ${response.status}`)
|
||||
|
||||
// 错误响应处理
|
||||
@@ -862,6 +890,19 @@ class ClaudeConsoleRelayService {
|
||||
return
|
||||
}
|
||||
|
||||
// 📬 收到成功响应头(HTTP 200),调用回调释放队列锁
|
||||
// 此时请求已被 Claude API 接受并计入 RPM 配额,无需等待响应完成
|
||||
if (onResponseHeaderReceived && typeof onResponseHeaderReceived === 'function') {
|
||||
try {
|
||||
await onResponseHeaderReceived()
|
||||
} catch (callbackError) {
|
||||
logger.error(
|
||||
`❌ Failed to execute onResponseHeaderReceived callback for console stream account ${accountId}:`,
|
||||
callbackError.message
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// 成功响应,检查并移除错误状态
|
||||
claudeConsoleAccountService.isAccountRateLimited(accountId).then((isRateLimited) => {
|
||||
if (isRateLimited) {
|
||||
|
||||
@@ -17,8 +17,9 @@ const DEFAULT_CONFIG = {
|
||||
sessionBindingTtlDays: 30, // 会话绑定 TTL(天),默认30天
|
||||
// 用户消息队列配置
|
||||
userMessageQueueEnabled: false, // 是否启用用户消息队列(默认关闭)
|
||||
userMessageQueueDelayMs: 100, // 请求间隔(毫秒)
|
||||
userMessageQueueTimeoutMs: 60000, // 队列超时(毫秒)
|
||||
userMessageQueueDelayMs: 200, // 请求间隔(毫秒)
|
||||
userMessageQueueTimeoutMs: 5000, // 队列等待超时(毫秒),优化后锁持有时间短无需长等待
|
||||
userMessageQueueLockTtlMs: 5000, // 锁TTL(毫秒),请求发送后立即释放无需长TTL
|
||||
updatedAt: null,
|
||||
updatedBy: null
|
||||
}
|
||||
@@ -320,11 +321,11 @@ class ClaudeRelayConfigService {
|
||||
|
||||
/**
|
||||
* 验证新会话请求
|
||||
* @param {Object} requestBody - 请求体
|
||||
* @param {Object} _requestBody - 请求体(预留参数,当前未使用)
|
||||
* @param {string} originalSessionId - 原始会话ID
|
||||
* @returns {Promise<Object>} { valid: boolean, error?: string, binding?: object, isNewSession?: boolean }
|
||||
*/
|
||||
async validateNewSession(requestBody, originalSessionId) {
|
||||
async validateNewSession(_requestBody, originalSessionId) {
|
||||
const cfg = await this.getConfig()
|
||||
|
||||
if (!cfg.globalSessionBindingEnabled) {
|
||||
|
||||
@@ -151,7 +151,6 @@ class ClaudeRelayService {
|
||||
let upstreamRequest = null
|
||||
let queueLockAcquired = false
|
||||
let queueRequestId = null
|
||||
let queueLockRenewalStopper = null
|
||||
let selectedAccountId = null
|
||||
|
||||
try {
|
||||
@@ -255,10 +254,6 @@ class ClaudeRelayService {
|
||||
if (queueResult.acquired && !queueResult.skipped) {
|
||||
queueLockAcquired = true
|
||||
queueRequestId = queueResult.requestId
|
||||
queueLockRenewalStopper = await userMessageQueueService.startLockRenewal(
|
||||
accountId,
|
||||
queueRequestId
|
||||
)
|
||||
logger.debug(
|
||||
`📬 User message queue lock acquired for account ${accountId}, requestId: ${queueRequestId}`
|
||||
)
|
||||
@@ -339,6 +334,23 @@ class ClaudeRelayService {
|
||||
options
|
||||
)
|
||||
|
||||
// 📬 请求已发送成功,立即释放队列锁(无需等待响应处理完成)
|
||||
// 因为 Claude API 限流基于请求发送时刻计算(RPM),不是请求完成时刻
|
||||
if (queueLockAcquired && queueRequestId && selectedAccountId) {
|
||||
try {
|
||||
await userMessageQueueService.releaseQueueLock(selectedAccountId, queueRequestId)
|
||||
queueLockAcquired = false // 标记已释放,防止 finally 重复释放
|
||||
logger.debug(
|
||||
`📬 User message queue lock released early for account ${selectedAccountId}, requestId: ${queueRequestId}`
|
||||
)
|
||||
} catch (releaseError) {
|
||||
logger.error(
|
||||
`❌ Failed to release user message queue lock early for account ${selectedAccountId}:`,
|
||||
releaseError.message
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
response.accountId = accountId
|
||||
response.accountType = accountType
|
||||
|
||||
@@ -608,13 +620,13 @@ class ClaudeRelayService {
|
||||
)
|
||||
throw error
|
||||
} finally {
|
||||
// 📬 释放用户消息队列锁
|
||||
// 📬 释放用户消息队列锁(兜底,正常情况下已在请求发送后提前释放)
|
||||
if (queueLockAcquired && queueRequestId && selectedAccountId) {
|
||||
try {
|
||||
if (queueLockRenewalStopper) {
|
||||
queueLockRenewalStopper()
|
||||
}
|
||||
await userMessageQueueService.releaseQueueLock(selectedAccountId, queueRequestId)
|
||||
logger.debug(
|
||||
`📬 User message queue lock released in finally for account ${selectedAccountId}, requestId: ${queueRequestId}`
|
||||
)
|
||||
} catch (releaseError) {
|
||||
logger.error(
|
||||
`❌ Failed to release user message queue lock for account ${selectedAccountId}:`,
|
||||
@@ -1245,7 +1257,6 @@ class ClaudeRelayService {
|
||||
) {
|
||||
let queueLockAcquired = false
|
||||
let queueRequestId = null
|
||||
let queueLockRenewalStopper = null
|
||||
let selectedAccountId = null
|
||||
|
||||
try {
|
||||
@@ -1350,10 +1361,6 @@ class ClaudeRelayService {
|
||||
if (queueResult.acquired && !queueResult.skipped) {
|
||||
queueLockAcquired = true
|
||||
queueRequestId = queueResult.requestId
|
||||
queueLockRenewalStopper = await userMessageQueueService.startLockRenewal(
|
||||
accountId,
|
||||
queueRequestId
|
||||
)
|
||||
logger.debug(
|
||||
`📬 User message queue lock acquired for account ${accountId} (stream), requestId: ${queueRequestId}`
|
||||
)
|
||||
@@ -1425,19 +1432,36 @@ class ClaudeRelayService {
|
||||
sessionHash,
|
||||
streamTransformer,
|
||||
options,
|
||||
isDedicatedOfficialAccount
|
||||
isDedicatedOfficialAccount,
|
||||
// 📬 新增回调:在收到响应头时释放队列锁
|
||||
async () => {
|
||||
if (queueLockAcquired && queueRequestId && selectedAccountId) {
|
||||
try {
|
||||
await userMessageQueueService.releaseQueueLock(selectedAccountId, queueRequestId)
|
||||
queueLockAcquired = false // 标记已释放,防止 finally 重复释放
|
||||
logger.debug(
|
||||
`📬 User message queue lock released early for stream account ${selectedAccountId}, requestId: ${queueRequestId}`
|
||||
)
|
||||
} catch (releaseError) {
|
||||
logger.error(
|
||||
`❌ Failed to release user message queue lock early for stream account ${selectedAccountId}:`,
|
||||
releaseError.message
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
} catch (error) {
|
||||
logger.error(`❌ Claude stream relay with usage capture failed:`, error)
|
||||
throw error
|
||||
} finally {
|
||||
// 📬 释放用户消息队列锁
|
||||
// 📬 释放用户消息队列锁(兜底,正常情况下已在收到响应头后提前释放)
|
||||
if (queueLockAcquired && queueRequestId && selectedAccountId) {
|
||||
try {
|
||||
if (queueLockRenewalStopper) {
|
||||
queueLockRenewalStopper()
|
||||
}
|
||||
await userMessageQueueService.releaseQueueLock(selectedAccountId, queueRequestId)
|
||||
logger.debug(
|
||||
`📬 User message queue lock released in finally for stream account ${selectedAccountId}, requestId: ${queueRequestId}`
|
||||
)
|
||||
} catch (releaseError) {
|
||||
logger.error(
|
||||
`❌ Failed to release user message queue lock for stream account ${selectedAccountId}:`,
|
||||
@@ -1461,7 +1485,8 @@ class ClaudeRelayService {
|
||||
sessionHash,
|
||||
streamTransformer = null,
|
||||
requestOptions = {},
|
||||
isDedicatedOfficialAccount = false
|
||||
isDedicatedOfficialAccount = false,
|
||||
onResponseStart = null // 📬 新增:收到响应头时的回调,用于提前释放队列锁
|
||||
) {
|
||||
// 获取账户信息用于统一 User-Agent
|
||||
const account = await claudeAccountService.getAccount(accountId)
|
||||
@@ -1707,6 +1732,16 @@ class ClaudeRelayService {
|
||||
return
|
||||
}
|
||||
|
||||
// 📬 收到成功响应头(HTTP 200),立即调用回调释放队列锁
|
||||
// 此时请求已被 Claude API 接受并计入 RPM 配额,无需等待响应完成
|
||||
if (onResponseStart && typeof onResponseStart === 'function') {
|
||||
try {
|
||||
await onResponseStart()
|
||||
} catch (callbackError) {
|
||||
logger.error('❌ Error in onResponseStart callback:', callbackError.message)
|
||||
}
|
||||
}
|
||||
|
||||
let buffer = ''
|
||||
const allUsageData = [] // 收集所有的usage事件
|
||||
let currentUsageData = {} // 当前正在收集的usage数据
|
||||
|
||||
@@ -14,9 +14,6 @@ const logger = require('../utils/logger')
|
||||
// 清理任务间隔
|
||||
const CLEANUP_INTERVAL_MS = 60000 // 1分钟
|
||||
|
||||
// 锁续租最大持续时间(从配置读取,与 REQUEST_TIMEOUT 保持一致)
|
||||
const MAX_RENEWAL_DURATION_MS = config.requestTimeout || 10 * 60 * 1000
|
||||
|
||||
// 轮询等待配置
|
||||
const POLL_INTERVAL_BASE_MS = 50 // 基础轮询间隔
|
||||
const POLL_INTERVAL_MAX_MS = 500 // 最大轮询间隔
|
||||
@@ -25,8 +22,6 @@ const POLL_BACKOFF_FACTOR = 1.5 // 退避因子
|
||||
class UserMessageQueueService {
|
||||
constructor() {
|
||||
this.cleanupTimer = null
|
||||
// 跟踪活跃的续租定时器,用于服务关闭时清理
|
||||
this.activeRenewalTimers = new Map()
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -74,12 +69,13 @@ class UserMessageQueueService {
|
||||
*/
|
||||
async getConfig() {
|
||||
// 默认配置(防止 config.userMessageQueue 未定义)
|
||||
// 注意:优化后的默认值 - 锁持有时间从分钟级降到毫秒级,无需长等待
|
||||
const queueConfig = config.userMessageQueue || {}
|
||||
const defaults = {
|
||||
enabled: queueConfig.enabled ?? false,
|
||||
delayMs: queueConfig.delayMs ?? 100,
|
||||
timeoutMs: queueConfig.timeoutMs ?? 60000,
|
||||
lockTtlMs: queueConfig.lockTtlMs ?? 120000
|
||||
delayMs: queueConfig.delayMs ?? 200,
|
||||
timeoutMs: queueConfig.timeoutMs ?? 5000, // 从 60000 降到 5000,因为锁持有时间短
|
||||
lockTtlMs: queueConfig.lockTtlMs ?? 5000 // 从 120000 降到 5000,5秒足以覆盖请求发送
|
||||
}
|
||||
|
||||
// 尝试从 claudeRelayConfigService 获取 Web 界面配置
|
||||
@@ -100,7 +96,10 @@ class UserMessageQueueService {
|
||||
webConfig.userMessageQueueTimeoutMs !== undefined
|
||||
? webConfig.userMessageQueueTimeoutMs
|
||||
: defaults.timeoutMs,
|
||||
lockTtlMs: defaults.lockTtlMs
|
||||
lockTtlMs:
|
||||
webConfig.userMessageQueueLockTtlMs !== undefined
|
||||
? webConfig.userMessageQueueLockTtlMs
|
||||
: defaults.lockTtlMs
|
||||
}
|
||||
} catch {
|
||||
// 回退到环境变量配置
|
||||
@@ -232,83 +231,6 @@ class UserMessageQueueService {
|
||||
return released
|
||||
}
|
||||
|
||||
/**
|
||||
* 启动锁续租(防止长连接超过TTL导致锁丢失)
|
||||
* @param {string} accountId - 账户ID
|
||||
* @param {string} requestId - 请求ID
|
||||
* @returns {Promise<Function>} 停止续租的函数
|
||||
*/
|
||||
async startLockRenewal(accountId, requestId) {
|
||||
const cfg = await this.getConfig()
|
||||
if (!cfg.enabled || !accountId || !requestId) {
|
||||
return () => {}
|
||||
}
|
||||
|
||||
const intervalMs = Math.max(10000, Math.floor(cfg.lockTtlMs / 2)) // 约一半TTL刷新一次
|
||||
const maxRenewals = Math.ceil(MAX_RENEWAL_DURATION_MS / intervalMs) // 最大续租次数
|
||||
const startTime = Date.now()
|
||||
const timerKey = `${accountId}:${requestId}`
|
||||
|
||||
let stopped = false
|
||||
let renewalCount = 0
|
||||
|
||||
const stopRenewal = () => {
|
||||
if (!stopped) {
|
||||
clearInterval(timer)
|
||||
stopped = true
|
||||
this.activeRenewalTimers.delete(timerKey)
|
||||
}
|
||||
}
|
||||
|
||||
const timer = setInterval(async () => {
|
||||
if (stopped) {
|
||||
return
|
||||
}
|
||||
|
||||
renewalCount++
|
||||
|
||||
// 检查是否超过最大续租次数或最大持续时间
|
||||
if (renewalCount > maxRenewals || Date.now() - startTime > MAX_RENEWAL_DURATION_MS) {
|
||||
logger.warn(`📬 User message queue: max renewal duration exceeded, stopping renewal`, {
|
||||
accountId,
|
||||
requestId,
|
||||
renewalCount,
|
||||
durationMs: Date.now() - startTime
|
||||
})
|
||||
stopRenewal()
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
const refreshed = await redis.refreshUserMessageLock(accountId, requestId, cfg.lockTtlMs)
|
||||
if (!refreshed) {
|
||||
// 锁可能已被释放或超时,停止续租
|
||||
logger.warn(
|
||||
`📬 User message queue: failed to refresh lock (possibly lost), stop renewal`,
|
||||
{
|
||||
accountId,
|
||||
requestId,
|
||||
renewalCount
|
||||
}
|
||||
)
|
||||
stopRenewal()
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('📬 User message queue: lock renewal error:', error)
|
||||
}
|
||||
}, intervalMs)
|
||||
|
||||
// 避免阻止进程退出
|
||||
if (typeof timer.unref === 'function') {
|
||||
timer.unref()
|
||||
}
|
||||
|
||||
// 跟踪活跃的定时器
|
||||
this.activeRenewalTimers.set(timerKey, { timer, stopRenewal, accountId, requestId, startTime })
|
||||
|
||||
return stopRenewal
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取队列统计信息
|
||||
* @param {string} accountId - 账户ID
|
||||
@@ -385,32 +307,6 @@ class UserMessageQueueService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 停止所有活跃的锁续租定时器(服务关闭时调用)
|
||||
*/
|
||||
stopAllRenewalTimers() {
|
||||
const count = this.activeRenewalTimers.size
|
||||
if (count > 0) {
|
||||
for (const [key, { stopRenewal }] of this.activeRenewalTimers) {
|
||||
try {
|
||||
stopRenewal()
|
||||
} catch (error) {
|
||||
logger.error(`📬 User message queue: failed to stop renewal timer ${key}:`, error)
|
||||
}
|
||||
}
|
||||
this.activeRenewalTimers.clear()
|
||||
logger.info(`📬 User message queue: stopped ${count} active renewal timer(s)`)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取活跃续租定时器数量(用于监控)
|
||||
* @returns {number}
|
||||
*/
|
||||
getActiveRenewalCount() {
|
||||
return this.activeRenewalTimers.size
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理孤儿锁
|
||||
* 检测异常情况:锁存在但没有设置过期时间(lockTtlRaw === -1)
|
||||
|
||||
@@ -179,88 +179,6 @@ describe('UserMessageQueueService', () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe('startLockRenewal', () => {
|
||||
beforeEach(() => {
|
||||
jest.useFakeTimers()
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
jest.useRealTimers()
|
||||
jest.restoreAllMocks()
|
||||
})
|
||||
|
||||
it('should periodically refresh lock while enabled', async () => {
|
||||
jest.spyOn(userMessageQueueService, 'getConfig').mockResolvedValue({
|
||||
enabled: true,
|
||||
delayMs: 200,
|
||||
timeoutMs: 30000,
|
||||
lockTtlMs: 120000
|
||||
})
|
||||
const refreshSpy = jest.spyOn(redis, 'refreshUserMessageLock').mockResolvedValue(true)
|
||||
|
||||
const stop = await userMessageQueueService.startLockRenewal('acct-1', 'req-1')
|
||||
|
||||
jest.advanceTimersByTime(60000) // 半个TTL
|
||||
await Promise.resolve()
|
||||
|
||||
expect(refreshSpy).toHaveBeenCalledWith('acct-1', 'req-1', 120000)
|
||||
|
||||
stop()
|
||||
})
|
||||
|
||||
it('should no-op when queue disabled', async () => {
|
||||
jest.spyOn(userMessageQueueService, 'getConfig').mockResolvedValue({
|
||||
enabled: false,
|
||||
delayMs: 200,
|
||||
timeoutMs: 30000,
|
||||
lockTtlMs: 120000
|
||||
})
|
||||
const refreshSpy = jest.spyOn(redis, 'refreshUserMessageLock').mockResolvedValue(true)
|
||||
|
||||
const stop = await userMessageQueueService.startLockRenewal('acct-1', 'req-1')
|
||||
jest.advanceTimersByTime(120000)
|
||||
await Promise.resolve()
|
||||
|
||||
expect(refreshSpy).not.toHaveBeenCalled()
|
||||
stop()
|
||||
})
|
||||
|
||||
it('should track active renewal timer', async () => {
|
||||
jest.spyOn(userMessageQueueService, 'getConfig').mockResolvedValue({
|
||||
enabled: true,
|
||||
delayMs: 200,
|
||||
timeoutMs: 30000,
|
||||
lockTtlMs: 120000
|
||||
})
|
||||
jest.spyOn(redis, 'refreshUserMessageLock').mockResolvedValue(true)
|
||||
|
||||
expect(userMessageQueueService.getActiveRenewalCount()).toBe(0)
|
||||
|
||||
const stop = await userMessageQueueService.startLockRenewal('acct-1', 'req-1')
|
||||
expect(userMessageQueueService.getActiveRenewalCount()).toBe(1)
|
||||
|
||||
stop()
|
||||
expect(userMessageQueueService.getActiveRenewalCount()).toBe(0)
|
||||
})
|
||||
|
||||
it('should stop all renewal timers on service shutdown', async () => {
|
||||
jest.spyOn(userMessageQueueService, 'getConfig').mockResolvedValue({
|
||||
enabled: true,
|
||||
delayMs: 200,
|
||||
timeoutMs: 30000,
|
||||
lockTtlMs: 120000
|
||||
})
|
||||
jest.spyOn(redis, 'refreshUserMessageLock').mockResolvedValue(true)
|
||||
|
||||
await userMessageQueueService.startLockRenewal('acct-1', 'req-1')
|
||||
await userMessageQueueService.startLockRenewal('acct-2', 'req-2')
|
||||
expect(userMessageQueueService.getActiveRenewalCount()).toBe(2)
|
||||
|
||||
userMessageQueueService.stopAllRenewalTimers()
|
||||
expect(userMessageQueueService.getActiveRenewalCount()).toBe(0)
|
||||
})
|
||||
})
|
||||
|
||||
describe('acquireQueueLock', () => {
|
||||
afterEach(() => {
|
||||
jest.restoreAllMocks()
|
||||
|
||||
Reference in New Issue
Block a user