diff --git a/package-lock.json b/package-lock.json index d9ebcff0..e6898fe4 100644 --- a/package-lock.json +++ b/package-lock.json @@ -20,6 +20,7 @@ "dotenv": "^16.3.1", "express": "^4.18.2", "google-auth-library": "^10.1.0", + "heapdump": "^0.3.15", "helmet": "^7.1.0", "https-proxy-agent": "^7.0.2", "inquirer": "^8.2.6", @@ -5398,6 +5399,19 @@ "node": ">= 0.4" } }, + "node_modules/heapdump": { + "version": "0.3.15", + "resolved": "https://registry.npmjs.org/heapdump/-/heapdump-0.3.15.tgz", + "integrity": "sha512-n8aSFscI9r3gfhOcAECAtXFaQ1uy4QSke6bnaL+iymYZ/dWs9cqDqHM+rALfsHUwukUbxsdlECZ0pKmJdQ/4OA==", + "hasInstallScript": true, + "license": "ISC", + "dependencies": { + "nan": "^2.13.2" + }, + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/helmet": { "version": "7.2.0", "resolved": "https://registry.npmmirror.com/helmet/-/helmet-7.2.0.tgz", @@ -7013,6 +7027,12 @@ "integrity": "sha512-nnbWWOkoWyUsTjKrhgD0dcz22mdkSnpYqbEjIm2nhwhuxlSkpywJmBo8h0ZqJdkp73mb90SssHkN4rsRaBAfAA==", "license": "ISC" }, + "node_modules/nan": { + "version": "2.24.0", + "resolved": "https://registry.npmjs.org/nan/-/nan-2.24.0.tgz", + "integrity": "sha512-Vpf9qnVW1RaDkoNKFUvfxqAbtI8ncb8OJlqZ9wwpXzWPEsvsB1nvdUi6oYrHIkQ1Y/tMDnr1h4nczS0VB9Xykg==", + "license": "MIT" + }, "node_modules/natural-compare": { "version": "1.4.0", "resolved": "https://registry.npmmirror.com/natural-compare/-/natural-compare-1.4.0.tgz", diff --git a/package.json b/package.json index 6ef88e60..043cbb2e 100644 --- a/package.json +++ b/package.json @@ -59,6 +59,7 @@ "dotenv": "^16.3.1", "express": "^4.18.2", "google-auth-library": "^10.1.0", + "heapdump": "^0.3.15", "helmet": "^7.1.0", "https-proxy-agent": "^7.0.2", "inquirer": "^8.2.6", diff --git a/src/routes/api.js b/src/routes/api.js index 6ec81cbd..03a97013 100644 --- a/src/routes/api.js +++ b/src/routes/api.js @@ -416,11 +416,18 @@ async function handleMessagesRequest(req, res) { // 根据账号类型选择对应的转发服务并调用 if (accountType === 'claude-official') { // 官方Claude账号使用原有的转发服务(会自己选择账号) + // 🧹 内存优化:提取需要的值,避免闭包捕获整个 req 对象 + const _apiKeyId = req.apiKey.id + const _rateLimitInfo = req.rateLimitInfo + const _requestBody = req.body // 传递后清除引用 + const _apiKey = req.apiKey + const _headers = req.headers + await claudeRelayService.relayStreamRequestWithUsageCapture( - req.body, - req.apiKey, + _requestBody, + _apiKey, res, - req.headers, + _headers, (usageData) => { // 回调函数:当检测到完整usage数据时记录真实token使用量 logger.info( @@ -470,13 +477,13 @@ async function handleMessagesRequest(req, res) { } apiKeyService - .recordUsageWithDetails(req.apiKey.id, usageObject, model, usageAccountId, 'claude') + .recordUsageWithDetails(_apiKeyId, usageObject, model, usageAccountId, 'claude') .catch((error) => { logger.error('❌ Failed to record stream usage:', error) }) queueRateLimitUpdate( - req.rateLimitInfo, + _rateLimitInfo, { inputTokens, outputTokens, @@ -501,11 +508,18 @@ async function handleMessagesRequest(req, res) { ) } else if (accountType === 'claude-console') { // Claude Console账号使用Console转发服务(需要传递accountId) + // 🧹 内存优化:提取需要的值 + const _apiKeyIdConsole = req.apiKey.id + const _rateLimitInfoConsole = req.rateLimitInfo + const _requestBodyConsole = req.body + const _apiKeyConsole = req.apiKey + const _headersConsole = req.headers + await claudeConsoleRelayService.relayStreamRequestWithUsageCapture( - req.body, - req.apiKey, + _requestBodyConsole, + _apiKeyConsole, res, - req.headers, + _headersConsole, (usageData) => { // 回调函数:当检测到完整usage数据时记录真实token使用量 logger.info( @@ -556,7 +570,7 @@ async function handleMessagesRequest(req, res) { apiKeyService .recordUsageWithDetails( - req.apiKey.id, + _apiKeyIdConsole, usageObject, model, usageAccountId, @@ -567,7 +581,7 @@ async function handleMessagesRequest(req, res) { }) queueRateLimitUpdate( - req.rateLimitInfo, + _rateLimitInfoConsole, { inputTokens, outputTokens, @@ -593,6 +607,11 @@ async function handleMessagesRequest(req, res) { ) } else if (accountType === 'bedrock') { // Bedrock账号使用Bedrock转发服务 + // 🧹 内存优化:提取需要的值 + const _apiKeyIdBedrock = req.apiKey.id + const _rateLimitInfoBedrock = req.rateLimitInfo + const _requestBodyBedrock = req.body + try { const bedrockAccountResult = await bedrockAccountService.getAccount(accountId) if (!bedrockAccountResult.success) { @@ -600,7 +619,7 @@ async function handleMessagesRequest(req, res) { } const result = await bedrockRelayService.handleStreamRequest( - req.body, + _requestBodyBedrock, bedrockAccountResult.data, res ) @@ -611,13 +630,21 @@ async function handleMessagesRequest(req, res) { const outputTokens = result.usage.output_tokens || 0 apiKeyService - .recordUsage(req.apiKey.id, inputTokens, outputTokens, 0, 0, result.model, accountId) + .recordUsage( + _apiKeyIdBedrock, + inputTokens, + outputTokens, + 0, + 0, + result.model, + accountId + ) .catch((error) => { logger.error('❌ Failed to record Bedrock stream usage:', error) }) queueRateLimitUpdate( - req.rateLimitInfo, + _rateLimitInfoBedrock, { inputTokens, outputTokens, @@ -642,11 +669,18 @@ async function handleMessagesRequest(req, res) { } } else if (accountType === 'ccr') { // CCR账号使用CCR转发服务(需要传递accountId) + // 🧹 内存优化:提取需要的值 + const _apiKeyIdCcr = req.apiKey.id + const _rateLimitInfoCcr = req.rateLimitInfo + const _requestBodyCcr = req.body + const _apiKeyCcr = req.apiKey + const _headersCcr = req.headers + await ccrRelayService.relayStreamRequestWithUsageCapture( - req.body, - req.apiKey, + _requestBodyCcr, + _apiKeyCcr, res, - req.headers, + _headersCcr, (usageData) => { // 回调函数:当检测到完整usage数据时记录真实token使用量 logger.info( @@ -696,13 +730,13 @@ async function handleMessagesRequest(req, res) { } apiKeyService - .recordUsageWithDetails(req.apiKey.id, usageObject, model, usageAccountId, 'ccr') + .recordUsageWithDetails(_apiKeyIdCcr, usageObject, model, usageAccountId, 'ccr') .catch((error) => { logger.error('❌ Failed to record CCR stream usage:', error) }) queueRateLimitUpdate( - req.rateLimitInfo, + _rateLimitInfoCcr, { inputTokens, outputTokens, @@ -737,18 +771,26 @@ async function handleMessagesRequest(req, res) { } }, 1000) // 1秒后检查 } else { + // 🧹 内存优化:提取需要的值,避免后续回调捕获整个 req + const _apiKeyIdNonStream = req.apiKey.id + const _apiKeyNameNonStream = req.apiKey.name + const _rateLimitInfoNonStream = req.rateLimitInfo + const _requestBodyNonStream = req.body + const _apiKeyNonStream = req.apiKey + const _headersNonStream = req.headers + // 🔍 检查客户端连接是否仍然有效(可能在并发排队等待期间断开) if (res.destroyed || res.socket?.destroyed || res.writableEnded) { logger.warn( - `⚠️ Client disconnected before non-stream request could start for key: ${req.apiKey?.name || 'unknown'}` + `⚠️ Client disconnected before non-stream request could start for key: ${_apiKeyNameNonStream || 'unknown'}` ) return undefined } // 非流式响应 - 只使用官方真实usage数据 logger.info('📄 Starting non-streaming request', { - apiKeyId: req.apiKey.id, - apiKeyName: req.apiKey.name + apiKeyId: _apiKeyIdNonStream, + apiKeyName: _apiKeyNameNonStream }) // 📊 监听 socket 事件以追踪连接状态变化 @@ -919,11 +961,11 @@ async function handleMessagesRequest(req, res) { ? await claudeAccountService.getAccount(accountId) : await claudeConsoleAccountService.getAccount(accountId) - if (account?.interceptWarmup === 'true' && isWarmupRequest(req.body)) { + if (account?.interceptWarmup === 'true' && isWarmupRequest(_requestBodyNonStream)) { logger.api( `🔥 Warmup request intercepted (non-stream) for account: ${account.name} (${accountId})` ) - return res.json(buildMockWarmupResponse(req.body.model)) + return res.json(buildMockWarmupResponse(_requestBodyNonStream.model)) } } @@ -936,11 +978,11 @@ async function handleMessagesRequest(req, res) { if (accountType === 'claude-official') { // 官方Claude账号使用原有的转发服务 response = await claudeRelayService.relayRequest( - req.body, - req.apiKey, - req, + _requestBodyNonStream, + _apiKeyNonStream, + req, // clientRequest 用于断开检测,保留但服务层已优化 res, - req.headers + _headersNonStream ) } else if (accountType === 'claude-console') { // Claude Console账号使用Console转发服务 @@ -948,11 +990,11 @@ async function handleMessagesRequest(req, res) { `[DEBUG] Calling claudeConsoleRelayService.relayRequest with accountId: ${accountId}` ) response = await claudeConsoleRelayService.relayRequest( - req.body, - req.apiKey, - req, + _requestBodyNonStream, + _apiKeyNonStream, + req, // clientRequest 保留用于断开检测 res, - req.headers, + _headersNonStream, accountId ) } else if (accountType === 'bedrock') { @@ -964,9 +1006,9 @@ async function handleMessagesRequest(req, res) { } const result = await bedrockRelayService.handleNonStreamRequest( - req.body, + _requestBodyNonStream, bedrockAccountResult.data, - req.headers + _headersNonStream ) // 构建标准响应格式 @@ -996,11 +1038,11 @@ async function handleMessagesRequest(req, res) { // CCR账号使用CCR转发服务 logger.debug(`[DEBUG] Calling ccrRelayService.relayRequest with accountId: ${accountId}`) response = await ccrRelayService.relayRequest( - req.body, - req.apiKey, - req, + _requestBodyNonStream, + _apiKeyNonStream, + req, // clientRequest 保留用于断开检测 res, - req.headers, + _headersNonStream, accountId ) } @@ -1049,14 +1091,14 @@ async function handleMessagesRequest(req, res) { const cacheCreateTokens = jsonData.usage.cache_creation_input_tokens || 0 const cacheReadTokens = jsonData.usage.cache_read_input_tokens || 0 // Parse the model to remove vendor prefix if present (e.g., "ccr,gemini-2.5-pro" -> "gemini-2.5-pro") - const rawModel = jsonData.model || req.body.model || 'unknown' + const rawModel = jsonData.model || _requestBodyNonStream.model || 'unknown' const { baseModel: usageBaseModel } = parseVendorPrefixedModel(rawModel) const model = usageBaseModel || rawModel // 记录真实的token使用量(包含模型信息和所有4种token以及账户ID) const { accountId: responseAccountId } = response await apiKeyService.recordUsage( - req.apiKey.id, + _apiKeyIdNonStream, inputTokens, outputTokens, cacheCreateTokens, @@ -1066,7 +1108,7 @@ async function handleMessagesRequest(req, res) { ) await queueRateLimitUpdate( - req.rateLimitInfo, + _rateLimitInfoNonStream, { inputTokens, outputTokens, diff --git a/src/services/claudeRelayService.js b/src/services/claudeRelayService.js index bb3b823e..66bc6a7f 100644 --- a/src/services/claudeRelayService.js +++ b/src/services/claudeRelayService.js @@ -21,6 +21,9 @@ const { isStreamWritable } = require('../utils/streamHelper') class ClaudeRelayService { constructor() { this.claudeApiUrl = 'https://api.anthropic.com/v1/messages?beta=true' + // 🧹 内存优化:用于存储请求体字符串,避免闭包捕获 + this.bodyStore = new Map() + this._bodyStoreIdCounter = 0 this.apiVersion = config.claude.apiVersion this.betaHeader = config.claude.betaHeader this.systemPrompt = config.claude.systemPrompt @@ -379,6 +382,7 @@ class ClaudeRelayService { let queueLockAcquired = false let queueRequestId = null let selectedAccountId = null + let bodyStoreIdNonStream = null // 🧹 在 try 块外声明,以便 finally 清理 try { // 调试日志:查看API Key数据 @@ -539,7 +543,10 @@ class ClaudeRelayService { const isRealClaudeCodeRequest = this._isActualClaudeCodeRequest(requestBody, clientHeaders) const processedBody = this._processRequestBody(requestBody, account) - const baseRequestBody = JSON.parse(JSON.stringify(processedBody)) + // 🧹 内存优化:存储到 bodyStore,避免闭包捕获 + const originalBodyString = JSON.stringify(processedBody) + bodyStoreIdNonStream = ++this._bodyStoreIdCounter + this.bodyStore.set(bodyStoreIdNonStream, originalBodyString) // 获取代理配置 const proxyAgent = await this._getProxyAgent(accountId) @@ -567,8 +574,16 @@ class ClaudeRelayService { let shouldRetry = false do { + // 🧹 每次重试从 bodyStore 解析新对象,避免闭包捕获 + let retryRequestBody + try { + retryRequestBody = JSON.parse(this.bodyStore.get(bodyStoreIdNonStream)) + } catch (parseError) { + logger.error(`❌ Failed to parse body for retry: ${parseError.message}`) + throw new Error(`Request body parse failed: ${parseError.message}`) + } response = await this._makeClaudeRequest( - JSON.parse(JSON.stringify(baseRequestBody)), + retryRequestBody, accessToken, proxyAgent, clientHeaders, @@ -904,6 +919,10 @@ class ClaudeRelayService { ) throw error } finally { + // 🧹 清理 bodyStore + if (bodyStoreIdNonStream !== null) { + this.bodyStore.delete(bodyStoreIdNonStream) + } // 📬 释放用户消息队列锁(兜底,正常情况下已在请求发送后提前释放) if (queueLockAcquired && queueRequestId && selectedAccountId) { try { @@ -1419,7 +1438,8 @@ class ClaudeRelayService { return prepared.abortResponse } - const { bodyString, headers, isRealClaudeCode, toolNameMap } = prepared + let { bodyString } = prepared + const { headers, isRealClaudeCode, toolNameMap } = prepared return new Promise((resolve, reject) => { // 支持自定义路径(如 count_tokens) @@ -1533,6 +1553,8 @@ class ClaudeRelayService { // 写入请求体 req.write(bodyString) + // 🧹 内存优化:立即清空 bodyString 引用,避免闭包捕获 + bodyString = null req.end() }) } @@ -1716,14 +1738,17 @@ class ClaudeRelayService { const isRealClaudeCodeRequest = this._isActualClaudeCodeRequest(requestBody, clientHeaders) const processedBody = this._processRequestBody(requestBody, account) - const baseRequestBody = JSON.parse(JSON.stringify(processedBody)) + // 🧹 内存优化:存储到 bodyStore,不放入 requestOptions 避免闭包捕获 + const originalBodyString = JSON.stringify(processedBody) + const bodyStoreId = ++this._bodyStoreIdCounter + this.bodyStore.set(bodyStoreId, originalBodyString) // 获取代理配置 const proxyAgent = await this._getProxyAgent(accountId) // 发送流式请求并捕获usage数据 await this._makeClaudeStreamRequestWithUsageCapture( - JSON.parse(JSON.stringify(baseRequestBody)), + processedBody, accessToken, proxyAgent, clientHeaders, @@ -1740,7 +1765,7 @@ class ClaudeRelayService { streamTransformer, { ...options, - originalRequestBody: baseRequestBody, + bodyStoreId, isRealClaudeCodeRequest }, isDedicatedOfficialAccount, @@ -1831,7 +1856,8 @@ class ClaudeRelayService { return prepared.abortResponse } - const { bodyString, headers, toolNameMap } = prepared + let { bodyString } = prepared + const { headers, toolNameMap } = prepared const toolNameStreamTransformer = this._createToolNameStripperStreamTransformer( streamTransformer, toolNameMap @@ -1943,9 +1969,20 @@ class ClaudeRelayService { try { // 递归调用自身进行重试 - const retryBody = requestOptions.originalRequestBody - ? JSON.parse(JSON.stringify(requestOptions.originalRequestBody)) - : body + // 🧹 从 bodyStore 获取字符串用于重试 + if ( + !requestOptions.bodyStoreId || + !this.bodyStore.has(requestOptions.bodyStoreId) + ) { + throw new Error('529 retry requires valid bodyStoreId') + } + let retryBody + try { + retryBody = JSON.parse(this.bodyStore.get(requestOptions.bodyStoreId)) + } catch (parseError) { + logger.error(`❌ Failed to parse body for 529 retry: ${parseError.message}`) + throw new Error(`529 retry body parse failed: ${parseError.message}`) + } const retryResult = await this._makeClaudeStreamRequestWithUsageCapture( retryBody, accessToken, @@ -2050,10 +2087,18 @@ class ClaudeRelayService { if ( this._isClaudeCodeCredentialError(errorData) && requestOptions.useRandomizedToolNames !== true && - requestOptions.originalRequestBody + requestOptions.bodyStoreId && + this.bodyStore.has(requestOptions.bodyStoreId) ) { + let retryBody + try { + retryBody = JSON.parse(this.bodyStore.get(requestOptions.bodyStoreId)) + } catch (parseError) { + logger.error(`❌ Failed to parse body for 403 retry: ${parseError.message}`) + reject(new Error(`403 retry body parse failed: ${parseError.message}`)) + return + } try { - const retryBody = JSON.parse(JSON.stringify(requestOptions.originalRequestBody)) const retryResult = await this._makeClaudeStreamRequestWithUsageCapture( retryBody, accessToken, @@ -2149,6 +2194,11 @@ class ClaudeRelayService { let rateLimitDetected = false // 限流检测标志 // 监听数据块,解析SSE并寻找usage信息 + // 🧹 内存优化:在闭包创建前提取需要的值,避免闭包捕获 body 和 requestOptions + // body 和 requestOptions 只在闭包外使用,闭包内只引用基本类型 + const requestedModel = body?.model || 'unknown' + const { isRealClaudeCodeRequest } = requestOptions + res.on('data', (chunk) => { try { const chunkStr = chunk.toString() @@ -2354,7 +2404,7 @@ class ClaudeRelayService { // 打印原始的usage数据为JSON字符串,避免嵌套问题 logger.info( - `📊 === Stream Request Usage Summary === Model: ${body.model}, Total Events: ${allUsageData.length}, Usage Data: ${JSON.stringify(allUsageData)}` + `📊 === Stream Request Usage Summary === Model: ${requestedModel}, Total Events: ${allUsageData.length}, Usage Data: ${JSON.stringify(allUsageData)}` ) // 一般一个请求只会使用一个模型,即使有多个usage事件也应该合并 @@ -2364,7 +2414,7 @@ class ClaudeRelayService { output_tokens: totalUsage.output_tokens, cache_creation_input_tokens: totalUsage.cache_creation_input_tokens, cache_read_input_tokens: totalUsage.cache_read_input_tokens, - model: allUsageData[allUsageData.length - 1].model || body.model // 使用最后一个模型或请求模型 + model: allUsageData[allUsageData.length - 1].model || requestedModel // 使用最后一个模型或请求模型 } // 如果有详细的cache_creation数据,合并它们 @@ -2473,15 +2523,15 @@ class ClaudeRelayService { } // 只有真实的 Claude Code 请求才更新 headers(流式请求) - if ( - clientHeaders && - Object.keys(clientHeaders).length > 0 && - this.isRealClaudeCodeRequest(body) - ) { + if (clientHeaders && Object.keys(clientHeaders).length > 0 && isRealClaudeCodeRequest) { await claudeCodeHeadersService.storeAccountHeaders(accountId, clientHeaders) } } + // 🧹 清理 bodyStore + if (requestOptions.bodyStoreId) { + this.bodyStore.delete(requestOptions.bodyStoreId) + } logger.debug('🌊 Claude stream response with usage capture completed') resolve() }) @@ -2538,6 +2588,10 @@ class ClaudeRelayService { ) responseStream.end() } + // 🧹 清理 bodyStore + if (requestOptions.bodyStoreId) { + this.bodyStore.delete(requestOptions.bodyStoreId) + } reject(error) }) @@ -2567,6 +2621,10 @@ class ClaudeRelayService { ) responseStream.end() } + // 🧹 清理 bodyStore + if (requestOptions.bodyStoreId) { + this.bodyStore.delete(requestOptions.bodyStoreId) + } reject(new Error('Request timeout')) }) @@ -2580,6 +2638,8 @@ class ClaudeRelayService { // 写入请求体 req.write(bodyString) + // 🧹 内存优化:立即清空 bodyString 引用,避免闭包捕获 + bodyString = null req.end() }) }