fix: 修复Gemini v1beta流式响应中断问题

- 优化SSE流式响应处理逻辑,修复客户端接收第一条消息后断开连接的问题
- 统一流处理缓冲区,正确处理不完整的SSE行
- v1beta版本返回response字段内容,v1internal保持原始转发
- 移除调试日志输出,提升生产环境稳定性

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
zjpyb
2025-08-28 02:38:01 +08:00
parent a7009e6864
commit fb57cfd293

View File

@@ -708,8 +708,28 @@ async function handleStreamGenerateContent(req, res) {
res.setHeader('Connection', 'keep-alive') res.setHeader('Connection', 'keep-alive')
res.setHeader('X-Accel-Buffering', 'no') res.setHeader('X-Accel-Buffering', 'no')
// SSE 解析函数
const parseSSELine = (line) => {
if (!line.startsWith('data: ')) {
return { type: 'other', line, data: null }
}
const jsonStr = line.substring(6).trim()
if (!jsonStr || jsonStr === '[DONE]') {
return { type: 'control', line, data: null, jsonStr }
}
try {
const data = JSON.parse(jsonStr)
return { type: 'data', line, data, jsonStr }
} catch (e) {
return { type: 'invalid', line, data: null, jsonStr, error: e }
}
}
// 处理流式响应并捕获usage数据 // 处理流式响应并捕获usage数据
let buffer = '' let streamBuffer = '' // 统一的流处理缓冲区
let totalUsage = { let totalUsage = {
promptTokenCount: 0, promptTokenCount: 0,
candidatesTokenCount: 0, candidatesTokenCount: 0,
@@ -721,32 +741,60 @@ async function handleStreamGenerateContent(req, res) {
try { try {
const chunkStr = chunk.toString() const chunkStr = chunk.toString()
// 直接转发数据到客户端 if (!chunkStr.trim()) {
if (!res.destroyed) { return
res.write(chunkStr)
} }
// 同时解析数据以捕获usage信息 // 使用统一缓冲区处理不完整的行
buffer += chunkStr streamBuffer += chunkStr
const lines = buffer.split('\n') const lines = streamBuffer.split('\n')
buffer = lines.pop() || '' streamBuffer = lines.pop() || '' // 保留最后一个不完整的行
const processedLines = []
for (const line of lines) { for (const line of lines) {
if (line.startsWith('data: ') && line.length > 6) { if (!line.trim()) {
try { continue // 跳过空行,不添加到处理队列
const jsonStr = line.slice(6) }
if (jsonStr && jsonStr !== '[DONE]') {
const data = JSON.parse(jsonStr)
// 从响应中提取usage数据 // 解析 SSE 行
if (data.response?.usageMetadata) { const parsed = parseSSELine(line)
totalUsage = data.response.usageMetadata
logger.debug('📊 Captured Gemini usage data:', totalUsage) // 提取 usage 数据(适用于所有版本)
} if (parsed.type === 'data' && parsed.data.response?.usageMetadata) {
totalUsage = parsed.data.response.usageMetadata
logger.debug('📊 Captured Gemini usage data:', totalUsage)
}
// 根据版本处理输出
if (version === 'v1beta') {
if (parsed.type === 'data') {
if (parsed.data.response) {
// 有 response 字段,只返回 response 的内容
processedLines.push(`data: ${JSON.stringify(parsed.data.response)}`)
} else {
// 没有 response 字段,返回整个数据对象
processedLines.push(`data: ${JSON.stringify(parsed.data)}`)
} }
} catch (e) { } else if (parsed.type === 'control') {
// 忽略解析错误 // 控制消息(如 [DONE])保持原样
processedLines.push(line)
} }
// 跳过其他类型的行('other', 'invalid'
}
}
// 发送数据到客户端
if (version === 'v1beta') {
for (const line of processedLines) {
if (!res.destroyed) {
res.write(`${line}\n\n`)
}
}
} else {
// v1internal 直接转发原始数据
if (!res.destroyed) {
res.write(chunkStr)
} }
} }
} catch (error) { } catch (error) {