diff --git a/src/routes/geminiRoutes.js b/src/routes/geminiRoutes.js index 5b87d52a..c9f9f244 100644 --- a/src/routes/geminiRoutes.js +++ b/src/routes/geminiRoutes.js @@ -924,7 +924,7 @@ async function handleStreamGenerateContent(req, res) { res.setHeader('X-Accel-Buffering', 'no') // 处理流式响应并捕获usage数据 - // 方案 A++:透明转发 + 异步 usage 提取 + // 方案 A++:透明转发 + 异步 usage 提取 + SSE 心跳机制 let streamBuffer = '' // 缓冲区用于处理不完整的行 let totalUsage = { promptTokenCount: 0, @@ -933,8 +933,26 @@ async function handleStreamGenerateContent(req, res) { } let usageReported = false // 修复:改为 let 以便后续修改 + // SSE 心跳机制:防止 Clash 等代理 120 秒超时 + let heartbeatTimer = null + let lastDataTime = Date.now() + const HEARTBEAT_INTERVAL = 15000 // 15 秒 + + const sendHeartbeat = () => { + const timeSinceLastData = Date.now() - lastDataTime + if (timeSinceLastData >= HEARTBEAT_INTERVAL && !res.destroyed) { + res.write('\n') // 发送空行保持连接活跃 + logger.info(`💓 Sent SSE keepalive (gap: ${(timeSinceLastData / 1000).toFixed(1)}s)`) + } + } + + heartbeatTimer = setInterval(sendHeartbeat, HEARTBEAT_INTERVAL) + streamResponse.on('data', (chunk) => { try { + // 更新最后数据时间 + lastDataTime = Date.now() + // 1️⃣ 立即转发原始数据(零延迟,最高优先级) // 对所有版本(v1beta 和 v1internal)都采用透明转发 if (!res.destroyed) { @@ -973,13 +991,13 @@ async function handleStreamGenerateContent(req, res) { logger.debug('📊 Captured Gemini usage data:', totalUsage) } } catch (parseError) { - // 静默失败,不影响转发 - logger.debug('Failed to parse usage line:', parseError.message) + // 解析失败但不影响转发 + logger.warn('⚠️ Failed to parse usage line:', parseError.message) } } } catch (error) { - // 静默失败,不影响转发 - logger.debug('Error extracting usage data:', error.message) + // 提取失败但不影响转发 + logger.warn('⚠️ Error extracting usage data:', error.message) } }) } catch (error) { @@ -988,13 +1006,22 @@ async function handleStreamGenerateContent(req, res) { } }) - streamResponse.on('end', async () => { + streamResponse.on('end', () => { logger.info('Stream completed successfully') - // 记录使用统计 + // 清理心跳定时器 + if (heartbeatTimer) { + clearInterval(heartbeatTimer) + heartbeatTimer = null + } + + // 立即结束响应,不阻塞 + res.end() + + // 异步记录使用统计(不阻塞响应) if (!usageReported && totalUsage.totalTokenCount > 0) { - try { - await apiKeyService.recordUsage( + Promise.all([ + apiKeyService.recordUsage( req.apiKey.id, totalUsage.promptTokenCount || 0, totalUsage.candidatesTokenCount || 0, @@ -1002,12 +1029,8 @@ async function handleStreamGenerateContent(req, res) { 0, // cacheReadTokens model, account.id - ) - logger.info( - `📊 Recorded Gemini stream usage - Input: ${totalUsage.promptTokenCount}, Output: ${totalUsage.candidatesTokenCount}, Total: ${totalUsage.totalTokenCount}` - ) - - await applyRateLimitTracking( + ), + applyRateLimitTracking( req, { inputTokens: totalUsage.promptTokenCount || 0, @@ -1018,19 +1041,28 @@ async function handleStreamGenerateContent(req, res) { model, 'gemini-stream' ) - - // 修复:标记 usage 已上报,避免重复上报 - usageReported = true - } catch (error) { - logger.error('Failed to record Gemini usage:', error) - } + ]) + .then(() => { + logger.info( + `📊 Recorded Gemini stream usage - Input: ${totalUsage.promptTokenCount}, Output: ${totalUsage.candidatesTokenCount}, Total: ${totalUsage.totalTokenCount}` + ) + usageReported = true + }) + .catch((error) => { + logger.error('Failed to record Gemini usage:', error) + }) } - - res.end() }) streamResponse.on('error', (error) => { logger.error('Stream error:', error) + + // 清理心跳定时器 + if (heartbeatTimer) { + clearInterval(heartbeatTimer) + heartbeatTimer = null + } + if (!res.headersSent) { // 如果还没发送响应头,可以返回正常的错误响应 res.status(500).json({ diff --git a/src/routes/standardGeminiRoutes.js b/src/routes/standardGeminiRoutes.js index c8358e25..84bd93e9 100644 --- a/src/routes/standardGeminiRoutes.js +++ b/src/routes/standardGeminiRoutes.js @@ -510,7 +510,7 @@ async function handleStandardStreamGenerateContent(req, res) { res.setHeader('X-Accel-Buffering', 'no') // 处理流式响应并捕获usage数据 - // 方案 A++:透明转发 + 异步 usage 提取 + // 方案 A++:透明转发 + 异步 usage 提取 + SSE 心跳机制 let streamBuffer = '' // 缓冲区用于处理不完整的行 let totalUsage = { promptTokenCount: 0, @@ -518,8 +518,26 @@ async function handleStandardStreamGenerateContent(req, res) { totalTokenCount: 0 } + // SSE 心跳机制:防止 Clash 等代理 120 秒超时 + let heartbeatTimer = null + let lastDataTime = Date.now() + const HEARTBEAT_INTERVAL = 15000 // 15 秒 + + const sendHeartbeat = () => { + const timeSinceLastData = Date.now() - lastDataTime + if (timeSinceLastData >= HEARTBEAT_INTERVAL && !res.destroyed) { + res.write('\n') // 发送空行保持连接活跃 + logger.info(`💓 Sent SSE keepalive (gap: ${(timeSinceLastData / 1000).toFixed(1)}s)`) + } + } + + heartbeatTimer = setInterval(sendHeartbeat, HEARTBEAT_INTERVAL) + streamResponse.on('data', (chunk) => { try { + // 更新最后数据时间 + lastDataTime = Date.now() + // 1️⃣ 立即转发原始数据(零延迟,最高优先级) if (!res.destroyed) { res.write(chunk) // 直接转发 Buffer,无需转换和序列化 @@ -557,13 +575,13 @@ async function handleStandardStreamGenerateContent(req, res) { logger.debug('📊 Captured Gemini usage data:', totalUsage) } } catch (parseError) { - // 静默失败,不影响转发 - logger.debug('Failed to parse usage line:', parseError.message) + // 解析失败但不影响转发 + logger.warn('⚠️ Failed to parse usage line:', parseError.message) } } } catch (error) { - // 静默失败,不影响转发 - logger.debug('Error extracting usage data:', error.message) + // 提取失败但不影响转发 + logger.warn('⚠️ Error extracting usage data:', error.message) } }) } catch (error) { @@ -572,13 +590,22 @@ async function handleStandardStreamGenerateContent(req, res) { } }) - streamResponse.on('end', async () => { + streamResponse.on('end', () => { logger.info('Stream completed successfully') - // 记录使用统计 + // 清理心跳定时器 + if (heartbeatTimer) { + clearInterval(heartbeatTimer) + heartbeatTimer = null + } + + // 立即结束响应,不阻塞 + res.end() + + // 异步记录使用统计(不阻塞响应) if (totalUsage.totalTokenCount > 0) { - try { - await apiKeyService.recordUsage( + apiKeyService + .recordUsage( req.apiKey.id, totalUsage.promptTokenCount || 0, totalUsage.candidatesTokenCount || 0, @@ -587,23 +614,30 @@ async function handleStandardStreamGenerateContent(req, res) { model, account.id ) - logger.info( - `📊 Recorded Gemini stream usage - Input: ${totalUsage.promptTokenCount}, Output: ${totalUsage.candidatesTokenCount}, Total: ${totalUsage.totalTokenCount}` - ) - } catch (error) { - logger.error('Failed to record Gemini usage:', error) - } + .then(() => { + logger.info( + `📊 Recorded Gemini stream usage - Input: ${totalUsage.promptTokenCount}, Output: ${totalUsage.candidatesTokenCount}, Total: ${totalUsage.totalTokenCount}` + ) + }) + .catch((error) => { + logger.error('Failed to record Gemini usage:', error) + }) } else { logger.warn( `⚠️ Stream completed without usage data - totalTokenCount: ${totalUsage.totalTokenCount}` ) } - - res.end() }) streamResponse.on('error', (error) => { logger.error('Stream error:', error) + + // 清理心跳定时器 + if (heartbeatTimer) { + clearInterval(heartbeatTimer) + heartbeatTimer = null + } + if (!res.headersSent) { // 如果还没发送响应头,可以返回正常的错误响应 res.status(500).json({