diff --git a/src/middleware/auth.js b/src/middleware/auth.js index b89586a4..b18b5322 100644 --- a/src/middleware/auth.js +++ b/src/middleware/auth.js @@ -7,6 +7,57 @@ const redis = require('../models/redis') // const { RateLimiterRedis } = require('rate-limiter-flexible') // 暂时未使用 const ClientValidator = require('../validators/clientValidator') +const FALLBACK_CONCURRENCY_CONFIG = { + leaseSeconds: 300, + renewIntervalSeconds: 30, + cleanupGraceSeconds: 30 +} + +const resolveConcurrencyConfig = () => { + if (typeof redis._getConcurrencyConfig === 'function') { + return redis._getConcurrencyConfig() + } + + const raw = { + ...FALLBACK_CONCURRENCY_CONFIG, + ...(config.concurrency || {}) + } + + const toNumber = (value, fallback) => { + const parsed = Number(value) + if (!Number.isFinite(parsed)) { + return fallback + } + return parsed + } + + const leaseSeconds = Math.max( + toNumber(raw.leaseSeconds, FALLBACK_CONCURRENCY_CONFIG.leaseSeconds), + 30 + ) + + let renewIntervalSeconds + if (raw.renewIntervalSeconds === 0 || raw.renewIntervalSeconds === '0') { + renewIntervalSeconds = 0 + } else { + renewIntervalSeconds = Math.max( + toNumber(raw.renewIntervalSeconds, FALLBACK_CONCURRENCY_CONFIG.renewIntervalSeconds), + 0 + ) + } + + const cleanupGraceSeconds = Math.max( + toNumber(raw.cleanupGraceSeconds, FALLBACK_CONCURRENCY_CONFIG.cleanupGraceSeconds), + 0 + ) + + return { + leaseSeconds, + renewIntervalSeconds, + cleanupGraceSeconds + } +} + const TOKEN_COUNT_PATHS = new Set([ '/v1/messages/count_tokens', '/api/v1/messages/count_tokens', @@ -116,13 +167,10 @@ const authenticateApiKey = async (req, res, next) => { // 检查并发限制 const concurrencyLimit = validation.keyData.concurrencyLimit || 0 if (!skipKeyRestrictions && concurrencyLimit > 0) { - const concurrencyConfig = config.concurrency || {} - const leaseSeconds = Math.max(concurrencyConfig.leaseSeconds || 900, 30) - const rawRenewInterval = - typeof concurrencyConfig.renewIntervalSeconds === 'number' - ? concurrencyConfig.renewIntervalSeconds - : 60 - let renewIntervalSeconds = rawRenewInterval + const { leaseSeconds: configLeaseSeconds, renewIntervalSeconds: configRenewIntervalSeconds } = + resolveConcurrencyConfig() + const leaseSeconds = Math.max(Number(configLeaseSeconds) || 300, 30) + let renewIntervalSeconds = configRenewIntervalSeconds if (renewIntervalSeconds > 0) { const maxSafeRenew = Math.max(leaseSeconds - 5, 15) renewIntervalSeconds = Math.min(Math.max(renewIntervalSeconds, 15), maxSafeRenew) @@ -215,6 +263,29 @@ const authenticateApiKey = async (req, res, next) => { decrementConcurrency() }) + req.once('aborted', () => { + logger.warn( + `⚠️ Request aborted for key: ${validation.keyData.id} (${validation.keyData.name})` + ) + decrementConcurrency() + }) + + req.once('error', (error) => { + logger.error( + `❌ Request error for key ${validation.keyData.id} (${validation.keyData.name}):`, + error + ) + decrementConcurrency() + }) + + res.once('error', (error) => { + logger.error( + `❌ Response error for key ${validation.keyData.id} (${validation.keyData.name}):`, + error + ) + decrementConcurrency() + }) + // res.on('finish') 处理正常完成的情况 res.once('finish', () => { logger.api( diff --git a/src/models/redis.js b/src/models/redis.js index e4c27243..ea917c28 100644 --- a/src/models/redis.js +++ b/src/models/redis.js @@ -1575,13 +1575,53 @@ class RedisClient { // 获取并发配置 _getConcurrencyConfig() { const defaults = { - leaseSeconds: 900, + leaseSeconds: 300, + renewIntervalSeconds: 30, cleanupGraceSeconds: 30 } - return { + + const configValues = { ...defaults, ...(config.concurrency || {}) } + + const normalizeNumber = (value, fallback, options = {}) => { + const parsed = Number(value) + if (!Number.isFinite(parsed)) { + return fallback + } + + if (options.allowZero && parsed === 0) { + return 0 + } + + if (options.min !== undefined && parsed < options.min) { + return options.min + } + + return parsed + } + + return { + leaseSeconds: normalizeNumber(configValues.leaseSeconds, defaults.leaseSeconds, { + min: 30 + }), + renewIntervalSeconds: normalizeNumber( + configValues.renewIntervalSeconds, + defaults.renewIntervalSeconds, + { + allowZero: true, + min: 0 + } + ), + cleanupGraceSeconds: normalizeNumber( + configValues.cleanupGraceSeconds, + defaults.cleanupGraceSeconds, + { + min: 0 + } + ) + } } // 增加并发计数(基于租约的有序集合) @@ -1650,10 +1690,10 @@ class RedisClient { local now = tonumber(ARGV[3]) local ttl = tonumber(ARGV[4]) - local exists = redis.call('ZSCORE', key, member) - redis.call('ZREMRANGEBYSCORE', key, '-inf', now) + local exists = redis.call('ZSCORE', key, member) + if exists then redis.call('ZADD', key, expireAt, member) if ttl > 0 then