mirror of
https://github.com/Wei-Shaw/claude-relay-service.git
synced 2026-01-22 16:43:35 +00:00
fix: 修复apikey的并发控制问题
This commit is contained in:
@@ -7,6 +7,57 @@ const redis = require('../models/redis')
|
||||
// const { RateLimiterRedis } = require('rate-limiter-flexible') // 暂时未使用
|
||||
const ClientValidator = require('../validators/clientValidator')
|
||||
|
||||
const FALLBACK_CONCURRENCY_CONFIG = {
|
||||
leaseSeconds: 300,
|
||||
renewIntervalSeconds: 30,
|
||||
cleanupGraceSeconds: 30
|
||||
}
|
||||
|
||||
const resolveConcurrencyConfig = () => {
|
||||
if (typeof redis._getConcurrencyConfig === 'function') {
|
||||
return redis._getConcurrencyConfig()
|
||||
}
|
||||
|
||||
const raw = {
|
||||
...FALLBACK_CONCURRENCY_CONFIG,
|
||||
...(config.concurrency || {})
|
||||
}
|
||||
|
||||
const toNumber = (value, fallback) => {
|
||||
const parsed = Number(value)
|
||||
if (!Number.isFinite(parsed)) {
|
||||
return fallback
|
||||
}
|
||||
return parsed
|
||||
}
|
||||
|
||||
const leaseSeconds = Math.max(
|
||||
toNumber(raw.leaseSeconds, FALLBACK_CONCURRENCY_CONFIG.leaseSeconds),
|
||||
30
|
||||
)
|
||||
|
||||
let renewIntervalSeconds
|
||||
if (raw.renewIntervalSeconds === 0 || raw.renewIntervalSeconds === '0') {
|
||||
renewIntervalSeconds = 0
|
||||
} else {
|
||||
renewIntervalSeconds = Math.max(
|
||||
toNumber(raw.renewIntervalSeconds, FALLBACK_CONCURRENCY_CONFIG.renewIntervalSeconds),
|
||||
0
|
||||
)
|
||||
}
|
||||
|
||||
const cleanupGraceSeconds = Math.max(
|
||||
toNumber(raw.cleanupGraceSeconds, FALLBACK_CONCURRENCY_CONFIG.cleanupGraceSeconds),
|
||||
0
|
||||
)
|
||||
|
||||
return {
|
||||
leaseSeconds,
|
||||
renewIntervalSeconds,
|
||||
cleanupGraceSeconds
|
||||
}
|
||||
}
|
||||
|
||||
const TOKEN_COUNT_PATHS = new Set([
|
||||
'/v1/messages/count_tokens',
|
||||
'/api/v1/messages/count_tokens',
|
||||
@@ -116,13 +167,10 @@ const authenticateApiKey = async (req, res, next) => {
|
||||
// 检查并发限制
|
||||
const concurrencyLimit = validation.keyData.concurrencyLimit || 0
|
||||
if (!skipKeyRestrictions && concurrencyLimit > 0) {
|
||||
const concurrencyConfig = config.concurrency || {}
|
||||
const leaseSeconds = Math.max(concurrencyConfig.leaseSeconds || 900, 30)
|
||||
const rawRenewInterval =
|
||||
typeof concurrencyConfig.renewIntervalSeconds === 'number'
|
||||
? concurrencyConfig.renewIntervalSeconds
|
||||
: 60
|
||||
let renewIntervalSeconds = rawRenewInterval
|
||||
const { leaseSeconds: configLeaseSeconds, renewIntervalSeconds: configRenewIntervalSeconds } =
|
||||
resolveConcurrencyConfig()
|
||||
const leaseSeconds = Math.max(Number(configLeaseSeconds) || 300, 30)
|
||||
let renewIntervalSeconds = configRenewIntervalSeconds
|
||||
if (renewIntervalSeconds > 0) {
|
||||
const maxSafeRenew = Math.max(leaseSeconds - 5, 15)
|
||||
renewIntervalSeconds = Math.min(Math.max(renewIntervalSeconds, 15), maxSafeRenew)
|
||||
@@ -215,6 +263,29 @@ const authenticateApiKey = async (req, res, next) => {
|
||||
decrementConcurrency()
|
||||
})
|
||||
|
||||
req.once('aborted', () => {
|
||||
logger.warn(
|
||||
`⚠️ Request aborted for key: ${validation.keyData.id} (${validation.keyData.name})`
|
||||
)
|
||||
decrementConcurrency()
|
||||
})
|
||||
|
||||
req.once('error', (error) => {
|
||||
logger.error(
|
||||
`❌ Request error for key ${validation.keyData.id} (${validation.keyData.name}):`,
|
||||
error
|
||||
)
|
||||
decrementConcurrency()
|
||||
})
|
||||
|
||||
res.once('error', (error) => {
|
||||
logger.error(
|
||||
`❌ Response error for key ${validation.keyData.id} (${validation.keyData.name}):`,
|
||||
error
|
||||
)
|
||||
decrementConcurrency()
|
||||
})
|
||||
|
||||
// res.on('finish') 处理正常完成的情况
|
||||
res.once('finish', () => {
|
||||
logger.api(
|
||||
|
||||
@@ -1575,13 +1575,53 @@ class RedisClient {
|
||||
// 获取并发配置
|
||||
_getConcurrencyConfig() {
|
||||
const defaults = {
|
||||
leaseSeconds: 900,
|
||||
leaseSeconds: 300,
|
||||
renewIntervalSeconds: 30,
|
||||
cleanupGraceSeconds: 30
|
||||
}
|
||||
return {
|
||||
|
||||
const configValues = {
|
||||
...defaults,
|
||||
...(config.concurrency || {})
|
||||
}
|
||||
|
||||
const normalizeNumber = (value, fallback, options = {}) => {
|
||||
const parsed = Number(value)
|
||||
if (!Number.isFinite(parsed)) {
|
||||
return fallback
|
||||
}
|
||||
|
||||
if (options.allowZero && parsed === 0) {
|
||||
return 0
|
||||
}
|
||||
|
||||
if (options.min !== undefined && parsed < options.min) {
|
||||
return options.min
|
||||
}
|
||||
|
||||
return parsed
|
||||
}
|
||||
|
||||
return {
|
||||
leaseSeconds: normalizeNumber(configValues.leaseSeconds, defaults.leaseSeconds, {
|
||||
min: 30
|
||||
}),
|
||||
renewIntervalSeconds: normalizeNumber(
|
||||
configValues.renewIntervalSeconds,
|
||||
defaults.renewIntervalSeconds,
|
||||
{
|
||||
allowZero: true,
|
||||
min: 0
|
||||
}
|
||||
),
|
||||
cleanupGraceSeconds: normalizeNumber(
|
||||
configValues.cleanupGraceSeconds,
|
||||
defaults.cleanupGraceSeconds,
|
||||
{
|
||||
min: 0
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// 增加并发计数(基于租约的有序集合)
|
||||
@@ -1650,10 +1690,10 @@ class RedisClient {
|
||||
local now = tonumber(ARGV[3])
|
||||
local ttl = tonumber(ARGV[4])
|
||||
|
||||
local exists = redis.call('ZSCORE', key, member)
|
||||
|
||||
redis.call('ZREMRANGEBYSCORE', key, '-inf', now)
|
||||
|
||||
local exists = redis.call('ZSCORE', key, member)
|
||||
|
||||
if exists then
|
||||
redis.call('ZADD', key, expireAt, member)
|
||||
if ttl > 0 then
|
||||
|
||||
Reference in New Issue
Block a user