mirror of
https://github.com/Wei-Shaw/claude-relay-service.git
synced 2026-03-30 02:31:33 +00:00
Merge pull request #981 from codsaf/fix/gemini-stream-usage [skip ci]
fix(gemini): resolve stream usage data loss due to split chunks
This commit is contained in:
@@ -642,6 +642,7 @@ async function handleMessages(req, res) {
|
|||||||
candidatesTokenCount: 0,
|
candidatesTokenCount: 0,
|
||||||
totalTokenCount: 0
|
totalTokenCount: 0
|
||||||
}
|
}
|
||||||
|
let streamBuffer = ''
|
||||||
|
|
||||||
geminiResponse.on('data', (chunk) => {
|
geminiResponse.on('data', (chunk) => {
|
||||||
try {
|
try {
|
||||||
@@ -649,7 +650,18 @@ async function handleMessages(req, res) {
|
|||||||
res.write(chunkStr)
|
res.write(chunkStr)
|
||||||
|
|
||||||
// 尝试从 SSE 流中提取 usage 数据
|
// 尝试从 SSE 流中提取 usage 数据
|
||||||
const lines = chunkStr.split('\n')
|
streamBuffer += chunkStr
|
||||||
|
|
||||||
|
// 如果 buffer 过大,进行保护性清理(防止内存泄漏)
|
||||||
|
if (streamBuffer.length > 1024 * 1024) {
|
||||||
|
// 1MB
|
||||||
|
streamBuffer = streamBuffer.slice(-1024 * 64) // 只保留最后 64KB
|
||||||
|
}
|
||||||
|
|
||||||
|
const lines = streamBuffer.split('\n')
|
||||||
|
// 保留最后一行(可能不完整)
|
||||||
|
streamBuffer = lines.pop() || ''
|
||||||
|
|
||||||
for (const line of lines) {
|
for (const line of lines) {
|
||||||
if (line.startsWith('data:')) {
|
if (line.startsWith('data:')) {
|
||||||
const data = line.substring(5).trim()
|
const data = line.substring(5).trim()
|
||||||
@@ -1945,7 +1957,7 @@ async function handleStreamGenerateContent(req, res) {
|
|||||||
res.setHeader('X-Accel-Buffering', 'no')
|
res.setHeader('X-Accel-Buffering', 'no')
|
||||||
|
|
||||||
// 处理流式响应并捕获usage数据
|
// 处理流式响应并捕获usage数据
|
||||||
let streamBuffer = ''
|
let streamBuffer = '' // 移动到 data 事件处理器外部,保持状态
|
||||||
let totalUsage = {
|
let totalUsage = {
|
||||||
promptTokenCount: 0,
|
promptTokenCount: 0,
|
||||||
candidatesTokenCount: 0,
|
candidatesTokenCount: 0,
|
||||||
@@ -1977,37 +1989,53 @@ async function handleStreamGenerateContent(req, res) {
|
|||||||
res.write(chunk)
|
res.write(chunk)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 异步提取 usage 数据
|
// 提取 usage 数据
|
||||||
setImmediate(() => {
|
try {
|
||||||
try {
|
const chunkStr = chunk.toString()
|
||||||
const chunkStr = chunk.toString()
|
streamBuffer += chunkStr
|
||||||
if (!chunkStr.trim() || !chunkStr.includes('usageMetadata')) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
streamBuffer += chunkStr
|
// 如果 buffer 过大,进行保护性清理(防止内存泄漏)
|
||||||
const lines = streamBuffer.split('\n')
|
if (streamBuffer.length > 1024 * 1024) {
|
||||||
streamBuffer = lines.pop() || ''
|
// 1MB
|
||||||
|
streamBuffer = streamBuffer.slice(-1024 * 64) // 只保留最后 64KB
|
||||||
for (const line of lines) {
|
|
||||||
if (!line.trim() || !line.includes('usageMetadata')) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
const parsed = parseSSELine(line)
|
|
||||||
if (parsed.type === 'data' && parsed.data.response?.usageMetadata) {
|
|
||||||
totalUsage = parsed.data.response.usageMetadata
|
|
||||||
logger.debug('📊 Captured Gemini usage data:', totalUsage)
|
|
||||||
}
|
|
||||||
} catch (parseError) {
|
|
||||||
logger.warn('⚠️ Failed to parse usage line:', parseError.message)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
logger.warn('⚠️ Error extracting usage data:', error.message)
|
|
||||||
}
|
}
|
||||||
})
|
|
||||||
|
const lines = streamBuffer.split('\n')
|
||||||
|
// 保留最后一行(可能不完整)
|
||||||
|
streamBuffer = lines.pop() || ''
|
||||||
|
|
||||||
|
for (const line of lines) {
|
||||||
|
// 只处理可能包含数据的行
|
||||||
|
if (!line.trim() || !line.startsWith('data:')) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// <20><>试解析 SSE 行
|
||||||
|
const parsed = parseSSELine(line)
|
||||||
|
|
||||||
|
// 检查各种可能的 usage 位置
|
||||||
|
let extractedUsage = null
|
||||||
|
|
||||||
|
if (parsed.type === 'data') {
|
||||||
|
if (parsed.data.response?.usageMetadata) {
|
||||||
|
extractedUsage = parsed.data.response.usageMetadata
|
||||||
|
} else if (parsed.data.usageMetadata) {
|
||||||
|
extractedUsage = parsed.data.usageMetadata
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (extractedUsage) {
|
||||||
|
totalUsage = extractedUsage
|
||||||
|
logger.debug('📊 Captured Gemini usage data:', totalUsage)
|
||||||
|
}
|
||||||
|
} catch (parseError) {
|
||||||
|
// 解析失败忽略,可能是非 JSON 数据
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
logger.warn('⚠️ Error extracting usage data:', error.message)
|
||||||
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error('Error processing stream chunk:', error)
|
logger.error('Error processing stream chunk:', error)
|
||||||
}
|
}
|
||||||
@@ -2763,26 +2791,24 @@ async function handleStandardStreamGenerateContent(req, res) {
|
|||||||
res.write(outputChunk)
|
res.write(outputChunk)
|
||||||
}
|
}
|
||||||
|
|
||||||
setImmediate(() => {
|
try {
|
||||||
try {
|
const usageSource =
|
||||||
const usageSource =
|
processedPayload && processedPayload !== '[DONE]' ? processedPayload : dataPayload
|
||||||
processedPayload && processedPayload !== '[DONE]' ? processedPayload : dataPayload
|
|
||||||
|
|
||||||
if (!usageSource || !usageSource.includes('usageMetadata')) {
|
if (!usageSource || !usageSource.includes('usageMetadata')) {
|
||||||
return
|
return
|
||||||
}
|
|
||||||
|
|
||||||
const usageObj = JSON.parse(usageSource)
|
|
||||||
const usage = usageObj.usageMetadata || usageObj.response?.usageMetadata || usageObj.usage
|
|
||||||
|
|
||||||
if (usage && typeof usage === 'object') {
|
|
||||||
totalUsage = usage
|
|
||||||
logger.debug('📊 Captured Gemini usage data (async):', totalUsage)
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
// 提取用量失败时忽略
|
|
||||||
}
|
}
|
||||||
})
|
|
||||||
|
const usageObj = JSON.parse(usageSource)
|
||||||
|
const usage = usageObj.usageMetadata || usageObj.response?.usageMetadata || usageObj.usage
|
||||||
|
|
||||||
|
if (usage && typeof usage === 'object') {
|
||||||
|
totalUsage = usage
|
||||||
|
logger.debug('📊 Captured Gemini usage data (async):', totalUsage)
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
// 提取用量失败时忽略
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamResponse.on('data', (chunk) => {
|
streamResponse.on('data', (chunk) => {
|
||||||
|
|||||||
Reference in New Issue
Block a user