feat(concurrencyManagement): implement concurrency status management API and enhance concurrency handling in middleware

This commit is contained in:
DaydreamCoding
2025-12-06 17:23:42 +08:00
committed by QTom
parent d81a16b98d
commit f74f77ef65
4 changed files with 430 additions and 2 deletions

View File

@@ -226,8 +226,18 @@ const authenticateApiKey = async (req, res, next) => {
) )
if (currentConcurrency > concurrencyLimit) { if (currentConcurrency > concurrencyLimit) {
// 如果超过限制,立即减少计数 // 如果超过限制,立即减少计数(添加 try-catch 防止异常导致并发泄漏)
await redis.decrConcurrency(validation.keyData.id, requestId) try {
const newCount = await redis.decrConcurrency(validation.keyData.id, requestId)
logger.api(
`📉 Decremented concurrency (429 rejected) for key: ${validation.keyData.id} (${validation.keyData.name}), new count: ${newCount}`
)
} catch (error) {
logger.error(
`Failed to decrement concurrency after limit exceeded for key ${validation.keyData.id}:`,
error
)
}
logger.security( logger.security(
`🚦 Concurrency limit exceeded for key: ${validation.keyData.id} (${ `🚦 Concurrency limit exceeded for key: ${validation.keyData.id} (${
validation.keyData.name validation.keyData.name
@@ -249,7 +259,38 @@ const authenticateApiKey = async (req, res, next) => {
let leaseRenewInterval = null let leaseRenewInterval = null
if (renewIntervalMs > 0) { if (renewIntervalMs > 0) {
// 🔴 关键修复:添加最大刷新次数限制,防止租约永不过期
// 默认最大生存时间为 10 分钟,可通过环境变量配置
const maxLifetimeMinutes = parseInt(process.env.CONCURRENCY_MAX_LIFETIME_MINUTES) || 10
const maxRefreshCount = Math.ceil((maxLifetimeMinutes * 60 * 1000) / renewIntervalMs)
let refreshCount = 0
leaseRenewInterval = setInterval(() => { leaseRenewInterval = setInterval(() => {
refreshCount++
// 超过最大刷新次数,强制停止并清理
if (refreshCount > maxRefreshCount) {
logger.warn(
`⚠️ Lease refresh exceeded max count (${maxRefreshCount}) for key ${validation.keyData.id} (${validation.keyData.name}), forcing cleanup after ${maxLifetimeMinutes} minutes`
)
// 清理定时器
if (leaseRenewInterval) {
clearInterval(leaseRenewInterval)
leaseRenewInterval = null
}
// 强制减少并发计数(如果还没减少)
if (!concurrencyDecremented) {
concurrencyDecremented = true
redis.decrConcurrency(validation.keyData.id, requestId).catch((error) => {
logger.error(
`Failed to decrement concurrency after max refresh for key ${validation.keyData.id}:`,
error
)
})
}
return
}
redis redis
.refreshConcurrencyLease(validation.keyData.id, requestId, leaseSeconds) .refreshConcurrencyLease(validation.keyData.id, requestId, leaseSeconds)
.catch((error) => { .catch((error) => {

View File

@@ -2034,6 +2034,246 @@ class RedisClient {
return await this.getConcurrency(compositeKey) return await this.getConcurrency(compositeKey)
} }
// 🔧 并发管理方法(用于管理员手动清理)
/**
* 获取所有并发状态
* @returns {Promise<Array>} 并发状态列表
*/
async getAllConcurrencyStatus() {
try {
const client = this.getClientSafe()
const keys = await client.keys('concurrency:*')
const now = Date.now()
const results = []
for (const key of keys) {
// 提取 apiKeyId去掉 concurrency: 前缀)
const apiKeyId = key.replace('concurrency:', '')
// 获取所有成员和分数(过期时间)
const members = await client.zrangebyscore(key, now, '+inf', 'WITHSCORES')
// 解析成员和过期时间
const activeRequests = []
for (let i = 0; i < members.length; i += 2) {
const requestId = members[i]
const expireAt = parseInt(members[i + 1])
const remainingSeconds = Math.max(0, Math.round((expireAt - now) / 1000))
activeRequests.push({
requestId,
expireAt: new Date(expireAt).toISOString(),
remainingSeconds
})
}
// 获取过期的成员数量
const expiredCount = await client.zcount(key, '-inf', now)
results.push({
apiKeyId,
key,
activeCount: activeRequests.length,
expiredCount,
activeRequests
})
}
return results
} catch (error) {
logger.error('❌ Failed to get all concurrency status:', error)
throw error
}
}
/**
* 获取特定 API Key 的并发状态详情
* @param {string} apiKeyId - API Key ID
* @returns {Promise<Object>} 并发状态详情
*/
async getConcurrencyStatus(apiKeyId) {
try {
const client = this.getClientSafe()
const key = `concurrency:${apiKeyId}`
const now = Date.now()
// 检查 key 是否存在
const exists = await client.exists(key)
if (!exists) {
return {
apiKeyId,
key,
activeCount: 0,
expiredCount: 0,
activeRequests: [],
exists: false
}
}
// 获取所有成员和分数
const allMembers = await client.zrange(key, 0, -1, 'WITHSCORES')
const activeRequests = []
const expiredRequests = []
for (let i = 0; i < allMembers.length; i += 2) {
const requestId = allMembers[i]
const expireAt = parseInt(allMembers[i + 1])
const remainingSeconds = Math.round((expireAt - now) / 1000)
const requestInfo = {
requestId,
expireAt: new Date(expireAt).toISOString(),
remainingSeconds
}
if (expireAt > now) {
activeRequests.push(requestInfo)
} else {
expiredRequests.push(requestInfo)
}
}
return {
apiKeyId,
key,
activeCount: activeRequests.length,
expiredCount: expiredRequests.length,
activeRequests,
expiredRequests,
exists: true
}
} catch (error) {
logger.error(`❌ Failed to get concurrency status for ${apiKeyId}:`, error)
throw error
}
}
/**
* 强制清理特定 API Key 的并发计数(忽略租约)
* @param {string} apiKeyId - API Key ID
* @returns {Promise<Object>} 清理结果
*/
async forceClearConcurrency(apiKeyId) {
try {
const client = this.getClientSafe()
const key = `concurrency:${apiKeyId}`
// 获取清理前的状态
const beforeCount = await client.zcard(key)
// 删除整个 key
await client.del(key)
logger.warn(
`🧹 Force cleared concurrency for key ${apiKeyId}, removed ${beforeCount} entries`
)
return {
apiKeyId,
key,
clearedCount: beforeCount,
success: true
}
} catch (error) {
logger.error(`❌ Failed to force clear concurrency for ${apiKeyId}:`, error)
throw error
}
}
/**
* 强制清理所有并发计数
* @returns {Promise<Object>} 清理结果
*/
async forceClearAllConcurrency() {
try {
const client = this.getClientSafe()
const keys = await client.keys('concurrency:*')
let totalCleared = 0
const clearedKeys = []
for (const key of keys) {
const count = await client.zcard(key)
await client.del(key)
totalCleared += count
clearedKeys.push({
key,
clearedCount: count
})
}
logger.warn(
`🧹 Force cleared all concurrency: ${keys.length} keys, ${totalCleared} total entries`
)
return {
keysCleared: keys.length,
totalEntriesCleared: totalCleared,
clearedKeys,
success: true
}
} catch (error) {
logger.error('❌ Failed to force clear all concurrency:', error)
throw error
}
}
/**
* 清理过期的并发条目(不影响活跃请求)
* @param {string} apiKeyId - API Key ID可选不传则清理所有
* @returns {Promise<Object>} 清理结果
*/
async cleanupExpiredConcurrency(apiKeyId = null) {
try {
const client = this.getClientSafe()
const now = Date.now()
let keys
if (apiKeyId) {
keys = [`concurrency:${apiKeyId}`]
} else {
keys = await client.keys('concurrency:*')
}
let totalCleaned = 0
const cleanedKeys = []
for (const key of keys) {
// 只清理过期的条目
const cleaned = await client.zremrangebyscore(key, '-inf', now)
if (cleaned > 0) {
totalCleaned += cleaned
cleanedKeys.push({
key,
cleanedCount: cleaned
})
}
// 如果 key 为空,删除它
const remaining = await client.zcard(key)
if (remaining === 0) {
await client.del(key)
}
}
logger.info(
`🧹 Cleaned up expired concurrency: ${totalCleaned} entries from ${cleanedKeys.length} keys`
)
return {
keysProcessed: keys.length,
keysCleaned: cleanedKeys.length,
totalEntriesCleaned: totalCleaned,
cleanedKeys,
success: true
}
} catch (error) {
logger.error('❌ Failed to cleanup expired concurrency:', error)
throw error
}
}
// 🔧 Basic Redis operations wrapper methods for convenience // 🔧 Basic Redis operations wrapper methods for convenience
async get(key) { async get(key) {
const client = this.getClientSafe() const client = this.getClientSafe()

View File

@@ -0,0 +1,145 @@
/**
* 并发管理 API 路由
* 提供并发状态查看和手动清理功能
*/
const express = require('express')
const router = express.Router()
const redis = require('../../models/redis')
const logger = require('../../utils/logger')
/**
* GET /admin/concurrency
* 获取所有并发状态
*/
router.get('/concurrency', async (req, res) => {
try {
const status = await redis.getAllConcurrencyStatus()
// 计算汇总统计
const summary = {
totalKeys: status.length,
totalActiveRequests: status.reduce((sum, s) => sum + s.activeCount, 0),
totalExpiredRequests: status.reduce((sum, s) => sum + s.expiredCount, 0)
}
res.json({
success: true,
summary,
concurrencyStatus: status
})
} catch (error) {
logger.error('❌ Failed to get concurrency status:', error)
res.status(500).json({
success: false,
error: 'Failed to get concurrency status',
message: error.message
})
}
})
/**
* GET /admin/concurrency/:apiKeyId
* 获取特定 API Key 的并发状态详情
*/
router.get('/concurrency/:apiKeyId', async (req, res) => {
try {
const { apiKeyId } = req.params
const status = await redis.getConcurrencyStatus(apiKeyId)
res.json({
success: true,
concurrencyStatus: status
})
} catch (error) {
logger.error(`❌ Failed to get concurrency status for ${req.params.apiKeyId}:`, error)
res.status(500).json({
success: false,
error: 'Failed to get concurrency status',
message: error.message
})
}
})
/**
* DELETE /admin/concurrency/:apiKeyId
* 强制清理特定 API Key 的并发计数
*/
router.delete('/concurrency/:apiKeyId', async (req, res) => {
try {
const { apiKeyId } = req.params
const result = await redis.forceClearConcurrency(apiKeyId)
logger.warn(
`🧹 Admin ${req.admin?.username || 'unknown'} force cleared concurrency for key ${apiKeyId}`
)
res.json({
success: true,
message: `Successfully cleared concurrency for API key ${apiKeyId}`,
result
})
} catch (error) {
logger.error(`❌ Failed to clear concurrency for ${req.params.apiKeyId}:`, error)
res.status(500).json({
success: false,
error: 'Failed to clear concurrency',
message: error.message
})
}
})
/**
* DELETE /admin/concurrency
* 强制清理所有并发计数
*/
router.delete('/concurrency', async (req, res) => {
try {
const result = await redis.forceClearAllConcurrency()
logger.warn(`🧹 Admin ${req.admin?.username || 'unknown'} force cleared ALL concurrency`)
res.json({
success: true,
message: 'Successfully cleared all concurrency',
result
})
} catch (error) {
logger.error('❌ Failed to clear all concurrency:', error)
res.status(500).json({
success: false,
error: 'Failed to clear all concurrency',
message: error.message
})
}
})
/**
* POST /admin/concurrency/cleanup
* 清理过期的并发条目(不影响活跃请求)
*/
router.post('/concurrency/cleanup', async (req, res) => {
try {
const { apiKeyId } = req.body
const result = await redis.cleanupExpiredConcurrency(apiKeyId || null)
logger.info(`🧹 Admin ${req.admin?.username || 'unknown'} cleaned up expired concurrency`)
res.json({
success: true,
message: apiKeyId
? `Successfully cleaned up expired concurrency for API key ${apiKeyId}`
: 'Successfully cleaned up all expired concurrency',
result
})
} catch (error) {
logger.error('❌ Failed to cleanup expired concurrency:', error)
res.status(500).json({
success: false,
error: 'Failed to cleanup expired concurrency',
message: error.message
})
}
})
module.exports = router

View File

@@ -22,6 +22,7 @@ const droidAccountsRoutes = require('./droidAccounts')
const dashboardRoutes = require('./dashboard') const dashboardRoutes = require('./dashboard')
const usageStatsRoutes = require('./usageStats') const usageStatsRoutes = require('./usageStats')
const systemRoutes = require('./system') const systemRoutes = require('./system')
const concurrencyRoutes = require('./concurrency')
// 挂载所有子路由 // 挂载所有子路由
// 使用完整路径的模块(直接挂载到根路径) // 使用完整路径的模块(直接挂载到根路径)
@@ -35,6 +36,7 @@ router.use('/', droidAccountsRoutes)
router.use('/', dashboardRoutes) router.use('/', dashboardRoutes)
router.use('/', usageStatsRoutes) router.use('/', usageStatsRoutes)
router.use('/', systemRoutes) router.use('/', systemRoutes)
router.use('/', concurrencyRoutes)
// 使用相对路径的模块(需要指定基础路径前缀) // 使用相对路径的模块(需要指定基础路径前缀)
router.use('/account-groups', accountGroupsRoutes) router.use('/account-groups', accountGroupsRoutes)