fix: split SSE chunks per event to avoid JSON parse errors

This commit is contained in:
shaw
2025-11-22 18:10:54 +08:00
parent 22e10c57ea
commit c33771ef82

View File

@@ -547,64 +547,94 @@ async function handleStandardStreamGenerateContent(req, res) {
heartbeatTimer = setInterval(sendHeartbeat, HEARTBEAT_INTERVAL) heartbeatTimer = setInterval(sendHeartbeat, HEARTBEAT_INTERVAL)
// 缓冲区:有些 chunk 内会包含多条 SSE 事件,需要拆分
let sseBuffer = ''
// 处理单个 SSE 事件块(不含结尾空行)
const handleEventBlock = (evt) => {
if (!evt.trim()) {
return
}
// 取出所有 data 行并拼接(兼容多行 data
const dataLines = evt.split(/\r?\n/).filter((line) => line.startsWith('data:'))
if (dataLines.length === 0) {
// 非 data 事件,直接原样转发
if (!res.destroyed) {
res.write(`${evt}\n\n`)
}
return
}
const dataPayload = dataLines.map((line) => line.replace(/^data:\s?/, '')).join('\n')
let processedPayload = null
let parsed = null
if (dataPayload === '[DONE]') {
processedPayload = '[DONE]'
} else {
try {
parsed = JSON.parse(dataPayload)
// 捕获 usage如果在顶层或 response 内都有可能)
if (parsed.usageMetadata) {
totalUsage = parsed.usageMetadata
} else if (parsed.response?.usageMetadata) {
totalUsage = parsed.response.usageMetadata
}
// 提取 response 并重新包装
processedPayload = JSON.stringify(parsed.response || parsed)
} catch (e) {
// 解析失败,直接转发原始 data
}
}
const outputChunk = processedPayload === null ? `${evt}\n\n` : `data: ${processedPayload}\n\n`
// 1⃣ 立即转发处理后的数据
if (!res.destroyed) {
res.write(outputChunk)
}
// 2⃣ 异步提取 usage 数据(兜底,防止上面解析失败未捕获)
setImmediate(() => {
try {
const usageSource =
processedPayload && processedPayload !== '[DONE]' ? processedPayload : dataPayload
if (!usageSource || !usageSource.includes('usageMetadata')) {
return
}
// 再尝试一次解析
const usageObj = JSON.parse(usageSource)
const usage = usageObj.usageMetadata || usageObj.response?.usageMetadata || usageObj.usage
if (usage && typeof usage === 'object') {
totalUsage = usage
logger.debug('📊 Captured Gemini usage data (async):', totalUsage)
}
} catch (error) {
// 提取用量失败时忽略
}
})
}
streamResponse.on('data', (chunk) => { streamResponse.on('data', (chunk) => {
try { try {
// 更新最后数据时间 // 更新最后数据时间
lastDataTime = Date.now() lastDataTime = Date.now()
const chunkStr = chunk.toString() // 追加到缓冲区后按双换行拆分事件
sseBuffer += chunk.toString()
const events = sseBuffer.split(/\r?\n\r?\n/)
sseBuffer = events.pop() || ''
// 尝试解析 SSE 数据 for (const evt of events) {
// upstream 返回格式: data: {"response": {...}} handleEventBlock(evt)
// standard API 期望格式: data: {...}
let processedChunk = chunk
if (chunkStr.startsWith('data: ')) {
try {
const jsonStr = chunkStr.substring(6).trim()
if (jsonStr !== '[DONE]') {
const data = JSON.parse(jsonStr)
if (data.response) {
// 提取内部的 response 对象并重新包装为 SSE
const newPayload = JSON.stringify(data.response)
processedChunk = Buffer.from(`data: ${newPayload}\n\n`)
} }
}
} catch (e) {
// 解析失败,直接转发原始数据
// logger.warn('Failed to parse SSE chunk:', e)
}
}
// 1⃣ 立即转发处理后的数据
if (!res.destroyed) {
res.write(processedChunk)
}
// 2⃣ 异步提取 usage 数据(不阻塞转发)
setImmediate(() => {
try {
const str = processedChunk.toString()
if (!str.trim() || !str.includes('usageMetadata')) {
return
}
// 简单的解析尝试
const match = str.match(/"usageMetadata":\s*({[^}]+})/)
if (match && match[1]) {
try {
const usage = JSON.parse(match[1])
totalUsage = usage
logger.debug('📊 Captured Gemini usage data:', totalUsage)
} catch (e) {
// ignore
}
}
} catch (error) {
logger.warn('⚠️ Error extracting usage data:', error.message)
}
})
} catch (error) { } catch (error) {
logger.error('Error processing stream chunk:', error) logger.error('Error processing stream chunk:', error)
} }
@@ -613,6 +643,16 @@ async function handleStandardStreamGenerateContent(req, res) {
streamResponse.on('end', () => { streamResponse.on('end', () => {
logger.info('Stream completed successfully') logger.info('Stream completed successfully')
// 处理可能残留在缓冲区的事件(上游未以空行结尾的情况)
if (sseBuffer.trim()) {
try {
handleEventBlock(sseBuffer)
} catch (flushError) {
// 忽略 flush 期间的异常
}
sseBuffer = ''
}
// 清理心跳定时器 // 清理心跳定时器
if (heartbeatTimer) { if (heartbeatTimer) {
clearInterval(heartbeatTimer) clearInterval(heartbeatTimer)