fix: 修复Gemini v1beta流式响应中断问题

- 优化SSE流式响应处理逻辑,修复客户端接收第一条消息后断开连接的问题
- 统一流处理缓冲区,正确处理不完整的SSE行
- v1beta版本返回response字段内容,v1internal保持原始转发
- 移除调试日志输出,提升生产环境稳定性

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
zjpyb
2025-08-28 02:38:01 +08:00
committed by shaw
parent 7ce55c006e
commit 7c3257764c

View File

@@ -708,8 +708,28 @@ async function handleStreamGenerateContent(req, res) {
res.setHeader('Connection', 'keep-alive')
res.setHeader('X-Accel-Buffering', 'no')
// SSE 解析函数
const parseSSELine = (line) => {
if (!line.startsWith('data: ')) {
return { type: 'other', line, data: null }
}
const jsonStr = line.substring(6).trim()
if (!jsonStr || jsonStr === '[DONE]') {
return { type: 'control', line, data: null, jsonStr }
}
try {
const data = JSON.parse(jsonStr)
return { type: 'data', line, data, jsonStr }
} catch (e) {
return { type: 'invalid', line, data: null, jsonStr, error: e }
}
}
// 处理流式响应并捕获usage数据
let buffer = ''
let streamBuffer = '' // 统一的流处理缓冲区
let totalUsage = {
promptTokenCount: 0,
candidatesTokenCount: 0,
@@ -721,32 +741,60 @@ async function handleStreamGenerateContent(req, res) {
try {
const chunkStr = chunk.toString()
// 直接转发数据到客户端
if (!res.destroyed) {
res.write(chunkStr)
if (!chunkStr.trim()) {
return
}
// 同时解析数据以捕获usage信息
buffer += chunkStr
const lines = buffer.split('\n')
buffer = lines.pop() || ''
// 使用统一缓冲区处理不完整的行
streamBuffer += chunkStr
const lines = streamBuffer.split('\n')
streamBuffer = lines.pop() || '' // 保留最后一个不完整的行
const processedLines = []
for (const line of lines) {
if (line.startsWith('data: ') && line.length > 6) {
try {
const jsonStr = line.slice(6)
if (jsonStr && jsonStr !== '[DONE]') {
const data = JSON.parse(jsonStr)
if (!line.trim()) {
continue // 跳过空行,不添加到处理队列
}
// 从响应中提取usage数据
if (data.response?.usageMetadata) {
totalUsage = data.response.usageMetadata
logger.debug('📊 Captured Gemini usage data:', totalUsage)
}
// 解析 SSE 行
const parsed = parseSSELine(line)
// 提取 usage 数据(适用于所有版本)
if (parsed.type === 'data' && parsed.data.response?.usageMetadata) {
totalUsage = parsed.data.response.usageMetadata
logger.debug('📊 Captured Gemini usage data:', totalUsage)
}
// 根据版本处理输出
if (version === 'v1beta') {
if (parsed.type === 'data') {
if (parsed.data.response) {
// 有 response 字段,只返回 response 的内容
processedLines.push(`data: ${JSON.stringify(parsed.data.response)}`)
} else {
// 没有 response 字段,返回整个数据对象
processedLines.push(`data: ${JSON.stringify(parsed.data)}`)
}
} catch (e) {
// 忽略解析错误
} else if (parsed.type === 'control') {
// 控制消息(如 [DONE])保持原样
processedLines.push(line)
}
// 跳过其他类型的行('other', 'invalid'
}
}
// 发送数据到客户端
if (version === 'v1beta') {
for (const line of processedLines) {
if (!res.destroyed) {
res.write(`${line}\n\n`)
}
}
} else {
// v1internal 直接转发原始数据
if (!res.destroyed) {
res.write(chunkStr)
}
}
} catch (error) {