diff --git a/.env.example b/.env.example index 4b3c3226..debac4de 100644 --- a/.env.example +++ b/.env.example @@ -29,11 +29,6 @@ MAX_PROXY_RETRIES=3 # 📈 使用限制 DEFAULT_TOKEN_LIMIT=1000000 -DEFAULT_REQUEST_LIMIT=1000 - -# 🚦 速率限制 -RATE_LIMIT_WINDOW=60000 -RATE_LIMIT_MAX_REQUESTS=100 # 📝 日志配置 LOG_LEVEL=info diff --git a/cli/index.js b/cli/index.js index a27002a1..996ae5d8 100644 --- a/cli/index.js +++ b/cli/index.js @@ -498,12 +498,6 @@ async function createApiKey() { message: 'Token 限制 (0=无限制):', default: 1000000 }, - { - type: 'number', - name: 'requestLimit', - message: '请求限制 (0=无限制):', - default: 1000 - } ]); const spinner = ora('正在创建 API Key...').start(); diff --git a/config/config.example.js b/config/config.example.js index b9725332..bfcb5e74 100644 --- a/config/config.example.js +++ b/config/config.example.js @@ -47,14 +47,7 @@ const config = { // 📈 使用限制 limits: { - defaultTokenLimit: parseInt(process.env.DEFAULT_TOKEN_LIMIT) || 1000000, - defaultRequestLimit: parseInt(process.env.DEFAULT_REQUEST_LIMIT) || 1000 - }, - - // 🚦 速率限制 - rateLimit: { - windowMs: parseInt(process.env.RATE_LIMIT_WINDOW) || 60000, - maxRequests: parseInt(process.env.RATE_LIMIT_MAX_REQUESTS) || 100 + defaultTokenLimit: parseInt(process.env.DEFAULT_TOKEN_LIMIT) || 1000000 }, // 📝 日志配置 diff --git a/src/middleware/auth.js b/src/middleware/auth.js index c2550f93..df86b110 100644 --- a/src/middleware/auth.js +++ b/src/middleware/auth.js @@ -42,24 +42,6 @@ const authenticateApiKey = async (req, res, next) => { }); } - // 检查速率限制(优化:只在验证成功后检查) - const rateLimitResult = await apiKeyService.checkRateLimit(validation.keyData.id); - - if (!rateLimitResult.allowed) { - logger.security(`🚦 Rate limit exceeded for key: ${validation.keyData.id} (${validation.keyData.name})`); - return res.status(429).json({ - error: 'Rate limit exceeded', - message: `Too many requests. Limit: ${rateLimitResult.limit} requests per minute`, - resetTime: rateLimitResult.resetTime, - retryAfter: rateLimitResult.resetTime - }); - } - - // 设置标准速率限制响应头 - res.setHeader('X-RateLimit-Limit', rateLimitResult.limit); - res.setHeader('X-RateLimit-Remaining', Math.max(0, rateLimitResult.limit - rateLimitResult.current)); - res.setHeader('X-RateLimit-Reset', rateLimitResult.resetTime); - res.setHeader('X-RateLimit-Policy', `${rateLimitResult.limit};w=60`); // 检查并发限制 const concurrencyLimit = validation.keyData.concurrencyLimit || 0; @@ -94,21 +76,29 @@ const authenticateApiKey = async (req, res, next) => { } }; - // 监听多个事件以确保在各种情况下都能正确减少计数 - res.on('finish', decrementConcurrency); - res.on('error', decrementConcurrency); - req.on('close', () => { + // 监听最可靠的事件(避免重复监听) + // res.on('close') 是最可靠的,会在连接关闭时触发 + res.once('close', () => { + logger.api(`🔌 Response closed for key: ${validation.keyData.id} (${validation.keyData.name})`); + decrementConcurrency(); + }); + + // req.on('close') 作为备用,处理请求端断开 + req.once('close', () => { logger.api(`🔌 Request closed for key: ${validation.keyData.id} (${validation.keyData.name})`); decrementConcurrency(); }); - req.on('aborted', () => { - logger.api(`⚠️ Request aborted for key: ${validation.keyData.id} (${validation.keyData.name})`); + + // res.on('finish') 处理正常完成的情况 + res.once('finish', () => { + logger.api(`✅ Response finished for key: ${validation.keyData.id} (${validation.keyData.name})`); decrementConcurrency(); }); // 存储并发信息到请求对象,便于后续处理 req.concurrencyInfo = { apiKeyId: validation.keyData.id, + apiKeyName: validation.keyData.name, decrementConcurrency }; } @@ -118,7 +108,6 @@ const authenticateApiKey = async (req, res, next) => { id: validation.keyData.id, name: validation.keyData.name, tokenLimit: validation.keyData.tokenLimit, - requestLimit: validation.keyData.requestLimit, claudeAccountId: validation.keyData.claudeAccountId, concurrencyLimit: validation.keyData.concurrencyLimit }; @@ -272,10 +261,8 @@ const corsMiddleware = (req, res, next) => { ].join(', ')); res.header('Access-Control-Expose-Headers', [ - 'X-RateLimit-Limit', - 'X-RateLimit-Remaining', - 'X-RateLimit-Reset', - 'X-RateLimit-Policy' + 'X-Request-ID', + 'Content-Type' ].join(', ')); res.header('Access-Control-Max-Age', '86400'); // 24小时预检缓存 diff --git a/src/models/redis.js b/src/models/redis.js index f2e25363..9d6655f2 100644 --- a/src/models/redis.js +++ b/src/models/redis.js @@ -482,22 +482,6 @@ class RedisClient { return await this.client.del(key); } - // 🚦 速率限制 - async checkRateLimit(identifier, limit = 100, window = 60) { - const key = `ratelimit:${identifier}`; - const current = await this.client.incr(key); - - if (current === 1) { - await this.client.expire(key, window); - } - - return { - allowed: current <= limit, - current, - limit, - resetTime: await this.client.ttl(key) - }; - } // 📈 系统统计 async getSystemStats() { diff --git a/src/routes/admin.js b/src/routes/admin.js index 5861cd85..cd8de6d5 100644 --- a/src/routes/admin.js +++ b/src/routes/admin.js @@ -30,7 +30,6 @@ router.post('/api-keys', authenticateAdmin, async (req, res) => { name, description, tokenLimit, - requestLimit, expiresAt, claudeAccountId, concurrencyLimit @@ -53,9 +52,6 @@ router.post('/api-keys', authenticateAdmin, async (req, res) => { return res.status(400).json({ error: 'Token limit must be a non-negative integer' }); } - if (requestLimit && (!Number.isInteger(Number(requestLimit)) || Number(requestLimit) < 0)) { - return res.status(400).json({ error: 'Request limit must be a non-negative integer' }); - } if (concurrencyLimit !== undefined && concurrencyLimit !== null && concurrencyLimit !== '' && (!Number.isInteger(Number(concurrencyLimit)) || Number(concurrencyLimit) < 0)) { return res.status(400).json({ error: 'Concurrency limit must be a non-negative integer' }); @@ -65,7 +61,6 @@ router.post('/api-keys', authenticateAdmin, async (req, res) => { name, description, tokenLimit, - requestLimit, expiresAt, claudeAccountId, concurrencyLimit diff --git a/src/routes/api.js b/src/routes/api.js index ab12b357..e826f339 100644 --- a/src/routes/api.js +++ b/src/routes/api.js @@ -45,20 +45,7 @@ router.post('/v1/messages', authenticateApiKey, async (req, res) => { res.setHeader('Connection', 'keep-alive'); res.setHeader('Access-Control-Allow-Origin', '*'); - // 为流式响应添加客户端断开检测,确保并发计数正确减少 - if (req.concurrencyInfo) { - // 添加响应关闭事件监听器 - res.on('close', () => { - logger.api(`🔌 Stream response closed for key: ${req.apiKey.id} (${req.apiKey.name}), triggering concurrency decrement`); - req.concurrencyInfo.decrementConcurrency(); - }); - - // 添加错误事件监听器 - res.on('error', (error) => { - logger.api(`⚠️ Stream response error for key: ${req.apiKey.id} (${req.apiKey.name}): ${error.message}`); - req.concurrencyInfo.decrementConcurrency(); - }); - } + // 流式响应不需要额外处理,中间件已经设置了监听器 let usageDataCaptured = false; @@ -99,7 +86,7 @@ router.post('/v1/messages', authenticateApiKey, async (req, res) => { apiKeyName: req.apiKey.name }); - const response = await claudeRelayService.relayRequest(req.body, req.apiKey); + const response = await claudeRelayService.relayRequest(req.body, req.apiKey, req, res); logger.info('📡 Claude API response received', { statusCode: response.statusCode, @@ -201,7 +188,6 @@ router.get('/v1/key-info', authenticateApiKey, async (req, res) => { id: req.apiKey.id, name: req.apiKey.name, tokenLimit: req.apiKey.tokenLimit, - requestLimit: req.apiKey.requestLimit, usage }, timestamp: new Date().toISOString() @@ -224,7 +210,7 @@ router.get('/v1/usage', authenticateApiKey, async (req, res) => { usage, limits: { tokens: req.apiKey.tokenLimit, - requests: req.apiKey.requestLimit + requests: 0 // 请求限制已移除 }, timestamp: new Date().toISOString() }); diff --git a/src/services/apiKeyService.js b/src/services/apiKeyService.js index c4365199..ee755ea5 100644 --- a/src/services/apiKeyService.js +++ b/src/services/apiKeyService.js @@ -15,7 +15,6 @@ class ApiKeyService { name = 'Unnamed Key', description = '', tokenLimit = config.limits.defaultTokenLimit, - requestLimit = config.limits.defaultRequestLimit, expiresAt = null, claudeAccountId = null, isActive = true, @@ -33,7 +32,6 @@ class ApiKeyService { description, apiKey: hashedKey, tokenLimit: String(tokenLimit ?? 0), - requestLimit: String(requestLimit ?? 0), concurrencyLimit: String(concurrencyLimit ?? 0), isActive: String(isActive), claudeAccountId: claudeAccountId || '', @@ -54,7 +52,6 @@ class ApiKeyService { name: keyData.name, description: keyData.description, tokenLimit: parseInt(keyData.tokenLimit), - requestLimit: parseInt(keyData.requestLimit), concurrencyLimit: parseInt(keyData.concurrencyLimit), isActive: keyData.isActive === 'true', claudeAccountId: keyData.claudeAccountId, @@ -94,15 +91,11 @@ class ApiKeyService { // 检查使用限制 const usage = await redis.getUsageStats(keyData.id); const tokenLimit = parseInt(keyData.tokenLimit); - const requestLimit = parseInt(keyData.requestLimit); if (tokenLimit > 0 && usage.total.tokens >= tokenLimit) { return { valid: false, error: 'Token limit exceeded' }; } - if (requestLimit > 0 && usage.total.requests >= requestLimit) { - return { valid: false, error: 'Request limit exceeded' }; - } // 更新最后使用时间(优化:只在实际API调用时更新,而不是验证时) // 注意:lastUsedAt的更新已移至recordUsage方法中 @@ -116,8 +109,7 @@ class ApiKeyService { name: keyData.name, claudeAccountId: keyData.claudeAccountId, tokenLimit: parseInt(keyData.tokenLimit), - requestLimit: parseInt(keyData.requestLimit), - concurrencyLimit: parseInt(keyData.concurrencyLimit || 0), + concurrencyLimit: parseInt(keyData.concurrencyLimit || 0), usage } }; @@ -136,7 +128,6 @@ class ApiKeyService { for (const key of apiKeys) { key.usage = await redis.getUsageStats(key.id); key.tokenLimit = parseInt(key.tokenLimit); - key.requestLimit = parseInt(key.requestLimit); key.concurrencyLimit = parseInt(key.concurrencyLimit || 0); key.currentConcurrency = await redis.getConcurrency(key.id); key.isActive = key.isActive === 'true'; @@ -159,7 +150,7 @@ class ApiKeyService { } // 允许更新的字段 - const allowedUpdates = ['name', 'description', 'tokenLimit', 'requestLimit', 'concurrencyLimit', 'isActive', 'claudeAccountId', 'expiresAt']; + const allowedUpdates = ['name', 'description', 'tokenLimit', 'concurrencyLimit', 'isActive', 'claudeAccountId', 'expiresAt']; const updatedData = { ...keyData }; for (const [field, value] of Object.entries(updates)) { @@ -240,13 +231,6 @@ class ApiKeyService { return await redis.getUsageStats(keyId); } - // 🚦 检查速率限制 - async checkRateLimit(keyId, limit = null) { - const rateLimit = limit || config.rateLimit.maxRequests; - const window = Math.floor(config.rateLimit.windowMs / 1000); - - return await redis.checkRateLimit(`apikey:${keyId}`, rateLimit, window); - } // 🧹 清理过期的API Keys async cleanupExpiredKeys() { diff --git a/src/services/claudeRelayService.js b/src/services/claudeRelayService.js index 521c7239..41de8a5b 100644 --- a/src/services/claudeRelayService.js +++ b/src/services/claudeRelayService.js @@ -15,7 +15,9 @@ class ClaudeRelayService { } // 🚀 转发请求到Claude API - async relayRequest(requestBody, apiKeyData) { + async relayRequest(requestBody, apiKeyData, clientRequest, clientResponse) { + let upstreamRequest = null; + try { // 生成会话哈希用于sticky会话 const sessionHash = sessionHelper.generateSessionHash(requestBody); @@ -34,8 +36,37 @@ class ClaudeRelayService { // 获取代理配置 const proxyAgent = await this._getProxyAgent(accountId); - // 发送请求到Claude API - const response = await this._makeClaudeRequest(processedBody, accessToken, proxyAgent); + // 设置客户端断开监听器 + const handleClientDisconnect = () => { + logger.info('🔌 Client disconnected, aborting upstream request'); + if (upstreamRequest && !upstreamRequest.destroyed) { + upstreamRequest.destroy(); + } + }; + + // 监听客户端断开事件 + if (clientRequest) { + clientRequest.once('close', handleClientDisconnect); + } + if (clientResponse) { + clientResponse.once('close', handleClientDisconnect); + } + + // 发送请求到Claude API(传入回调以获取请求对象) + const response = await this._makeClaudeRequest( + processedBody, + accessToken, + proxyAgent, + (req) => { upstreamRequest = req; } + ); + + // 移除监听器(请求成功完成) + if (clientRequest) { + clientRequest.removeListener('close', handleClientDisconnect); + } + if (clientResponse) { + clientResponse.removeListener('close', handleClientDisconnect); + } // 记录成功的API调用 const inputTokens = requestBody.messages ? @@ -160,7 +191,7 @@ class ClaudeRelayService { } // 🔗 发送请求到Claude API - async _makeClaudeRequest(body, accessToken, proxyAgent) { + async _makeClaudeRequest(body, accessToken, proxyAgent, onRequest) { return new Promise((resolve, reject) => { const url = new URL(this.claudeApiUrl); @@ -207,6 +238,11 @@ class ClaudeRelayService { } }); }); + + // 如果提供了 onRequest 回调,传递请求对象 + if (onRequest && typeof onRequest === 'function') { + onRequest(req); + } req.on('error', (error) => { logger.error('❌ Claude API request error:', error);