Compare commits

...

15 Commits

Author SHA1 Message Date
github-actions[bot]
c4d923c46f chore: sync VERSION file with release v1.1.232 [skip ci] 2025-12-11 02:46:31 +00:00
Wesley Liddick
fa9f9146a2 Merge pull request #793 from qq790716890/main
fix:修复codex统计token问题
2025-12-10 21:46:15 -05:00
Wesley Liddick
dc9409a5a6 Merge pull request #788 from atoz03/main [skip ci]
fix: 账户列表默认显示限额/限流账号并加固加载健壮性
2025-12-10 21:44:55 -05:00
LZY
51aa8dc381 fix:修复codex统计token问题 2025-12-10 22:56:25 +08:00
github-actions[bot]
5061f4d9fd chore: sync VERSION file with release v1.1.231 [skip ci] 2025-12-10 12:11:39 +00:00
Wesley Liddick
4337af06d4 Merge pull request #791 from DaydreamCoding/feature/log-opt
fix: improve logging for client disconnections in relay services
2025-12-10 07:11:24 -05:00
Wesley Liddick
d226d57325 Merge pull request #790 from DaydreamCoding/patch-4 [skip ci]
fix(security): add authenticateAdmin middleware to concurrency routes
2025-12-10 07:11:07 -05:00
Wesley Liddick
9f92c58640 Merge pull request #789 from DaydreamCoding/feature/user-message-queue-optimize [skip ci]
feat(queue): 优化用户消息队列锁释放时机
2025-12-10 07:10:38 -05:00
QTom
8901994644 fix: improve logging for client disconnections in relay services
当客户端主动断开连接时,改为使用 INFO 级别记录而不是 ERROR 级别,
因为这是正常情况而非错误。

- ccrRelayService: 区分客户端断开与实际错误
- claudeConsoleRelayService: 区分客户端断开与实际错误
- claudeRelayService: 区分客户端断开与实际错误
- droidRelayService: 区分客户端断开与实际错误
2025-12-10 14:18:44 +08:00
QTom
e3ca555df7 fix(security): add authenticateAdmin middleware to concurrency routes
fix(security): add authenticateAdmin middleware to concurrency routes

All concurrency management endpoints were missing authentication,
allowing unauthenticated access to view and clear concurrency data.
2025-12-10 13:59:25 +08:00
QTom
3b9c96dff8 feat(queue): 优化用户消息队列锁释放时机
将队列锁释放时机从"请求完成后"提前到"请求发送后",因为 Claude API
限流(RPM)基于请求发送时刻计算,无需等待响应完成。

主要变更:
- 移除锁续租机制(startLockRenewal、refreshUserMessageLock)
- 所有 relay 服务在请求发送成功后立即释放锁
- 流式请求通过 onResponseStart 回调在收到响应头时释放
- 调整默认配置:timeoutMs 60s→5s,lockTtlMs 120s→5s
- 新增 USER_MESSAGE_QUEUE_LOCK_TTL_MS 环境变量支持
2025-12-10 01:26:00 +08:00
github-actions[bot]
cb94a4260e chore: sync VERSION file with release v1.1.230 [skip ci] 2025-12-09 10:59:05 +00:00
Wesley Liddick
ac9499aa6d Merge pull request #787 from DaydreamCoding/feature/user-message-queue-fix
feat: 修复 userMessageQueue 配置缺失导致的 500 错误
2025-12-09 05:58:35 -05:00
atoz03
fc25840f95 fix: 账户列表默认显示限额/限流账号并加固加载健壮性
- 将账户页状态筛选默认值从 normal 改为 all,额度满/限流/异常账号默认可见
  - appendAccounts 使用 Array.isArray 兜底接口响应,避免空/异常数据导致“加载账户失败”
  - 便于在额度耗尽场景查看并处理账号
2025-12-09 18:49:57 +08:00
QTom
b409adf9d8 feat: 修复 userMessageQueue 配置缺失导致的 500 错误
- 在 config.example.js 添加缺失的 userMessageQueue 配置段
  - 在 userMessageQueueService.js 添加防御性代码,当配置未定义时使用默认值

  修复 #783 合并后新用户安装报错:
  Cannot read properties of undefined (reading 'enabled')
2025-12-09 18:41:13 +08:00
16 changed files with 319 additions and 344 deletions

View File

