diff --git a/src/routes/geminiRoutes.js b/src/routes/geminiRoutes.js index 54f84426..73d4b7c7 100644 --- a/src/routes/geminiRoutes.js +++ b/src/routes/geminiRoutes.js @@ -9,6 +9,7 @@ const sessionHelper = require('../utils/sessionHelper') const unifiedGeminiScheduler = require('../services/unifiedGeminiScheduler') const apiKeyService = require('../services/apiKeyService') const { updateRateLimitCounters } = require('../utils/rateLimitHelper') +const { parseSSELine } = require('../utils/sseParser') // const { OAuth2Client } = require('google-auth-library'); // OAuth2Client is not used in this file // 生成会话哈希 @@ -917,26 +918,6 @@ async function handleStreamGenerateContent(req, res) { res.setHeader('Connection', 'keep-alive') res.setHeader('X-Accel-Buffering', 'no') - // SSE 解析函数 - const parseSSELine = (line) => { - if (!line.startsWith('data: ')) { - return { type: 'other', line, data: null } - } - - const jsonStr = line.substring(6).trim() - - if (!jsonStr || jsonStr === '[DONE]') { - return { type: 'control', line, data: null, jsonStr } - } - - try { - const data = JSON.parse(jsonStr) - return { type: 'data', line, data, jsonStr } - } catch (e) { - return { type: 'invalid', line, data: null, jsonStr, error: e } - } - } - // 处理流式响应并捕获usage数据 let streamBuffer = '' // 统一的流处理缓冲区 let totalUsage = { diff --git a/src/routes/standardGeminiRoutes.js b/src/routes/standardGeminiRoutes.js index 46914b93..4bf718ef 100644 --- a/src/routes/standardGeminiRoutes.js +++ b/src/routes/standardGeminiRoutes.js @@ -6,6 +6,7 @@ const geminiAccountService = require('../services/geminiAccountService') const unifiedGeminiScheduler = require('../services/unifiedGeminiScheduler') const apiKeyService = require('../services/apiKeyService') const sessionHelper = require('../utils/sessionHelper') +const { parseSSELine } = require('../utils/sseParser') // 导入 geminiRoutes 中导出的处理函数 const { handleLoadCodeAssist, handleOnboardUser, handleCountTokens } = require('./geminiRoutes') @@ -509,6 +510,7 @@ async function handleStandardStreamGenerateContent(req, res) { res.setHeader('X-Accel-Buffering', 'no') // 处理流式响应并捕获usage数据 + let streamBuffer = '' // 统一的流处理缓冲区 let totalUsage = { promptTokenCount: 0, candidatesTokenCount: 0, @@ -517,38 +519,52 @@ async function handleStandardStreamGenerateContent(req, res) { streamResponse.on('data', (chunk) => { try { - if (!res.destroyed) { - const chunkStr = chunk.toString() + const chunkStr = chunk.toString() - // 处理 SSE 格式的数据 - const lines = chunkStr.split('\n') - for (const line of lines) { - if (line.startsWith('data: ')) { - const jsonStr = line.substring(6).trim() - if (jsonStr && jsonStr !== '[DONE]') { - try { - const data = JSON.parse(jsonStr) + if (!chunkStr.trim()) { + return + } - // 捕获 usage 数据 - if (data.response?.usageMetadata) { - totalUsage = data.response.usageMetadata - } + // 使用统一缓冲区处理不完整的行 + streamBuffer += chunkStr + const lines = streamBuffer.split('\n') + streamBuffer = lines.pop() || '' // 保留最后一个不完整的行 - // 转换格式:移除 response 包装,直接返回标准 Gemini API 格式 - // 注意:不过滤 thought 字段,因为 gemini-cli 会自行处理 - if (data.response) { - res.write(`data: ${JSON.stringify(data.response)}\n\n`) - } else { - // 如果没有 response 包装,直接发送 - res.write(`data: ${JSON.stringify(data)}\n\n`) - } - } catch (e) { - // 忽略解析错误 - } - } else if (jsonStr === '[DONE]') { - // 保持 [DONE] 标记 - res.write(`${line}\n\n`) + for (const line of lines) { + if (!line.trim()) { + continue // 跳过空行 + } + + // 解析 SSE 行 + const parsed = parseSSELine(line) + + // 记录无效的解析(用于调试) + if (parsed.type === 'invalid') { + logger.warn('Failed to parse SSE line:', { + line: parsed.line.substring(0, 100), + error: parsed.error.message + }) + continue + } + + // 捕获 usage 数据 + if (parsed.type === 'data' && parsed.data.response?.usageMetadata) { + totalUsage = parsed.data.response.usageMetadata + logger.debug('📊 Captured Gemini usage data:', totalUsage) + } + + // 转换格式并发送 + if (!res.destroyed) { + if (parsed.type === 'data') { + // 转换格式:移除 response 包装,直接返回标准 Gemini API 格式 + if (parsed.data.response) { + res.write(`data: ${JSON.stringify(parsed.data.response)}\n\n`) + } else { + res.write(`data: ${JSON.stringify(parsed.data)}\n\n`) } + } else if (parsed.type === 'control') { + // 保持控制消息(如 [DONE])原样 + res.write(`${parsed.line}\n\n`) } } } @@ -578,6 +594,10 @@ async function handleStandardStreamGenerateContent(req, res) { } catch (error) { logger.error('Failed to record Gemini usage:', error) } + } else { + logger.warn( + `⚠️ Stream completed without usage data - totalTokenCount: ${totalUsage.totalTokenCount}` + ) } res.end() diff --git a/src/utils/sseParser.js b/src/utils/sseParser.js new file mode 100644 index 00000000..ea3d6a9c --- /dev/null +++ b/src/utils/sseParser.js @@ -0,0 +1,52 @@ +/** + * Server-Sent Events (SSE) 解析工具 + * + * 用于解析标准 SSE 格式的数据流 + * 当前主要用于 Gemini API 的流式响应处理 + * + * @module sseParser + */ + +/** + * 解析单行 SSE 数据 + * + * @param {string} line - SSE 格式的行(如:"data: {json}\n") + * @returns {Object} 解析结果 + * @returns {'data'|'control'|'other'|'invalid'} .type - 行类型 + * @returns {Object|null} .data - 解析后的 JSON 数据(仅 type='data' 时) + * @returns {string} .line - 原始行内容 + * @returns {string} [.jsonStr] - JSON 字符串 + * @returns {Error} [.error] - 解析错误(仅 type='invalid' 时) + * + * @example + * // 数据行 + * parseSSELine('data: {"key":"value"}') + * // => { type: 'data', data: {key: 'value'}, line: '...', jsonStr: '...' } + * + * @example + * // 控制行 + * parseSSELine('data: [DONE]') + * // => { type: 'control', data: null, line: '...', jsonStr: '[DONE]' } + */ +function parseSSELine(line) { + if (!line.startsWith('data: ')) { + return { type: 'other', line, data: null } + } + + const jsonStr = line.substring(6).trim() + + if (!jsonStr || jsonStr === '[DONE]') { + return { type: 'control', line, data: null, jsonStr } + } + + try { + const data = JSON.parse(jsonStr) + return { type: 'data', line, data, jsonStr } + } catch (e) { + return { type: 'invalid', line, data: null, jsonStr, error: e } + } +} + +module.exports = { + parseSSELine +}