feat: 优化并发控制和移除冗余限制功能

主要改进:
1. 改进并发控制机制
   - 使用 once 代替 on 避免重复监听
   - 监听多个事件确保可靠性(close、finish)
   - 支持客户端断开时立即释放并发槽位

2. 支持非流式请求的客户端断开处理
   - 客户端断开时立即中断上游请求
   - 避免资源浪费和不必要的 API 调用

3. 移除 requestLimit(请求数限制)功能
   - 移除配置和验证逻辑
   - 保留请求统计用于监控分析

4. 移除速率限制(Rate Limit)功能
   - 移除 RATE_LIMIT_* 配置
   - 简化中间件逻辑
   - 避免与并发控制重复

现在系统仅保留:
- Token 使用量限制
- 并发数限制(更精确的资源控制)

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
shaw
2025-07-16 14:40:37 +08:00
parent f9bc2ddb23
commit 567e3b25aa
9 changed files with 62 additions and 108 deletions

View File

@@ -29,11 +29,6 @@ MAX_PROXY_RETRIES=3
# 📈 使用限制 # 📈 使用限制
DEFAULT_TOKEN_LIMIT=1000000 DEFAULT_TOKEN_LIMIT=1000000
DEFAULT_REQUEST_LIMIT=1000
# 🚦 速率限制
RATE_LIMIT_WINDOW=60000
RATE_LIMIT_MAX_REQUESTS=100
# 📝 日志配置 # 📝 日志配置
LOG_LEVEL=info LOG_LEVEL=info

View File

@@ -498,12 +498,6 @@ async function createApiKey() {
message: 'Token 限制 (0=无限制):', message: 'Token 限制 (0=无限制):',
default: 1000000 default: 1000000
}, },
{
type: 'number',
name: 'requestLimit',
message: '请求限制 (0=无限制):',
default: 1000
}
]); ]);
const spinner = ora('正在创建 API Key...').start(); const spinner = ora('正在创建 API Key...').start();

View File

