From c33771ef82b4422cd4b7a21fb4b703b7359cd683 Mon Sep 17 00:00:00 2001 From: shaw Date: Sat, 22 Nov 2025 18:10:54 +0800 Subject: [PATCH] fix: split SSE chunks per event to avoid JSON parse errors --- src/routes/standardGeminiRoutes.js | 142 ++++++++++++++++++----------- 1 file changed, 91 insertions(+), 51 deletions(-) diff --git a/src/routes/standardGeminiRoutes.js b/src/routes/standardGeminiRoutes.js index 701e3328..759e29b3 100644 --- a/src/routes/standardGeminiRoutes.js +++ b/src/routes/standardGeminiRoutes.js @@ -547,64 +547,94 @@ async function handleStandardStreamGenerateContent(req, res) { heartbeatTimer = setInterval(sendHeartbeat, HEARTBEAT_INTERVAL) + // 缓冲区:有些 chunk 内会包含多条 SSE 事件,需要拆分 + let sseBuffer = '' + + // 处理单个 SSE 事件块(不含结尾空行) + const handleEventBlock = (evt) => { + if (!evt.trim()) { + return + } + + // 取出所有 data 行并拼接(兼容多行 data) + const dataLines = evt.split(/\r?\n/).filter((line) => line.startsWith('data:')) + if (dataLines.length === 0) { + // 非 data 事件,直接原样转发 + if (!res.destroyed) { + res.write(`${evt}\n\n`) + } + return + } + + const dataPayload = dataLines.map((line) => line.replace(/^data:\s?/, '')).join('\n') + + let processedPayload = null + let parsed = null + + if (dataPayload === '[DONE]') { + processedPayload = '[DONE]' + } else { + try { + parsed = JSON.parse(dataPayload) + + // 捕获 usage(如果在顶层或 response 内都有可能) + if (parsed.usageMetadata) { + totalUsage = parsed.usageMetadata + } else if (parsed.response?.usageMetadata) { + totalUsage = parsed.response.usageMetadata + } + + // 提取 response 并重新包装 + processedPayload = JSON.stringify(parsed.response || parsed) + } catch (e) { + // 解析失败,直接转发原始 data + } + } + + const outputChunk = processedPayload === null ? `${evt}\n\n` : `data: ${processedPayload}\n\n` + + // 1️⃣ 立即转发处理后的数据 + if (!res.destroyed) { + res.write(outputChunk) + } + + // 2️⃣ 异步提取 usage 数据(兜底,防止上面解析失败未捕获) + setImmediate(() => { + try { + const usageSource = + processedPayload && processedPayload !== '[DONE]' ? processedPayload : dataPayload + + if (!usageSource || !usageSource.includes('usageMetadata')) { + return + } + + // 再尝试一次解析 + const usageObj = JSON.parse(usageSource) + const usage = usageObj.usageMetadata || usageObj.response?.usageMetadata || usageObj.usage + + if (usage && typeof usage === 'object') { + totalUsage = usage + logger.debug('📊 Captured Gemini usage data (async):', totalUsage) + } + } catch (error) { + // 提取用量失败时忽略 + } + }) + } + streamResponse.on('data', (chunk) => { try { // 更新最后数据时间 lastDataTime = Date.now() - const chunkStr = chunk.toString() + // 追加到缓冲区后按双换行拆分事件 + sseBuffer += chunk.toString() + const events = sseBuffer.split(/\r?\n\r?\n/) + sseBuffer = events.pop() || '' - // 尝试解析 SSE 数据 - // upstream 返回格式: data: {"response": {...}} - // standard API 期望格式: data: {...} - - let processedChunk = chunk - - if (chunkStr.startsWith('data: ')) { - try { - const jsonStr = chunkStr.substring(6).trim() - if (jsonStr !== '[DONE]') { - const data = JSON.parse(jsonStr) - if (data.response) { - // 提取内部的 response 对象并重新包装为 SSE - const newPayload = JSON.stringify(data.response) - processedChunk = Buffer.from(`data: ${newPayload}\n\n`) - } - } - } catch (e) { - // 解析失败,直接转发原始数据 - // logger.warn('Failed to parse SSE chunk:', e) - } + for (const evt of events) { + handleEventBlock(evt) } - - // 1️⃣ 立即转发处理后的数据 - if (!res.destroyed) { - res.write(processedChunk) - } - - // 2️⃣ 异步提取 usage 数据(不阻塞转发) - setImmediate(() => { - try { - const str = processedChunk.toString() - if (!str.trim() || !str.includes('usageMetadata')) { - return - } - - // 简单的解析尝试 - const match = str.match(/"usageMetadata":\s*({[^}]+})/) - if (match && match[1]) { - try { - const usage = JSON.parse(match[1]) - totalUsage = usage - logger.debug('📊 Captured Gemini usage data:', totalUsage) - } catch (e) { - // ignore - } - } - } catch (error) { - logger.warn('⚠️ Error extracting usage data:', error.message) - } - }) } catch (error) { logger.error('Error processing stream chunk:', error) } @@ -613,6 +643,16 @@ async function handleStandardStreamGenerateContent(req, res) { streamResponse.on('end', () => { logger.info('Stream completed successfully') + // 处理可能残留在缓冲区的事件(上游未以空行结尾的情况) + if (sseBuffer.trim()) { + try { + handleEventBlock(sseBuffer) + } catch (flushError) { + // 忽略 flush 期间的异常 + } + sseBuffer = '' + } + // 清理心跳定时器 if (heartbeatTimer) { clearInterval(heartbeatTimer)