fix azure openai usage count issue

This commit is contained in:
Feng Yue
2025-08-30 20:45:01 +08:00
parent 70c8cb5aff
commit dc3d311def
2 changed files with 324 additions and 29 deletions

View File

@@ -197,6 +197,13 @@ router.post('/chat/completions', authenticateApiKey, async (req, res) => {
onEnd: async ({ usageData, actualModel }) => { onEnd: async ({ usageData, actualModel }) => {
if (usageData) { if (usageData) {
const modelToRecord = actualModel || req.body.model || 'unknown' const modelToRecord = actualModel || req.body.model || 'unknown'
logger.info(`✅ Usage capture SUCCESS for stream chat request ${requestId}`, {
usageData,
modelToRecord,
inputTokens: usageData.prompt_tokens || usageData.input_tokens || 0,
outputTokens: usageData.completion_tokens || usageData.output_tokens || 0
})
await usageReporter.reportOnce( await usageReporter.reportOnce(
requestId, requestId,
usageData, usageData,
@@ -204,6 +211,14 @@ router.post('/chat/completions', authenticateApiKey, async (req, res) => {
modelToRecord, modelToRecord,
account.id account.id
) )
} else {
logger.error(`❌ Usage capture FAILED for stream chat request ${requestId}`, {
apiKeyId: req.apiKey.id,
model: req.body.model,
account: account.name,
endpoint: 'chat/completions',
isStream: true
})
} }
}, },
onError: (error) => { onError: (error) => {
@@ -219,6 +234,13 @@ router.post('/chat/completions', authenticateApiKey, async (req, res) => {
if (usageData) { if (usageData) {
const modelToRecord = actualModel || req.body.model || 'unknown' const modelToRecord = actualModel || req.body.model || 'unknown'
logger.info(`✅ Usage capture SUCCESS for non-stream chat request ${requestId}`, {
usageData,
modelToRecord,
inputTokens: usageData.prompt_tokens || usageData.input_tokens || 0,
outputTokens: usageData.completion_tokens || usageData.output_tokens || 0
})
await usageReporter.reportOnce( await usageReporter.reportOnce(
requestId, requestId,
usageData, usageData,
@@ -226,6 +248,15 @@ router.post('/chat/completions', authenticateApiKey, async (req, res) => {
modelToRecord, modelToRecord,
account.id account.id
) )
} else {
logger.error(`❌ Usage capture FAILED for non-stream chat request ${requestId}`, {
apiKeyId: req.apiKey.id,
model: req.body.model,
account: account.name,
endpoint: 'chat/completions',
isStream: false,
responseStatus: response.status
})
} }
} }
} catch (error) { } catch (error) {
@@ -314,6 +345,13 @@ router.post('/responses', authenticateApiKey, async (req, res) => {
onEnd: async ({ usageData, actualModel }) => { onEnd: async ({ usageData, actualModel }) => {
if (usageData) { if (usageData) {
const modelToRecord = actualModel || req.body.model || 'unknown' const modelToRecord = actualModel || req.body.model || 'unknown'
logger.info(`✅ Usage capture SUCCESS for stream responses request ${requestId}`, {
usageData,
modelToRecord,
inputTokens: usageData.prompt_tokens || usageData.input_tokens || 0,
outputTokens: usageData.completion_tokens || usageData.output_tokens || 0
})
await usageReporter.reportOnce( await usageReporter.reportOnce(
requestId, requestId,
usageData, usageData,
@@ -321,6 +359,14 @@ router.post('/responses', authenticateApiKey, async (req, res) => {
modelToRecord, modelToRecord,
account.id account.id
) )
} else {
logger.error(`❌ Usage capture FAILED for stream responses request ${requestId}`, {
apiKeyId: req.apiKey.id,
model: req.body.model,
account: account.name,
endpoint: 'responses',
isStream: true
})
} }
}, },
onError: (error) => { onError: (error) => {
@@ -336,6 +382,13 @@ router.post('/responses', authenticateApiKey, async (req, res) => {
if (usageData) { if (usageData) {
const modelToRecord = actualModel || req.body.model || 'unknown' const modelToRecord = actualModel || req.body.model || 'unknown'
logger.info(`✅ Usage capture SUCCESS for non-stream responses request ${requestId}`, {
usageData,
modelToRecord,
inputTokens: usageData.prompt_tokens || usageData.input_tokens || 0,
outputTokens: usageData.completion_tokens || usageData.output_tokens || 0
})
await usageReporter.reportOnce( await usageReporter.reportOnce(
requestId, requestId,
usageData, usageData,
@@ -343,6 +396,15 @@ router.post('/responses', authenticateApiKey, async (req, res) => {
modelToRecord, modelToRecord,
account.id account.id
) )
} else {
logger.error(`❌ Usage capture FAILED for non-stream responses request ${requestId}`, {
apiKeyId: req.apiKey.id,
model: req.body.model,
account: account.name,
endpoint: 'responses',
isStream: false,
responseStatus: response.status
})
} }
} }
} catch (error) { } catch (error) {
@@ -418,7 +480,23 @@ router.post('/embeddings', authenticateApiKey, async (req, res) => {
if (usageData) { if (usageData) {
const modelToRecord = actualModel || req.body.model || 'unknown' const modelToRecord = actualModel || req.body.model || 'unknown'
logger.info(`✅ Usage capture SUCCESS for embeddings request ${requestId}`, {
usageData,
modelToRecord,
inputTokens: usageData.prompt_tokens || usageData.input_tokens || 0,
outputTokens: usageData.completion_tokens || usageData.output_tokens || 0
})
await usageReporter.reportOnce(requestId, usageData, req.apiKey.id, modelToRecord, account.id) await usageReporter.reportOnce(requestId, usageData, req.apiKey.id, modelToRecord, account.id)
} else {
logger.error(`❌ Usage capture FAILED for embeddings request ${requestId}`, {
apiKeyId: req.apiKey.id,
model: req.body.model,
account: account.name,
endpoint: 'embeddings',
isStream: false,
responseStatus: response.status
})
} }
} catch (error) { } catch (error) {
logger.error(`Azure OpenAI embeddings request failed ${requestId}:`, error) logger.error(`Azure OpenAI embeddings request failed ${requestId}:`, error)

View File

@@ -316,6 +316,11 @@ function handleStreamResponse(upstreamResponse, clientResponse, options = {}) {
let hasEnded = false let hasEnded = false
let eventCount = 0 let eventCount = 0
const maxEvents = 10000 // 最大事件数量限制 const maxEvents = 10000 // 最大事件数量限制
// 专门用于保存最后几个chunks以提取usage数据
let finalChunksBuffer = ''
const FINAL_CHUNKS_SIZE = 32 * 1024 // 32KB保留最终chunks
let allParsedEvents = [] // 存储所有解析的事件用于最终usage提取
// 设置响应头 // 设置响应头
clientResponse.setHeader('Content-Type', 'text/event-stream') clientResponse.setHeader('Content-Type', 'text/event-stream')
@@ -341,8 +346,8 @@ function handleStreamResponse(upstreamResponse, clientResponse, options = {}) {
clientResponse.flushHeaders() clientResponse.flushHeaders()
} }
// 解析 SSE 事件以捕获 usage 数据 // 强化的SSE事件解析,保存所有事件用于最终处理
const parseSSEForUsage = (data) => { const parseSSEForUsage = (data, isFromFinalBuffer = false) => {
const lines = data.split('\n') const lines = data.split('\n')
for (const line of lines) { for (const line of lines) {
@@ -353,35 +358,53 @@ function handleStreamResponse(upstreamResponse, clientResponse, options = {}) {
continue continue
} }
const eventData = JSON.parse(jsonStr) const eventData = JSON.parse(jsonStr)
// 保存所有成功解析的事件
allParsedEvents.push(eventData)
// 获取模型信息 // 获取模型信息
if (eventData.model) { if (eventData.model) {
actualModel = eventData.model actualModel = eventData.model
} }
// 获取使用统计Responses API: response.completed -> response.usage // 使用强化的usage提取函数
if (eventData.type === 'response.completed' && eventData.response) { const { usageData: extractedUsage, actualModel: extractedModel } = extractUsageDataRobust(
if (eventData.response.model) { eventData,
actualModel = eventData.response.model `stream-event-${isFromFinalBuffer ? 'final' : 'normal'}`
)
if (extractedUsage && !usageData) {
usageData = extractedUsage
if (extractedModel) actualModel = extractedModel
logger.debug(`🎯 Stream usage captured via robust extraction`, {
isFromFinalBuffer,
usageData,
actualModel
})
}
// 原有的简单提取作为备用
if (!usageData) {
// 获取使用统计Responses API: response.completed -> response.usage
if (eventData.type === 'response.completed' && eventData.response) {
if (eventData.response.model) {
actualModel = eventData.response.model
}
if (eventData.response.usage) {
usageData = eventData.response.usage
logger.debug('🎯 Stream usage (backup method - response.usage):', usageData)
}
} }
if (eventData.response.usage) {
usageData = eventData.response.usage // 兼容 Chat Completions 风格(顶层 usage
logger.debug('Captured Azure OpenAI nested usage (response.usage):', usageData) if (!usageData && eventData.usage) {
usageData = eventData.usage
logger.debug('🎯 Stream usage (backup method - top-level):', usageData)
} }
} }
// 兼容 Chat Completions 风格(顶层 usage
if (!usageData && eventData.usage) {
usageData = eventData.usage
logger.debug('Captured Azure OpenAI usage (top-level):', usageData)
}
// 检查是否是完成事件
if (eventData.choices && eventData.choices[0] && eventData.choices[0].finish_reason) {
// 这是最后一个 chunk
}
} catch (e) { } catch (e) {
// 忽略解析错误 logger.debug('SSE parsing error (expected for incomplete chunks):', e.message)
} }
} }
} }
@@ -431,10 +454,17 @@ function handleStreamResponse(upstreamResponse, clientResponse, options = {}) {
// 同时解析数据以捕获 usage 信息,带缓冲区大小限制 // 同时解析数据以捕获 usage 信息,带缓冲区大小限制
buffer += chunkStr buffer += chunkStr
// 防止缓冲区过大 // 保留最后的chunks用于最终usage提取不被truncate影响
finalChunksBuffer += chunkStr
if (finalChunksBuffer.length > FINAL_CHUNKS_SIZE) {
finalChunksBuffer = finalChunksBuffer.slice(-FINAL_CHUNKS_SIZE)
}
// 防止主缓冲区过大 - 但保持最后部分用于usage解析
if (buffer.length > MAX_BUFFER_SIZE) { if (buffer.length > MAX_BUFFER_SIZE) {
logger.warn(`Stream ${streamId} buffer exceeded limit, truncating`) logger.warn(`Stream ${streamId} buffer exceeded limit, truncating main buffer but preserving final chunks`)
buffer = buffer.slice(-MAX_BUFFER_SIZE / 2) // 保留后一半 // 保留最后1/4而不是1/2为usage数据留更多空间
buffer = buffer.slice(-MAX_BUFFER_SIZE / 4)
} }
// 处理完整的 SSE 事件 // 处理完整的 SSE 事件
@@ -470,9 +500,86 @@ function handleStreamResponse(upstreamResponse, clientResponse, options = {}) {
hasEnded = true hasEnded = true
try { try {
// 处理剩余的 buffer logger.debug(`🔚 Stream ended, performing comprehensive usage extraction for ${streamId}`, {
if (buffer.trim() && buffer.length <= MAX_EVENT_SIZE) { mainBufferSize: buffer.length,
parseSSEForUsage(buffer) finalChunksBufferSize: finalChunksBuffer.length,
parsedEventsCount: allParsedEvents.length,
hasUsageData: !!usageData
})
// 多层次的最终usage提取策略
if (!usageData) {
logger.debug('🔍 No usage found during stream, trying final extraction methods...')
// 方法1: 解析剩余的主buffer
if (buffer.trim() && buffer.length <= MAX_EVENT_SIZE) {
parseSSEForUsage(buffer, false)
}
// 方法2: 解析保留的final chunks buffer
if (!usageData && finalChunksBuffer.trim()) {
logger.debug('🔍 Trying final chunks buffer for usage extraction...')
parseSSEForUsage(finalChunksBuffer, true)
}
// 方法3: 从所有解析的事件中重新搜索usage
if (!usageData && allParsedEvents.length > 0) {
logger.debug('🔍 Searching through all parsed events for usage...')
// 倒序查找因为usage通常在最后
for (let i = allParsedEvents.length - 1; i >= 0; i--) {
const { usageData: foundUsage, actualModel: foundModel } = extractUsageDataRobust(
allParsedEvents[i],
`final-event-scan-${i}`
)
if (foundUsage) {
usageData = foundUsage
if (foundModel) actualModel = foundModel
logger.debug(`🎯 Usage found in event ${i} during final scan!`)
break
}
}
}
// 方法4: 尝试合并所有事件并搜索
if (!usageData && allParsedEvents.length > 0) {
logger.debug('🔍 Trying combined events analysis...')
const combinedData = {
events: allParsedEvents,
lastEvent: allParsedEvents[allParsedEvents.length - 1],
eventCount: allParsedEvents.length
}
const { usageData: combinedUsage } = extractUsageDataRobust(combinedData, 'combined-events')
if (combinedUsage) {
usageData = combinedUsage
logger.debug('🎯 Usage found via combined events analysis!')
}
}
}
// 最终usage状态报告
if (usageData) {
logger.debug('✅ Final stream usage extraction SUCCESS', {
streamId,
usageData,
actualModel,
totalEvents: allParsedEvents.length,
finalBufferSize: finalChunksBuffer.length
})
} else {
logger.warn('❌ Final stream usage extraction FAILED', {
streamId,
totalEvents: allParsedEvents.length,
finalBufferSize: finalChunksBuffer.length,
mainBufferSize: buffer.length,
lastFewEvents: allParsedEvents.slice(-3).map(e => ({
type: e.type,
hasUsage: !!e.usage,
hasResponse: !!e.response,
keys: Object.keys(e)
}))
})
} }
if (onEnd) { if (onEnd) {
@@ -528,6 +635,117 @@ function handleStreamResponse(upstreamResponse, clientResponse, options = {}) {
}) })
} }
// 强化的用量数据提取函数
function extractUsageDataRobust(responseData, context = 'unknown') {
logger.debug(`🔍 Attempting usage extraction for ${context}`, {
responseDataKeys: Object.keys(responseData || {}),
responseDataType: typeof responseData,
hasUsage: !!responseData?.usage,
hasResponse: !!responseData?.response
})
let usageData = null
let actualModel = null
try {
// 策略 1: 顶层 usage (标准 Chat Completions)
if (responseData?.usage) {
usageData = responseData.usage
actualModel = responseData.model
logger.debug('✅ Usage extracted via Strategy 1 (top-level)', { usageData, actualModel })
}
// 策略 2: response.usage (Responses API)
else if (responseData?.response?.usage) {
usageData = responseData.response.usage
actualModel = responseData.response.model || responseData.model
logger.debug('✅ Usage extracted via Strategy 2 (response.usage)', { usageData, actualModel })
}
// 策略 3: 嵌套搜索 - 深度查找 usage 字段
else {
const findUsageRecursive = (obj, path = '') => {
if (!obj || typeof obj !== 'object') return null
for (const [key, value] of Object.entries(obj)) {
const currentPath = path ? `${path}.${key}` : key
if (key === 'usage' && value && typeof value === 'object') {
logger.debug(`✅ Usage found at path: ${currentPath}`, value)
return { usage: value, path: currentPath }
}
if (typeof value === 'object' && value !== null) {
const nested = findUsageRecursive(value, currentPath)
if (nested) return nested
}
}
return null
}
const found = findUsageRecursive(responseData)
if (found) {
usageData = found.usage
// Try to find model in the same parent object
const pathParts = found.path.split('.')
pathParts.pop() // remove 'usage'
let modelParent = responseData
for (const part of pathParts) {
modelParent = modelParent?.[part]
}
actualModel = modelParent?.model || responseData?.model
logger.debug('✅ Usage extracted via Strategy 3 (recursive)', {
usageData,
actualModel,
foundPath: found.path
})
}
}
// 策略 4: 特殊响应格式处理
if (!usageData) {
// 检查是否有 choices 数组usage 可能在最后一个 choice 中
if (responseData?.choices?.length > 0) {
const lastChoice = responseData.choices[responseData.choices.length - 1]
if (lastChoice?.usage) {
usageData = lastChoice.usage
actualModel = responseData.model || lastChoice.model
logger.debug('✅ Usage extracted via Strategy 4 (choices)', { usageData, actualModel })
}
}
}
// 最终验证和记录
if (usageData) {
logger.debug('🎯 Final usage extraction result', {
context,
usageData,
actualModel,
inputTokens: usageData.prompt_tokens || usageData.input_tokens || 0,
outputTokens: usageData.completion_tokens || usageData.output_tokens || 0,
totalTokens: usageData.total_tokens || 0
})
} else {
logger.warn('❌ Failed to extract usage data', {
context,
responseDataStructure: JSON.stringify(responseData, null, 2).substring(0, 1000) + '...',
availableKeys: Object.keys(responseData || {}),
responseSize: JSON.stringify(responseData || {}).length
})
}
} catch (extractionError) {
logger.error('🚨 Error during usage extraction', {
context,
error: extractionError.message,
stack: extractionError.stack,
responseDataType: typeof responseData
})
}
return { usageData, actualModel }
}
// 处理非流式响应 // 处理非流式响应
function handleNonStreamResponse(upstreamResponse, clientResponse) { function handleNonStreamResponse(upstreamResponse, clientResponse) {
try { try {
@@ -554,9 +772,8 @@ function handleNonStreamResponse(upstreamResponse, clientResponse) {
const responseData = upstreamResponse.data const responseData = upstreamResponse.data
clientResponse.json(responseData) clientResponse.json(responseData)
// 提取 usage 数据 // 使用强化的用量提取
const usageData = responseData.usage const { usageData, actualModel } = extractUsageDataRobust(responseData, 'non-stream')
const actualModel = responseData.model
return { usageData, actualModel, responseData } return { usageData, actualModel, responseData }
} catch (error) { } catch (error) {