mirror of
https://github.com/Wei-Shaw/claude-relay-service.git
synced 2026-01-22 16:43:35 +00:00
fix: 修复并发清理任务 WRONGTYPE 错误
问题: - 并发清理定时任务在遇到非 zset 类型的遗留键时报 WRONGTYPE 错误 - 错误键如 concurrency:wait:*, concurrency:user:*, concurrency:account:* 等 修复: - app.js: 使用原子 Lua 脚本先检查键类型再执行清理,消除竞态条件 - redis.js: 为 6 个并发管理函数添加类型检查 - getAllConcurrencyStatus(): 跳过 queue 键 + 类型检查 - getConcurrencyStatus(): 类型检查,非 zset 返回 invalidType - forceClearConcurrency(): 类型检查,任意类型都删除 - forceClearAllConcurrency(): 跳过 queue 键 + 类型检查 - cleanupExpiredConcurrency(): 跳过 queue 键 + 类型检查 - 遗留键会被自动识别并删除,同时记录日志
This commit is contained in:
22
src/app.js
22
src/app.js
@@ -581,10 +581,11 @@ class Application {
|
||||
|
||||
const now = Date.now()
|
||||
let totalCleaned = 0
|
||||
let legacyCleaned = 0
|
||||
|
||||
// 使用 Lua 脚本批量清理所有过期项
|
||||
for (const key of keys) {
|
||||
// 跳过非 Sorted Set 类型的键(这些键有各自的清理逻辑)
|
||||
// 跳过已知非 Sorted Set 类型的键(这些键有各自的清理逻辑)
|
||||
// - concurrency:queue:stats:* 是 Hash 类型
|
||||
// - concurrency:queue:wait_times:* 是 List 类型
|
||||
// - concurrency:queue:* (不含stats/wait_times) 是 String 类型
|
||||
@@ -599,11 +600,21 @@ class Application {
|
||||
}
|
||||
|
||||
try {
|
||||
const cleaned = await redis.client.eval(
|
||||
// 使用原子 Lua 脚本:先检查类型,再执行清理
|
||||
// 返回值:0 = 正常清理无删除,1 = 清理后删除空键,-1 = 遗留键已删除
|
||||
const result = await redis.client.eval(
|
||||
`
|
||||
local key = KEYS[1]
|
||||
local now = tonumber(ARGV[1])
|
||||
|
||||
-- 先检查键类型,只对 Sorted Set 执行清理
|
||||
local keyType = redis.call('TYPE', key)
|
||||
if keyType.ok ~= 'zset' then
|
||||
-- 非 ZSET 类型的遗留键,直接删除
|
||||
redis.call('DEL', key)
|
||||
return -1
|
||||
end
|
||||
|
||||
-- 清理过期项
|
||||
redis.call('ZREMRANGEBYSCORE', key, '-inf', now)
|
||||
|
||||
@@ -622,8 +633,10 @@ class Application {
|
||||
key,
|
||||
now
|
||||
)
|
||||
if (cleaned === 1) {
|
||||
if (result === 1) {
|
||||
totalCleaned++
|
||||
} else if (result === -1) {
|
||||
legacyCleaned++
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`❌ Failed to clean concurrency key ${key}:`, error)
|
||||
@@ -633,6 +646,9 @@ class Application {
|
||||
if (totalCleaned > 0) {
|
||||
logger.info(`🔢 Concurrency cleanup: cleaned ${totalCleaned} expired keys`)
|
||||
}
|
||||
if (legacyCleaned > 0) {
|
||||
logger.warn(`🧹 Concurrency cleanup: removed ${legacyCleaned} legacy keys (wrong type)`)
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('❌ Concurrency cleanup task failed:', error)
|
||||
}
|
||||
|
||||
@@ -2140,6 +2140,27 @@ class RedisClient {
|
||||
const results = []
|
||||
|
||||
for (const key of keys) {
|
||||
// 跳过已知非 Sorted Set 类型的键
|
||||
// - concurrency:queue:stats:* 是 Hash 类型
|
||||
// - concurrency:queue:wait_times:* 是 List 类型
|
||||
// - concurrency:queue:* (不含stats/wait_times) 是 String 类型
|
||||
if (
|
||||
key.startsWith('concurrency:queue:stats:') ||
|
||||
key.startsWith('concurrency:queue:wait_times:') ||
|
||||
(key.startsWith('concurrency:queue:') &&
|
||||
!key.includes(':stats:') &&
|
||||
!key.includes(':wait_times:'))
|
||||
) {
|
||||
continue
|
||||
}
|
||||
|
||||
// 检查键类型,只处理 Sorted Set
|
||||
const keyType = await client.type(key)
|
||||
if (keyType !== 'zset') {
|
||||
logger.debug(`🔢 getAllConcurrencyStatus skipped non-zset key: ${key} (type: ${keyType})`)
|
||||
continue
|
||||
}
|
||||
|
||||
// 提取 apiKeyId(去掉 concurrency: 前缀)
|
||||
const apiKeyId = key.replace('concurrency:', '')
|
||||
|
||||
@@ -2202,6 +2223,23 @@ class RedisClient {
|
||||
}
|
||||
}
|
||||
|
||||
// 检查键类型,只处理 Sorted Set
|
||||
const keyType = await client.type(key)
|
||||
if (keyType !== 'zset') {
|
||||
logger.warn(
|
||||
`⚠️ getConcurrencyStatus: key ${key} has unexpected type: ${keyType}, expected zset`
|
||||
)
|
||||
return {
|
||||
apiKeyId,
|
||||
key,
|
||||
activeCount: 0,
|
||||
expiredCount: 0,
|
||||
activeRequests: [],
|
||||
exists: true,
|
||||
invalidType: keyType
|
||||
}
|
||||
}
|
||||
|
||||
// 获取所有成员和分数
|
||||
const allMembers = await client.zrange(key, 0, -1, 'WITHSCORES')
|
||||
|
||||
@@ -2251,20 +2289,36 @@ class RedisClient {
|
||||
const client = this.getClientSafe()
|
||||
const key = `concurrency:${apiKeyId}`
|
||||
|
||||
// 获取清理前的状态
|
||||
const beforeCount = await client.zcard(key)
|
||||
// 检查键类型
|
||||
const keyType = await client.type(key)
|
||||
|
||||
// 删除整个 key
|
||||
let beforeCount = 0
|
||||
let isLegacy = false
|
||||
|
||||
if (keyType === 'zset') {
|
||||
// 正常的 zset 键,获取条目数
|
||||
beforeCount = await client.zcard(key)
|
||||
} else if (keyType !== 'none') {
|
||||
// 非 zset 且非空的遗留键
|
||||
isLegacy = true
|
||||
logger.warn(
|
||||
`⚠️ forceClearConcurrency: key ${key} has unexpected type: ${keyType}, will be deleted`
|
||||
)
|
||||
}
|
||||
|
||||
// 删除键(无论什么类型)
|
||||
await client.del(key)
|
||||
|
||||
logger.warn(
|
||||
`🧹 Force cleared concurrency for key ${apiKeyId}, removed ${beforeCount} entries`
|
||||
`🧹 Force cleared concurrency for key ${apiKeyId}, removed ${beforeCount} entries${isLegacy ? ' (legacy key)' : ''}`
|
||||
)
|
||||
|
||||
return {
|
||||
apiKeyId,
|
||||
key,
|
||||
clearedCount: beforeCount,
|
||||
type: keyType,
|
||||
legacy: isLegacy,
|
||||
success: true
|
||||
}
|
||||
} catch (error) {
|
||||
@@ -2283,25 +2337,47 @@ class RedisClient {
|
||||
const keys = await client.keys('concurrency:*')
|
||||
|
||||
let totalCleared = 0
|
||||
let legacyCleared = 0
|
||||
const clearedKeys = []
|
||||
|
||||
for (const key of keys) {
|
||||
// 跳过 queue 相关的键(它们有各自的清理逻辑)
|
||||
if (key.startsWith('concurrency:queue:')) {
|
||||
continue
|
||||
}
|
||||
|
||||
// 检查键类型
|
||||
const keyType = await client.type(key)
|
||||
if (keyType === 'zset') {
|
||||
const count = await client.zcard(key)
|
||||
await client.del(key)
|
||||
totalCleared += count
|
||||
clearedKeys.push({
|
||||
key,
|
||||
clearedCount: count
|
||||
clearedCount: count,
|
||||
type: 'zset'
|
||||
})
|
||||
} else {
|
||||
// 非 zset 类型的遗留键,直接删除
|
||||
await client.del(key)
|
||||
legacyCleared++
|
||||
clearedKeys.push({
|
||||
key,
|
||||
clearedCount: 0,
|
||||
type: keyType,
|
||||
legacy: true
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
logger.warn(
|
||||
`🧹 Force cleared all concurrency: ${keys.length} keys, ${totalCleared} total entries`
|
||||
`🧹 Force cleared all concurrency: ${clearedKeys.length} keys, ${totalCleared} entries, ${legacyCleared} legacy keys`
|
||||
)
|
||||
|
||||
return {
|
||||
keysCleared: keys.length,
|
||||
keysCleared: clearedKeys.length,
|
||||
totalEntriesCleared: totalCleared,
|
||||
legacyKeysCleared: legacyCleared,
|
||||
clearedKeys,
|
||||
success: true
|
||||
}
|
||||
@@ -2329,9 +2405,30 @@ class RedisClient {
|
||||
}
|
||||
|
||||
let totalCleaned = 0
|
||||
let legacyCleaned = 0
|
||||
const cleanedKeys = []
|
||||
|
||||
for (const key of keys) {
|
||||
// 跳过 queue 相关的键(它们有各自的清理逻辑)
|
||||
if (key.startsWith('concurrency:queue:')) {
|
||||
continue
|
||||
}
|
||||
|
||||
// 检查键类型
|
||||
const keyType = await client.type(key)
|
||||
if (keyType !== 'zset') {
|
||||
// 非 zset 类型的遗留键,直接删除
|
||||
await client.del(key)
|
||||
legacyCleaned++
|
||||
cleanedKeys.push({
|
||||
key,
|
||||
cleanedCount: 0,
|
||||
type: keyType,
|
||||
legacy: true
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
// 只清理过期的条目
|
||||
const cleaned = await client.zremrangebyscore(key, '-inf', now)
|
||||
if (cleaned > 0) {
|
||||
@@ -2350,13 +2447,14 @@ class RedisClient {
|
||||
}
|
||||
|
||||
logger.info(
|
||||
`🧹 Cleaned up expired concurrency: ${totalCleaned} entries from ${cleanedKeys.length} keys`
|
||||
`🧹 Cleaned up expired concurrency: ${totalCleaned} entries from ${cleanedKeys.length} keys, ${legacyCleaned} legacy keys removed`
|
||||
)
|
||||
|
||||
return {
|
||||
keysProcessed: keys.length,
|
||||
keysCleaned: cleanedKeys.length,
|
||||
totalEntriesCleaned: totalCleaned,
|
||||
legacyKeysRemoved: legacyCleaned,
|
||||
cleanedKeys,
|
||||
success: true
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user