@@ -47,14 +47,7 @@ const config = {
// 📈 使用限制 // 📈 使用限制
limits: { limits: {
defaultTokenLimit: parseInt(process.env.DEFAULT_TOKEN_LIMIT) || 1000000, defaultTokenLimit: parseInt(process.env.DEFAULT_TOKEN_LIMIT) || 1000000
defaultRequestLimit: parseInt(process.env.DEFAULT_REQUEST_LIMIT) || 1000
},
// 🚦 速率限制
rateLimit: {
windowMs: parseInt(process.env.RATE_LIMIT_WINDOW) || 60000,
maxRequests: parseInt(process.env.RATE_LIMIT_MAX_REQUESTS) || 100
}, },
// 📝 日志配置 // 📝 日志配置

View File

@@ -42,24 +42,6 @@ const authenticateApiKey = async (req, res, next) => {
}); });
} }
// 检查速率限制(优化:只在验证成功后检查)
const rateLimitResult = await apiKeyService.checkRateLimit(validation.keyData.id);
if (!rateLimitResult.allowed) {
logger.security(`🚦 Rate limit exceeded for key: ${validation.keyData.id} (${validation.keyData.name})`);
return res.status(429).json({
error: 'Rate limit exceeded',
message: `Too many requests. Limit: ${rateLimitResult.limit} requests per minute`,
resetTime: rateLimitResult.resetTime,
retryAfter: rateLimitResult.resetTime
});
}
// 设置标准速率限制响应头
res.setHeader('X-RateLimit-Limit', rateLimitResult.limit);
res.setHeader('X-RateLimit-Remaining', Math.max(0, rateLimitResult.limit - rateLimitResult.current));
res.setHeader('X-RateLimit-Reset', rateLimitResult.resetTime);
res.setHeader('X-RateLimit-Policy', `${rateLimitResult.limit};w=60`);
// 检查并发限制 // 检查并发限制
const concurrencyLimit = validation.keyData.concurrencyLimit || 0; const concurrencyLimit = validation.keyData.concurrencyLimit || 0;
@@ -94,21 +76,29 @@ const authenticateApiKey = async (req, res, next) => {
} }
}; };
// 监听多个事件以确保在各种情况下都能正确减少计数 // 监听最可靠的事件(避免重复监听)
res.on('finish', decrementConcurrency); // res.on('close') 是最可靠的,会在连接关闭时触发
res.on('error', decrementConcurrency); res.once('close', () => {
req.on('close', () => { logger.api(`🔌 Response closed for key: ${validation.keyData.id} (${validation.keyData.name})`);
decrementConcurrency();
});
// req.on('close') 作为备用,处理请求端断开
req.once('close', () => {
logger.api(`🔌 Request closed for key: ${validation.keyData.id} (${validation.keyData.name})`); logger.api(`🔌 Request closed for key: ${validation.keyData.id} (${validation.keyData.name})`);
decrementConcurrency(); decrementConcurrency();
}); });
req.on('aborted', () => {
logger.api(`⚠️ Request aborted for key: ${validation.keyData.id} (${validation.keyData.name})`); // res.on('finish') 处理正常完成的情况
res.once('finish', () => {
logger.api(`✅ Response finished for key: ${validation.keyData.id} (${validation.keyData.name})`);
decrementConcurrency(); decrementConcurrency();
}); });
// 存储并发信息到请求对象,便于后续处理 // 存储并发信息到请求对象,便于后续处理
req.concurrencyInfo = { req.concurrencyInfo = {
apiKeyId: validation.keyData.id, apiKeyId: validation.keyData.id,
apiKeyName: validation.keyData.name,
decrementConcurrency decrementConcurrency
}; };
} }
@@ -118,7 +108,6 @@ const authenticateApiKey = async (req, res, next) => {
id: validation.keyData.id, id: validation.keyData.id,
name: validation.keyData.name, name: validation.keyData.name,
tokenLimit: validation.keyData.tokenLimit, tokenLimit: validation.keyData.tokenLimit,
requestLimit: validation.keyData.requestLimit,
claudeAccountId: validation.keyData.claudeAccountId, claudeAccountId: validation.keyData.claudeAccountId,
concurrencyLimit: validation.keyData.concurrencyLimit concurrencyLimit: validation.keyData.concurrencyLimit
}; };
@@ -272,10 +261,8 @@ const corsMiddleware = (req, res, next) => {
].join(', ')); ].join(', '));
res.header('Access-Control-Expose-Headers', [ res.header('Access-Control-Expose-Headers', [
'X-RateLimit-Limit', 'X-Request-ID',
'X-RateLimit-Remaining', 'Content-Type'
'X-RateLimit-Reset',
'X-RateLimit-Policy'
].join(', ')); ].join(', '));
res.header('Access-Control-Max-Age', '86400'); // 24小时预检缓存 res.header('Access-Control-Max-Age', '86400'); // 24小时预检缓存

View File

