实现SSE心跳机制和非阻塞响应结束

This commit is contained in:
曾庆雷
2025-11-19 11:59:38 +08:00
parent 94925e57bd
commit 9eccc7da49
2 changed files with 106 additions and 40 deletions

View File

@@ -924,7 +924,7 @@ async function handleStreamGenerateContent(req, res) {
res.setHeader('X-Accel-Buffering', 'no') res.setHeader('X-Accel-Buffering', 'no')
// 处理流式响应并捕获usage数据 // 处理流式响应并捕获usage数据
// 方案 A++:透明转发 + 异步 usage 提取 // 方案 A++:透明转发 + 异步 usage 提取 + SSE 心跳机制
let streamBuffer = '' // 缓冲区用于处理不完整的行 let streamBuffer = '' // 缓冲区用于处理不完整的行
let totalUsage = { let totalUsage = {
promptTokenCount: 0, promptTokenCount: 0,
@@ -933,8 +933,26 @@ async function handleStreamGenerateContent(req, res) {
} }
let usageReported = false // 修复:改为 let 以便后续修改 let usageReported = false // 修复:改为 let 以便后续修改
// SSE 心跳机制:防止 Clash 等代理 120 秒超时
let heartbeatTimer = null
let lastDataTime = Date.now()
const HEARTBEAT_INTERVAL = 15000 // 15 秒
const sendHeartbeat = () => {
const timeSinceLastData = Date.now() - lastDataTime
if (timeSinceLastData >= HEARTBEAT_INTERVAL && !res.destroyed) {
res.write('\n') // 发送空行保持连接活跃
logger.info(`💓 Sent SSE keepalive (gap: ${(timeSinceLastData / 1000).toFixed(1)}s)`)
}
}
heartbeatTimer = setInterval(sendHeartbeat, HEARTBEAT_INTERVAL)
streamResponse.on('data', (chunk) => { streamResponse.on('data', (chunk) => {
try { try {
// 更新最后数据时间
lastDataTime = Date.now()
// 1⃣ 立即转发原始数据(零延迟,最高优先级) // 1⃣ 立即转发原始数据(零延迟,最高优先级)
// 对所有版本v1beta 和 v1internal都采用透明转发 // 对所有版本v1beta 和 v1internal都采用透明转发
if (!res.destroyed) { if (!res.destroyed) {
@@ -973,13 +991,13 @@ async function handleStreamGenerateContent(req, res) {
logger.debug('📊 Captured Gemini usage data:', totalUsage) logger.debug('📊 Captured Gemini usage data:', totalUsage)
} }
} catch (parseError) { } catch (parseError) {
// 静默失败不影响转发 // 解析失败不影响转发
logger.debug('Failed to parse usage line:', parseError.message) logger.warn('⚠️ Failed to parse usage line:', parseError.message)
} }
} }
} catch (error) { } catch (error) {
// 静默失败不影响转发 // 提取失败不影响转发
logger.debug('Error extracting usage data:', error.message) logger.warn('⚠️ Error extracting usage data:', error.message)
} }
}) })
} catch (error) { } catch (error) {
@@ -988,13 +1006,22 @@ async function handleStreamGenerateContent(req, res) {
} }
}) })
streamResponse.on('end', async () => { streamResponse.on('end', () => {
logger.info('Stream completed successfully') logger.info('Stream completed successfully')
// 记录使用统计 // 清理心跳定时器
if (heartbeatTimer) {
clearInterval(heartbeatTimer)
heartbeatTimer = null
}
// 立即结束响应,不阻塞
res.end()
// 异步记录使用统计(不阻塞响应)
if (!usageReported && totalUsage.totalTokenCount > 0) { if (!usageReported && totalUsage.totalTokenCount > 0) {
try { Promise.all([
await apiKeyService.recordUsage( apiKeyService.recordUsage(
req.apiKey.id, req.apiKey.id,
totalUsage.promptTokenCount || 0, totalUsage.promptTokenCount || 0,
totalUsage.candidatesTokenCount || 0, totalUsage.candidatesTokenCount || 0,
@@ -1002,12 +1029,8 @@ async function handleStreamGenerateContent(req, res) {
0, // cacheReadTokens 0, // cacheReadTokens
model, model,
account.id account.id
) ),
logger.info( applyRateLimitTracking(
`📊 Recorded Gemini stream usage - Input: ${totalUsage.promptTokenCount}, Output: ${totalUsage.candidatesTokenCount}, Total: ${totalUsage.totalTokenCount}`
)
await applyRateLimitTracking(
req, req,
{ {
inputTokens: totalUsage.promptTokenCount || 0, inputTokens: totalUsage.promptTokenCount || 0,
@@ -1018,19 +1041,28 @@ async function handleStreamGenerateContent(req, res) {
model, model,
'gemini-stream' 'gemini-stream'
) )
])
// 修复:标记 usage 已上报,避免重复上报 .then(() => {
logger.info(
`📊 Recorded Gemini stream usage - Input: ${totalUsage.promptTokenCount}, Output: ${totalUsage.candidatesTokenCount}, Total: ${totalUsage.totalTokenCount}`
)
usageReported = true usageReported = true
} catch (error) { })
.catch((error) => {
logger.error('Failed to record Gemini usage:', error) logger.error('Failed to record Gemini usage:', error)
})
} }
}
res.end()
}) })
streamResponse.on('error', (error) => { streamResponse.on('error', (error) => {
logger.error('Stream error:', error) logger.error('Stream error:', error)
// 清理心跳定时器
if (heartbeatTimer) {
clearInterval(heartbeatTimer)
heartbeatTimer = null
}
if (!res.headersSent) { if (!res.headersSent) {
// 如果还没发送响应头,可以返回正常的错误响应 // 如果还没发送响应头,可以返回正常的错误响应
res.status(500).json({ res.status(500).json({

View File

@@ -510,7 +510,7 @@ async function handleStandardStreamGenerateContent(req, res) {
res.setHeader('X-Accel-Buffering', 'no') res.setHeader('X-Accel-Buffering', 'no')
// 处理流式响应并捕获usage数据 // 处理流式响应并捕获usage数据
// 方案 A++:透明转发 + 异步 usage 提取 // 方案 A++:透明转发 + 异步 usage 提取 + SSE 心跳机制
let streamBuffer = '' // 缓冲区用于处理不完整的行 let streamBuffer = '' // 缓冲区用于处理不完整的行
let totalUsage = { let totalUsage = {
promptTokenCount: 0, promptTokenCount: 0,
@@ -518,8 +518,26 @@ async function handleStandardStreamGenerateContent(req, res) {
totalTokenCount: 0 totalTokenCount: 0
} }
// SSE 心跳机制:防止 Clash 等代理 120 秒超时
let heartbeatTimer = null
let lastDataTime = Date.now()
const HEARTBEAT_INTERVAL = 15000 // 15 秒
const sendHeartbeat = () => {
const timeSinceLastData = Date.now() - lastDataTime
if (timeSinceLastData >= HEARTBEAT_INTERVAL && !res.destroyed) {
res.write('\n') // 发送空行保持连接活跃
logger.info(`💓 Sent SSE keepalive (gap: ${(timeSinceLastData / 1000).toFixed(1)}s)`)
}
}
heartbeatTimer = setInterval(sendHeartbeat, HEARTBEAT_INTERVAL)
streamResponse.on('data', (chunk) => { streamResponse.on('data', (chunk) => {
try { try {
// 更新最后数据时间
lastDataTime = Date.now()
// 1⃣ 立即转发原始数据(零延迟,最高优先级) // 1⃣ 立即转发原始数据(零延迟,最高优先级)
if (!res.destroyed) { if (!res.destroyed) {
res.write(chunk) // 直接转发 Buffer无需转换和序列化 res.write(chunk) // 直接转发 Buffer无需转换和序列化
@@ -557,13 +575,13 @@ async function handleStandardStreamGenerateContent(req, res) {
logger.debug('📊 Captured Gemini usage data:', totalUsage) logger.debug('📊 Captured Gemini usage data:', totalUsage)
} }
} catch (parseError) { } catch (parseError) {
// 静默失败不影响转发 // 解析失败不影响转发
logger.debug('Failed to parse usage line:', parseError.message) logger.warn('⚠️ Failed to parse usage line:', parseError.message)
} }
} }
} catch (error) { } catch (error) {
// 静默失败不影响转发 // 提取失败不影响转发
logger.debug('Error extracting usage data:', error.message) logger.warn('⚠️ Error extracting usage data:', error.message)
} }
}) })
} catch (error) { } catch (error) {
@@ -572,13 +590,22 @@ async function handleStandardStreamGenerateContent(req, res) {
} }
}) })
streamResponse.on('end', async () => { streamResponse.on('end', () => {
logger.info('Stream completed successfully') logger.info('Stream completed successfully')
// 记录使用统计 // 清理心跳定时器
if (heartbeatTimer) {
clearInterval(heartbeatTimer)
heartbeatTimer = null
}
// 立即结束响应,不阻塞
res.end()
// 异步记录使用统计(不阻塞响应)
if (totalUsage.totalTokenCount > 0) { if (totalUsage.totalTokenCount > 0) {
try { apiKeyService
await apiKeyService.recordUsage( .recordUsage(
req.apiKey.id, req.apiKey.id,
totalUsage.promptTokenCount || 0, totalUsage.promptTokenCount || 0,
totalUsage.candidatesTokenCount || 0, totalUsage.candidatesTokenCount || 0,
@@ -587,23 +614,30 @@ async function handleStandardStreamGenerateContent(req, res) {
model, model,
account.id account.id
) )
.then(() => {
logger.info( logger.info(
`📊 Recorded Gemini stream usage - Input: ${totalUsage.promptTokenCount}, Output: ${totalUsage.candidatesTokenCount}, Total: ${totalUsage.totalTokenCount}` `📊 Recorded Gemini stream usage - Input: ${totalUsage.promptTokenCount}, Output: ${totalUsage.candidatesTokenCount}, Total: ${totalUsage.totalTokenCount}`
) )
} catch (error) { })
.catch((error) => {
logger.error('Failed to record Gemini usage:', error) logger.error('Failed to record Gemini usage:', error)
} })
} else { } else {
logger.warn( logger.warn(
`⚠️ Stream completed without usage data - totalTokenCount: ${totalUsage.totalTokenCount}` `⚠️ Stream completed without usage data - totalTokenCount: ${totalUsage.totalTokenCount}`
) )
} }
res.end()
}) })
streamResponse.on('error', (error) => { streamResponse.on('error', (error) => {
logger.error('Stream error:', error) logger.error('Stream error:', error)
// 清理心跳定时器
if (heartbeatTimer) {
clearInterval(heartbeatTimer)
heartbeatTimer = null
}
if (!res.headersSent) { if (!res.headersSent) {
// 如果还没发送响应头,可以返回正常的错误响应 // 如果还没发送响应头,可以返回正常的错误响应
res.status(500).json({ res.status(500).json({