修复标准Gemini API流式响应的缓冲区和解析问题

- 新增通用SSE解析器(src/utils/sseParser.js)
- 添加streamBuffer处理TCP数据包分割
- 统一两种API方式的SSE解析逻辑
- 记录解析失败和usage缺失的详细日志
This commit is contained in:
曾庆雷
2025-11-13 00:48:13 +08:00
parent e130405809
commit 7a6c287a7e
3 changed files with 101 additions and 48 deletions

View File

@@ -9,6 +9,7 @@ const sessionHelper = require('../utils/sessionHelper')
const unifiedGeminiScheduler = require('../services/unifiedGeminiScheduler')
const apiKeyService = require('../services/apiKeyService')
const { updateRateLimitCounters } = require('../utils/rateLimitHelper')
const { parseSSELine } = require('../utils/sseParser')
// const { OAuth2Client } = require('google-auth-library'); // OAuth2Client is not used in this file
// 生成会话哈希
@@ -917,26 +918,6 @@ async function handleStreamGenerateContent(req, res) {
res.setHeader('Connection', 'keep-alive')
res.setHeader('X-Accel-Buffering', 'no')
// SSE 解析函数
const parseSSELine = (line) => {
if (!line.startsWith('data: ')) {
return { type: 'other', line, data: null }
}
const jsonStr = line.substring(6).trim()
if (!jsonStr || jsonStr === '[DONE]') {
return { type: 'control', line, data: null, jsonStr }
}
try {
const data = JSON.parse(jsonStr)
return { type: 'data', line, data, jsonStr }
} catch (e) {
return { type: 'invalid', line, data: null, jsonStr, error: e }
}
}
// 处理流式响应并捕获usage数据
let streamBuffer = '' // 统一的流处理缓冲区
let totalUsage = {

View File

@@ -6,6 +6,7 @@ const geminiAccountService = require('../services/geminiAccountService')
const unifiedGeminiScheduler = require('../services/unifiedGeminiScheduler')
const apiKeyService = require('../services/apiKeyService')
const sessionHelper = require('../utils/sessionHelper')
const { parseSSELine } = require('../utils/sseParser')
// 导入 geminiRoutes 中导出的处理函数
const { handleLoadCodeAssist, handleOnboardUser, handleCountTokens } = require('./geminiRoutes')
@@ -509,6 +510,7 @@ async function handleStandardStreamGenerateContent(req, res) {
res.setHeader('X-Accel-Buffering', 'no')
// 处理流式响应并捕获usage数据
let streamBuffer = '' // 统一的流处理缓冲区
let totalUsage = {
promptTokenCount: 0,
candidatesTokenCount: 0,
@@ -517,38 +519,52 @@ async function handleStandardStreamGenerateContent(req, res) {
streamResponse.on('data', (chunk) => {
try {
if (!res.destroyed) {
const chunkStr = chunk.toString()
// 处理 SSE 格式的数据
const lines = chunkStr.split('\n')
if (!chunkStr.trim()) {
return
}
// 使用统一缓冲区处理不完整的行
streamBuffer += chunkStr
const lines = streamBuffer.split('\n')
streamBuffer = lines.pop() || '' // 保留最后一个不完整的行
for (const line of lines) {
if (line.startsWith('data: ')) {
const jsonStr = line.substring(6).trim()
if (jsonStr && jsonStr !== '[DONE]') {
try {
const data = JSON.parse(jsonStr)
if (!line.trim()) {
continue // 跳过空行
}
// 解析 SSE 行
const parsed = parseSSELine(line)
// 记录无效的解析(用于调试)
if (parsed.type === 'invalid') {
logger.warn('Failed to parse SSE line:', {
line: parsed.line.substring(0, 100),
error: parsed.error.message
})
continue
}
// 捕获 usage 数据
if (data.response?.usageMetadata) {
totalUsage = data.response.usageMetadata
if (parsed.type === 'data' && parsed.data.response?.usageMetadata) {
totalUsage = parsed.data.response.usageMetadata
logger.debug('📊 Captured Gemini usage data:', totalUsage)
}
// 转换格式并发送
if (!res.destroyed) {
if (parsed.type === 'data') {
// 转换格式:移除 response 包装,直接返回标准 Gemini API 格式
// 注意:不过滤 thought 字段,因为 gemini-cli 会自行处理
if (data.response) {
res.write(`data: ${JSON.stringify(data.response)}\n\n`)
if (parsed.data.response) {
res.write(`data: ${JSON.stringify(parsed.data.response)}\n\n`)
} else {
// 如果没有 response 包装,直接发送
res.write(`data: ${JSON.stringify(data)}\n\n`)
}
} catch (e) {
// 忽略解析错误
}
} else if (jsonStr === '[DONE]') {
// 保持 [DONE] 标记
res.write(`${line}\n\n`)
res.write(`data: ${JSON.stringify(parsed.data)}\n\n`)
}
} else if (parsed.type === 'control') {
// 保持控制消息(如 [DONE])原样
res.write(`${parsed.line}\n\n`)
}
}
}
@@ -578,6 +594,10 @@ async function handleStandardStreamGenerateContent(req, res) {
} catch (error) {
logger.error('Failed to record Gemini usage:', error)
}
} else {
logger.warn(
`⚠️ Stream completed without usage data - totalTokenCount: ${totalUsage.totalTokenCount}`
)
}
res.end()

52
src/utils/sseParser.js Normal file
View File

@@ -0,0 +1,52 @@
/**
* Server-Sent Events (SSE) 解析工具
*
* 用于解析标准 SSE 格式的数据流
* 当前主要用于 Gemini API 的流式响应处理
*
* @module sseParser
*/
/**
* 解析单行 SSE 数据
*
* @param {string} line - SSE 格式的行(如:"data: {json}\n"
* @returns {Object} 解析结果
* @returns {'data'|'control'|'other'|'invalid'} .type - 行类型
* @returns {Object|null} .data - 解析后的 JSON 数据(仅 type='data' 时)
* @returns {string} .line - 原始行内容
* @returns {string} [.jsonStr] - JSON 字符串
* @returns {Error} [.error] - 解析错误(仅 type='invalid' 时)
*
* @example
* // 数据行
* parseSSELine('data: {"key":"value"}')
* // => { type: 'data', data: {key: 'value'}, line: '...', jsonStr: '...' }
*
* @example
* // 控制行
* parseSSELine('data: [DONE]')
* // => { type: 'control', data: null, line: '...', jsonStr: '[DONE]' }
*/
function parseSSELine(line) {
if (!line.startsWith('data: ')) {
return { type: 'other', line, data: null }
}
const jsonStr = line.substring(6).trim()
if (!jsonStr || jsonStr === '[DONE]') {
return { type: 'control', line, data: null, jsonStr }
}
try {
const data = JSON.parse(jsonStr)
return { type: 'data', line, data, jsonStr }
} catch (e) {
return { type: 'invalid', line, data: null, jsonStr, error: e }
}
}
module.exports = {
parseSSELine
}