From dc3d311def427fa1c635620e0f7d76cc007fe1c2 Mon Sep 17 00:00:00 2001 From: Feng Yue <2525275@gmail.com> Date: Sat, 30 Aug 2025 20:45:01 +0800 Subject: [PATCH] fix azure openai usage count issue --- src/routes/azureOpenaiRoutes.js | 78 +++++++ src/services/azureOpenaiRelayService.js | 275 +++++++++++++++++++++--- 2 files changed, 324 insertions(+), 29 deletions(-) diff --git a/src/routes/azureOpenaiRoutes.js b/src/routes/azureOpenaiRoutes.js index 5de9117f..897442be 100644 --- a/src/routes/azureOpenaiRoutes.js +++ b/src/routes/azureOpenaiRoutes.js @@ -197,6 +197,13 @@ router.post('/chat/completions', authenticateApiKey, async (req, res) => { onEnd: async ({ usageData, actualModel }) => { if (usageData) { const modelToRecord = actualModel || req.body.model || 'unknown' + logger.info(`✅ Usage capture SUCCESS for stream chat request ${requestId}`, { + usageData, + modelToRecord, + inputTokens: usageData.prompt_tokens || usageData.input_tokens || 0, + outputTokens: usageData.completion_tokens || usageData.output_tokens || 0 + }) + await usageReporter.reportOnce( requestId, usageData, @@ -204,6 +211,14 @@ router.post('/chat/completions', authenticateApiKey, async (req, res) => { modelToRecord, account.id ) + } else { + logger.error(`❌ Usage capture FAILED for stream chat request ${requestId}`, { + apiKeyId: req.apiKey.id, + model: req.body.model, + account: account.name, + endpoint: 'chat/completions', + isStream: true + }) } }, onError: (error) => { @@ -219,6 +234,13 @@ router.post('/chat/completions', authenticateApiKey, async (req, res) => { if (usageData) { const modelToRecord = actualModel || req.body.model || 'unknown' + logger.info(`✅ Usage capture SUCCESS for non-stream chat request ${requestId}`, { + usageData, + modelToRecord, + inputTokens: usageData.prompt_tokens || usageData.input_tokens || 0, + outputTokens: usageData.completion_tokens || usageData.output_tokens || 0 + }) + await usageReporter.reportOnce( requestId, usageData, @@ -226,6 +248,15 @@ router.post('/chat/completions', authenticateApiKey, async (req, res) => { modelToRecord, account.id ) + } else { + logger.error(`❌ Usage capture FAILED for non-stream chat request ${requestId}`, { + apiKeyId: req.apiKey.id, + model: req.body.model, + account: account.name, + endpoint: 'chat/completions', + isStream: false, + responseStatus: response.status + }) } } } catch (error) { @@ -314,6 +345,13 @@ router.post('/responses', authenticateApiKey, async (req, res) => { onEnd: async ({ usageData, actualModel }) => { if (usageData) { const modelToRecord = actualModel || req.body.model || 'unknown' + logger.info(`✅ Usage capture SUCCESS for stream responses request ${requestId}`, { + usageData, + modelToRecord, + inputTokens: usageData.prompt_tokens || usageData.input_tokens || 0, + outputTokens: usageData.completion_tokens || usageData.output_tokens || 0 + }) + await usageReporter.reportOnce( requestId, usageData, @@ -321,6 +359,14 @@ router.post('/responses', authenticateApiKey, async (req, res) => { modelToRecord, account.id ) + } else { + logger.error(`❌ Usage capture FAILED for stream responses request ${requestId}`, { + apiKeyId: req.apiKey.id, + model: req.body.model, + account: account.name, + endpoint: 'responses', + isStream: true + }) } }, onError: (error) => { @@ -336,6 +382,13 @@ router.post('/responses', authenticateApiKey, async (req, res) => { if (usageData) { const modelToRecord = actualModel || req.body.model || 'unknown' + logger.info(`✅ Usage capture SUCCESS for non-stream responses request ${requestId}`, { + usageData, + modelToRecord, + inputTokens: usageData.prompt_tokens || usageData.input_tokens || 0, + outputTokens: usageData.completion_tokens || usageData.output_tokens || 0 + }) + await usageReporter.reportOnce( requestId, usageData, @@ -343,6 +396,15 @@ router.post('/responses', authenticateApiKey, async (req, res) => { modelToRecord, account.id ) + } else { + logger.error(`❌ Usage capture FAILED for non-stream responses request ${requestId}`, { + apiKeyId: req.apiKey.id, + model: req.body.model, + account: account.name, + endpoint: 'responses', + isStream: false, + responseStatus: response.status + }) } } } catch (error) { @@ -418,7 +480,23 @@ router.post('/embeddings', authenticateApiKey, async (req, res) => { if (usageData) { const modelToRecord = actualModel || req.body.model || 'unknown' + logger.info(`✅ Usage capture SUCCESS for embeddings request ${requestId}`, { + usageData, + modelToRecord, + inputTokens: usageData.prompt_tokens || usageData.input_tokens || 0, + outputTokens: usageData.completion_tokens || usageData.output_tokens || 0 + }) + await usageReporter.reportOnce(requestId, usageData, req.apiKey.id, modelToRecord, account.id) + } else { + logger.error(`❌ Usage capture FAILED for embeddings request ${requestId}`, { + apiKeyId: req.apiKey.id, + model: req.body.model, + account: account.name, + endpoint: 'embeddings', + isStream: false, + responseStatus: response.status + }) } } catch (error) { logger.error(`Azure OpenAI embeddings request failed ${requestId}:`, error) diff --git a/src/services/azureOpenaiRelayService.js b/src/services/azureOpenaiRelayService.js index dab115a6..82ed57d4 100644 --- a/src/services/azureOpenaiRelayService.js +++ b/src/services/azureOpenaiRelayService.js @@ -316,6 +316,11 @@ function handleStreamResponse(upstreamResponse, clientResponse, options = {}) { let hasEnded = false let eventCount = 0 const maxEvents = 10000 // 最大事件数量限制 + + // 专门用于保存最后几个chunks以提取usage数据 + let finalChunksBuffer = '' + const FINAL_CHUNKS_SIZE = 32 * 1024 // 32KB保留最终chunks + let allParsedEvents = [] // 存储所有解析的事件用于最终usage提取 // 设置响应头 clientResponse.setHeader('Content-Type', 'text/event-stream') @@ -341,8 +346,8 @@ function handleStreamResponse(upstreamResponse, clientResponse, options = {}) { clientResponse.flushHeaders() } - // 解析 SSE 事件以捕获 usage 数据 - const parseSSEForUsage = (data) => { + // 强化的SSE事件解析,保存所有事件用于最终处理 + const parseSSEForUsage = (data, isFromFinalBuffer = false) => { const lines = data.split('\n') for (const line of lines) { @@ -353,35 +358,53 @@ function handleStreamResponse(upstreamResponse, clientResponse, options = {}) { continue } const eventData = JSON.parse(jsonStr) + + // 保存所有成功解析的事件 + allParsedEvents.push(eventData) // 获取模型信息 if (eventData.model) { actualModel = eventData.model } - // 获取使用统计(Responses API: response.completed -> response.usage) - if (eventData.type === 'response.completed' && eventData.response) { - if (eventData.response.model) { - actualModel = eventData.response.model + // 使用强化的usage提取函数 + const { usageData: extractedUsage, actualModel: extractedModel } = extractUsageDataRobust( + eventData, + `stream-event-${isFromFinalBuffer ? 'final' : 'normal'}` + ) + + if (extractedUsage && !usageData) { + usageData = extractedUsage + if (extractedModel) actualModel = extractedModel + logger.debug(`🎯 Stream usage captured via robust extraction`, { + isFromFinalBuffer, + usageData, + actualModel + }) + } + + // 原有的简单提取作为备用 + if (!usageData) { + // 获取使用统计(Responses API: response.completed -> response.usage) + if (eventData.type === 'response.completed' && eventData.response) { + if (eventData.response.model) { + actualModel = eventData.response.model + } + if (eventData.response.usage) { + usageData = eventData.response.usage + logger.debug('🎯 Stream usage (backup method - response.usage):', usageData) + } } - if (eventData.response.usage) { - usageData = eventData.response.usage - logger.debug('Captured Azure OpenAI nested usage (response.usage):', usageData) + + // 兼容 Chat Completions 风格(顶层 usage) + if (!usageData && eventData.usage) { + usageData = eventData.usage + logger.debug('🎯 Stream usage (backup method - top-level):', usageData) } } - // 兼容 Chat Completions 风格(顶层 usage) - if (!usageData && eventData.usage) { - usageData = eventData.usage - logger.debug('Captured Azure OpenAI usage (top-level):', usageData) - } - - // 检查是否是完成事件 - if (eventData.choices && eventData.choices[0] && eventData.choices[0].finish_reason) { - // 这是最后一个 chunk - } } catch (e) { - // 忽略解析错误 + logger.debug('SSE parsing error (expected for incomplete chunks):', e.message) } } } @@ -431,10 +454,17 @@ function handleStreamResponse(upstreamResponse, clientResponse, options = {}) { // 同时解析数据以捕获 usage 信息,带缓冲区大小限制 buffer += chunkStr - // 防止缓冲区过大 + // 保留最后的chunks用于最终usage提取(不被truncate影响) + finalChunksBuffer += chunkStr + if (finalChunksBuffer.length > FINAL_CHUNKS_SIZE) { + finalChunksBuffer = finalChunksBuffer.slice(-FINAL_CHUNKS_SIZE) + } + + // 防止主缓冲区过大 - 但保持最后部分用于usage解析 if (buffer.length > MAX_BUFFER_SIZE) { - logger.warn(`Stream ${streamId} buffer exceeded limit, truncating`) - buffer = buffer.slice(-MAX_BUFFER_SIZE / 2) // 保留后一半 + logger.warn(`Stream ${streamId} buffer exceeded limit, truncating main buffer but preserving final chunks`) + // 保留最后1/4而不是1/2,为usage数据留更多空间 + buffer = buffer.slice(-MAX_BUFFER_SIZE / 4) } // 处理完整的 SSE 事件 @@ -470,9 +500,86 @@ function handleStreamResponse(upstreamResponse, clientResponse, options = {}) { hasEnded = true try { - // 处理剩余的 buffer - if (buffer.trim() && buffer.length <= MAX_EVENT_SIZE) { - parseSSEForUsage(buffer) + logger.debug(`🔚 Stream ended, performing comprehensive usage extraction for ${streamId}`, { + mainBufferSize: buffer.length, + finalChunksBufferSize: finalChunksBuffer.length, + parsedEventsCount: allParsedEvents.length, + hasUsageData: !!usageData + }) + + // 多层次的最终usage提取策略 + if (!usageData) { + logger.debug('🔍 No usage found during stream, trying final extraction methods...') + + // 方法1: 解析剩余的主buffer + if (buffer.trim() && buffer.length <= MAX_EVENT_SIZE) { + parseSSEForUsage(buffer, false) + } + + // 方法2: 解析保留的final chunks buffer + if (!usageData && finalChunksBuffer.trim()) { + logger.debug('🔍 Trying final chunks buffer for usage extraction...') + parseSSEForUsage(finalChunksBuffer, true) + } + + // 方法3: 从所有解析的事件中重新搜索usage + if (!usageData && allParsedEvents.length > 0) { + logger.debug('🔍 Searching through all parsed events for usage...') + + // 倒序查找,因为usage通常在最后 + for (let i = allParsedEvents.length - 1; i >= 0; i--) { + const { usageData: foundUsage, actualModel: foundModel } = extractUsageDataRobust( + allParsedEvents[i], + `final-event-scan-${i}` + ) + if (foundUsage) { + usageData = foundUsage + if (foundModel) actualModel = foundModel + logger.debug(`🎯 Usage found in event ${i} during final scan!`) + break + } + } + } + + // 方法4: 尝试合并所有事件并搜索 + if (!usageData && allParsedEvents.length > 0) { + logger.debug('🔍 Trying combined events analysis...') + const combinedData = { + events: allParsedEvents, + lastEvent: allParsedEvents[allParsedEvents.length - 1], + eventCount: allParsedEvents.length + } + + const { usageData: combinedUsage } = extractUsageDataRobust(combinedData, 'combined-events') + if (combinedUsage) { + usageData = combinedUsage + logger.debug('🎯 Usage found via combined events analysis!') + } + } + } + + // 最终usage状态报告 + if (usageData) { + logger.debug('✅ Final stream usage extraction SUCCESS', { + streamId, + usageData, + actualModel, + totalEvents: allParsedEvents.length, + finalBufferSize: finalChunksBuffer.length + }) + } else { + logger.warn('❌ Final stream usage extraction FAILED', { + streamId, + totalEvents: allParsedEvents.length, + finalBufferSize: finalChunksBuffer.length, + mainBufferSize: buffer.length, + lastFewEvents: allParsedEvents.slice(-3).map(e => ({ + type: e.type, + hasUsage: !!e.usage, + hasResponse: !!e.response, + keys: Object.keys(e) + })) + }) } if (onEnd) { @@ -528,6 +635,117 @@ function handleStreamResponse(upstreamResponse, clientResponse, options = {}) { }) } +// 强化的用量数据提取函数 +function extractUsageDataRobust(responseData, context = 'unknown') { + logger.debug(`🔍 Attempting usage extraction for ${context}`, { + responseDataKeys: Object.keys(responseData || {}), + responseDataType: typeof responseData, + hasUsage: !!responseData?.usage, + hasResponse: !!responseData?.response + }) + + let usageData = null + let actualModel = null + + try { + // 策略 1: 顶层 usage (标准 Chat Completions) + if (responseData?.usage) { + usageData = responseData.usage + actualModel = responseData.model + logger.debug('✅ Usage extracted via Strategy 1 (top-level)', { usageData, actualModel }) + } + + // 策略 2: response.usage (Responses API) + else if (responseData?.response?.usage) { + usageData = responseData.response.usage + actualModel = responseData.response.model || responseData.model + logger.debug('✅ Usage extracted via Strategy 2 (response.usage)', { usageData, actualModel }) + } + + // 策略 3: 嵌套搜索 - 深度查找 usage 字段 + else { + const findUsageRecursive = (obj, path = '') => { + if (!obj || typeof obj !== 'object') return null + + for (const [key, value] of Object.entries(obj)) { + const currentPath = path ? `${path}.${key}` : key + + if (key === 'usage' && value && typeof value === 'object') { + logger.debug(`✅ Usage found at path: ${currentPath}`, value) + return { usage: value, path: currentPath } + } + + if (typeof value === 'object' && value !== null) { + const nested = findUsageRecursive(value, currentPath) + if (nested) return nested + } + } + return null + } + + const found = findUsageRecursive(responseData) + if (found) { + usageData = found.usage + // Try to find model in the same parent object + const pathParts = found.path.split('.') + pathParts.pop() // remove 'usage' + let modelParent = responseData + for (const part of pathParts) { + modelParent = modelParent?.[part] + } + actualModel = modelParent?.model || responseData?.model + logger.debug('✅ Usage extracted via Strategy 3 (recursive)', { + usageData, + actualModel, + foundPath: found.path + }) + } + } + + // 策略 4: 特殊响应格式处理 + if (!usageData) { + // 检查是否有 choices 数组,usage 可能在最后一个 choice 中 + if (responseData?.choices?.length > 0) { + const lastChoice = responseData.choices[responseData.choices.length - 1] + if (lastChoice?.usage) { + usageData = lastChoice.usage + actualModel = responseData.model || lastChoice.model + logger.debug('✅ Usage extracted via Strategy 4 (choices)', { usageData, actualModel }) + } + } + } + + // 最终验证和记录 + if (usageData) { + logger.debug('🎯 Final usage extraction result', { + context, + usageData, + actualModel, + inputTokens: usageData.prompt_tokens || usageData.input_tokens || 0, + outputTokens: usageData.completion_tokens || usageData.output_tokens || 0, + totalTokens: usageData.total_tokens || 0 + }) + } else { + logger.warn('❌ Failed to extract usage data', { + context, + responseDataStructure: JSON.stringify(responseData, null, 2).substring(0, 1000) + '...', + availableKeys: Object.keys(responseData || {}), + responseSize: JSON.stringify(responseData || {}).length + }) + } + + } catch (extractionError) { + logger.error('🚨 Error during usage extraction', { + context, + error: extractionError.message, + stack: extractionError.stack, + responseDataType: typeof responseData + }) + } + + return { usageData, actualModel } +} + // 处理非流式响应 function handleNonStreamResponse(upstreamResponse, clientResponse) { try { @@ -554,9 +772,8 @@ function handleNonStreamResponse(upstreamResponse, clientResponse) { const responseData = upstreamResponse.data clientResponse.json(responseData) - // 提取 usage 数据 - const usageData = responseData.usage - const actualModel = responseData.model + // 使用强化的用量提取 + const { usageData, actualModel } = extractUsageDataRobust(responseData, 'non-stream') return { usageData, actualModel, responseData } } catch (error) {