/** * 并发请求排队功能集成测试 * * 测试分为三个层次: * 1. Mock 测试 - 测试核心逻辑,不需要真实 Redis * 2. Redis 方法测试 - 测试 Redis 操作的原子性和正确性 * 3. 端到端场景测试 - 测试完整的排队流程 * * 运行方式: * - npm test -- concurrencyQueue.integration # 运行所有测试(Mock 部分) * - REDIS_TEST=1 npm test -- concurrencyQueue.integration # 包含真实 Redis 测试 */ // Mock logger to avoid console output during tests jest.mock('../src/utils/logger', () => ({ api: jest.fn(), warn: jest.fn(), error: jest.fn(), info: jest.fn(), database: jest.fn(), security: jest.fn() })) const redis = require('../src/models/redis') const claudeRelayConfigService = require('../src/services/claudeRelayConfigService') // Helper: sleep function const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)) // Helper: 创建模拟的 req/res 对象 function createMockReqRes() { const listeners = {} const req = { destroyed: false, once: jest.fn((event, handler) => { listeners[`req:${event}`] = handler }), removeListener: jest.fn((event) => { delete listeners[`req:${event}`] }), // 触发事件的辅助方法 emit: (event) => { const handler = listeners[`req:${event}`] if (handler) { handler() } } } const res = { once: jest.fn((event, handler) => { listeners[`res:${event}`] = handler }), removeListener: jest.fn((event) => { delete listeners[`res:${event}`] }), emit: (event) => { const handler = listeners[`res:${event}`] if (handler) { handler() } } } return { req, res, listeners } } // ============================================ // 第一部分:Mock 测试 - waitForConcurrencySlot 核心逻辑 // ============================================ describe('ConcurrencyQueue Integration Tests', () => { describe('Part 1: waitForConcurrencySlot Logic (Mocked)', () => { // 导入 auth 模块中的 waitForConcurrencySlot // 由于它是内部函数,我们需要通过测试其行为来验证 // 这里我们模拟整个流程 let mockRedis beforeEach(() => { jest.clearAllMocks() // 创建 Redis mock mockRedis = { concurrencyCount: {}, queueCount: {}, stats: {}, waitTimes: {}, globalWaitTimes: [] } // Mock Redis 并发方法 jest.spyOn(redis, 'incrConcurrency').mockImplementation(async (keyId, requestId, _lease) => { if (!mockRedis.concurrencyCount[keyId]) { mockRedis.concurrencyCount[keyId] = new Set() } mockRedis.concurrencyCount[keyId].add(requestId) return mockRedis.concurrencyCount[keyId].size }) jest.spyOn(redis, 'decrConcurrency').mockImplementation(async (keyId, requestId) => { if (mockRedis.concurrencyCount[keyId]) { mockRedis.concurrencyCount[keyId].delete(requestId) return mockRedis.concurrencyCount[keyId].size } return 0 }) // Mock 排队计数方法 jest.spyOn(redis, 'incrConcurrencyQueue').mockImplementation(async (keyId) => { mockRedis.queueCount[keyId] = (mockRedis.queueCount[keyId] || 0) + 1 return mockRedis.queueCount[keyId] }) jest.spyOn(redis, 'decrConcurrencyQueue').mockImplementation(async (keyId) => { mockRedis.queueCount[keyId] = Math.max(0, (mockRedis.queueCount[keyId] || 0) - 1) return mockRedis.queueCount[keyId] }) jest .spyOn(redis, 'getConcurrencyQueueCount') .mockImplementation(async (keyId) => mockRedis.queueCount[keyId] || 0) // Mock 统计方法 jest.spyOn(redis, 'incrConcurrencyQueueStats').mockImplementation(async (keyId, field) => { if (!mockRedis.stats[keyId]) { mockRedis.stats[keyId] = {} } mockRedis.stats[keyId][field] = (mockRedis.stats[keyId][field] || 0) + 1 return mockRedis.stats[keyId][field] }) jest.spyOn(redis, 'recordQueueWaitTime').mockResolvedValue(undefined) jest.spyOn(redis, 'recordGlobalQueueWaitTime').mockResolvedValue(undefined) }) afterEach(() => { jest.restoreAllMocks() }) describe('Slot Acquisition Flow', () => { it('should acquire slot immediately when under concurrency limit', async () => { // 模拟 waitForConcurrencySlot 的行为 const keyId = 'test-key-1' const requestId = 'req-1' const concurrencyLimit = 5 // 直接测试 incrConcurrency 的行为 const count = await redis.incrConcurrency(keyId, requestId, 300) expect(count).toBe(1) expect(count).toBeLessThanOrEqual(concurrencyLimit) }) it('should track multiple concurrent requests correctly', async () => { const keyId = 'test-key-2' const concurrencyLimit = 3 // 模拟多个并发请求 const results = [] for (let i = 1; i <= 5; i++) { const count = await redis.incrConcurrency(keyId, `req-${i}`, 300) results.push({ requestId: `req-${i}`, count, exceeds: count > concurrencyLimit }) } // 前3个应该在限制内 expect(results[0].exceeds).toBe(false) expect(results[1].exceeds).toBe(false) expect(results[2].exceeds).toBe(false) // 后2个超过限制 expect(results[3].exceeds).toBe(true) expect(results[4].exceeds).toBe(true) }) it('should release slot and allow next request', async () => { const keyId = 'test-key-3' const concurrencyLimit = 1 // 第一个请求获取槽位 const count1 = await redis.incrConcurrency(keyId, 'req-1', 300) expect(count1).toBe(1) // 第二个请求超限 const count2 = await redis.incrConcurrency(keyId, 'req-2', 300) expect(count2).toBe(2) expect(count2).toBeGreaterThan(concurrencyLimit) // 释放第二个请求(因为超限) await redis.decrConcurrency(keyId, 'req-2') // 释放第一个请求 await redis.decrConcurrency(keyId, 'req-1') // 现在第三个请求应该能获取 const count3 = await redis.incrConcurrency(keyId, 'req-3', 300) expect(count3).toBe(1) }) }) describe('Queue Count Management', () => { it('should increment and decrement queue count atomically', async () => { const keyId = 'test-key-4' // 增加排队计数 const count1 = await redis.incrConcurrencyQueue(keyId, 60000) expect(count1).toBe(1) const count2 = await redis.incrConcurrencyQueue(keyId, 60000) expect(count2).toBe(2) // 减少排队计数 const count3 = await redis.decrConcurrencyQueue(keyId) expect(count3).toBe(1) const count4 = await redis.decrConcurrencyQueue(keyId) expect(count4).toBe(0) }) it('should not go below zero on decrement', async () => { const keyId = 'test-key-5' // 直接减少(没有先增加) const count = await redis.decrConcurrencyQueue(keyId) expect(count).toBe(0) }) it('should handle concurrent queue operations', async () => { const keyId = 'test-key-6' // 并发增加 const increments = await Promise.all([ redis.incrConcurrencyQueue(keyId, 60000), redis.incrConcurrencyQueue(keyId, 60000), redis.incrConcurrencyQueue(keyId, 60000) ]) // 所有增量应该是连续的 const sortedIncrements = [...increments].sort((a, b) => a - b) expect(sortedIncrements).toEqual([1, 2, 3]) }) }) describe('Statistics Tracking', () => { it('should track entered/success/timeout/cancelled stats', async () => { const keyId = 'test-key-7' await redis.incrConcurrencyQueueStats(keyId, 'entered') await redis.incrConcurrencyQueueStats(keyId, 'entered') await redis.incrConcurrencyQueueStats(keyId, 'success') await redis.incrConcurrencyQueueStats(keyId, 'timeout') await redis.incrConcurrencyQueueStats(keyId, 'cancelled') expect(mockRedis.stats[keyId]).toEqual({ entered: 2, success: 1, timeout: 1, cancelled: 1 }) }) }) describe('Client Disconnection Handling', () => { it('should detect client disconnection via close event', async () => { const { req } = createMockReqRes() let clientDisconnected = false // 设置监听器 req.once('close', () => { clientDisconnected = true }) // 模拟客户端断开 req.emit('close') expect(clientDisconnected).toBe(true) }) it('should detect pre-destroyed request', () => { const { req } = createMockReqRes() req.destroyed = true expect(req.destroyed).toBe(true) }) }) describe('Exponential Backoff Simulation', () => { it('should increase poll interval with backoff', () => { const config = { pollIntervalMs: 200, maxPollIntervalMs: 2000, backoffFactor: 1.5, jitterRatio: 0 // 禁用抖动以便测试 } let interval = config.pollIntervalMs const intervals = [interval] for (let i = 0; i < 5; i++) { interval = Math.min(interval * config.backoffFactor, config.maxPollIntervalMs) intervals.push(interval) } // 验证指数增长 expect(intervals[1]).toBe(300) // 200 * 1.5 expect(intervals[2]).toBe(450) // 300 * 1.5 expect(intervals[3]).toBe(675) // 450 * 1.5 expect(intervals[4]).toBe(1012.5) // 675 * 1.5 expect(intervals[5]).toBe(1518.75) // 1012.5 * 1.5 }) it('should cap interval at maximum', () => { const config = { pollIntervalMs: 1000, maxPollIntervalMs: 2000, backoffFactor: 1.5 } let interval = config.pollIntervalMs for (let i = 0; i < 10; i++) { interval = Math.min(interval * config.backoffFactor, config.maxPollIntervalMs) } expect(interval).toBe(2000) }) it('should apply jitter within expected range', () => { const baseInterval = 1000 const jitterRatio = 0.2 // ±20% const results = [] for (let i = 0; i < 100; i++) { const randomValue = Math.random() const jitter = baseInterval * jitterRatio * (randomValue * 2 - 1) const finalInterval = baseInterval + jitter results.push(finalInterval) } const min = Math.min(...results) const max = Math.max(...results) // 所有结果应该在 [800, 1200] 范围内 expect(min).toBeGreaterThanOrEqual(800) expect(max).toBeLessThanOrEqual(1200) }) }) }) // ============================================ // 第二部分:并发竞争场景测试 // ============================================ describe('Part 2: Concurrent Race Condition Tests', () => { beforeEach(() => { jest.clearAllMocks() }) afterEach(() => { jest.restoreAllMocks() }) describe('Race Condition: Multiple Requests Competing for Same Slot', () => { it('should handle race condition when multiple requests try to acquire last slot', async () => { const keyId = 'race-test-1' const concurrencyLimit = 1 const concurrencyState = { count: 0, holders: new Set() } // 模拟原子的 incrConcurrency jest.spyOn(redis, 'incrConcurrency').mockImplementation(async (key, reqId) => { // 模拟原子操作 concurrencyState.count++ concurrencyState.holders.add(reqId) return concurrencyState.count }) jest.spyOn(redis, 'decrConcurrency').mockImplementation(async (key, reqId) => { if (concurrencyState.holders.has(reqId)) { concurrencyState.count-- concurrencyState.holders.delete(reqId) } return concurrencyState.count }) // 5个请求同时竞争1个槽位 const requests = Array.from({ length: 5 }, (_, i) => `req-${i + 1}`) const acquireResults = await Promise.all( requests.map(async (reqId) => { const count = await redis.incrConcurrency(keyId, reqId, 300) const acquired = count <= concurrencyLimit if (!acquired) { // 超限,释放 await redis.decrConcurrency(keyId, reqId) } return { reqId, count, acquired } }) ) // 只有一个请求应该成功获取槽位 const successfulAcquires = acquireResults.filter((r) => r.acquired) expect(successfulAcquires.length).toBe(1) // 最终并发计数应该是1 expect(concurrencyState.count).toBe(1) }) it('should maintain consistency under high contention', async () => { const keyId = 'race-test-2' const concurrencyLimit = 3 const requestCount = 20 const concurrencyState = { count: 0, maxSeen: 0 } jest.spyOn(redis, 'incrConcurrency').mockImplementation(async () => { concurrencyState.count++ concurrencyState.maxSeen = Math.max(concurrencyState.maxSeen, concurrencyState.count) return concurrencyState.count }) jest.spyOn(redis, 'decrConcurrency').mockImplementation(async () => { concurrencyState.count = Math.max(0, concurrencyState.count - 1) return concurrencyState.count }) // 模拟多轮请求 const activeRequests = [] for (let i = 0; i < requestCount; i++) { const count = await redis.incrConcurrency(keyId, `req-${i}`, 300) if (count <= concurrencyLimit) { activeRequests.push(`req-${i}`) // 模拟处理时间后释放 setTimeout(async () => { await redis.decrConcurrency(keyId, `req-${i}`) }, Math.random() * 50) } else { await redis.decrConcurrency(keyId, `req-${i}`) } // 随机延迟 await sleep(Math.random() * 10) } // 等待所有请求完成 await sleep(100) // 最大并发不应超过限制 expect(concurrencyState.maxSeen).toBeLessThanOrEqual(concurrencyLimit + requestCount) // 允许短暂超限 }) }) describe('Queue Overflow Protection', () => { it('should reject requests when queue is full', async () => { const keyId = 'overflow-test-1' const maxQueueSize = 5 const queueState = { count: 0 } jest.spyOn(redis, 'incrConcurrencyQueue').mockImplementation(async () => { queueState.count++ return queueState.count }) jest.spyOn(redis, 'decrConcurrencyQueue').mockImplementation(async () => { queueState.count = Math.max(0, queueState.count - 1) return queueState.count }) const results = [] // 尝试10个请求进入队列 for (let i = 0; i < 10; i++) { const queueCount = await redis.incrConcurrencyQueue(keyId, 60000) if (queueCount > maxQueueSize) { // 队列满,释放并拒绝 await redis.decrConcurrencyQueue(keyId) results.push({ index: i, accepted: false }) } else { results.push({ index: i, accepted: true, position: queueCount }) } } const accepted = results.filter((r) => r.accepted) const rejected = results.filter((r) => !r.accepted) expect(accepted.length).toBe(5) expect(rejected.length).toBe(5) }) }) }) // ============================================ // 第三部分:真实 Redis 集成测试(可选) // ============================================ describe('Part 3: Real Redis Integration Tests', () => { const skipRealRedis = !process.env.REDIS_TEST // 辅助函数:检查 Redis 连接 async function checkRedisConnection() { try { const client = redis.getClient() if (!client) { return false } await client.ping() return true } catch { return false } } beforeAll(async () => { if (skipRealRedis) { console.log('⏭️ Skipping real Redis tests (set REDIS_TEST=1 to enable)') return } const connected = await checkRedisConnection() if (!connected) { console.log('⚠️ Redis not connected, skipping real Redis tests') } }) // 清理测试数据 afterEach(async () => { if (skipRealRedis) { return } try { const client = redis.getClient() if (!client) { return } // 清理测试键 const testKeys = await client.keys('concurrency:queue:test-*') if (testKeys.length > 0) { await client.del(...testKeys) } } catch { // 忽略清理错误 } }) describe('Redis Queue Operations', () => { const testOrSkip = skipRealRedis ? it.skip : it testOrSkip('should atomically increment queue count with TTL', async () => { const keyId = 'test-redis-queue-1' const timeoutMs = 5000 const count1 = await redis.incrConcurrencyQueue(keyId, timeoutMs) expect(count1).toBe(1) const count2 = await redis.incrConcurrencyQueue(keyId, timeoutMs) expect(count2).toBe(2) // 验证 TTL 被设置 const client = redis.getClient() const ttl = await client.ttl(`concurrency:queue:${keyId}`) expect(ttl).toBeGreaterThan(0) expect(ttl).toBeLessThanOrEqual(Math.ceil(timeoutMs / 1000) + 30) }) testOrSkip('should atomically decrement and delete when zero', async () => { const keyId = 'test-redis-queue-2' await redis.incrConcurrencyQueue(keyId, 60000) const count = await redis.decrConcurrencyQueue(keyId) expect(count).toBe(0) // 验证键已删除 const client = redis.getClient() const exists = await client.exists(`concurrency:queue:${keyId}`) expect(exists).toBe(0) }) testOrSkip('should handle concurrent increments correctly', async () => { const keyId = 'test-redis-queue-3' const numRequests = 10 // 并发增加 const results = await Promise.all( Array.from({ length: numRequests }, () => redis.incrConcurrencyQueue(keyId, 60000)) ) // 所有结果应该是 1 到 numRequests const sorted = [...results].sort((a, b) => a - b) expect(sorted).toEqual(Array.from({ length: numRequests }, (_, i) => i + 1)) }) }) describe('Redis Stats Operations', () => { const testOrSkip = skipRealRedis ? it.skip : it testOrSkip('should track queue statistics correctly', async () => { const keyId = 'test-redis-stats-1' await redis.incrConcurrencyQueueStats(keyId, 'entered') await redis.incrConcurrencyQueueStats(keyId, 'entered') await redis.incrConcurrencyQueueStats(keyId, 'success') await redis.incrConcurrencyQueueStats(keyId, 'timeout') const stats = await redis.getConcurrencyQueueStats(keyId) expect(stats.entered).toBe(2) expect(stats.success).toBe(1) expect(stats.timeout).toBe(1) expect(stats.cancelled).toBe(0) }) testOrSkip('should record and retrieve wait times', async () => { const keyId = 'test-redis-wait-1' const waitTimes = [100, 200, 150, 300, 250] for (const wt of waitTimes) { await redis.recordQueueWaitTime(keyId, wt) } const recorded = await redis.getQueueWaitTimes(keyId) // 应该按 LIFO 顺序存储 expect(recorded.length).toBe(5) expect(recorded[0]).toBe(250) // 最后插入的在前面 }) testOrSkip('should record global wait times', async () => { const waitTimes = [500, 600, 700] for (const wt of waitTimes) { await redis.recordGlobalQueueWaitTime(wt) } const recorded = await redis.getGlobalQueueWaitTimes() expect(recorded.length).toBeGreaterThanOrEqual(3) }) }) describe('Redis Cleanup Operations', () => { const testOrSkip = skipRealRedis ? it.skip : it testOrSkip('should clear specific queue', async () => { const keyId = 'test-redis-clear-1' await redis.incrConcurrencyQueue(keyId, 60000) await redis.incrConcurrencyQueue(keyId, 60000) const cleared = await redis.clearConcurrencyQueue(keyId) expect(cleared).toBe(true) const count = await redis.getConcurrencyQueueCount(keyId) expect(count).toBe(0) }) testOrSkip('should clear all queues but preserve stats', async () => { const keyId1 = 'test-redis-clearall-1' const keyId2 = 'test-redis-clearall-2' // 创建队列和统计 await redis.incrConcurrencyQueue(keyId1, 60000) await redis.incrConcurrencyQueue(keyId2, 60000) await redis.incrConcurrencyQueueStats(keyId1, 'entered') // 清理所有队列 const cleared = await redis.clearAllConcurrencyQueues() expect(cleared).toBeGreaterThanOrEqual(2) // 验证队列已清理 const count1 = await redis.getConcurrencyQueueCount(keyId1) const count2 = await redis.getConcurrencyQueueCount(keyId2) expect(count1).toBe(0) expect(count2).toBe(0) // 统计应该保留 const stats = await redis.getConcurrencyQueueStats(keyId1) expect(stats.entered).toBe(1) }) }) }) // ============================================ // 第四部分:配置服务集成测试 // ============================================ describe('Part 4: Configuration Service Integration', () => { beforeEach(() => { // 清除配置缓存 claudeRelayConfigService.clearCache() }) afterEach(() => { jest.restoreAllMocks() }) describe('Queue Configuration', () => { it('should return default queue configuration', async () => { jest.spyOn(redis, 'getClient').mockReturnValue(null) const config = await claudeRelayConfigService.getConfig() expect(config.concurrentRequestQueueEnabled).toBe(false) expect(config.concurrentRequestQueueMaxSize).toBe(3) expect(config.concurrentRequestQueueMaxSizeMultiplier).toBe(0) expect(config.concurrentRequestQueueTimeoutMs).toBe(10000) }) it('should calculate max queue size correctly', async () => { const testCases = [ { concurrencyLimit: 5, multiplier: 2, fixedMin: 3, expected: 10 }, // 5*2=10 > 3 { concurrencyLimit: 1, multiplier: 1, fixedMin: 5, expected: 5 }, // 1*1=1 < 5 { concurrencyLimit: 10, multiplier: 0.5, fixedMin: 3, expected: 5 }, // 10*0.5=5 > 3 { concurrencyLimit: 2, multiplier: 1, fixedMin: 10, expected: 10 } // 2*1=2 < 10 ] for (const tc of testCases) { const maxQueueSize = Math.max(tc.concurrencyLimit * tc.multiplier, tc.fixedMin) expect(maxQueueSize).toBe(tc.expected) } }) }) }) // ============================================ // 第五部分:端到端场景测试 // ============================================ describe('Part 5: End-to-End Scenario Tests', () => { describe('Scenario: Claude Code Agent Parallel Tool Calls', () => { it('should handle burst of parallel tool results', async () => { // 模拟 Claude Code Agent 发送多个并行工具结果的场景 const concurrencyLimit = 2 const maxQueueSize = 5 const state = { concurrency: 0, queue: 0, completed: 0, rejected: 0 } // 模拟 8 个并行工具结果请求 const requests = Array.from({ length: 8 }, (_, i) => ({ id: `tool-result-${i + 1}`, startTime: Date.now() })) // 模拟处理逻辑 async function processRequest(req) { // 尝试获取并发槽位 state.concurrency++ if (state.concurrency > concurrencyLimit) { // 超限,进入队列 state.concurrency-- state.queue++ if (state.queue > maxQueueSize) { // 队列满,拒绝 state.queue-- state.rejected++ return { ...req, status: 'rejected', reason: 'queue_full' } } // 等待槽位(模拟) await sleep(Math.random() * 100) state.queue-- state.concurrency++ } // 处理请求 await sleep(50) // 模拟处理时间 state.concurrency-- state.completed++ return { ...req, status: 'completed', duration: Date.now() - req.startTime } } const results = await Promise.all(requests.map(processRequest)) const completed = results.filter((r) => r.status === 'completed') const rejected = results.filter((r) => r.status === 'rejected') // 大部分请求应该完成 expect(completed.length).toBeGreaterThan(0) // 可能有一些被拒绝 expect(state.rejected).toBe(rejected.length) console.log( ` ✓ Completed: ${completed.length}, Rejected: ${rejected.length}, Max concurrent: ${concurrencyLimit}` ) }) }) describe('Scenario: Graceful Degradation', () => { it('should fallback when Redis fails', async () => { jest .spyOn(redis, 'incrConcurrencyQueue') .mockRejectedValue(new Error('Redis connection lost')) // 模拟降级行为:Redis 失败时直接拒绝而不是崩溃 let result try { await redis.incrConcurrencyQueue('fallback-test', 60000) result = { success: true } } catch (error) { // 优雅降级:返回 429 而不是 500 result = { success: false, fallback: true, error: error.message } } expect(result.fallback).toBe(true) expect(result.error).toContain('Redis') }) }) describe('Scenario: Timeout Behavior', () => { it('should respect queue timeout', async () => { const timeoutMs = 100 const startTime = Date.now() // 模拟等待超时 await new Promise((resolve) => setTimeout(resolve, timeoutMs)) const elapsed = Date.now() - startTime expect(elapsed).toBeGreaterThanOrEqual(timeoutMs - 10) // 允许 10ms 误差 }) it('should track timeout statistics', async () => { const stats = { entered: 0, success: 0, timeout: 0, cancelled: 0 } // 模拟多个请求,部分超时 const requests = [ { id: 'req-1', willTimeout: false }, { id: 'req-2', willTimeout: true }, { id: 'req-3', willTimeout: false }, { id: 'req-4', willTimeout: true } ] for (const req of requests) { stats.entered++ if (req.willTimeout) { stats.timeout++ } else { stats.success++ } } expect(stats.entered).toBe(4) expect(stats.success).toBe(2) expect(stats.timeout).toBe(2) // 成功率应该是 50% const successRate = (stats.success / stats.entered) * 100 expect(successRate).toBe(50) }) }) }) })