@@ -186,9 +186,10 @@ npm run service:stop # 停止服务
- `CLAUDE_OVERLOAD_HANDLING_MINUTES`: Claude 529错误处理持续时间分钟0表示禁用
- `STICKY_SESSION_TTL_HOURS`: 粘性会话TTL小时默认1
- `STICKY_SESSION_RENEWAL_THRESHOLD_MINUTES`: 粘性会话续期阈值分钟默认0
- `USER_MESSAGE_QUEUE_ENABLED`: 启用用户消息串行队列(默认true
- `USER_MESSAGE_QUEUE_ENABLED`: 启用用户消息串行队列(默认false
- `USER_MESSAGE_QUEUE_DELAY_MS`: 用户消息请求间隔毫秒默认200
- `USER_MESSAGE_QUEUE_TIMEOUT_MS`: 队列等待超时(毫秒,默认30000
- `USER_MESSAGE_QUEUE_TIMEOUT_MS`: 队列等待超时(毫秒,默认5000,锁持有时间短无需长等待
- `USER_MESSAGE_QUEUE_LOCK_TTL_MS`: 锁TTL毫秒默认5000请求发送后立即释放无需长TTL
- `METRICS_WINDOW`: 实时指标统计窗口分钟1-60默认5
- `MAX_API_KEYS_PER_USER`: 每用户最大API Key数量默认1
- `ALLOW_USER_DELETE_API_KEYS`: 允许用户删除自己的API Keys默认false
@@ -341,7 +342,7 @@ npm run setup # 自动生成密钥并创建管理员账户
11. **速率限制未清理**: rateLimitCleanupService每5分钟自动清理过期限流状态
12. **成本统计不准确**: 运行 `npm run init:costs` 初始化成本数据检查pricingService是否正确加载模型价格
13. **缓存命中率低**: 查看缓存监控统计调整LRU缓存大小配置
14. **用户消息队列超时**: 检查 `USER_MESSAGE_QUEUE_TIMEOUT_MS` 配置是否合理,查看日志中的 `queue_timeout` 错误,可通过 Web 界面或 `USER_MESSAGE_QUEUE_ENABLED=false` 禁用此功能
14. **用户消息队列超时**: 优化后锁持有时间已从分钟级降到毫秒级(请求发送后立即释放),默认 `USER_MESSAGE_QUEUE_TIMEOUT_MS=5000` 已足够。如仍有超时,检查网络延迟或禁用此功能(`USER_MESSAGE_QUEUE_ENABLED=false`
### 调试工具

View File

@@ -1 +1 @@
1.1.229
1.1.232

View File

@@ -203,6 +203,15 @@ const config = {
development: {
debug: process.env.DEBUG === 'true',
hotReload: process.env.HOT_RELOAD === 'true'
},
// 📬 用户消息队列配置
// 优化说明:锁在请求发送成功后立即释放(而非请求完成后),因为 Claude API 限流基于请求发送时刻计算
userMessageQueue: {
enabled: process.env.USER_MESSAGE_QUEUE_ENABLED === 'true', // 默认关闭
delayMs: parseInt(process.env.USER_MESSAGE_QUEUE_DELAY_MS) || 200, // 请求间隔(毫秒)
timeoutMs: parseInt(process.env.USER_MESSAGE_QUEUE_TIMEOUT_MS) || 5000, // 队列等待超时(毫秒),锁持有时间短,无需长等待
lockTtlMs: parseInt(process.env.USER_MESSAGE_QUEUE_LOCK_TTL_MS) || 5000 // 锁TTL毫秒5秒足以覆盖请求发送
}
}

View File

@@ -669,10 +669,9 @@ class Application {
logger.error('❌ Error stopping rate limit cleanup service:', error)
}
// 停止用户消息队列清理服务和续租定时器
// 停止用户消息队列清理服务
try {
const userMessageQueueService = require('./services/userMessageQueueService')
userMessageQueueService.stopAllRenewalTimers()
userMessageQueueService.stopCleanupTask()
logger.info('📬 User message queue service stopped')
} catch (error) {

View File

@@ -2626,38 +2626,6 @@ redisClient.acquireUserMessageLock = async function (accountId, requestId, lockT
}
}
/**
* 续租用户消息队列锁(仅锁持有者可续租)
* @param {string} accountId - 账户ID
* @param {string} requestId - 请求ID
* @param {number} lockTtlMs - 锁 TTL毫秒
* @returns {Promise<boolean>} 是否续租成功(只有锁持有者才能续租)
*/
redisClient.refreshUserMessageLock = async function (accountId, requestId, lockTtlMs) {
const lockKey = `user_msg_queue_lock:${accountId}`
const script = `
local lockKey = KEYS[1]
local requestId = ARGV[1]
local lockTtl = tonumber(ARGV[2])
local currentLock = redis.call('GET', lockKey)
if currentLock == requestId then
redis.call('PEXPIRE', lockKey, lockTtl)
return 1
end
return 0
`
try {
const result = await this.client.eval(script, 1, lockKey, requestId, lockTtlMs)
return result === 1
} catch (error) {
logger.error(`Failed to refresh user message lock for account ${accountId}:`, error)
return false
}
}
/**
* 释放用户消息队列锁并记录完成时间
* @param {string} accountId - 账户ID

View File

@@ -7,12 +7,13 @@ const express = require('express')
const router = express.Router()
const redis = require('../../models/redis')
const logger = require('../../utils/logger')
const { authenticateAdmin } = require('../../middleware/auth')
/**
* GET /admin/concurrency
* 获取所有并发状态
*/
router.get('/concurrency', async (req, res) => {
router.get('/concurrency', authenticateAdmin, async (req, res) => {
try {
const status = await redis.getAllConcurrencyStatus()
@@ -42,7 +43,7 @@ router.get('/concurrency', async (req, res) => {
* GET /admin/concurrency/:apiKeyId
* 获取特定 API Key 的并发状态详情
*/
router.get('/concurrency/:apiKeyId', async (req, res) => {
router.get('/concurrency/:apiKeyId', authenticateAdmin, async (req, res) => {
try {
const { apiKeyId } = req.params
const status = await redis.getConcurrencyStatus(apiKeyId)
@@ -65,7 +66,7 @@ router.get('/concurrency/:apiKeyId', async (req, res) => {
* DELETE /admin/concurrency/:apiKeyId
* 强制清理特定 API Key 的并发计数
*/
router.delete('/concurrency/:apiKeyId', async (req, res) => {
router.delete('/concurrency/:apiKeyId', authenticateAdmin, async (req, res) => {
try {
const { apiKeyId } = req.params
const result = await redis.forceClearConcurrency(apiKeyId)
@@ -93,7 +94,7 @@ router.delete('/concurrency/:apiKeyId', async (req, res) => {
* DELETE /admin/concurrency
* 强制清理所有并发计数
*/
router.delete('/concurrency', async (req, res) => {
router.delete('/concurrency', authenticateAdmin, async (req, res) => {
try {
const result = await redis.forceClearAllConcurrency()
@@ -118,7 +119,7 @@ router.delete('/concurrency', async (req, res) => {
* POST /admin/concurrency/cleanup
* 清理过期的并发条目(不影响活跃请求)
*/
router.post('/concurrency/cleanup', async (req, res) => {
router.post('/concurrency/cleanup', authenticateAdmin, async (req, res) => {
try {
const { apiKeyId } = req.body
const result = await redis.cleanupExpiredConcurrency(apiKeyId || null)

View File

@@ -73,7 +73,6 @@ class BedrockRelayService {
const accountId = bedrockAccount?.id
let queueLockAcquired = false
let queueRequestId = null
let queueLockRenewalStopper = null
try {
// 📬 用户消息队列处理
@@ -127,9 +126,8 @@ class BedrockRelayService {
if (queueResult.acquired && !queueResult.skipped) {
queueLockAcquired = true
queueRequestId = queueResult.requestId
queueLockRenewalStopper = await userMessageQueueService.startLockRenewal(
accountId,
queueRequestId
logger.debug(
`📬 User message queue lock acquired for Bedrock account ${accountId}, requestId: ${queueRequestId}`
)
}
}
@@ -154,6 +152,23 @@ class BedrockRelayService {
const response = await client.send(command)
const duration = Date.now() - startTime
// 📬 请求已发送成功,立即释放队列锁(无需等待响应处理完成)
// 因为限流基于请求发送时刻计算RPM不是请求完成时刻
if (queueLockAcquired && queueRequestId && accountId) {
try {
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
queueLockAcquired = false // 标记已释放,防止 finally 重复释放
logger.debug(
`📬 User message queue lock released early for Bedrock account ${accountId}, requestId: ${queueRequestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock early for Bedrock account ${accountId}:`,
releaseError.message
)
}
}
// 解析响应
const responseBody = JSON.parse(new TextDecoder().decode(response.body))
const claudeResponse = this._convertFromBedrockFormat(responseBody)
@@ -171,13 +186,13 @@ class BedrockRelayService {
logger.error('❌ Bedrock非流式请求失败:', error)
throw this._handleBedrockError(error)
} finally {
// 📬 释放用户消息队列锁
// 📬 释放用户消息队列锁(兜底,正常情况下已在请求发送后提前释放)
if (queueLockAcquired && queueRequestId && accountId) {
try {
if (queueLockRenewalStopper) {
queueLockRenewalStopper()
}
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
logger.debug(
`📬 User message queue lock released in finally for Bedrock account ${accountId}, requestId: ${queueRequestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock for Bedrock account ${accountId}:`,
@@ -193,7 +208,6 @@ class BedrockRelayService {
const accountId = bedrockAccount?.id
let queueLockAcquired = false
let queueRequestId = null
let queueLockRenewalStopper = null
try {
// 📬 用户消息队列处理
@@ -252,9 +266,8 @@ class BedrockRelayService {
if (queueResult.acquired && !queueResult.skipped) {
queueLockAcquired = true
queueRequestId = queueResult.requestId
queueLockRenewalStopper = await userMessageQueueService.startLockRenewal(
accountId,
queueRequestId
logger.debug(
`📬 User message queue lock acquired for Bedrock account ${accountId} (stream), requestId: ${queueRequestId}`
)
}
}
@@ -278,6 +291,23 @@ class BedrockRelayService {
const startTime = Date.now()
const response = await client.send(command)
// 📬 请求已发送成功,立即释放队列锁(无需等待响应处理完成)
// 因为限流基于请求发送时刻计算RPM不是请求完成时刻
if (queueLockAcquired && queueRequestId && accountId) {
try {
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
queueLockAcquired = false // 标记已释放,防止 finally 重复释放
logger.debug(
`📬 User message queue lock released early for Bedrock stream account ${accountId}, requestId: ${queueRequestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock early for Bedrock stream account ${accountId}:`,
releaseError.message
)
}
}
// 设置SSE响应头
res.writeHead(200, {
'Content-Type': 'text/event-stream',
@@ -339,13 +369,13 @@ class BedrockRelayService {
throw this._handleBedrockError(error)
} finally {
// 📬 释放用户消息队列锁
// 📬 释放用户消息队列锁(兜底,正常情况下已在请求发送后提前释放)
if (queueLockAcquired && queueRequestId && accountId) {
try {
if (queueLockRenewalStopper) {
queueLockRenewalStopper()
}
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
logger.debug(
`📬 User message queue lock released in finally for Bedrock stream account ${accountId}, requestId: ${queueRequestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock for Bedrock stream account ${accountId}:`,

View File

@@ -24,7 +24,6 @@ class CcrRelayService {
let account = null
let queueLockAcquired = false
let queueRequestId = null
let queueLockRenewalStopper = null
try {
// 📬 用户消息队列处理
@@ -78,9 +77,8 @@ class CcrRelayService {
if (queueResult.acquired && !queueResult.skipped) {
queueLockAcquired = true
queueRequestId = queueResult.requestId
queueLockRenewalStopper = await userMessageQueueService.startLockRenewal(
accountId,
queueRequestId
logger.debug(
`📬 User message queue lock acquired for CCR account ${accountId}, requestId: ${queueRequestId}`
)
}
}
@@ -224,6 +222,23 @@ class CcrRelayService {
)
const response = await axios(requestConfig)
// 📬 请求已发送成功,立即释放队列锁(无需等待响应处理完成)
// 因为 Claude API 限流基于请求发送时刻计算RPM不是请求完成时刻
if (queueLockAcquired && queueRequestId && accountId) {
try {
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
queueLockAcquired = false // 标记已释放,防止 finally 重复释放
logger.debug(
`📬 User message queue lock released early for CCR account ${accountId}, requestId: ${queueRequestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock early for CCR account ${accountId}:`,
releaseError.message
)
}
}
// 移除监听器(请求成功完成)
if (clientRequest) {
clientRequest.removeListener('close', handleClientDisconnect)
@@ -296,13 +311,13 @@ class CcrRelayService {
throw error
} finally {
// 📬 释放用户消息队列锁
// 📬 释放用户消息队列锁(兜底,正常情况下已在请求发送后提前释放)
if (queueLockAcquired && queueRequestId && accountId) {
try {
if (queueLockRenewalStopper) {
queueLockRenewalStopper()
}
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
logger.debug(
`📬 User message queue lock released in finally for CCR account ${accountId}, requestId: ${queueRequestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock for CCR account ${accountId}:`,
@@ -327,7 +342,6 @@ class CcrRelayService {
let account = null
let queueLockAcquired = false
let queueRequestId = null
let queueLockRenewalStopper = null
try {
// 📬 用户消息队列处理
@@ -388,9 +402,8 @@ class CcrRelayService {
if (queueResult.acquired && !queueResult.skipped) {
queueLockAcquired = true
queueRequestId = queueResult.requestId
queueLockRenewalStopper = await userMessageQueueService.startLockRenewal(
accountId,
queueRequestId
logger.debug(
`📬 User message queue lock acquired for CCR account ${accountId} (stream), requestId: ${queueRequestId}`
)
}
}
@@ -442,22 +455,46 @@ class CcrRelayService {
accountId,
usageCallback,
streamTransformer,
options
options,
// 📬 回调:在收到响应头时释放队列锁
async () => {
if (queueLockAcquired && queueRequestId && accountId) {
try {
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
queueLockAcquired = false // 标记已释放,防止 finally 重复释放
logger.debug(
`📬 User message queue lock released early for CCR stream account ${accountId}, requestId: ${queueRequestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock early for CCR stream account ${accountId}:`,
releaseError.message
)
}
}
}
)
// 更新最后使用时间
await this._updateLastUsedTime(accountId)
} catch (error) {
logger.error(`❌ CCR stream relay failed (Account: ${account?.name || accountId}):`, error)
// 客户端主动断开连接是正常情况,使用 INFO 级别
if (error.message === 'Client disconnected') {
logger.info(
`🔌 CCR stream relay ended: Client disconnected (Account: ${account?.name || accountId})`
)
} else {
logger.error(`❌ CCR stream relay failed (Account: ${account?.name || accountId}):`, error)
}
throw error
} finally {
// 📬 释放用户消息队列锁
// 📬 释放用户消息队列锁(兜底,正常情况下已在收到响应头后提前释放)
if (queueLockAcquired && queueRequestId && accountId) {
try {
if (queueLockRenewalStopper) {
queueLockRenewalStopper()
}
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
logger.debug(
`📬 User message queue lock released in finally for CCR stream account ${accountId}, requestId: ${queueRequestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock for CCR stream account ${accountId}:`,
@@ -478,7 +515,8 @@ class CcrRelayService {
accountId,
usageCallback,
streamTransformer = null,
requestOptions = {}
requestOptions = {},
onResponseHeaderReceived = null
) {
return new Promise((resolve, reject) => {
let aborted = false
@@ -541,8 +579,11 @@ class CcrRelayService {
// 发送请求
const request = axios(requestConfig)
// 注意:使用 .then(async ...) 模式处理响应
// - 内部的 releaseQueueLock 有独立的 try-catch不会导致未捕获异常
// - queueLockAcquired = false 的赋值会在 finally 执行前完成JS 单线程保证)
request
.then((response) => {
.then(async (response) => {
logger.debug(`🌊 CCR stream response status: ${response.status}`)
// 错误响应处理
@@ -592,6 +633,19 @@ class CcrRelayService {
return
}
// 📬 收到成功响应头HTTP 200调用回调释放队列锁
// 此时请求已被 Claude API 接受并计入 RPM 配额,无需等待响应完成
if (onResponseHeaderReceived && typeof onResponseHeaderReceived === 'function') {
try {
await onResponseHeaderReceived()
} catch (callbackError) {
logger.error(
`❌ Failed to execute onResponseHeaderReceived callback for CCR stream account ${accountId}:`,
callbackError.message
)
}
}
// 成功响应,检查并移除错误状态
ccrAccountService.isAccountRateLimited(accountId).then((isRateLimited) => {
if (isRateLimited) {

View File

@@ -32,7 +32,6 @@ class ClaudeConsoleRelayService {
let concurrencyAcquired = false
let queueLockAcquired = false
let queueRequestId = null
let queueLockRenewalStopper = null
try {
// 📬 用户消息队列处理:如果是用户消息请求,需要获取队列锁
@@ -87,10 +86,6 @@ class ClaudeConsoleRelayService {
if (queueResult.acquired && !queueResult.skipped) {
queueLockAcquired = true
queueRequestId = queueResult.requestId
queueLockRenewalStopper = await userMessageQueueService.startLockRenewal(
accountId,
queueRequestId
)
logger.debug(
`📬 User message queue lock acquired for console account ${accountId}, requestId: ${queueRequestId}`
)
@@ -269,6 +264,23 @@ class ClaudeConsoleRelayService {
)
const response = await axios(requestConfig)
// 📬 请求已发送成功,立即释放队列锁(无需等待响应处理完成)
// 因为 Claude API 限流基于请求发送时刻计算RPM不是请求完成时刻
if (queueLockAcquired && queueRequestId && accountId) {
try {
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
queueLockAcquired = false // 标记已释放,防止 finally 重复释放
logger.debug(
`📬 User message queue lock released early for console account ${accountId}, requestId: ${queueRequestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock early for console account ${accountId}:`,
releaseError.message
)
}
}
// 移除监听器(请求成功完成)
if (clientRequest) {
clientRequest.removeListener('close', handleClientDisconnect)
@@ -433,13 +445,13 @@ class ClaudeConsoleRelayService {
}
}
// 📬 释放用户消息队列锁
// 📬 释放用户消息队列锁(兜底,正常情况下已在请求发送后提前释放)
if (queueLockAcquired && queueRequestId && accountId) {
try {
if (queueLockRenewalStopper) {
queueLockRenewalStopper()
}
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
logger.debug(
`📬 User message queue lock released in finally for console account ${accountId}, requestId: ${queueRequestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock for account ${accountId}:`,
@@ -467,7 +479,6 @@ class ClaudeConsoleRelayService {
let leaseRefreshInterval = null // 租约刷新定时器
let queueLockAcquired = false
let queueRequestId = null
let queueLockRenewalStopper = null
try {
// 📬 用户消息队列处理:如果是用户消息请求,需要获取队列锁
@@ -522,10 +533,6 @@ class ClaudeConsoleRelayService {
if (queueResult.acquired && !queueResult.skipped) {
queueLockAcquired = true
queueRequestId = queueResult.requestId
queueLockRenewalStopper = await userMessageQueueService.startLockRenewal(
accountId,
queueRequestId
)
logger.debug(
`📬 User message queue lock acquired for console account ${accountId} (stream), requestId: ${queueRequestId}`
)
@@ -629,16 +636,40 @@ class ClaudeConsoleRelayService {
accountId,
usageCallback,
streamTransformer,
options
options,
// 📬 回调:在收到响应头时释放队列锁
async () => {
if (queueLockAcquired && queueRequestId && accountId) {
try {
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
queueLockAcquired = false // 标记已释放,防止 finally 重复释放
logger.debug(
`📬 User message queue lock released early for console stream account ${accountId}, requestId: ${queueRequestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock early for console stream account ${accountId}:`,
releaseError.message
)
}
}
}
)
// 更新最后使用时间
await this._updateLastUsedTime(accountId)
} catch (error) {
logger.error(
`❌ Claude Console stream relay failed (Account: ${account?.name || accountId}):`,
error
)
// 客户端主动断开连接是正常情况,使用 INFO 级别
if (error.message === 'Client disconnected') {
logger.info(
`🔌 Claude Console stream relay ended: Client disconnected (Account: ${account?.name || accountId})`
)
} else {
logger.error(
`❌ Claude Console stream relay failed (Account: ${account?.name || accountId}):`,
error
)
}
throw error
} finally {
// 🛑 清理租约刷新定时器
@@ -664,13 +695,13 @@ class ClaudeConsoleRelayService {
}
}
// 📬 释放用户消息队列锁
// 📬 释放用户消息队列锁(兜底,正常情况下已在收到响应头后提前释放)
if (queueLockAcquired && queueRequestId && accountId) {
try {
if (queueLockRenewalStopper) {
queueLockRenewalStopper()
}
await userMessageQueueService.releaseQueueLock(accountId, queueRequestId)
logger.debug(
`📬 User message queue lock released in finally for console stream account ${accountId}, requestId: ${queueRequestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock for stream account ${accountId}:`,
@@ -691,7 +722,8 @@ class ClaudeConsoleRelayService {
accountId,
usageCallback,
streamTransformer = null,
requestOptions = {}
requestOptions = {},
onResponseHeaderReceived = null
) {
return new Promise((resolve, reject) => {
let aborted = false
@@ -754,8 +786,11 @@ class ClaudeConsoleRelayService {
// 发送请求
const request = axios(requestConfig)
// 注意:使用 .then(async ...) 模式处理响应
// - 内部的 releaseQueueLock 有独立的 try-catch不会导致未捕获异常
// - queueLockAcquired = false 的赋值会在 finally 执行前完成JS 单线程保证)
request
.then((response) => {
.then(async (response) => {
logger.debug(`🌊 Claude Console Claude stream response status: ${response.status}`)
// 错误响应处理
@@ -862,6 +897,19 @@ class ClaudeConsoleRelayService {
return
}
// 📬 收到成功响应头HTTP 200调用回调释放队列锁
// 此时请求已被 Claude API 接受并计入 RPM 配额,无需等待响应完成
if (onResponseHeaderReceived && typeof onResponseHeaderReceived === 'function') {
try {
await onResponseHeaderReceived()
} catch (callbackError) {
logger.error(
`❌ Failed to execute onResponseHeaderReceived callback for console stream account ${accountId}:`,
callbackError.message
)
}
}
// 成功响应,检查并移除错误状态
claudeConsoleAccountService.isAccountRateLimited(accountId).then((isRateLimited) => {
if (isRateLimited) {

View File

@@ -17,8 +17,9 @@ const DEFAULT_CONFIG = {
sessionBindingTtlDays: 30, // 会话绑定 TTL默认30天
// 用户消息队列配置
userMessageQueueEnabled: false, // 是否启用用户消息队列(默认关闭)
userMessageQueueDelayMs: 100, // 请求间隔(毫秒)
userMessageQueueTimeoutMs: 60000, // 队列超时(毫秒)
userMessageQueueDelayMs: 200, // 请求间隔(毫秒)
userMessageQueueTimeoutMs: 5000, // 队列等待超时(毫秒),优化后锁持有时间短无需长等待
userMessageQueueLockTtlMs: 5000, // 锁TTL毫秒请求发送后立即释放无需长TTL
updatedAt: null,
updatedBy: null
}
@@ -320,11 +321,11 @@ class ClaudeRelayConfigService {
/**
* 验证新会话请求
* @param {Object} requestBody - 请求体
* @param {Object} _requestBody - 请求体(预留参数,当前未使用)
* @param {string} originalSessionId - 原始会话ID
* @returns {Promise<Object>} { valid: boolean, error?: string, binding?: object, isNewSession?: boolean }
*/
async validateNewSession(requestBody, originalSessionId) {
async validateNewSession(_requestBody, originalSessionId) {
const cfg = await this.getConfig()
if (!cfg.globalSessionBindingEnabled) {

View File

@@ -151,7 +151,6 @@ class ClaudeRelayService {
let upstreamRequest = null
let queueLockAcquired = false
let queueRequestId = null
let queueLockRenewalStopper = null
let selectedAccountId = null
try {
@@ -255,10 +254,6 @@ class ClaudeRelayService {
if (queueResult.acquired && !queueResult.skipped) {
queueLockAcquired = true
queueRequestId = queueResult.requestId
queueLockRenewalStopper = await userMessageQueueService.startLockRenewal(
accountId,
queueRequestId
)
logger.debug(
`📬 User message queue lock acquired for account ${accountId}, requestId: ${queueRequestId}`
)
@@ -339,6 +334,23 @@ class ClaudeRelayService {
options
)
// 📬 请求已发送成功,立即释放队列锁(无需等待响应处理完成)
// 因为 Claude API 限流基于请求发送时刻计算RPM不是请求完成时刻
if (queueLockAcquired && queueRequestId && selectedAccountId) {
try {
await userMessageQueueService.releaseQueueLock(selectedAccountId, queueRequestId)
queueLockAcquired = false // 标记已释放,防止 finally 重复释放
logger.debug(
`📬 User message queue lock released early for account ${selectedAccountId}, requestId: ${queueRequestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock early for account ${selectedAccountId}:`,
releaseError.message
)
}
}
response.accountId = accountId
response.accountType = accountType
@@ -608,13 +620,13 @@ class ClaudeRelayService {
)
throw error
} finally {
// 📬 释放用户消息队列锁
// 📬 释放用户消息队列锁(兜底,正常情况下已在请求发送后提前释放)
if (queueLockAcquired && queueRequestId && selectedAccountId) {
try {
if (queueLockRenewalStopper) {
queueLockRenewalStopper()
}
await userMessageQueueService.releaseQueueLock(selectedAccountId, queueRequestId)
logger.debug(
`📬 User message queue lock released in finally for account ${selectedAccountId}, requestId: ${queueRequestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock for account ${selectedAccountId}:`,
@@ -1245,7 +1257,6 @@ class ClaudeRelayService {
) {
let queueLockAcquired = false
let queueRequestId = null
let queueLockRenewalStopper = null
let selectedAccountId = null
try {
@@ -1350,10 +1361,6 @@ class ClaudeRelayService {
if (queueResult.acquired && !queueResult.skipped) {
queueLockAcquired = true
queueRequestId = queueResult.requestId
queueLockRenewalStopper = await userMessageQueueService.startLockRenewal(
accountId,
queueRequestId
)
logger.debug(
`📬 User message queue lock acquired for account ${accountId} (stream), requestId: ${queueRequestId}`
)
@@ -1425,19 +1432,41 @@ class ClaudeRelayService {
sessionHash,
streamTransformer,
options,
isDedicatedOfficialAccount
isDedicatedOfficialAccount,
// 📬 新增回调:在收到响应头时释放队列锁
async () => {
if (queueLockAcquired && queueRequestId && selectedAccountId) {
try {
await userMessageQueueService.releaseQueueLock(selectedAccountId, queueRequestId)
queueLockAcquired = false // 标记已释放,防止 finally 重复释放
logger.debug(
`📬 User message queue lock released early for stream account ${selectedAccountId}, requestId: ${queueRequestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock early for stream account ${selectedAccountId}:`,
releaseError.message
)
}
}
}
)
} catch (error) {
logger.error(`❌ Claude stream relay with usage capture failed:`, error)
// 客户端主动断开连接是正常情况,使用 INFO 级别
if (error.message === 'Client disconnected') {
logger.info(`🔌 Claude stream relay ended: Client disconnected`)
} else {
logger.error(`❌ Claude stream relay with usage capture failed:`, error)
}
throw error
} finally {
// 📬 释放用户消息队列锁
// 📬 释放用户消息队列锁(兜底,正常情况下已在收到响应头后提前释放)
if (queueLockAcquired && queueRequestId && selectedAccountId) {
try {
if (queueLockRenewalStopper) {
queueLockRenewalStopper()
}
await userMessageQueueService.releaseQueueLock(selectedAccountId, queueRequestId)
logger.debug(
`📬 User message queue lock released in finally for stream account ${selectedAccountId}, requestId: ${queueRequestId}`
)
} catch (releaseError) {
logger.error(
`❌ Failed to release user message queue lock for stream account ${selectedAccountId}:`,
@@ -1461,7 +1490,8 @@ class ClaudeRelayService {
sessionHash,
streamTransformer = null,
requestOptions = {},
isDedicatedOfficialAccount = false
isDedicatedOfficialAccount = false,
onResponseStart = null // 📬 新增:收到响应头时的回调,用于提前释放队列锁
) {
// 获取账户信息用于统一 User-Agent
const account = await claudeAccountService.getAccount(accountId)
@@ -1707,6 +1737,16 @@ class ClaudeRelayService {
return
}
// 📬 收到成功响应头HTTP 200立即调用回调释放队列锁
// 此时请求已被 Claude API 接受并计入 RPM 配额,无需等待响应完成
if (onResponseStart && typeof onResponseStart === 'function') {
try {
await onResponseStart()
} catch (callbackError) {
logger.error('❌ Error in onResponseStart callback:', callbackError.message)
}
}
let buffer = ''
const allUsageData = [] // 收集所有的usage事件
let currentUsageData = {} // 当前正在收集的usage数据

View File

@@ -336,7 +336,12 @@ class DroidRelayService {
)
}
} catch (error) {
logger.error(`❌ Droid relay error: ${error.message}`, error)
// 客户端主动断开连接是正常情况,使用 INFO 级别
if (error.message === 'Client disconnected') {
logger.info(`🔌 Droid relay ended: Client disconnected`)
} else {
logger.error(`❌ Droid relay error: ${error.message}`, error)
}
const status = error?.response?.status
if (status >= 400 && status < 500) {

View File

@@ -426,9 +426,9 @@ class OpenAIResponsesRelayService {
const lines = data.split('\n')
for (const line of lines) {
if (line.startsWith('data: ')) {
if (line.startsWith('data:')) {
try {
const jsonStr = line.slice(6)
const jsonStr = line.slice(5).trim()
if (jsonStr === '[DONE]') {
continue
}

View File

@@ -14,9 +14,6 @@ const logger = require('../utils/logger')
// 清理任务间隔
const CLEANUP_INTERVAL_MS = 60000 // 1分钟
// 锁续租最大持续时间(从配置读取,与 REQUEST_TIMEOUT 保持一致)
const MAX_RENEWAL_DURATION_MS = config.requestTimeout || 10 * 60 * 1000
// 轮询等待配置
const POLL_INTERVAL_BASE_MS = 50 // 基础轮询间隔
const POLL_INTERVAL_MAX_MS = 500 // 最大轮询间隔
@@ -25,8 +22,6 @@ const POLL_BACKOFF_FACTOR = 1.5 // 退避因子
class UserMessageQueueService {
constructor() {
this.cleanupTimer = null
// 跟踪活跃的续租定时器,用于服务关闭时清理
this.activeRenewalTimers = new Map()
}
/**
@@ -73,6 +68,16 @@ class UserMessageQueueService {
* @returns {Promise<Object>} 配置对象
*/
async getConfig() {
// 默认配置(防止 config.userMessageQueue 未定义)
// 注意:优化后的默认值 - 锁持有时间从分钟级降到毫秒级,无需长等待
const queueConfig = config.userMessageQueue || {}
const defaults = {
enabled: queueConfig.enabled ?? false,
delayMs: queueConfig.delayMs ?? 200,
timeoutMs: queueConfig.timeoutMs ?? 5000, // 从 60000 降到 5000因为锁持有时间短
lockTtlMs: queueConfig.lockTtlMs ?? 5000 // 从 120000 降到 50005秒足以覆盖请求发送
}
// 尝试从 claudeRelayConfigService 获取 Web 界面配置
try {
const claudeRelayConfigService = require('./claudeRelayConfigService')
@@ -82,25 +87,23 @@ class UserMessageQueueService {
enabled:
webConfig.userMessageQueueEnabled !== undefined
? webConfig.userMessageQueueEnabled
: config.userMessageQueue.enabled,
: defaults.enabled,
delayMs:
webConfig.userMessageQueueDelayMs !== undefined
? webConfig.userMessageQueueDelayMs
: config.userMessageQueue.delayMs,
: defaults.delayMs,
timeoutMs:
webConfig.userMessageQueueTimeoutMs !== undefined
? webConfig.userMessageQueueTimeoutMs
: config.userMessageQueue.timeoutMs,
lockTtlMs: config.userMessageQueue.lockTtlMs
: defaults.timeoutMs,
lockTtlMs:
webConfig.userMessageQueueLockTtlMs !== undefined
? webConfig.userMessageQueueLockTtlMs
: defaults.lockTtlMs
}
} catch {
// 回退到环境变量配置
return {
enabled: config.userMessageQueue.enabled,
delayMs: config.userMessageQueue.delayMs,
timeoutMs: config.userMessageQueue.timeoutMs,
lockTtlMs: config.userMessageQueue.lockTtlMs
}
return defaults
}
}
@@ -228,83 +231,6 @@ class UserMessageQueueService {
return released
}
/**
* 启动锁续租防止长连接超过TTL导致锁丢失
* @param {string} accountId - 账户ID
* @param {string} requestId - 请求ID
* @returns {Promise<Function>} 停止续租的函数
*/
async startLockRenewal(accountId, requestId) {
const cfg = await this.getConfig()
if (!cfg.enabled || !accountId || !requestId) {
return () => {}
}
const intervalMs = Math.max(10000, Math.floor(cfg.lockTtlMs / 2)) // 约一半TTL刷新一次
const maxRenewals = Math.ceil(MAX_RENEWAL_DURATION_MS / intervalMs) // 最大续租次数
const startTime = Date.now()
const timerKey = `${accountId}:${requestId}`
let stopped = false
let renewalCount = 0
const stopRenewal = () => {
if (!stopped) {
clearInterval(timer)
stopped = true
this.activeRenewalTimers.delete(timerKey)
}
}
const timer = setInterval(async () => {
if (stopped) {
return
}
renewalCount++
// 检查是否超过最大续租次数或最大持续时间
if (renewalCount > maxRenewals || Date.now() - startTime > MAX_RENEWAL_DURATION_MS) {
logger.warn(`📬 User message queue: max renewal duration exceeded, stopping renewal`, {
accountId,
requestId,
renewalCount,
durationMs: Date.now() - startTime
})
stopRenewal()
return
}
try {
const refreshed = await redis.refreshUserMessageLock(accountId, requestId, cfg.lockTtlMs)
if (!refreshed) {
// 锁可能已被释放或超时,停止续租
logger.warn(
`📬 User message queue: failed to refresh lock (possibly lost), stop renewal`,
{
accountId,
requestId,
renewalCount
}
)
stopRenewal()
}
} catch (error) {
logger.error('📬 User message queue: lock renewal error:', error)
}
}, intervalMs)
// 避免阻止进程退出
if (typeof timer.unref === 'function') {
timer.unref()
}
// 跟踪活跃的定时器
this.activeRenewalTimers.set(timerKey, { timer, stopRenewal, accountId, requestId, startTime })
return stopRenewal
}
/**
* 获取队列统计信息
* @param {string} accountId - 账户ID
@@ -381,32 +307,6 @@ class UserMessageQueueService {
}
}
/**
* 停止所有活跃的锁续租定时器(服务关闭时调用)
*/
stopAllRenewalTimers() {
const count = this.activeRenewalTimers.size
if (count > 0) {
for (const [key, { stopRenewal }] of this.activeRenewalTimers) {
try {
stopRenewal()
} catch (error) {
logger.error(`📬 User message queue: failed to stop renewal timer ${key}:`, error)
}
}
this.activeRenewalTimers.clear()
logger.info(`📬 User message queue: stopped ${count} active renewal timer(s)`)
}
}
/**
* 获取活跃续租定时器数量(用于监控)
* @returns {number}
*/
getActiveRenewalCount() {
return this.activeRenewalTimers.size
}
/**
* 清理孤儿锁
* 检测异常情况锁存在但没有设置过期时间lockTtlRaw === -1

View File

@@ -179,88 +179,6 @@ describe('UserMessageQueueService', () => {
})
})
describe('startLockRenewal', () => {
beforeEach(() => {
jest.useFakeTimers()
})
afterEach(() => {
jest.useRealTimers()
jest.restoreAllMocks()
})
it('should periodically refresh lock while enabled', async () => {
jest.spyOn(userMessageQueueService, 'getConfig').mockResolvedValue({
enabled: true,
delayMs: 200,
timeoutMs: 30000,
lockTtlMs: 120000
})
const refreshSpy = jest.spyOn(redis, 'refreshUserMessageLock').mockResolvedValue(true)
const stop = await userMessageQueueService.startLockRenewal('acct-1', 'req-1')
jest.advanceTimersByTime(60000) // 半个TTL
await Promise.resolve()
expect(refreshSpy).toHaveBeenCalledWith('acct-1', 'req-1', 120000)
stop()
})
it('should no-op when queue disabled', async () => {
jest.spyOn(userMessageQueueService, 'getConfig').mockResolvedValue({
enabled: false,
delayMs: 200,
timeoutMs: 30000,
lockTtlMs: 120000
})
const refreshSpy = jest.spyOn(redis, 'refreshUserMessageLock').mockResolvedValue(true)
const stop = await userMessageQueueService.startLockRenewal('acct-1', 'req-1')
jest.advanceTimersByTime(120000)
await Promise.resolve()
expect(refreshSpy).not.toHaveBeenCalled()
stop()
})
it('should track active renewal timer', async () => {
jest.spyOn(userMessageQueueService, 'getConfig').mockResolvedValue({
enabled: true,
delayMs: 200,
timeoutMs: 30000,
lockTtlMs: 120000
})
jest.spyOn(redis, 'refreshUserMessageLock').mockResolvedValue(true)
expect(userMessageQueueService.getActiveRenewalCount()).toBe(0)
const stop = await userMessageQueueService.startLockRenewal('acct-1', 'req-1')
expect(userMessageQueueService.getActiveRenewalCount()).toBe(1)
stop()
expect(userMessageQueueService.getActiveRenewalCount()).toBe(0)
})
it('should stop all renewal timers on service shutdown', async () => {
jest.spyOn(userMessageQueueService, 'getConfig').mockResolvedValue({
enabled: true,
delayMs: 200,
timeoutMs: 30000,
lockTtlMs: 120000
})
jest.spyOn(redis, 'refreshUserMessageLock').mockResolvedValue(true)
await userMessageQueueService.startLockRenewal('acct-1', 'req-1')
await userMessageQueueService.startLockRenewal('acct-2', 'req-2')
expect(userMessageQueueService.getActiveRenewalCount()).toBe(2)
userMessageQueueService.stopAllRenewalTimers()
expect(userMessageQueueService.getActiveRenewalCount()).toBe(0)
})
})
describe('acquireQueueLock', () => {
afterEach(() => {
jest.restoreAllMocks()

View File

@@ -2049,7 +2049,7 @@ const bindingCounts = ref({}) // 轻量级绑定计数,用于显示"绑定: X
const accountGroups = ref([])
const groupFilter = ref('all')
const platformFilter = ref('all')
const statusFilter = ref('normal') // 状态过滤 (normal/rateLimited/other/all)
const statusFilter = ref('all') // 状态过滤 (normal/rateLimited/other/all)
const searchKeyword = ref('')
const PAGE_SIZE_STORAGE_KEY = 'accountsPageSize'
const getInitialPageSize = () => {
@@ -2804,11 +2804,12 @@ const loadAccounts = async (forceReload = false) => {
let openaiResponsesRaw = []
const appendAccounts = (platform, data) => {
if (!data || data.length === 0) return
const list = Array.isArray(data) ? data : []
if (list.length === 0) return
switch (platform) {
case 'claude': {
const items = data.map((acc) => {
const items = list.map((acc) => {
const boundApiKeysCount = counts.claudeAccountId?.[acc.id] || 0
return { ...acc, platform: 'claude', boundApiKeysCount }
})
@@ -2816,7 +2817,7 @@ const loadAccounts = async (forceReload = false) => {
break
}
case 'claude-console': {
const items = data.map((acc) => {
const items = list.map((acc) => {
const boundApiKeysCount = counts.claudeConsoleAccountId?.[acc.id] || 0
return { ...acc, platform: 'claude-console', boundApiKeysCount }
})
@@ -2824,12 +2825,12 @@ const loadAccounts = async (forceReload = false) => {
break
}
case 'bedrock': {
const items = data.map((acc) => ({ ...acc, platform: 'bedrock', boundApiKeysCount: 0 }))
const items = list.map((acc) => ({ ...acc, platform: 'bedrock', boundApiKeysCount: 0 }))
allAccounts.push(...items)
break
}
case 'gemini': {
const items = data.map((acc) => {
const items = list.map((acc) => {
const boundApiKeysCount = counts.geminiAccountId?.[acc.id] || 0
return { ...acc, platform: 'gemini', boundApiKeysCount }
})
@@ -2837,7 +2838,7 @@ const loadAccounts = async (forceReload = false) => {
break
}
case 'openai': {
const items = data.map((acc) => {
const items = list.map((acc) => {
const boundApiKeysCount = counts.openaiAccountId?.[acc.id] || 0
return { ...acc, platform: 'openai', boundApiKeysCount }
})
@@ -2845,7 +2846,7 @@ const loadAccounts = async (forceReload = false) => {
break
}
case 'azure_openai': {
const items = data.map((acc) => {
const items = list.map((acc) => {
const boundApiKeysCount = counts.azureOpenaiAccountId?.[acc.id] || 0
return { ...acc, platform: 'azure_openai', boundApiKeysCount }
})
@@ -2853,16 +2854,16 @@ const loadAccounts = async (forceReload = false) => {
break
}
case 'openai-responses': {
openaiResponsesRaw = data
openaiResponsesRaw = list
break
}
case 'ccr': {
const items = data.map((acc) => ({ ...acc, platform: 'ccr', boundApiKeysCount: 0 }))
const items = list.map((acc) => ({ ...acc, platform: 'ccr', boundApiKeysCount: 0 }))
allAccounts.push(...items)
break
}
case 'droid': {
const items = data.map((acc) => {
const items = list.map((acc) => {
const boundApiKeysCount = counts.droidAccountId?.[acc.id] || acc.boundApiKeysCount || 0
return { ...acc, platform: 'droid', boundApiKeysCount }
})
@@ -2870,7 +2871,7 @@ const loadAccounts = async (forceReload = false) => {
break
}
case 'gemini-api': {
const items = data.map((acc) => {
const items = list.map((acc) => {
const boundApiKeysCount = counts.geminiAccountId?.[`api:${acc.id}`] || 0
return { ...acc, platform: 'gemini-api', boundApiKeysCount }
})