diff --git a/src/middleware/auth.js b/src/middleware/auth.js index c39655e0..9c34fd5e 100644 --- a/src/middleware/auth.js +++ b/src/middleware/auth.js @@ -226,8 +226,18 @@ const authenticateApiKey = async (req, res, next) => { ) if (currentConcurrency > concurrencyLimit) { - // 如果超过限制,立即减少计数 - await redis.decrConcurrency(validation.keyData.id, requestId) + // 如果超过限制,立即减少计数(添加 try-catch 防止异常导致并发泄漏) + try { + const newCount = await redis.decrConcurrency(validation.keyData.id, requestId) + logger.api( + `📉 Decremented concurrency (429 rejected) for key: ${validation.keyData.id} (${validation.keyData.name}), new count: ${newCount}` + ) + } catch (error) { + logger.error( + `Failed to decrement concurrency after limit exceeded for key ${validation.keyData.id}:`, + error + ) + } logger.security( `🚦 Concurrency limit exceeded for key: ${validation.keyData.id} (${ validation.keyData.name @@ -249,7 +259,38 @@ const authenticateApiKey = async (req, res, next) => { let leaseRenewInterval = null if (renewIntervalMs > 0) { + // 🔴 关键修复:添加最大刷新次数限制,防止租约永不过期 + // 默认最大生存时间为 10 分钟,可通过环境变量配置 + const maxLifetimeMinutes = parseInt(process.env.CONCURRENCY_MAX_LIFETIME_MINUTES) || 10 + const maxRefreshCount = Math.ceil((maxLifetimeMinutes * 60 * 1000) / renewIntervalMs) + let refreshCount = 0 + leaseRenewInterval = setInterval(() => { + refreshCount++ + + // 超过最大刷新次数,强制停止并清理 + if (refreshCount > maxRefreshCount) { + logger.warn( + `⚠️ Lease refresh exceeded max count (${maxRefreshCount}) for key ${validation.keyData.id} (${validation.keyData.name}), forcing cleanup after ${maxLifetimeMinutes} minutes` + ) + // 清理定时器 + if (leaseRenewInterval) { + clearInterval(leaseRenewInterval) + leaseRenewInterval = null + } + // 强制减少并发计数(如果还没减少) + if (!concurrencyDecremented) { + concurrencyDecremented = true + redis.decrConcurrency(validation.keyData.id, requestId).catch((error) => { + logger.error( + `Failed to decrement concurrency after max refresh for key ${validation.keyData.id}:`, + error + ) + }) + } + return + } + redis .refreshConcurrencyLease(validation.keyData.id, requestId, leaseSeconds) .catch((error) => { diff --git a/src/models/redis.js b/src/models/redis.js index 10f10e7a..073ba17c 100644 --- a/src/models/redis.js +++ b/src/models/redis.js @@ -2034,6 +2034,246 @@ class RedisClient { return await this.getConcurrency(compositeKey) } + // 🔧 并发管理方法(用于管理员手动清理) + + /** + * 获取所有并发状态 + * @returns {Promise} 并发状态列表 + */ + async getAllConcurrencyStatus() { + try { + const client = this.getClientSafe() + const keys = await client.keys('concurrency:*') + const now = Date.now() + const results = [] + + for (const key of keys) { + // 提取 apiKeyId(去掉 concurrency: 前缀) + const apiKeyId = key.replace('concurrency:', '') + + // 获取所有成员和分数(过期时间) + const members = await client.zrangebyscore(key, now, '+inf', 'WITHSCORES') + + // 解析成员和过期时间 + const activeRequests = [] + for (let i = 0; i < members.length; i += 2) { + const requestId = members[i] + const expireAt = parseInt(members[i + 1]) + const remainingSeconds = Math.max(0, Math.round((expireAt - now) / 1000)) + activeRequests.push({ + requestId, + expireAt: new Date(expireAt).toISOString(), + remainingSeconds + }) + } + + // 获取过期的成员数量 + const expiredCount = await client.zcount(key, '-inf', now) + + results.push({ + apiKeyId, + key, + activeCount: activeRequests.length, + expiredCount, + activeRequests + }) + } + + return results + } catch (error) { + logger.error('❌ Failed to get all concurrency status:', error) + throw error + } + } + + /** + * 获取特定 API Key 的并发状态详情 + * @param {string} apiKeyId - API Key ID + * @returns {Promise} 并发状态详情 + */ + async getConcurrencyStatus(apiKeyId) { + try { + const client = this.getClientSafe() + const key = `concurrency:${apiKeyId}` + const now = Date.now() + + // 检查 key 是否存在 + const exists = await client.exists(key) + if (!exists) { + return { + apiKeyId, + key, + activeCount: 0, + expiredCount: 0, + activeRequests: [], + exists: false + } + } + + // 获取所有成员和分数 + const allMembers = await client.zrange(key, 0, -1, 'WITHSCORES') + + const activeRequests = [] + const expiredRequests = [] + + for (let i = 0; i < allMembers.length; i += 2) { + const requestId = allMembers[i] + const expireAt = parseInt(allMembers[i + 1]) + const remainingSeconds = Math.round((expireAt - now) / 1000) + + const requestInfo = { + requestId, + expireAt: new Date(expireAt).toISOString(), + remainingSeconds + } + + if (expireAt > now) { + activeRequests.push(requestInfo) + } else { + expiredRequests.push(requestInfo) + } + } + + return { + apiKeyId, + key, + activeCount: activeRequests.length, + expiredCount: expiredRequests.length, + activeRequests, + expiredRequests, + exists: true + } + } catch (error) { + logger.error(`❌ Failed to get concurrency status for ${apiKeyId}:`, error) + throw error + } + } + + /** + * 强制清理特定 API Key 的并发计数(忽略租约) + * @param {string} apiKeyId - API Key ID + * @returns {Promise} 清理结果 + */ + async forceClearConcurrency(apiKeyId) { + try { + const client = this.getClientSafe() + const key = `concurrency:${apiKeyId}` + + // 获取清理前的状态 + const beforeCount = await client.zcard(key) + + // 删除整个 key + await client.del(key) + + logger.warn( + `🧹 Force cleared concurrency for key ${apiKeyId}, removed ${beforeCount} entries` + ) + + return { + apiKeyId, + key, + clearedCount: beforeCount, + success: true + } + } catch (error) { + logger.error(`❌ Failed to force clear concurrency for ${apiKeyId}:`, error) + throw error + } + } + + /** + * 强制清理所有并发计数 + * @returns {Promise} 清理结果 + */ + async forceClearAllConcurrency() { + try { + const client = this.getClientSafe() + const keys = await client.keys('concurrency:*') + + let totalCleared = 0 + const clearedKeys = [] + + for (const key of keys) { + const count = await client.zcard(key) + await client.del(key) + totalCleared += count + clearedKeys.push({ + key, + clearedCount: count + }) + } + + logger.warn( + `🧹 Force cleared all concurrency: ${keys.length} keys, ${totalCleared} total entries` + ) + + return { + keysCleared: keys.length, + totalEntriesCleared: totalCleared, + clearedKeys, + success: true + } + } catch (error) { + logger.error('❌ Failed to force clear all concurrency:', error) + throw error + } + } + + /** + * 清理过期的并发条目(不影响活跃请求) + * @param {string} apiKeyId - API Key ID(可选,不传则清理所有) + * @returns {Promise} 清理结果 + */ + async cleanupExpiredConcurrency(apiKeyId = null) { + try { + const client = this.getClientSafe() + const now = Date.now() + let keys + + if (apiKeyId) { + keys = [`concurrency:${apiKeyId}`] + } else { + keys = await client.keys('concurrency:*') + } + + let totalCleaned = 0 + const cleanedKeys = [] + + for (const key of keys) { + // 只清理过期的条目 + const cleaned = await client.zremrangebyscore(key, '-inf', now) + if (cleaned > 0) { + totalCleaned += cleaned + cleanedKeys.push({ + key, + cleanedCount: cleaned + }) + } + + // 如果 key 为空,删除它 + const remaining = await client.zcard(key) + if (remaining === 0) { + await client.del(key) + } + } + + logger.info( + `🧹 Cleaned up expired concurrency: ${totalCleaned} entries from ${cleanedKeys.length} keys` + ) + + return { + keysProcessed: keys.length, + keysCleaned: cleanedKeys.length, + totalEntriesCleaned: totalCleaned, + cleanedKeys, + success: true + } + } catch (error) { + logger.error('❌ Failed to cleanup expired concurrency:', error) + throw error + } + } + // 🔧 Basic Redis operations wrapper methods for convenience async get(key) { const client = this.getClientSafe() diff --git a/src/routes/admin/concurrency.js b/src/routes/admin/concurrency.js new file mode 100644 index 00000000..80fee22c --- /dev/null +++ b/src/routes/admin/concurrency.js @@ -0,0 +1,145 @@ +/** + * 并发管理 API 路由 + * 提供并发状态查看和手动清理功能 + */ + +const express = require('express') +const router = express.Router() +const redis = require('../../models/redis') +const logger = require('../../utils/logger') + +/** + * GET /admin/concurrency + * 获取所有并发状态 + */ +router.get('/concurrency', async (req, res) => { + try { + const status = await redis.getAllConcurrencyStatus() + + // 计算汇总统计 + const summary = { + totalKeys: status.length, + totalActiveRequests: status.reduce((sum, s) => sum + s.activeCount, 0), + totalExpiredRequests: status.reduce((sum, s) => sum + s.expiredCount, 0) + } + + res.json({ + success: true, + summary, + concurrencyStatus: status + }) + } catch (error) { + logger.error('❌ Failed to get concurrency status:', error) + res.status(500).json({ + success: false, + error: 'Failed to get concurrency status', + message: error.message + }) + } +}) + +/** + * GET /admin/concurrency/:apiKeyId + * 获取特定 API Key 的并发状态详情 + */ +router.get('/concurrency/:apiKeyId', async (req, res) => { + try { + const { apiKeyId } = req.params + const status = await redis.getConcurrencyStatus(apiKeyId) + + res.json({ + success: true, + concurrencyStatus: status + }) + } catch (error) { + logger.error(`❌ Failed to get concurrency status for ${req.params.apiKeyId}:`, error) + res.status(500).json({ + success: false, + error: 'Failed to get concurrency status', + message: error.message + }) + } +}) + +/** + * DELETE /admin/concurrency/:apiKeyId + * 强制清理特定 API Key 的并发计数 + */ +router.delete('/concurrency/:apiKeyId', async (req, res) => { + try { + const { apiKeyId } = req.params + const result = await redis.forceClearConcurrency(apiKeyId) + + logger.warn( + `🧹 Admin ${req.admin?.username || 'unknown'} force cleared concurrency for key ${apiKeyId}` + ) + + res.json({ + success: true, + message: `Successfully cleared concurrency for API key ${apiKeyId}`, + result + }) + } catch (error) { + logger.error(`❌ Failed to clear concurrency for ${req.params.apiKeyId}:`, error) + res.status(500).json({ + success: false, + error: 'Failed to clear concurrency', + message: error.message + }) + } +}) + +/** + * DELETE /admin/concurrency + * 强制清理所有并发计数 + */ +router.delete('/concurrency', async (req, res) => { + try { + const result = await redis.forceClearAllConcurrency() + + logger.warn(`🧹 Admin ${req.admin?.username || 'unknown'} force cleared ALL concurrency`) + + res.json({ + success: true, + message: 'Successfully cleared all concurrency', + result + }) + } catch (error) { + logger.error('❌ Failed to clear all concurrency:', error) + res.status(500).json({ + success: false, + error: 'Failed to clear all concurrency', + message: error.message + }) + } +}) + +/** + * POST /admin/concurrency/cleanup + * 清理过期的并发条目(不影响活跃请求) + */ +router.post('/concurrency/cleanup', async (req, res) => { + try { + const { apiKeyId } = req.body + const result = await redis.cleanupExpiredConcurrency(apiKeyId || null) + + logger.info(`🧹 Admin ${req.admin?.username || 'unknown'} cleaned up expired concurrency`) + + res.json({ + success: true, + message: apiKeyId + ? `Successfully cleaned up expired concurrency for API key ${apiKeyId}` + : 'Successfully cleaned up all expired concurrency', + result + }) + } catch (error) { + logger.error('❌ Failed to cleanup expired concurrency:', error) + res.status(500).json({ + success: false, + error: 'Failed to cleanup expired concurrency', + message: error.message + }) + } +}) + +module.exports = router diff --git a/src/routes/admin/index.js b/src/routes/admin/index.js index 32b77db6..3e133c50 100644 --- a/src/routes/admin/index.js +++ b/src/routes/admin/index.js @@ -22,6 +22,7 @@ const droidAccountsRoutes = require('./droidAccounts') const dashboardRoutes = require('./dashboard') const usageStatsRoutes = require('./usageStats') const systemRoutes = require('./system') +const concurrencyRoutes = require('./concurrency') // 挂载所有子路由 // 使用完整路径的模块(直接挂载到根路径) @@ -35,6 +36,7 @@ router.use('/', droidAccountsRoutes) router.use('/', dashboardRoutes) router.use('/', usageStatsRoutes) router.use('/', systemRoutes) +router.use('/', concurrencyRoutes) // 使用相对路径的模块(需要指定基础路径前缀) router.use('/account-groups', accountGroupsRoutes)