feat(queue): 优化用户消息队列锁释放时机

将队列锁释放时机从"请求完成后"提前到"请求发送后",因为 Claude API
限流(RPM)基于请求发送时刻计算,无需等待响应完成。

主要变更:
- 移除锁续租机制(startLockRenewal、refreshUserMessageLock)
- 所有 relay 服务在请求发送成功后立即释放锁
- 流式请求通过 onResponseStart 回调在收到响应头时释放
- 调整默认配置:timeoutMs 60s→5s,lockTtlMs 120s→5s
- 新增 USER_MESSAGE_QUEUE_LOCK_TTL_MS 环境变量支持
This commit is contained in:
QTom
2025-12-10 01:26:00 +08:00
committed by QTom
parent cb94a4260e
commit 3b9c96dff8
11 changed files with 251 additions and 314 deletions

View File

@@ -151,7 +151,6 @@ class ClaudeRelayService {
let upstreamRequest = null
let queueLockAcquired = false
let queueRequestId = null
let queueLockRenewalStopper = null
let selectedAccountId = null
try {
@@ -255,10 +254,6 @@ class ClaudeRelayService {
if (queueResult.acquired && !queueResult.skipped) {
queueLockAcquired = true
queueRequestId = queueResult.requestId
queueLockRenewalStopper = await userMessageQueueService.startLockRenewal(
accountId,
queueRequestId
)
logger.debug(
`📬 User message queue lock acquired for account ${accountId}, requestId: ${queueRequestId}`
)
@@ -339,6 +334,23 @@ class ClaudeRelayService {
options
)
// 📬 请求已发送成功,立即释放队列锁(无需等待响应处理完成)
// 因为 Claude API 限流基于请求发送时刻计算RPM不是请求完成时刻
if (queueLockAcquired && queueRequestId && selectedAccountId) {
try {
await userMessageQueueService.releaseQueueLock(selectedAccountId, queueRequestId)
queueLockAcquired = false // 标记已释放,防止 finally 重复释放
logger.debug(
`📬 User message queue lock released early for account ${selectedAccountId}, requestId: ${queueRequestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock early for account ${selectedAccountId}:`,
releaseError.message
)
}
}
response.accountId = accountId
response.accountType = accountType
@@ -608,13 +620,13 @@ class ClaudeRelayService {
)
throw error
} finally {
// 📬 释放用户消息队列锁
// 📬 释放用户消息队列锁(兜底,正常情况下已在请求发送后提前释放)
if (queueLockAcquired && queueRequestId && selectedAccountId) {
try {
if (queueLockRenewalStopper) {
queueLockRenewalStopper()
}
await userMessageQueueService.releaseQueueLock(selectedAccountId, queueRequestId)
logger.debug(
`📬 User message queue lock released in finally for account ${selectedAccountId}, requestId: ${queueRequestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock for account ${selectedAccountId}:`,
@@ -1245,7 +1257,6 @@ class ClaudeRelayService {
) {
let queueLockAcquired = false
let queueRequestId = null
let queueLockRenewalStopper = null
let selectedAccountId = null
try {
@@ -1350,10 +1361,6 @@ class ClaudeRelayService {
if (queueResult.acquired && !queueResult.skipped) {
queueLockAcquired = true
queueRequestId = queueResult.requestId
queueLockRenewalStopper = await userMessageQueueService.startLockRenewal(
accountId,
queueRequestId
)
logger.debug(
`📬 User message queue lock acquired for account ${accountId} (stream), requestId: ${queueRequestId}`
)
@@ -1425,19 +1432,36 @@ class ClaudeRelayService {
sessionHash,
streamTransformer,
options,
isDedicatedOfficialAccount
isDedicatedOfficialAccount,
// 📬 新增回调:在收到响应头时释放队列锁
async () => {
if (queueLockAcquired && queueRequestId && selectedAccountId) {
try {
await userMessageQueueService.releaseQueueLock(selectedAccountId, queueRequestId)
queueLockAcquired = false // 标记已释放,防止 finally 重复释放
logger.debug(
`📬 User message queue lock released early for stream account ${selectedAccountId}, requestId: ${queueRequestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock early for stream account ${selectedAccountId}:`,
releaseError.message
)
}
}
}
)
} catch (error) {
logger.error(`❌ Claude stream relay with usage capture failed:`, error)
throw error
} finally {
// 📬 释放用户消息队列锁
// 📬 释放用户消息队列锁(兜底,正常情况下已在收到响应头后提前释放)
if (queueLockAcquired && queueRequestId && selectedAccountId) {
try {
if (queueLockRenewalStopper) {
queueLockRenewalStopper()
}
await userMessageQueueService.releaseQueueLock(selectedAccountId, queueRequestId)
logger.debug(
`📬 User message queue lock released in finally for stream account ${selectedAccountId}, requestId: ${queueRequestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock for stream account ${selectedAccountId}:`,
@@ -1461,7 +1485,8 @@ class ClaudeRelayService {
sessionHash,
streamTransformer = null,
requestOptions = {},
isDedicatedOfficialAccount = false
isDedicatedOfficialAccount = false,
onResponseStart = null // 📬 新增:收到响应头时的回调,用于提前释放队列锁
) {
// 获取账户信息用于统一 User-Agent
const account = await claudeAccountService.getAccount(accountId)
@@ -1707,6 +1732,16 @@ class ClaudeRelayService {
return
}
// 📬 收到成功响应头HTTP 200立即调用回调释放队列锁
// 此时请求已被 Claude API 接受并计入 RPM 配额,无需等待响应完成
if (onResponseStart && typeof onResponseStart === 'function') {
try {
await onResponseStart()
} catch (callbackError) {
logger.error('❌ Error in onResponseStart callback:', callbackError.message)
}
}
let buffer = ''
const allUsageData = [] // 收集所有的usage事件
let currentUsageData = {} // 当前正在收集的usage数据