diff --git a/src/app.js b/src/app.js index 41edc483..77e60216 100644 --- a/src/app.js +++ b/src/app.js @@ -581,10 +581,11 @@ class Application { const now = Date.now() let totalCleaned = 0 + let legacyCleaned = 0 // 使用 Lua 脚本批量清理所有过期项 for (const key of keys) { - // 跳过非 Sorted Set 类型的键(这些键有各自的清理逻辑) + // 跳过已知非 Sorted Set 类型的键(这些键有各自的清理逻辑) // - concurrency:queue:stats:* 是 Hash 类型 // - concurrency:queue:wait_times:* 是 List 类型 // - concurrency:queue:* (不含stats/wait_times) 是 String 类型 @@ -599,11 +600,21 @@ class Application { } try { - const cleaned = await redis.client.eval( + // 使用原子 Lua 脚本:先检查类型,再执行清理 + // 返回值:0 = 正常清理无删除,1 = 清理后删除空键,-1 = 遗留键已删除 + const result = await redis.client.eval( ` local key = KEYS[1] local now = tonumber(ARGV[1]) + -- 先检查键类型,只对 Sorted Set 执行清理 + local keyType = redis.call('TYPE', key) + if keyType.ok ~= 'zset' then + -- 非 ZSET 类型的遗留键,直接删除 + redis.call('DEL', key) + return -1 + end + -- 清理过期项 redis.call('ZREMRANGEBYSCORE', key, '-inf', now) @@ -622,8 +633,10 @@ class Application { key, now ) - if (cleaned === 1) { + if (result === 1) { totalCleaned++ + } else if (result === -1) { + legacyCleaned++ } } catch (error) { logger.error(`❌ Failed to clean concurrency key ${key}:`, error) @@ -633,6 +646,9 @@ class Application { if (totalCleaned > 0) { logger.info(`🔢 Concurrency cleanup: cleaned ${totalCleaned} expired keys`) } + if (legacyCleaned > 0) { + logger.warn(`🧹 Concurrency cleanup: removed ${legacyCleaned} legacy keys (wrong type)`) + } } catch (error) { logger.error('❌ Concurrency cleanup task failed:', error) } diff --git a/src/models/redis.js b/src/models/redis.js index 6cffa6a9..8b41cabc 100644 --- a/src/models/redis.js +++ b/src/models/redis.js @@ -2140,6 +2140,27 @@ class RedisClient { const results = [] for (const key of keys) { + // 跳过已知非 Sorted Set 类型的键 + // - concurrency:queue:stats:* 是 Hash 类型 + // - concurrency:queue:wait_times:* 是 List 类型 + // - concurrency:queue:* (不含stats/wait_times) 是 String 类型 + if ( + key.startsWith('concurrency:queue:stats:') || + key.startsWith('concurrency:queue:wait_times:') || + (key.startsWith('concurrency:queue:') && + !key.includes(':stats:') && + !key.includes(':wait_times:')) + ) { + continue + } + + // 检查键类型,只处理 Sorted Set + const keyType = await client.type(key) + if (keyType !== 'zset') { + logger.debug(`🔢 getAllConcurrencyStatus skipped non-zset key: ${key} (type: ${keyType})`) + continue + } + // 提取 apiKeyId(去掉 concurrency: 前缀) const apiKeyId = key.replace('concurrency:', '') @@ -2202,6 +2223,23 @@ class RedisClient { } } + // 检查键类型,只处理 Sorted Set + const keyType = await client.type(key) + if (keyType !== 'zset') { + logger.warn( + `⚠️ getConcurrencyStatus: key ${key} has unexpected type: ${keyType}, expected zset` + ) + return { + apiKeyId, + key, + activeCount: 0, + expiredCount: 0, + activeRequests: [], + exists: true, + invalidType: keyType + } + } + // 获取所有成员和分数 const allMembers = await client.zrange(key, 0, -1, 'WITHSCORES') @@ -2251,20 +2289,36 @@ class RedisClient { const client = this.getClientSafe() const key = `concurrency:${apiKeyId}` - // 获取清理前的状态 - const beforeCount = await client.zcard(key) + // 检查键类型 + const keyType = await client.type(key) - // 删除整个 key + let beforeCount = 0 + let isLegacy = false + + if (keyType === 'zset') { + // 正常的 zset 键,获取条目数 + beforeCount = await client.zcard(key) + } else if (keyType !== 'none') { + // 非 zset 且非空的遗留键 + isLegacy = true + logger.warn( + `⚠️ forceClearConcurrency: key ${key} has unexpected type: ${keyType}, will be deleted` + ) + } + + // 删除键(无论什么类型) await client.del(key) logger.warn( - `🧹 Force cleared concurrency for key ${apiKeyId}, removed ${beforeCount} entries` + `🧹 Force cleared concurrency for key ${apiKeyId}, removed ${beforeCount} entries${isLegacy ? ' (legacy key)' : ''}` ) return { apiKeyId, key, clearedCount: beforeCount, + type: keyType, + legacy: isLegacy, success: true } } catch (error) { @@ -2283,25 +2337,47 @@ class RedisClient { const keys = await client.keys('concurrency:*') let totalCleared = 0 + let legacyCleared = 0 const clearedKeys = [] for (const key of keys) { - const count = await client.zcard(key) - await client.del(key) - totalCleared += count - clearedKeys.push({ - key, - clearedCount: count - }) + // 跳过 queue 相关的键(它们有各自的清理逻辑) + if (key.startsWith('concurrency:queue:')) { + continue + } + + // 检查键类型 + const keyType = await client.type(key) + if (keyType === 'zset') { + const count = await client.zcard(key) + await client.del(key) + totalCleared += count + clearedKeys.push({ + key, + clearedCount: count, + type: 'zset' + }) + } else { + // 非 zset 类型的遗留键,直接删除 + await client.del(key) + legacyCleared++ + clearedKeys.push({ + key, + clearedCount: 0, + type: keyType, + legacy: true + }) + } } logger.warn( - `🧹 Force cleared all concurrency: ${keys.length} keys, ${totalCleared} total entries` + `🧹 Force cleared all concurrency: ${clearedKeys.length} keys, ${totalCleared} entries, ${legacyCleared} legacy keys` ) return { - keysCleared: keys.length, + keysCleared: clearedKeys.length, totalEntriesCleared: totalCleared, + legacyKeysCleared: legacyCleared, clearedKeys, success: true } @@ -2329,9 +2405,30 @@ class RedisClient { } let totalCleaned = 0 + let legacyCleaned = 0 const cleanedKeys = [] for (const key of keys) { + // 跳过 queue 相关的键(它们有各自的清理逻辑) + if (key.startsWith('concurrency:queue:')) { + continue + } + + // 检查键类型 + const keyType = await client.type(key) + if (keyType !== 'zset') { + // 非 zset 类型的遗留键,直接删除 + await client.del(key) + legacyCleaned++ + cleanedKeys.push({ + key, + cleanedCount: 0, + type: keyType, + legacy: true + }) + continue + } + // 只清理过期的条目 const cleaned = await client.zremrangebyscore(key, '-inf', now) if (cleaned > 0) { @@ -2350,13 +2447,14 @@ class RedisClient { } logger.info( - `🧹 Cleaned up expired concurrency: ${totalCleaned} entries from ${cleanedKeys.length} keys` + `🧹 Cleaned up expired concurrency: ${totalCleaned} entries from ${cleanedKeys.length} keys, ${legacyCleaned} legacy keys removed` ) return { keysProcessed: keys.length, keysCleaned: cleanedKeys.length, totalEntriesCleaned: totalCleaned, + legacyKeysRemoved: legacyCleaned, cleanedKeys, success: true }