mirror of
https://github.com/Wei-Shaw/claude-relay-service.git
synced 2026-01-22 16:43:35 +00:00
Merge pull request #773 from DaydreamCoding/feature/concurrency [skip ci]
feat(concurrencyManagement): implement concurrency status management …
This commit is contained in:
@@ -226,8 +226,18 @@ const authenticateApiKey = async (req, res, next) => {
|
|||||||
)
|
)
|
||||||
|
|
||||||
if (currentConcurrency > concurrencyLimit) {
|
if (currentConcurrency > concurrencyLimit) {
|
||||||
// 如果超过限制,立即减少计数
|
// 如果超过限制,立即减少计数(添加 try-catch 防止异常导致并发泄漏)
|
||||||
await redis.decrConcurrency(validation.keyData.id, requestId)
|
try {
|
||||||
|
const newCount = await redis.decrConcurrency(validation.keyData.id, requestId)
|
||||||
|
logger.api(
|
||||||
|
`📉 Decremented concurrency (429 rejected) for key: ${validation.keyData.id} (${validation.keyData.name}), new count: ${newCount}`
|
||||||
|
)
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(
|
||||||
|
`Failed to decrement concurrency after limit exceeded for key ${validation.keyData.id}:`,
|
||||||
|
error
|
||||||
|
)
|
||||||
|
}
|
||||||
logger.security(
|
logger.security(
|
||||||
`🚦 Concurrency limit exceeded for key: ${validation.keyData.id} (${
|
`🚦 Concurrency limit exceeded for key: ${validation.keyData.id} (${
|
||||||
validation.keyData.name
|
validation.keyData.name
|
||||||
@@ -249,7 +259,38 @@ const authenticateApiKey = async (req, res, next) => {
|
|||||||
let leaseRenewInterval = null
|
let leaseRenewInterval = null
|
||||||
|
|
||||||
if (renewIntervalMs > 0) {
|
if (renewIntervalMs > 0) {
|
||||||
|
// 🔴 关键修复:添加最大刷新次数限制,防止租约永不过期
|
||||||
|
// 默认最大生存时间为 10 分钟,可通过环境变量配置
|
||||||
|
const maxLifetimeMinutes = parseInt(process.env.CONCURRENCY_MAX_LIFETIME_MINUTES) || 10
|
||||||
|
const maxRefreshCount = Math.ceil((maxLifetimeMinutes * 60 * 1000) / renewIntervalMs)
|
||||||
|
let refreshCount = 0
|
||||||
|
|
||||||
leaseRenewInterval = setInterval(() => {
|
leaseRenewInterval = setInterval(() => {
|
||||||
|
refreshCount++
|
||||||
|
|
||||||
|
// 超过最大刷新次数,强制停止并清理
|
||||||
|
if (refreshCount > maxRefreshCount) {
|
||||||
|
logger.warn(
|
||||||
|
`⚠️ Lease refresh exceeded max count (${maxRefreshCount}) for key ${validation.keyData.id} (${validation.keyData.name}), forcing cleanup after ${maxLifetimeMinutes} minutes`
|
||||||
|
)
|
||||||
|
// 清理定时器
|
||||||
|
if (leaseRenewInterval) {
|
||||||
|
clearInterval(leaseRenewInterval)
|
||||||
|
leaseRenewInterval = null
|
||||||
|
}
|
||||||
|
// 强制减少并发计数(如果还没减少)
|
||||||
|
if (!concurrencyDecremented) {
|
||||||
|
concurrencyDecremented = true
|
||||||
|
redis.decrConcurrency(validation.keyData.id, requestId).catch((error) => {
|
||||||
|
logger.error(
|
||||||
|
`Failed to decrement concurrency after max refresh for key ${validation.keyData.id}:`,
|
||||||
|
error
|
||||||
|
)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
redis
|
redis
|
||||||
.refreshConcurrencyLease(validation.keyData.id, requestId, leaseSeconds)
|
.refreshConcurrencyLease(validation.keyData.id, requestId, leaseSeconds)
|
||||||
.catch((error) => {
|
.catch((error) => {
|
||||||
|
|||||||
@@ -2096,6 +2096,246 @@ class RedisClient {
|
|||||||
return await this.getConcurrency(compositeKey)
|
return await this.getConcurrency(compositeKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 🔧 并发管理方法(用于管理员手动清理)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取所有并发状态
|
||||||
|
* @returns {Promise<Array>} 并发状态列表
|
||||||
|
*/
|
||||||
|
async getAllConcurrencyStatus() {
|
||||||
|
try {
|
||||||
|
const client = this.getClientSafe()
|
||||||
|
const keys = await client.keys('concurrency:*')
|
||||||
|
const now = Date.now()
|
||||||
|
const results = []
|
||||||
|
|
||||||
|
for (const key of keys) {
|
||||||
|
// 提取 apiKeyId(去掉 concurrency: 前缀)
|
||||||
|
const apiKeyId = key.replace('concurrency:', '')
|
||||||
|
|
||||||
|
// 获取所有成员和分数(过期时间)
|
||||||
|
const members = await client.zrangebyscore(key, now, '+inf', 'WITHSCORES')
|
||||||
|
|
||||||
|
// 解析成员和过期时间
|
||||||
|
const activeRequests = []
|
||||||
|
for (let i = 0; i < members.length; i += 2) {
|
||||||
|
const requestId = members[i]
|
||||||
|
const expireAt = parseInt(members[i + 1])
|
||||||
|
const remainingSeconds = Math.max(0, Math.round((expireAt - now) / 1000))
|
||||||
|
activeRequests.push({
|
||||||
|
requestId,
|
||||||
|
expireAt: new Date(expireAt).toISOString(),
|
||||||
|
remainingSeconds
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// 获取过期的成员数量
|
||||||
|
const expiredCount = await client.zcount(key, '-inf', now)
|
||||||
|
|
||||||
|
results.push({
|
||||||
|
apiKeyId,
|
||||||
|
key,
|
||||||
|
activeCount: activeRequests.length,
|
||||||
|
expiredCount,
|
||||||
|
activeRequests
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return results
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('❌ Failed to get all concurrency status:', error)
|
||||||
|
throw error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取特定 API Key 的并发状态详情
|
||||||
|
* @param {string} apiKeyId - API Key ID
|
||||||
|
* @returns {Promise<Object>} 并发状态详情
|
||||||
|
*/
|
||||||
|
async getConcurrencyStatus(apiKeyId) {
|
||||||
|
try {
|
||||||
|
const client = this.getClientSafe()
|
||||||
|
const key = `concurrency:${apiKeyId}`
|
||||||
|
const now = Date.now()
|
||||||
|
|
||||||
|
// 检查 key 是否存在
|
||||||
|
const exists = await client.exists(key)
|
||||||
|
if (!exists) {
|
||||||
|
return {
|
||||||
|
apiKeyId,
|
||||||
|
key,
|
||||||
|
activeCount: 0,
|
||||||
|
expiredCount: 0,
|
||||||
|
activeRequests: [],
|
||||||
|
exists: false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 获取所有成员和分数
|
||||||
|
const allMembers = await client.zrange(key, 0, -1, 'WITHSCORES')
|
||||||
|
|
||||||
|
const activeRequests = []
|
||||||
|
const expiredRequests = []
|
||||||
|
|
||||||
|
for (let i = 0; i < allMembers.length; i += 2) {
|
||||||
|
const requestId = allMembers[i]
|
||||||
|
const expireAt = parseInt(allMembers[i + 1])
|
||||||
|
const remainingSeconds = Math.round((expireAt - now) / 1000)
|
||||||
|
|
||||||
|
const requestInfo = {
|
||||||
|
requestId,
|
||||||
|
expireAt: new Date(expireAt).toISOString(),
|
||||||
|
remainingSeconds
|
||||||
|
}
|
||||||
|
|
||||||
|
if (expireAt > now) {
|
||||||
|
activeRequests.push(requestInfo)
|
||||||
|
} else {
|
||||||
|
expiredRequests.push(requestInfo)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
apiKeyId,
|
||||||
|
key,
|
||||||
|
activeCount: activeRequests.length,
|
||||||
|
expiredCount: expiredRequests.length,
|
||||||
|
activeRequests,
|
||||||
|
expiredRequests,
|
||||||
|
exists: true
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`❌ Failed to get concurrency status for ${apiKeyId}:`, error)
|
||||||
|
throw error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 强制清理特定 API Key 的并发计数(忽略租约)
|
||||||
|
* @param {string} apiKeyId - API Key ID
|
||||||
|
* @returns {Promise<Object>} 清理结果
|
||||||
|
*/
|
||||||
|
async forceClearConcurrency(apiKeyId) {
|
||||||
|
try {
|
||||||
|
const client = this.getClientSafe()
|
||||||
|
const key = `concurrency:${apiKeyId}`
|
||||||
|
|
||||||
|
// 获取清理前的状态
|
||||||
|
const beforeCount = await client.zcard(key)
|
||||||
|
|
||||||
|
// 删除整个 key
|
||||||
|
await client.del(key)
|
||||||
|
|
||||||
|
logger.warn(
|
||||||
|
`🧹 Force cleared concurrency for key ${apiKeyId}, removed ${beforeCount} entries`
|
||||||
|
)
|
||||||
|
|
||||||
|
return {
|
||||||
|
apiKeyId,
|
||||||
|
key,
|
||||||
|
clearedCount: beforeCount,
|
||||||
|
success: true
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`❌ Failed to force clear concurrency for ${apiKeyId}:`, error)
|
||||||
|
throw error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 强制清理所有并发计数
|
||||||
|
* @returns {Promise<Object>} 清理结果
|
||||||
|
*/
|
||||||
|
async forceClearAllConcurrency() {
|
||||||
|
try {
|
||||||
|
const client = this.getClientSafe()
|
||||||
|
const keys = await client.keys('concurrency:*')
|
||||||
|
|
||||||
|
let totalCleared = 0
|
||||||
|
const clearedKeys = []
|
||||||
|
|
||||||
|
for (const key of keys) {
|
||||||
|
const count = await client.zcard(key)
|
||||||
|
await client.del(key)
|
||||||
|
totalCleared += count
|
||||||
|
clearedKeys.push({
|
||||||
|
key,
|
||||||
|
clearedCount: count
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.warn(
|
||||||
|
`🧹 Force cleared all concurrency: ${keys.length} keys, ${totalCleared} total entries`
|
||||||
|
)
|
||||||
|
|
||||||
|
return {
|
||||||
|
keysCleared: keys.length,
|
||||||
|
totalEntriesCleared: totalCleared,
|
||||||
|
clearedKeys,
|
||||||
|
success: true
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('❌ Failed to force clear all concurrency:', error)
|
||||||
|
throw error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 清理过期的并发条目(不影响活跃请求)
|
||||||
|
* @param {string} apiKeyId - API Key ID(可选,不传则清理所有)
|
||||||
|
* @returns {Promise<Object>} 清理结果
|
||||||
|
*/
|
||||||
|
async cleanupExpiredConcurrency(apiKeyId = null) {
|
||||||
|
try {
|
||||||
|
const client = this.getClientSafe()
|
||||||
|
const now = Date.now()
|
||||||
|
let keys
|
||||||
|
|
||||||
|
if (apiKeyId) {
|
||||||
|
keys = [`concurrency:${apiKeyId}`]
|
||||||
|
} else {
|
||||||
|
keys = await client.keys('concurrency:*')
|
||||||
|
}
|
||||||
|
|
||||||
|
let totalCleaned = 0
|
||||||
|
const cleanedKeys = []
|
||||||
|
|
||||||
|
for (const key of keys) {
|
||||||
|
// 只清理过期的条目
|
||||||
|
const cleaned = await client.zremrangebyscore(key, '-inf', now)
|
||||||
|
if (cleaned > 0) {
|
||||||
|
totalCleaned += cleaned
|
||||||
|
cleanedKeys.push({
|
||||||
|
key,
|
||||||
|
cleanedCount: cleaned
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果 key 为空,删除它
|
||||||
|
const remaining = await client.zcard(key)
|
||||||
|
if (remaining === 0) {
|
||||||
|
await client.del(key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
`🧹 Cleaned up expired concurrency: ${totalCleaned} entries from ${cleanedKeys.length} keys`
|
||||||
|
)
|
||||||
|
|
||||||
|
return {
|
||||||
|
keysProcessed: keys.length,
|
||||||
|
keysCleaned: cleanedKeys.length,
|
||||||
|
totalEntriesCleaned: totalCleaned,
|
||||||
|
cleanedKeys,
|
||||||
|
success: true
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('❌ Failed to cleanup expired concurrency:', error)
|
||||||
|
throw error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// 🔧 Basic Redis operations wrapper methods for convenience
|
// 🔧 Basic Redis operations wrapper methods for convenience
|
||||||
async get(key) {
|
async get(key) {
|
||||||
const client = this.getClientSafe()
|
const client = this.getClientSafe()
|
||||||
|
|||||||
145
src/routes/admin/concurrency.js
Normal file
145
src/routes/admin/concurrency.js
Normal file
@@ -0,0 +1,145 @@
|
|||||||
|
/**
|
||||||
|
* 并发管理 API 路由
|
||||||
|
* 提供并发状态查看和手动清理功能
|
||||||
|
*/
|
||||||
|
|
||||||
|
const express = require('express')
|
||||||
|
const router = express.Router()
|
||||||
|
const redis = require('../../models/redis')
|
||||||
|
const logger = require('../../utils/logger')
|
||||||
|
|
||||||
|
/**
|
||||||
|
* GET /admin/concurrency
|
||||||
|
* 获取所有并发状态
|
||||||
|
*/
|
||||||
|
router.get('/concurrency', async (req, res) => {
|
||||||
|
try {
|
||||||
|
const status = await redis.getAllConcurrencyStatus()
|
||||||
|
|
||||||
|
// 计算汇总统计
|
||||||
|
const summary = {
|
||||||
|
totalKeys: status.length,
|
||||||
|
totalActiveRequests: status.reduce((sum, s) => sum + s.activeCount, 0),
|
||||||
|
totalExpiredRequests: status.reduce((sum, s) => sum + s.expiredCount, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
res.json({
|
||||||
|
success: true,
|
||||||
|
summary,
|
||||||
|
concurrencyStatus: status
|
||||||
|
})
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('❌ Failed to get concurrency status:', error)
|
||||||
|
res.status(500).json({
|
||||||
|
success: false,
|
||||||
|
error: 'Failed to get concurrency status',
|
||||||
|
message: error.message
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
/**
|
||||||
|
* GET /admin/concurrency/:apiKeyId
|
||||||
|
* 获取特定 API Key 的并发状态详情
|
||||||
|
*/
|
||||||
|
router.get('/concurrency/:apiKeyId', async (req, res) => {
|
||||||
|
try {
|
||||||
|
const { apiKeyId } = req.params
|
||||||
|
const status = await redis.getConcurrencyStatus(apiKeyId)
|
||||||
|
|
||||||
|
res.json({
|
||||||
|
success: true,
|
||||||
|
concurrencyStatus: status
|
||||||
|
})
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`❌ Failed to get concurrency status for ${req.params.apiKeyId}:`, error)
|
||||||
|
res.status(500).json({
|
||||||
|
success: false,
|
||||||
|
error: 'Failed to get concurrency status',
|
||||||
|
message: error.message
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
/**
|
||||||
|
* DELETE /admin/concurrency/:apiKeyId
|
||||||
|
* 强制清理特定 API Key 的并发计数
|
||||||
|
*/
|
||||||
|
router.delete('/concurrency/:apiKeyId', async (req, res) => {
|
||||||
|
try {
|
||||||
|
const { apiKeyId } = req.params
|
||||||
|
const result = await redis.forceClearConcurrency(apiKeyId)
|
||||||
|
|
||||||
|
logger.warn(
|
||||||
|
`🧹 Admin ${req.admin?.username || 'unknown'} force cleared concurrency for key ${apiKeyId}`
|
||||||
|
)
|
||||||
|
|
||||||
|
res.json({
|
||||||
|
success: true,
|
||||||
|
message: `Successfully cleared concurrency for API key ${apiKeyId}`,
|
||||||
|
result
|
||||||
|
})
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`❌ Failed to clear concurrency for ${req.params.apiKeyId}:`, error)
|
||||||
|
res.status(500).json({
|
||||||
|
success: false,
|
||||||
|
error: 'Failed to clear concurrency',
|
||||||
|
message: error.message
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
/**
|
||||||
|
* DELETE /admin/concurrency
|
||||||
|
* 强制清理所有并发计数
|
||||||
|
*/
|
||||||
|
router.delete('/concurrency', async (req, res) => {
|
||||||
|
try {
|
||||||
|
const result = await redis.forceClearAllConcurrency()
|
||||||
|
|
||||||
|
logger.warn(`🧹 Admin ${req.admin?.username || 'unknown'} force cleared ALL concurrency`)
|
||||||
|
|
||||||
|
res.json({
|
||||||
|
success: true,
|
||||||
|
message: 'Successfully cleared all concurrency',
|
||||||
|
result
|
||||||
|
})
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('❌ Failed to clear all concurrency:', error)
|
||||||
|
res.status(500).json({
|
||||||
|
success: false,
|
||||||
|
error: 'Failed to clear all concurrency',
|
||||||
|
message: error.message
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
/**
|
||||||
|
* POST /admin/concurrency/cleanup
|
||||||
|
* 清理过期的并发条目(不影响活跃请求)
|
||||||
|
*/
|
||||||
|
router.post('/concurrency/cleanup', async (req, res) => {
|
||||||
|
try {
|
||||||
|
const { apiKeyId } = req.body
|
||||||
|
const result = await redis.cleanupExpiredConcurrency(apiKeyId || null)
|
||||||
|
|
||||||
|
logger.info(`🧹 Admin ${req.admin?.username || 'unknown'} cleaned up expired concurrency`)
|
||||||
|
|
||||||
|
res.json({
|
||||||
|
success: true,
|
||||||
|
message: apiKeyId
|
||||||
|
? `Successfully cleaned up expired concurrency for API key ${apiKeyId}`
|
||||||
|
: 'Successfully cleaned up all expired concurrency',
|
||||||
|
result
|
||||||
|
})
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('❌ Failed to cleanup expired concurrency:', error)
|
||||||
|
res.status(500).json({
|
||||||
|
success: false,
|
||||||
|
error: 'Failed to cleanup expired concurrency',
|
||||||
|
message: error.message
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
module.exports = router
|
||||||
@@ -22,6 +22,7 @@ const droidAccountsRoutes = require('./droidAccounts')
|
|||||||
const dashboardRoutes = require('./dashboard')
|
const dashboardRoutes = require('./dashboard')
|
||||||
const usageStatsRoutes = require('./usageStats')
|
const usageStatsRoutes = require('./usageStats')
|
||||||
const systemRoutes = require('./system')
|
const systemRoutes = require('./system')
|
||||||
|
const concurrencyRoutes = require('./concurrency')
|
||||||
|
|
||||||
// 挂载所有子路由
|
// 挂载所有子路由
|
||||||
// 使用完整路径的模块(直接挂载到根路径)
|
// 使用完整路径的模块(直接挂载到根路径)
|
||||||
@@ -35,6 +36,7 @@ router.use('/', droidAccountsRoutes)
|
|||||||
router.use('/', dashboardRoutes)
|
router.use('/', dashboardRoutes)
|
||||||
router.use('/', usageStatsRoutes)
|
router.use('/', usageStatsRoutes)
|
||||||
router.use('/', systemRoutes)
|
router.use('/', systemRoutes)
|
||||||
|
router.use('/', concurrencyRoutes)
|
||||||
|
|
||||||
// 使用相对路径的模块(需要指定基础路径前缀)
|
// 使用相对路径的模块(需要指定基础路径前缀)
|
||||||
router.use('/account-groups', accountGroupsRoutes)
|
router.use('/account-groups', accountGroupsRoutes)
|
||||||
|
|||||||
Reference in New Issue
Block a user