fix: 修复并发计数器在请求异常时不能正确减少的问题

- 增强事件监听机制
  - 监听 req 的 close 和 aborted 事件
  - 监听 res 的 finish 和 error 事件
  - 使用标志位确保只减少一次计数

- 改进日志记录
  - 增加并发计数增减的详细日志
  - 记录请求关闭和中断事件

- 确保计数器安全性
  - 使用 Lua 脚本原子操作防止负数
  - 优化 Redis 操作逻辑

- 增强管理界面
  - API Keys 列表显示当前并发数
  - 并发数超过 0 时用橙色标记
  - 显示当前并发数/限制数格式

🤖 Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
shaw
2025-07-16 10:21:17 +08:00
parent e2c9f26135
commit 08f93e9872
4 changed files with 62 additions and 18 deletions

View File

@@ -65,6 +65,7 @@ const authenticateApiKey = async (req, res, next) => {
const concurrencyLimit = validation.keyData.concurrencyLimit || 0; const concurrencyLimit = validation.keyData.concurrencyLimit || 0;
if (concurrencyLimit > 0) { if (concurrencyLimit > 0) {
const currentConcurrency = await redis.incrConcurrency(validation.keyData.id); const currentConcurrency = await redis.incrConcurrency(validation.keyData.id);
logger.api(`📈 Incremented concurrency for key: ${validation.keyData.id} (${validation.keyData.name}), current: ${currentConcurrency}, limit: ${concurrencyLimit}`);
if (currentConcurrency > concurrencyLimit) { if (currentConcurrency > concurrencyLimit) {
// 如果超过限制,立即减少计数 // 如果超过限制,立即减少计数
@@ -78,19 +79,38 @@ const authenticateApiKey = async (req, res, next) => {
}); });
} }
// 在响应结束时减少并发计数 // 使用标志位确保只减少一次
res.on('finish', () => { let concurrencyDecremented = false;
redis.decrConcurrency(validation.keyData.id).catch(error => {
logger.error('Failed to decrement concurrency:', error); const decrementConcurrency = async () => {
}); if (!concurrencyDecremented) {
concurrencyDecremented = true;
try {
const newCount = await redis.decrConcurrency(validation.keyData.id);
logger.api(`📉 Decremented concurrency for key: ${validation.keyData.id} (${validation.keyData.name}), new count: ${newCount}`);
} catch (error) {
logger.error(`Failed to decrement concurrency for key ${validation.keyData.id}:`, error);
}
}
};
// 监听多个事件以确保在各种情况下都能正确减少计数
res.on('finish', decrementConcurrency);
res.on('error', decrementConcurrency);
req.on('close', () => {
logger.api(`🔌 Request closed for key: ${validation.keyData.id} (${validation.keyData.name})`);
decrementConcurrency();
});
req.on('aborted', () => {
logger.api(`⚠️ Request aborted for key: ${validation.keyData.id} (${validation.keyData.name})`);
decrementConcurrency();
}); });
// 在响应错误时也减少并发计数 // 存储并发信息到请求对象,便于后续处理
res.on('error', () => { req.concurrencyInfo = {
redis.decrConcurrency(validation.keyData.id).catch(error => { apiKeyId: validation.keyData.id,
logger.error('Failed to decrement concurrency on error:', error); decrementConcurrency
}); };
});
} }
// 将验证信息添加到请求对象(只包含必要信息) // 将验证信息添加到请求对象(只包含必要信息)

View File

@@ -724,6 +724,7 @@ class RedisClient {
// 设置过期时间为5分钟防止计数器永远不清零 // 设置过期时间为5分钟防止计数器永远不清零
await this.client.expire(key, 300); await this.client.expire(key, 300);
logger.database(`🔢 Incremented concurrency for key ${apiKeyId}: ${count}`);
return count; return count;
} catch (error) { } catch (error) {
logger.error('❌ Failed to increment concurrency:', error); logger.error('❌ Failed to increment concurrency:', error);
@@ -735,14 +736,28 @@ class RedisClient {
async decrConcurrency(apiKeyId) { async decrConcurrency(apiKeyId) {
try { try {
const key = `concurrency:${apiKeyId}`; const key = `concurrency:${apiKeyId}`;
const count = await this.client.decr(key);
// 如果计数降到0或以下删除键 // 使用Lua脚本确保原子性操作防止计数器变成负数
if (count <= 0) { const luaScript = `
await this.client.del(key); local key = KEYS[1]
return 0; local current = tonumber(redis.call('get', key) or "0")
}
if current <= 0 then
redis.call('del', key)
return 0
else
local new_value = redis.call('decr', key)
if new_value <= 0 then
redis.call('del', key)
return 0
else
return new_value
end
end
`;
const count = await this.client.eval(luaScript, 1, key);
logger.database(`🔢 Decremented concurrency for key ${apiKeyId}: ${count}`);
return count; return count;
} catch (error) { } catch (error) {
logger.error('❌ Failed to decrement concurrency:', error); logger.error('❌ Failed to decrement concurrency:', error);

View File

@@ -132,12 +132,13 @@ class ApiKeyService {
try { try {
const apiKeys = await redis.getAllApiKeys(); const apiKeys = await redis.getAllApiKeys();
// 为每个key添加使用统计 // 为每个key添加使用统计和当前并发数
for (const key of apiKeys) { for (const key of apiKeys) {
key.usage = await redis.getUsageStats(key.id); key.usage = await redis.getUsageStats(key.id);
key.tokenLimit = parseInt(key.tokenLimit); key.tokenLimit = parseInt(key.tokenLimit);
key.requestLimit = parseInt(key.requestLimit); key.requestLimit = parseInt(key.requestLimit);
key.concurrencyLimit = parseInt(key.concurrencyLimit || 0); key.concurrencyLimit = parseInt(key.concurrencyLimit || 0);
key.currentConcurrency = await redis.getConcurrency(key.id);
key.isActive = key.isActive === 'true'; key.isActive = key.isActive === 'true';
delete key.apiKey; // 不返回哈希后的key delete key.apiKey; // 不返回哈希后的key
} }

View File

@@ -430,6 +430,14 @@
<span class="text-gray-600">并发限制:</span> <span class="text-gray-600">并发限制:</span>
<span class="font-medium text-purple-600">{{ key.concurrencyLimit > 0 ? key.concurrencyLimit : '无限制' }}</span> <span class="font-medium text-purple-600">{{ key.concurrencyLimit > 0 ? key.concurrencyLimit : '无限制' }}</span>
</div> </div>
<!-- 当前并发数 -->
<div class="flex justify-between text-sm">
<span class="text-gray-600">当前并发:</span>
<span :class="['font-medium', key.currentConcurrency > 0 ? 'text-orange-600' : 'text-gray-600']">
{{ key.currentConcurrency || 0 }}
<span v-if="key.concurrencyLimit > 0" class="text-xs text-gray-500">/ {{ key.concurrencyLimit }}</span>
</span>
</div>
<!-- 输入/输出Token --> <!-- 输入/输出Token -->
<div class="flex justify-between text-xs text-gray-500"> <div class="flex justify-between text-xs text-gray-500">
<span>输入: {{ formatNumber((key.usage && key.usage.total && key.usage.total.inputTokens) || 0) }}</span> <span>输入: {{ formatNumber((key.usage && key.usage.total && key.usage.total.inputTokens) || 0) }}</span>