@@ -482,22 +482,6 @@ class RedisClient {
return await this.client.del(key); return await this.client.del(key);
} }
// 🚦 速率限制
async checkRateLimit(identifier, limit = 100, window = 60) {
const key = `ratelimit:${identifier}`;
const current = await this.client.incr(key);
if (current === 1) {
await this.client.expire(key, window);
}
return {
allowed: current <= limit,
current,
limit,
resetTime: await this.client.ttl(key)
};
}
// 📈 系统统计 // 📈 系统统计
async getSystemStats() { async getSystemStats() {

View File

@@ -30,7 +30,6 @@ router.post('/api-keys', authenticateAdmin, async (req, res) => {
name, name,
description, description,
tokenLimit, tokenLimit,
requestLimit,
expiresAt, expiresAt,
claudeAccountId, claudeAccountId,
concurrencyLimit concurrencyLimit
@@ -53,9 +52,6 @@ router.post('/api-keys', authenticateAdmin, async (req, res) => {
return res.status(400).json({ error: 'Token limit must be a non-negative integer' }); return res.status(400).json({ error: 'Token limit must be a non-negative integer' });
} }
if (requestLimit && (!Number.isInteger(Number(requestLimit)) || Number(requestLimit) < 0)) {
return res.status(400).json({ error: 'Request limit must be a non-negative integer' });
}
if (concurrencyLimit !== undefined && concurrencyLimit !== null && concurrencyLimit !== '' && (!Number.isInteger(Number(concurrencyLimit)) || Number(concurrencyLimit) < 0)) { if (concurrencyLimit !== undefined && concurrencyLimit !== null && concurrencyLimit !== '' && (!Number.isInteger(Number(concurrencyLimit)) || Number(concurrencyLimit) < 0)) {
return res.status(400).json({ error: 'Concurrency limit must be a non-negative integer' }); return res.status(400).json({ error: 'Concurrency limit must be a non-negative integer' });
@@ -65,7 +61,6 @@ router.post('/api-keys', authenticateAdmin, async (req, res) => {
name, name,
description, description,
tokenLimit, tokenLimit,
requestLimit,
expiresAt, expiresAt,
claudeAccountId, claudeAccountId,
concurrencyLimit concurrencyLimit

View File

@@ -45,20 +45,7 @@ router.post('/v1/messages', authenticateApiKey, async (req, res) => {
res.setHeader('Connection', 'keep-alive'); res.setHeader('Connection', 'keep-alive');
res.setHeader('Access-Control-Allow-Origin', '*'); res.setHeader('Access-Control-Allow-Origin', '*');
// 流式响应添加客户端断开检测,确保并发计数正确减少 // 流式响应不需要额外处理,中间件已经设置了监听器
if (req.concurrencyInfo) {
// 添加响应关闭事件监听器
res.on('close', () => {
logger.api(`🔌 Stream response closed for key: ${req.apiKey.id} (${req.apiKey.name}), triggering concurrency decrement`);
req.concurrencyInfo.decrementConcurrency();
});
// 添加错误事件监听器
res.on('error', (error) => {
logger.api(`⚠️ Stream response error for key: ${req.apiKey.id} (${req.apiKey.name}): ${error.message}`);
req.concurrencyInfo.decrementConcurrency();
});
}
let usageDataCaptured = false; let usageDataCaptured = false;
@@ -99,7 +86,7 @@ router.post('/v1/messages', authenticateApiKey, async (req, res) => {
apiKeyName: req.apiKey.name apiKeyName: req.apiKey.name
}); });
const response = await claudeRelayService.relayRequest(req.body, req.apiKey); const response = await claudeRelayService.relayRequest(req.body, req.apiKey, req, res);
logger.info('📡 Claude API response received', { logger.info('📡 Claude API response received', {
statusCode: response.statusCode, statusCode: response.statusCode,
@@ -201,7 +188,6 @@ router.get('/v1/key-info', authenticateApiKey, async (req, res) => {
id: req.apiKey.id, id: req.apiKey.id,
name: req.apiKey.name, name: req.apiKey.name,
tokenLimit: req.apiKey.tokenLimit, tokenLimit: req.apiKey.tokenLimit,
requestLimit: req.apiKey.requestLimit,
usage usage
}, },
timestamp: new Date().toISOString() timestamp: new Date().toISOString()
@@ -224,7 +210,7 @@ router.get('/v1/usage', authenticateApiKey, async (req, res) => {
usage, usage,
limits: { limits: {
tokens: req.apiKey.tokenLimit, tokens: req.apiKey.tokenLimit,
requests: req.apiKey.requestLimit requests: 0 // 请求限制已移除
}, },
timestamp: new Date().toISOString() timestamp: new Date().toISOString()
}); });

View File

@@ -15,7 +15,6 @@ class ApiKeyService {
name = 'Unnamed Key', name = 'Unnamed Key',
description = '', description = '',
tokenLimit = config.limits.defaultTokenLimit, tokenLimit = config.limits.defaultTokenLimit,
requestLimit = config.limits.defaultRequestLimit,
expiresAt = null, expiresAt = null,
claudeAccountId = null, claudeAccountId = null,
isActive = true, isActive = true,
@@ -33,7 +32,6 @@ class ApiKeyService {
description, description,
apiKey: hashedKey, apiKey: hashedKey,
tokenLimit: String(tokenLimit ?? 0), tokenLimit: String(tokenLimit ?? 0),
requestLimit: String(requestLimit ?? 0),
concurrencyLimit: String(concurrencyLimit ?? 0), concurrencyLimit: String(concurrencyLimit ?? 0),
isActive: String(isActive), isActive: String(isActive),
claudeAccountId: claudeAccountId || '', claudeAccountId: claudeAccountId || '',
@@ -54,7 +52,6 @@ class ApiKeyService {
name: keyData.name, name: keyData.name,
description: keyData.description, description: keyData.description,
tokenLimit: parseInt(keyData.tokenLimit), tokenLimit: parseInt(keyData.tokenLimit),
requestLimit: parseInt(keyData.requestLimit),
concurrencyLimit: parseInt(keyData.concurrencyLimit), concurrencyLimit: parseInt(keyData.concurrencyLimit),
isActive: keyData.isActive === 'true', isActive: keyData.isActive === 'true',
claudeAccountId: keyData.claudeAccountId, claudeAccountId: keyData.claudeAccountId,
@@ -94,15 +91,11 @@ class ApiKeyService {
// 检查使用限制 // 检查使用限制
const usage = await redis.getUsageStats(keyData.id); const usage = await redis.getUsageStats(keyData.id);
const tokenLimit = parseInt(keyData.tokenLimit); const tokenLimit = parseInt(keyData.tokenLimit);
const requestLimit = parseInt(keyData.requestLimit);
if (tokenLimit > 0 && usage.total.tokens >= tokenLimit) { if (tokenLimit > 0 && usage.total.tokens >= tokenLimit) {
return { valid: false, error: 'Token limit exceeded' }; return { valid: false, error: 'Token limit exceeded' };
} }
if (requestLimit > 0 && usage.total.requests >= requestLimit) {
return { valid: false, error: 'Request limit exceeded' };
}
// 更新最后使用时间优化只在实际API调用时更新而不是验证时 // 更新最后使用时间优化只在实际API调用时更新而不是验证时
// 注意lastUsedAt的更新已移至recordUsage方法中 // 注意lastUsedAt的更新已移至recordUsage方法中
@@ -116,8 +109,7 @@ class ApiKeyService {
name: keyData.name, name: keyData.name,
claudeAccountId: keyData.claudeAccountId, claudeAccountId: keyData.claudeAccountId,
tokenLimit: parseInt(keyData.tokenLimit), tokenLimit: parseInt(keyData.tokenLimit),
requestLimit: parseInt(keyData.requestLimit), concurrencyLimit: parseInt(keyData.concurrencyLimit || 0),
concurrencyLimit: parseInt(keyData.concurrencyLimit || 0),
usage usage
} }
}; };
@@ -136,7 +128,6 @@ class ApiKeyService {
for (const key of apiKeys) { for (const key of apiKeys) {
key.usage = await redis.getUsageStats(key.id); key.usage = await redis.getUsageStats(key.id);
key.tokenLimit = parseInt(key.tokenLimit); key.tokenLimit = parseInt(key.tokenLimit);
key.requestLimit = parseInt(key.requestLimit);
key.concurrencyLimit = parseInt(key.concurrencyLimit || 0); key.concurrencyLimit = parseInt(key.concurrencyLimit || 0);
key.currentConcurrency = await redis.getConcurrency(key.id); key.currentConcurrency = await redis.getConcurrency(key.id);
key.isActive = key.isActive === 'true'; key.isActive = key.isActive === 'true';
@@ -159,7 +150,7 @@ class ApiKeyService {
} }
// 允许更新的字段 // 允许更新的字段
const allowedUpdates = ['name', 'description', 'tokenLimit', 'requestLimit', 'concurrencyLimit', 'isActive', 'claudeAccountId', 'expiresAt']; const allowedUpdates = ['name', 'description', 'tokenLimit', 'concurrencyLimit', 'isActive', 'claudeAccountId', 'expiresAt'];
const updatedData = { ...keyData }; const updatedData = { ...keyData };
for (const [field, value] of Object.entries(updates)) { for (const [field, value] of Object.entries(updates)) {
@@ -240,13 +231,6 @@ class ApiKeyService {
return await redis.getUsageStats(keyId); return await redis.getUsageStats(keyId);
} }
// 🚦 检查速率限制
async checkRateLimit(keyId, limit = null) {
const rateLimit = limit || config.rateLimit.maxRequests;
const window = Math.floor(config.rateLimit.windowMs / 1000);
return await redis.checkRateLimit(`apikey:${keyId}`, rateLimit, window);
}
// 🧹 清理过期的API Keys // 🧹 清理过期的API Keys
async cleanupExpiredKeys() { async cleanupExpiredKeys() {

View File

@@ -15,7 +15,9 @@ class ClaudeRelayService {
} }
// 🚀 转发请求到Claude API // 🚀 转发请求到Claude API
async relayRequest(requestBody, apiKeyData) { async relayRequest(requestBody, apiKeyData, clientRequest, clientResponse) {
let upstreamRequest = null;
try { try {
// 生成会话哈希用于sticky会话 // 生成会话哈希用于sticky会话
const sessionHash = sessionHelper.generateSessionHash(requestBody); const sessionHash = sessionHelper.generateSessionHash(requestBody);
@@ -34,8 +36,37 @@ class ClaudeRelayService {
// 获取代理配置 // 获取代理配置
const proxyAgent = await this._getProxyAgent(accountId); const proxyAgent = await this._getProxyAgent(accountId);
// 发送请求到Claude API // 设置客户端断开监听器
const response = await this._makeClaudeRequest(processedBody, accessToken, proxyAgent); const handleClientDisconnect = () => {
logger.info('🔌 Client disconnected, aborting upstream request');
if (upstreamRequest && !upstreamRequest.destroyed) {
upstreamRequest.destroy();
}
};
// 监听客户端断开事件
if (clientRequest) {
clientRequest.once('close', handleClientDisconnect);
}
if (clientResponse) {
clientResponse.once('close', handleClientDisconnect);
}
// 发送请求到Claude API传入回调以获取请求对象
const response = await this._makeClaudeRequest(
processedBody,
accessToken,
proxyAgent,
(req) => { upstreamRequest = req; }
);
// 移除监听器(请求成功完成)
if (clientRequest) {
clientRequest.removeListener('close', handleClientDisconnect);
}
if (clientResponse) {
clientResponse.removeListener('close', handleClientDisconnect);
}
// 记录成功的API调用 // 记录成功的API调用
const inputTokens = requestBody.messages ? const inputTokens = requestBody.messages ?
@@ -160,7 +191,7 @@ class ClaudeRelayService {
} }
// 🔗 发送请求到Claude API // 🔗 发送请求到Claude API
async _makeClaudeRequest(body, accessToken, proxyAgent) { async _makeClaudeRequest(body, accessToken, proxyAgent, onRequest) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const url = new URL(this.claudeApiUrl); const url = new URL(this.claudeApiUrl);
@@ -207,6 +238,11 @@ class ClaudeRelayService {
} }
}); });
}); });
// 如果提供了 onRequest 回调,传递请求对象
if (onRequest && typeof onRequest === 'function') {
onRequest(req);
}
req.on('error', (error) => { req.on('error', (error) => {
logger.error('❌ Claude API request error:', error); logger.error('❌ Claude API request error:', error);