fix(memory): use bodyStore to avoid closure capturing request body

Problem:
- Stream response handlers (res.on) captured requestOptions in closures
- requestOptions contained originalBodyString (~800KB per request)
- These strings couldn't be GC'd until stream completed
- With concurrent requests, memory accumulated rapidly

Solution:
- Store request body strings in this.bodyStore Map with unique ID
- Pass only bodyStoreId in requestOptions (not the 800KB string)
- Closures capture small ID, not large string
- Clean up bodyStore on request completion (success/error/timeout)
- Extract needed values before closures to avoid capturing body object
This commit is contained in:
root
2026-01-12 08:31:47 +00:00
parent 962e01b080
commit f535b35a1c
3 changed files with 81 additions and 13 deletions

View File

@@ -21,6 +21,9 @@ const { isStreamWritable } = require('../utils/streamHelper')
class ClaudeRelayService {
constructor() {
this.claudeApiUrl = 'https://api.anthropic.com/v1/messages?beta=true'
// 🧹 内存优化:用于存储请求体字符串,避免闭包捕获
this.bodyStore = new Map()
this._bodyStoreIdCounter = 0
this.apiVersion = config.claude.apiVersion
this.betaHeader = config.claude.betaHeader
this.systemPrompt = config.claude.systemPrompt
@@ -539,7 +542,8 @@ class ClaudeRelayService {
const isRealClaudeCodeRequest = this._isActualClaudeCodeRequest(requestBody, clientHeaders)
const processedBody = this._processRequestBody(requestBody, account)
const baseRequestBody = JSON.parse(JSON.stringify(processedBody))
// 🧹 内存优化:存储序列化字符串用于重试,避免重复转换工具名
const originalBodyString = JSON.stringify(processedBody)
// 获取代理配置
const proxyAgent = await this._getProxyAgent(accountId)
@@ -567,8 +571,16 @@ class ClaudeRelayService {
let shouldRetry = false
do {
// 🧹 每次重试从字符串解析新对象,避免使用被修改的 body
let retryRequestBody
try {
retryRequestBody = JSON.parse(originalBodyString)
} catch (parseError) {
logger.error(`❌ Failed to parse originalBodyString for retry: ${parseError.message}`)
throw new Error(`Request body parse failed: ${parseError.message}`)
}
response = await this._makeClaudeRequest(
JSON.parse(JSON.stringify(baseRequestBody)),
retryRequestBody,
accessToken,
proxyAgent,
clientHeaders,
@@ -1716,14 +1728,17 @@ class ClaudeRelayService {
const isRealClaudeCodeRequest = this._isActualClaudeCodeRequest(requestBody, clientHeaders)
const processedBody = this._processRequestBody(requestBody, account)
const baseRequestBody = JSON.parse(JSON.stringify(processedBody))
// 🧹 内存优化:存储到 bodyStore不放入 requestOptions 避免闭包捕获
const originalBodyString = JSON.stringify(processedBody)
const bodyStoreId = ++this._bodyStoreIdCounter
this.bodyStore.set(bodyStoreId, originalBodyString)
// 获取代理配置
const proxyAgent = await this._getProxyAgent(accountId)
// 发送流式请求并捕获usage数据
await this._makeClaudeStreamRequestWithUsageCapture(
JSON.parse(JSON.stringify(baseRequestBody)),
processedBody,
accessToken,
proxyAgent,
clientHeaders,
@@ -1740,7 +1755,7 @@ class ClaudeRelayService {
streamTransformer,
{
...options,
originalRequestBody: baseRequestBody,
bodyStoreId: bodyStoreId,
isRealClaudeCodeRequest
},
isDedicatedOfficialAccount,
@@ -1943,9 +1958,17 @@ class ClaudeRelayService {
try {
// 递归调用自身进行重试
const retryBody = requestOptions.originalRequestBody
? JSON.parse(JSON.stringify(requestOptions.originalRequestBody))
: body
// 🧹 从 bodyStore 获取字符串用于重试
if (!requestOptions.bodyStoreId || !this.bodyStore.has(requestOptions.bodyStoreId)) {
throw new Error('529 retry requires valid bodyStoreId')
}
let retryBody
try {
retryBody = JSON.parse(this.bodyStore.get(requestOptions.bodyStoreId))
} catch (parseError) {
logger.error(`❌ Failed to parse body for 529 retry: ${parseError.message}`)
throw new Error(`529 retry body parse failed: ${parseError.message}`)
}
const retryResult = await this._makeClaudeStreamRequestWithUsageCapture(
retryBody,
accessToken,
@@ -2050,10 +2073,17 @@ class ClaudeRelayService {
if (
this._isClaudeCodeCredentialError(errorData) &&
requestOptions.useRandomizedToolNames !== true &&
requestOptions.originalRequestBody
requestOptions.bodyStoreId && this.bodyStore.has(requestOptions.bodyStoreId)
) {
let retryBody
try {
retryBody = JSON.parse(this.bodyStore.get(requestOptions.bodyStoreId))
} catch (parseError) {
logger.error(`❌ Failed to parse body for 403 retry: ${parseError.message}`)
reject(new Error(`403 retry body parse failed: ${parseError.message}`))
return
}
try {
const retryBody = JSON.parse(JSON.stringify(requestOptions.originalRequestBody))
const retryResult = await this._makeClaudeStreamRequestWithUsageCapture(
retryBody,
accessToken,
@@ -2149,6 +2179,11 @@ class ClaudeRelayService {
let rateLimitDetected = false // 限流检测标志
// 监听数据块解析SSE并寻找usage信息
// 🧹 内存优化:在闭包创建前提取需要的值,避免闭包捕获 body 和 requestOptions
// body 和 requestOptions 只在闭包外使用,闭包内只引用基本类型
const requestedModel = body?.model || 'unknown'
const isRealClaudeCodeRequest = requestOptions.isRealClaudeCodeRequest
res.on('data', (chunk) => {
try {
const chunkStr = chunk.toString()
@@ -2354,7 +2389,7 @@ class ClaudeRelayService {
// 打印原始的usage数据为JSON字符串避免嵌套问题
logger.info(
`📊 === Stream Request Usage Summary === Model: ${body.model}, Total Events: ${allUsageData.length}, Usage Data: ${JSON.stringify(allUsageData)}`
`📊 === Stream Request Usage Summary === Model: ${requestedModel}, Total Events: ${allUsageData.length}, Usage Data: ${JSON.stringify(allUsageData)}`
)
// 一般一个请求只会使用一个模型即使有多个usage事件也应该合并
@@ -2364,7 +2399,7 @@ class ClaudeRelayService {
output_tokens: totalUsage.output_tokens,
cache_creation_input_tokens: totalUsage.cache_creation_input_tokens,
cache_read_input_tokens: totalUsage.cache_read_input_tokens,
model: allUsageData[allUsageData.length - 1].model || body.model // 使用最后一个模型或请求模型
model: allUsageData[allUsageData.length - 1].model || requestedModel // 使用最后一个模型或请求模型
}
// 如果有详细的cache_creation数据合并它们
@@ -2476,12 +2511,16 @@ class ClaudeRelayService {
if (
clientHeaders &&
Object.keys(clientHeaders).length > 0 &&
this.isRealClaudeCodeRequest(body)
isRealClaudeCodeRequest
) {
await claudeCodeHeadersService.storeAccountHeaders(accountId, clientHeaders)
}
}
// 🧹 清理 bodyStore
if (requestOptions.bodyStoreId) {
this.bodyStore.delete(requestOptions.bodyStoreId)
}
logger.debug('🌊 Claude stream response with usage capture completed')
resolve()
})
@@ -2538,6 +2577,10 @@ class ClaudeRelayService {
)
responseStream.end()
}
// 🧹 清理 bodyStore
if (requestOptions.bodyStoreId) {
this.bodyStore.delete(requestOptions.bodyStoreId)
}
reject(error)
})
@@ -2567,6 +2610,10 @@ class ClaudeRelayService {
)
responseStream.end()
}
// 🧹 清理 bodyStore
if (requestOptions.bodyStoreId) {
this.bodyStore.delete(requestOptions.bodyStoreId)
}
reject(new Error('Request timeout'))
})