diff --git a/.env.example b/.env.example index 3bb4db48..75f4683a 100644 --- a/.env.example +++ b/.env.example @@ -53,25 +53,38 @@ CLAUDE_BETA_HEADER=claude-code-20250219,oauth-2025-04-20,interleaved-thinking-20 # - /antigravity/api -> Antigravity OAuth # - /gemini-cli/api -> Gemini CLI OAuth -# (可选)Claude Code 调试 Dump:会在项目根目录写入 jsonl 文件,便于排查 tools/schema/回包问题 -# - anthropic-requests-dump.jsonl -# - anthropic-responses-dump.jsonl -# - anthropic-tools-dump.jsonl +# ============================================================================ +# 🐛 调试 Dump 配置(可选) +# ============================================================================ +# 以下开启后会在项目根目录写入 .jsonl 调试文件,便于排查问题。 +# ⚠️ 生产环境建议关闭,避免磁盘占用。 +# +# 📄 输出文件列表: +# - anthropic-requests-dump.jsonl (客户端请求) +# - anthropic-responses-dump.jsonl (返回给客户端的响应) +# - anthropic-tools-dump.jsonl (工具定义快照) +# - antigravity-upstream-requests-dump.jsonl (发往上游的请求) +# - antigravity-upstream-responses-dump.jsonl (上游 SSE 响应) +# +# 📌 开关配置: # ANTHROPIC_DEBUG_REQUEST_DUMP=true -# ANTHROPIC_DEBUG_REQUEST_DUMP_MAX_BYTES=2097152 # ANTHROPIC_DEBUG_RESPONSE_DUMP=true -# ANTHROPIC_DEBUG_RESPONSE_DUMP_MAX_BYTES=2097152 # ANTHROPIC_DEBUG_TOOLS_DUMP=true -# -# (可选)工具失败继续:当 tool_result 标记 is_error=true 时,提示模型不要中断任务(仅 /antigravity/api 分流生效) -# ANTHROPIC_TOOL_ERROR_CONTINUE=true -# -# (可选)Antigravity 上游请求 Dump:会在项目根目录写入 jsonl 文件,便于核对最终发往上游的 payload(含 tools/schema 清洗后的结果) -# - antigravity-upstream-requests-dump.jsonl # ANTIGRAVITY_DEBUG_UPSTREAM_REQUEST_DUMP=true -# ANTIGRAVITY_DEBUG_UPSTREAM_REQUEST_DUMP_MAX_BYTES=2097152 -# 启用 Antigravity 上游响应日志(SSE 事件 + 流摘要) # ANTIGRAVITY_DEBUG_UPSTREAM_RESPONSE_DUMP=true +# +# 📏 单条记录大小上限(字节),默认 2MB: +# ANTHROPIC_DEBUG_REQUEST_DUMP_MAX_BYTES=2097152 +# ANTHROPIC_DEBUG_RESPONSE_DUMP_MAX_BYTES=2097152 +# ANTIGRAVITY_DEBUG_UPSTREAM_REQUEST_DUMP_MAX_BYTES=2097152 +# +# 📦 整个 Dump 文件大小上限(字节),超过后自动轮转为 .bak 文件,默认 10MB: +# DUMP_MAX_FILE_SIZE_BYTES=10485760 +# +# 🔧 工具失败继续:当 tool_result 标记 is_error=true 时,提示模型不要中断任务 +# (仅 /antigravity/api 分流生效) +# ANTHROPIC_TOOL_ERROR_CONTINUE=true + # 🚫 529错误处理配置 # 启用529错误处理,0表示禁用,>0表示过载状态持续时间(分钟) diff --git a/.gitignore b/.gitignore index ad751429..e4c9e9c1 100644 --- a/.gitignore +++ b/.gitignore @@ -247,6 +247,3 @@ web/apiStats/ # Admin SPA build files web/admin-spa/dist/ - -.cunzhi-memory/ -*.jsonl \ No newline at end of file diff --git a/src/services/anthropicGeminiBridgeService.js b/src/services/anthropicGeminiBridgeService.js index 5984cf58..faa4f592 100644 --- a/src/services/anthropicGeminiBridgeService.js +++ b/src/services/anthropicGeminiBridgeService.js @@ -28,6 +28,7 @@ * - 缺失 tool_result 自动补全:避免 tool_use concurrency 错误 */ +const util = require('util') const crypto = require('crypto') const fs = require('fs') const path = require('path') @@ -36,6 +37,7 @@ const { getProjectRoot } = require('../utils/projectPaths') const geminiAccountService = require('./geminiAccountService') const unifiedGeminiScheduler = require('./unifiedGeminiScheduler') const sessionHelper = require('../utils/sessionHelper') +const signatureCache = require('../utils/signatureCache') const apiKeyService = require('./apiKeyService') const { updateRateLimitCounters } = require('../utils/rateLimitHelper') const { parseSSELine } = require('../utils/sseParser') @@ -54,6 +56,9 @@ const { // 常量定义 // ============================================================================ +// 默认签名 +const THOUGHT_SIGNATURE_FALLBACK = 'skip_thought_signature_validator' + // 支持的后端类型 const SUPPORTED_VENDORS = new Set(['gemini-cli', 'antigravity']) // 需要跳过的系统提醒前缀(Claude 内部消息,不应转发给上游) @@ -363,6 +368,84 @@ function sanitizeThoughtSignatureForAntigravity(signature) { return compacted } +/** + * 检测是否是 Antigravity 的 INVALID_ARGUMENT (400) 错误 + * 用于在日志中特殊标记这类错误,方便调试 + * + * @param {Object} sanitized - sanitizeUpstreamError 处理后的错误对象 + * @returns {boolean} 是否是参数无效错误 + */ +function isInvalidAntigravityArgumentError(sanitized) { + if (!sanitized || typeof sanitized !== 'object') { + return false + } + const upstreamType = String(sanitized.upstreamType || '').toUpperCase() + if (upstreamType === 'INVALID_ARGUMENT') { + return true + } + const message = String(sanitized.upstreamMessage || sanitized.message || '') + return /invalid argument/i.test(message) +} + +/** + * 汇总 Antigravity 请求信息用于调试 + * 当发生 400 错误时,输出请求的关键统计信息,帮助定位问题 + * + * @param {Object} requestData - 发送给 Antigravity 的请求数据 + * @returns {Object} 请求摘要信息 + */ +function summarizeAntigravityRequestForDebug(requestData) { + const request = requestData?.request || {} + const contents = Array.isArray(request.contents) ? request.contents : [] + const partStats = { text: 0, thought: 0, functionCall: 0, functionResponse: 0, other: 0 } + let functionResponseIds = 0 + let fallbackSignatureCount = 0 + + for (const message of contents) { + const parts = Array.isArray(message?.parts) ? message.parts : [] + for (const part of parts) { + if (!part || typeof part !== 'object') { + continue + } + if (part.thoughtSignature === THOUGHT_SIGNATURE_FALLBACK) { + fallbackSignatureCount += 1 + } + if (part.thought) { + partStats.thought += 1 + continue + } + if (part.functionCall) { + partStats.functionCall += 1 + continue + } + if (part.functionResponse) { + partStats.functionResponse += 1 + if (part.functionResponse.id) { + functionResponseIds += 1 + } + continue + } + if (typeof part.text === 'string') { + partStats.text += 1 + continue + } + partStats.other += 1 + } + } + + return { + model: requestData?.model, + toolCount: Array.isArray(request.tools) ? request.tools.length : 0, + toolConfigMode: request.toolConfig?.functionCallingConfig?.mode, + thinkingConfig: request.generationConfig?.thinkingConfig, + maxOutputTokens: request.generationConfig?.maxOutputTokens, + contentsCount: contents.length, + partStats, + functionResponseIds, + fallbackSignatureCount + } +} + /** * 清洗工具结果的 content blocks * - 移除 base64 图片(避免体积过大) @@ -933,7 +1016,7 @@ function convertAnthropicToolChoiceToGeminiToolConfig(toolChoice) { function convertAnthropicMessagesToGeminiContents( messages, toolUseIdToName, - { vendor = null, stripThinking = false } = {} + { vendor = null, stripThinking = false, sessionId = null } = {} ) { const contents = [] for (const message of messages || []) { @@ -972,7 +1055,15 @@ function convertAnthropicMessagesToGeminiContents( const thinkingText = extractAnthropicText(part.thinking || part.text || '') if (vendor === 'antigravity') { const hasThinkingText = thinkingText && !shouldSkipText(thinkingText) - const signature = sanitizeThoughtSignatureForAntigravity(part.signature) + // 先尝试使用请求中的签名,如果没有则尝试从缓存恢复 + let signature = sanitizeThoughtSignatureForAntigravity(part.signature) + if (!signature && sessionId && hasThinkingText) { + const cachedSig = signatureCache.getCachedSignature(sessionId, thinkingText) + if (cachedSig) { + signature = cachedSig + logger.debug('[SignatureCache] Restored signature from cache for thinking block') + } + } const hasSignature = Boolean(signature) // Claude Code 有时会发送空的 thinking block(无 thinking / 无 signature)。 @@ -1023,8 +1114,19 @@ function convertAnthropicMessagesToGeminiContents( // Antigravity 对历史工具调用的 functionCall 会校验 thoughtSignature; // Claude Code 侧的签名存放在 thinking block(part.signature),这里需要回填到 functionCall part 上。 - if (vendor === 'antigravity' && lastAntigravityThoughtSignature) { - parts.push({ thoughtSignature: lastAntigravityThoughtSignature, functionCall }) + // [大东的绝杀补丁] 再次尝试! + if (vendor === 'antigravity') { + // 如果没有真签名,就用“免检金牌” + const effectiveSignature = + lastAntigravityThoughtSignature || THOUGHT_SIGNATURE_FALLBACK + + // 必须把这个塞进去 + // Antigravity 要求:每个包含 thoughtSignature 的 part 都必须有 thought: true + parts.push({ + thought: true, + thoughtSignature: effectiveSignature, + functionCall + }) } else { parts.push({ functionCall }) } @@ -1185,7 +1287,11 @@ function canEnableAntigravityThinking(messages) { * @param {Object} options - 选项,包含 vendor * @returns {Object} { model, request } Gemini 请求对象 */ -function buildGeminiRequestFromAnthropic(body, baseModel, { vendor = null } = {}) { +function buildGeminiRequestFromAnthropic( + body, + baseModel, + { vendor = null, sessionId = null } = {} +) { const normalizedMessages = normalizeAnthropicMessages(body.messages || [], { vendor }) const toolUseIdToName = buildToolUseIdToNameMap(normalizedMessages || []) @@ -1204,7 +1310,8 @@ function buildGeminiRequestFromAnthropic(body, baseModel, { vendor = null } = {} { vendor, // 当 Antigravity 无法启用 thinking 时,剥离所有 thinking blocks - stripThinking: vendor === 'antigravity' && !canEnableThinking + stripThinking: vendor === 'antigravity' && !canEnableThinking, + sessionId } ) const systemParts = buildSystemParts(body.system) @@ -1782,7 +1889,8 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel }) .json(buildAnthropicError(error.message || 'No available Gemini accounts')) } - const { accountId, accountType } = accountSelection + let { accountId } = accountSelection + const { accountType } = accountSelection if (accountType !== 'gemini') { return res .status(400) @@ -1831,7 +1939,10 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel }) }) } - let requestData = buildGeminiRequestFromAnthropic(req.body, effectiveModel, { vendor }) + let requestData = buildGeminiRequestFromAnthropic(req.body, effectiveModel, { + vendor, + sessionId: sessionHash + }) // Antigravity 上游对 function calling 的启用/校验更严格:参考实现普遍使用 VALIDATED。 // 这里仅在 tools 存在且未显式禁用(tool_choice=none)时应用,避免破坏原始语义。 @@ -1897,6 +2008,67 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel }) accountId }) rawResponse = await attemptRequest(stripToolsFromRequest(requestData)) + } else if ( + // [429 账户切换] 检测到 Antigravity 配额耗尽错误时,尝试切换账户重试 + vendor === 'antigravity' && + sanitized.statusCode === 429 && + (sanitized.message?.toLowerCase()?.includes('exhausted') || + sanitized.upstreamMessage?.toLowerCase()?.includes('exhausted') || + sanitized.message?.toLowerCase()?.includes('capacity')) + ) { + logger.warn( + '⚠️ Antigravity 429 quota exhausted (non-stream), switching account and retrying', + { + vendor, + accountId, + model: effectiveModel + } + ) + // 删除当前会话映射,让调度器选择其他账户 + if (sessionHash) { + await unifiedGeminiScheduler._deleteSessionMapping(sessionHash) + } + // 重新选择账户 + try { + const newAccountSelection = await unifiedGeminiScheduler.selectAccountForApiKey( + req.apiKey, + sessionHash, + effectiveModel, + { oauthProvider: vendor } + ) + const newAccountId = newAccountSelection.accountId + const newClient = await geminiAccountService.getGeminiClient(newAccountId) + if (!newClient) { + throw new Error('Failed to get new Gemini client for retry') + } + logger.info( + `🔄 Retrying non-stream with new account: ${newAccountId} (was: ${accountId})` + ) + // 用新账户的 client 重试 + rawResponse = + vendor === 'antigravity' + ? await geminiAccountService.generateContentAntigravity( + newClient, + requestData, + null, + projectId, + upstreamSessionId, + proxyConfig + ) + : await geminiAccountService.generateContent( + newClient, + requestData, + null, + projectId, + upstreamSessionId, + proxyConfig + ) + // 更新 accountId 以便后续使用记录 + accountId = newAccountId + } catch (retryError) { + logger.error('❌ Failed to retry non-stream with new account:', retryError) + throw error // 抛出原始错误 + } } else { throw error } @@ -2035,6 +2207,64 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel }) accountId }) streamResponse = await startStream(stripToolsFromRequest(requestData)) + } else if ( + // [429 账户切换] 检测到 Antigravity 配额耗尽错误时,尝试切换账户重试 + vendor === 'antigravity' && + sanitized.statusCode === 429 && + (sanitized.message?.toLowerCase()?.includes('exhausted') || + sanitized.upstreamMessage?.toLowerCase()?.includes('exhausted') || + sanitized.message?.toLowerCase()?.includes('capacity')) + ) { + logger.warn('⚠️ Antigravity 429 quota exhausted, switching account and retrying', { + vendor, + accountId, + model: effectiveModel + }) + // 删除当前会话映射,让调度器选择其他账户 + if (sessionHash) { + await unifiedGeminiScheduler._deleteSessionMapping(sessionHash) + } + // 重新选择账户 + try { + const newAccountSelection = await unifiedGeminiScheduler.selectAccountForApiKey( + req.apiKey, + sessionHash, + effectiveModel, + { oauthProvider: vendor } + ) + const newAccountId = newAccountSelection.accountId + const newClient = await geminiAccountService.getGeminiClient(newAccountId) + if (!newClient) { + throw new Error('Failed to get new Gemini client for retry') + } + logger.info(`🔄 Retrying with new account: ${newAccountId} (was: ${accountId})`) + // 用新账户的 client 重试 + streamResponse = + vendor === 'antigravity' + ? await geminiAccountService.generateContentStreamAntigravity( + newClient, + requestData, + null, + projectId, + upstreamSessionId, + abortController.signal, + proxyConfig + ) + : await geminiAccountService.generateContentStream( + newClient, + requestData, + null, + projectId, + upstreamSessionId, + abortController.signal, + proxyConfig + ) + // 更新 accountId 以便后续使用记录 + accountId = newAccountId + } catch (retryError) { + logger.error('❌ Failed to retry with new account:', retryError) + throw error // 抛出原始错误 + } } else { throw error } @@ -2065,10 +2295,52 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel }) } }) + const isAntigravityVendor = vendor === 'antigravity' const wantsThinkingBlockFirst = - vendor === 'antigravity' && + isAntigravityVendor && requestData?.request?.generationConfig?.thinkingConfig?.include_thoughts === true + // ======================================================================== + // [大东的 2.0 补丁 - 修复版] 活跃度看门狗 (Watchdog) + // ======================================================================== + let activityTimeout = null + const STREAM_ACTIVITY_TIMEOUT_MS = 45000 // 45秒无数据视为卡死 + + const resetActivityTimeout = () => { + if (activityTimeout) { + clearTimeout(activityTimeout) + } + activityTimeout = setTimeout(() => { + if (finished) { + return + } + + // 🛑【关键修改】先锁门!防止 abort() 触发的 onError 再次写入 res + finished = true + + logger.warn('⚠️ Upstream stream zombie detected (no data for 45s). Forcing termination.', { + requestId: req.requestId + }) + + if (!abortController.signal.aborted) { + abortController.abort() + } + + writeAnthropicSseEvent(res, 'error', { + type: 'error', + error: { + type: 'overloaded_error', + message: 'Upstream stream timed out (zombie connection). Please try again.' + } + }) + res.end() + }, STREAM_ACTIVITY_TIMEOUT_MS) + } + + // 🔥【这里!】一定要加这句来启动它! + resetActivityTimeout() + // ======================================================================== + let buffer = '' let emittedText = '' let emittedThinking = '' @@ -2128,7 +2400,15 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel }) } } - const canStartThinkingBlock = () => { + const canStartThinkingBlock = (_hasSignature = false) => { + // Antigravity 特殊处理:某些情况下不应启动 thinking block + if (isAntigravityVendor) { + // 如果 wantsThinkingBlockFirst 且已发送过工具调用,不应再启动 thinking + if (wantsThinkingBlockFirst && emittedAnyToolUse) { + return false + } + // [移除规则2] 签名可能在后续 chunk 中到达,不应提前阻止 thinking 启动 + } if (currentIndex < 0) { return true } @@ -2297,9 +2577,7 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel }) effectiveModel, responseModel, stop_reason: 'error', - tool_use_names: Array.from(emittedToolCallKeys) - .map((key) => key.split(':')[0]) - .filter(Boolean), + tool_use_names: Array.from(emittedToolUseNames).filter(Boolean), text_preview: emittedText ? emittedText.slice(0, 800) : '', usage: { input_tokens: 0, output_tokens: 0 } }) @@ -2354,9 +2632,7 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel }) stop_reason: emittedAnyToolUse ? 'tool_use' : mapGeminiFinishReasonToAnthropicStopReason(finishReason), - tool_use_names: Array.from(emittedToolCallKeys) - .map((key) => key.split(':')[0]) - .filter(Boolean), + tool_use_names: Array.from(emittedToolUseNames).filter(Boolean), text_preview: emittedText ? emittedText.slice(0, 800) : '', usage: { input_tokens: inputTokens, output_tokens: outputTokens } }) @@ -2396,6 +2672,8 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel }) } streamResponse.on('data', (chunk) => { + resetActivityTimeout() // <--- 【新增】收到数据了,重置倒计时! + if (finished) { return } @@ -2442,7 +2720,11 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel }) } const parts = extractGeminiParts(payload) - const thoughtSignature = extractGeminiThoughtSignature(payload) + const rawThoughtSignature = extractGeminiThoughtSignature(payload) + // Antigravity 专用净化:确保签名格式符合 API 要求 + const thoughtSignature = isAntigravityVendor + ? sanitizeThoughtSignatureForAntigravity(rawThoughtSignature) + : rawThoughtSignature const fullThoughtForToolOrdering = extractGeminiThoughtText(payload) if (wantsThinkingBlockFirst) { @@ -2531,7 +2813,7 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel }) } } - if (thoughtSignature && canStartThinkingBlock()) { + if (thoughtSignature && canStartThinkingBlock(true)) { let delta = '' if (thoughtSignature.startsWith(emittedThoughtSignature)) { delta = thoughtSignature.slice(emittedThoughtSignature.length) @@ -2550,7 +2832,10 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel }) } const fullThought = extractGeminiThoughtText(payload) - if (fullThought && canStartThinkingBlock()) { + if ( + fullThought && + canStartThinkingBlock(Boolean(thoughtSignature || emittedThoughtSignature)) + ) { let delta = '' if (fullThought.startsWith(emittedThinking)) { delta = fullThought.slice(emittedThinking.length) @@ -2565,6 +2850,10 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel }) index: currentIndex, delta: { type: 'thinking_delta', thinking: delta } }) + // [签名缓存] 当 thinking 内容和签名都有时,缓存供后续请求使用 + if (isAntigravityVendor && sessionHash && emittedThoughtSignature) { + signatureCache.cacheSignature(sessionHash, fullThought, emittedThoughtSignature) + } } } @@ -2590,10 +2879,18 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel }) }) streamResponse.on('end', () => { + if (activityTimeout) { + clearTimeout(activityTimeout) + } // <--- 【新增】正常结束,取消报警 + finalize().catch((e) => logger.error('Failed to finalize Anthropic SSE response:', e)) }) streamResponse.on('error', (error) => { + if (activityTimeout) { + clearTimeout(activityTimeout) + } // <--- 【新增】报错了,取消报警 + if (finished) { return } @@ -2609,22 +2906,54 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel }) return undefined } catch (error) { - const sanitized = sanitizeUpstreamError(error) - logger.error('Failed to start Gemini stream (via /v1/messages):', sanitized) + // ============================================================ + // [大东修复 3.0] 彻底防止 JSON 循环引用导致服务崩溃 + // ============================================================ - // 上游尚未建立 SSE(未写入任何事件)时,优先返回真实 HTTP 错误码,避免 200 + SSE error 的混淆。 + // 1. 使用 util.inspect 安全地将错误对象转为字符串,不使用 JSON.stringify + const safeErrorDetails = util.inspect(error, { + showHidden: false, + depth: 2, + colors: false, + breakLength: Infinity + }) + + // 2. 打印安全日志,绝对不会崩 + logger.error(`❌ [Critical] Failed to start Gemini stream. 错误详情:\n${safeErrorDetails}`) + + const sanitized = sanitizeUpstreamError(error) + + // 3. 特殊处理 Antigravity 的参数错误 (400),输出详细请求信息便于调试 + if ( + vendor === 'antigravity' && + effectiveModel.includes('claude') && + isInvalidAntigravityArgumentError(sanitized) + ) { + logger.warn('⚠️ Antigravity Claude invalid argument detected', { + requestId: req.requestId, + ...summarizeAntigravityRequestForDebug(requestData), + statusCode: sanitized.statusCode, + upstreamType: sanitized.upstreamType, + upstreamMessage: sanitized.upstreamMessage || sanitized.message + }) + } + + // 4. 确保返回 JSON 响应给客户端 (让客户端知道出错了并重试) if (!res.headersSent) { + // 记录非流式响应日志 dumpAnthropicNonStreamResponse( req, sanitized.statusCode || 502, buildAnthropicError(sanitized.upstreamMessage || sanitized.message), { vendor, accountId, effectiveModel, forcedVendor: vendor, upstreamError: sanitized } ) + return res .status(sanitized.statusCode || 502) .json(buildAnthropicError(sanitized.upstreamMessage || sanitized.message)) } + // 5. 如果头已经发了,走 SSE 发送错误 writeAnthropicSseEvent( res, 'error', @@ -2708,7 +3037,8 @@ async function handleAnthropicCountTokensToGemini(req, res, { vendor }) { toolUseIdToName, { vendor, - stripThinking: vendor === 'antigravity' && !canEnableThinking + stripThinking: vendor === 'antigravity' && !canEnableThinking, + sessionId: sessionHash } ) diff --git a/src/services/antigravityClient.js b/src/services/antigravityClient.js index 76997e72..19c3bd23 100644 --- a/src/services/antigravityClient.js +++ b/src/services/antigravityClient.js @@ -434,7 +434,37 @@ async function request({ const status = error?.response?.status if (status === 429 && !retriedAfterDelay && !signal?.aborted) { const data = error?.response?.data - const msg = typeof data === 'string' ? data : JSON.stringify(data || '') + + // 安全地将 data 转为字符串,避免 stream 对象导致循环引用崩溃 + const safeDataToString = (value) => { + if (typeof value === 'string') { + return value + } + if (value === null || value === undefined) { + return '' + } + // stream 对象存在循环引用,不能 JSON.stringify + if (typeof value === 'object' && typeof value.pipe === 'function') { + return '' + } + if (Buffer.isBuffer(value)) { + try { + return value.toString('utf8') + } catch (_) { + return '' + } + } + if (typeof value === 'object') { + try { + return JSON.stringify(value) + } catch (_) { + return '' + } + } + return String(value) + } + + const msg = safeDataToString(data) if ( msg.toLowerCase().includes('resource_exhausted') || msg.toLowerCase().includes('no capacity') diff --git a/src/utils/anthropicRequestDump.js b/src/utils/anthropicRequestDump.js index 8a1a7510..8e755064 100644 --- a/src/utils/anthropicRequestDump.js +++ b/src/utils/anthropicRequestDump.js @@ -1,7 +1,7 @@ -const fs = require('fs/promises') const path = require('path') const logger = require('./logger') const { getProjectRoot } = require('./projectPaths') +const { safeRotatingAppend } = require('./safeRotatingAppend') const REQUEST_DUMP_ENV = 'ANTHROPIC_DEBUG_REQUEST_DUMP' const REQUEST_DUMP_MAX_BYTES_ENV = 'ANTHROPIC_DEBUG_REQUEST_DUMP_MAX_BYTES' @@ -108,7 +108,7 @@ async function dumpAnthropicMessagesRequest(req, meta = {}) { const line = `${safeJsonStringify(record, maxBytes)}\n` try { - await fs.appendFile(filename, line, { encoding: 'utf8' }) + await safeRotatingAppend(filename, line) } catch (e) { logger.warn('Failed to dump Anthropic request', { filename, diff --git a/src/utils/anthropicResponseDump.js b/src/utils/anthropicResponseDump.js index 7107556c..c21605bc 100644 --- a/src/utils/anthropicResponseDump.js +++ b/src/utils/anthropicResponseDump.js @@ -1,7 +1,7 @@ -const fs = require('fs/promises') const path = require('path') const logger = require('./logger') const { getProjectRoot } = require('./projectPaths') +const { safeRotatingAppend } = require('./safeRotatingAppend') const RESPONSE_DUMP_ENV = 'ANTHROPIC_DEBUG_RESPONSE_DUMP' const RESPONSE_DUMP_MAX_BYTES_ENV = 'ANTHROPIC_DEBUG_RESPONSE_DUMP_MAX_BYTES' @@ -89,7 +89,7 @@ async function dumpAnthropicResponse(req, responseInfo, meta = {}) { const line = `${safeJsonStringify(record, maxBytes)}\n` try { - await fs.appendFile(filename, line, { encoding: 'utf8' }) + await safeRotatingAppend(filename, line) } catch (e) { logger.warn('Failed to dump Anthropic response', { filename, diff --git a/src/utils/antigravityUpstreamDump.js b/src/utils/antigravityUpstreamDump.js index 4c1be446..56120aa5 100644 --- a/src/utils/antigravityUpstreamDump.js +++ b/src/utils/antigravityUpstreamDump.js @@ -1,7 +1,7 @@ -const fs = require('fs/promises') const path = require('path') const logger = require('./logger') const { getProjectRoot } = require('./projectPaths') +const { safeRotatingAppend } = require('./safeRotatingAppend') const UPSTREAM_REQUEST_DUMP_ENV = 'ANTIGRAVITY_DEBUG_UPSTREAM_REQUEST_DUMP' const UPSTREAM_REQUEST_DUMP_MAX_BYTES_ENV = 'ANTIGRAVITY_DEBUG_UPSTREAM_REQUEST_DUMP_MAX_BYTES' @@ -103,7 +103,7 @@ async function dumpAntigravityUpstreamRequest(requestInfo) { const line = `${safeJsonStringify(record, maxBytes)}\n` try { - await fs.appendFile(filename, line, { encoding: 'utf8' }) + await safeRotatingAppend(filename, line) } catch (e) { logger.warn('Failed to dump Antigravity upstream request', { filename, diff --git a/src/utils/antigravityUpstreamResponseDump.js b/src/utils/antigravityUpstreamResponseDump.js index 0ad3a29a..177b1d11 100644 --- a/src/utils/antigravityUpstreamResponseDump.js +++ b/src/utils/antigravityUpstreamResponseDump.js @@ -1,7 +1,7 @@ -const fs = require('fs/promises') const path = require('path') const logger = require('./logger') const { getProjectRoot } = require('./projectPaths') +const { safeRotatingAppend } = require('./safeRotatingAppend') const UPSTREAM_RESPONSE_DUMP_ENV = 'ANTIGRAVITY_DEBUG_UPSTREAM_RESPONSE_DUMP' const UPSTREAM_RESPONSE_DUMP_MAX_BYTES_ENV = 'ANTIGRAVITY_DEBUG_UPSTREAM_RESPONSE_DUMP_MAX_BYTES' @@ -89,7 +89,7 @@ async function dumpAntigravityUpstreamResponse(responseInfo) { const line = `${safeJsonStringify(record, maxBytes)}\n` try { - await fs.appendFile(filename, line, { encoding: 'utf8' }) + await safeRotatingAppend(filename, line) } catch (e) { logger.warn('Failed to dump Antigravity upstream response', { filename, @@ -121,7 +121,7 @@ async function dumpAntigravityStreamEvent(eventInfo) { const line = `${safeJsonStringify(record, maxBytes)}\n` try { - await fs.appendFile(filename, line, { encoding: 'utf8' }) + await safeRotatingAppend(filename, line) } catch (e) { // 静默处理,避免日志过多 } @@ -155,7 +155,7 @@ async function dumpAntigravityStreamSummary(summaryInfo) { const line = `${safeJsonStringify(record, maxBytes)}\n` try { - await fs.appendFile(filename, line, { encoding: 'utf8' }) + await safeRotatingAppend(filename, line) } catch (e) { logger.warn('Failed to dump Antigravity stream summary', { filename, diff --git a/src/utils/safeRotatingAppend.js b/src/utils/safeRotatingAppend.js new file mode 100644 index 00000000..21afecc6 --- /dev/null +++ b/src/utils/safeRotatingAppend.js @@ -0,0 +1,88 @@ +/** + * ============================================================================ + * 安全 JSONL 追加工具(带文件大小限制与自动轮转) + * ============================================================================ + * + * 用于所有调试 Dump 模块,避免日志文件无限增长导致 I/O 拥塞。 + * + * 策略: + * - 每次写入前检查目标文件大小 + * - 超过阈值时,将现有文件重命名为 .bak(覆盖旧 .bak) + * - 然后写入新文件 + */ + +const fs = require('fs/promises') +const logger = require('./logger') + +// 默认文件大小上限:10MB +const DEFAULT_MAX_FILE_SIZE_BYTES = 10 * 1024 * 1024 +const MAX_FILE_SIZE_ENV = 'DUMP_MAX_FILE_SIZE_BYTES' + +/** + * 获取文件大小上限(可通过环境变量覆盖) + */ +function getMaxFileSize() { + const raw = process.env[MAX_FILE_SIZE_ENV] + if (raw) { + const parsed = Number.parseInt(raw, 10) + if (Number.isFinite(parsed) && parsed > 0) { + return parsed + } + } + return DEFAULT_MAX_FILE_SIZE_BYTES +} + +/** + * 获取文件大小,文件不存在时返回 0 + */ +async function getFileSize(filepath) { + try { + const stat = await fs.stat(filepath) + return stat.size + } catch (e) { + // 文件不存在或无法读取 + return 0 + } +} + +/** + * 安全追加写入 JSONL 文件,支持自动轮转 + * + * @param {string} filepath - 目标文件绝对路径 + * @param {string} line - 要写入的单行(应以 \n 结尾) + * @param {Object} options - 可选配置 + * @param {number} options.maxFileSize - 文件大小上限(字节),默认从环境变量或 10MB + */ +async function safeRotatingAppend(filepath, line, options = {}) { + const maxFileSize = options.maxFileSize || getMaxFileSize() + + const currentSize = await getFileSize(filepath) + + // 如果当前文件已达到或超过阈值,轮转 + if (currentSize >= maxFileSize) { + const backupPath = `${filepath}.bak` + try { + // 先删除旧备份(如果存在) + await fs.unlink(backupPath).catch(() => {}) + // 重命名当前文件为备份 + await fs.rename(filepath, backupPath) + } catch (renameErr) { + // 轮转失败时记录警告日志,继续写入原文件 + logger.warn('⚠️ Log rotation failed, continuing to write to original file', { + filepath, + backupPath, + error: renameErr?.message || String(renameErr) + }) + } + } + + // 追加写入 + await fs.appendFile(filepath, line, { encoding: 'utf8' }) +} + +module.exports = { + safeRotatingAppend, + getMaxFileSize, + MAX_FILE_SIZE_ENV, + DEFAULT_MAX_FILE_SIZE_BYTES +} diff --git a/src/utils/signatureCache.js b/src/utils/signatureCache.js new file mode 100644 index 00000000..7f691b8e --- /dev/null +++ b/src/utils/signatureCache.js @@ -0,0 +1,183 @@ +/** + * Signature Cache - 签名缓存模块 + * + * 用于缓存 Antigravity thinking block 的 thoughtSignature。 + * Claude Code 客户端可能剥离非标准字段,导致多轮对话时签名丢失。 + * 此模块按 sessionId + thinkingText 存储签名,便于后续请求恢复。 + * + * 参考实现: + * - CLIProxyAPI: internal/cache/signature_cache.go + * - antigravity-claude-proxy: src/format/signature-cache.js + */ + +const crypto = require('crypto') +const logger = require('./logger') + +// 配置常量 +const SIGNATURE_CACHE_TTL_MS = 60 * 60 * 1000 // 1 小时(同 CLIProxyAPI) +const MAX_ENTRIES_PER_SESSION = 100 // 每会话最大缓存条目 +const MIN_SIGNATURE_LENGTH = 50 // 最小有效签名长度 +const TEXT_HASH_LENGTH = 16 // 文本哈希长度(SHA256 前 16 位) + +// 主缓存:sessionId -> Map +const signatureCache = new Map() + +/** + * 生成文本内容的稳定哈希值 + * @param {string} text - 待哈希的文本 + * @returns {string} 16 字符的十六进制哈希 + */ +function hashText(text) { + if (!text || typeof text !== 'string') { + return '' + } + const hash = crypto.createHash('sha256').update(text).digest('hex') + return hash.slice(0, TEXT_HASH_LENGTH) +} + +/** + * 获取或创建会话缓存 + * @param {string} sessionId - 会话 ID + * @returns {Map} 会话的签名缓存 Map + */ +function getOrCreateSessionCache(sessionId) { + if (!signatureCache.has(sessionId)) { + signatureCache.set(sessionId, new Map()) + } + return signatureCache.get(sessionId) +} + +/** + * 检查签名是否有效 + * @param {string} signature - 待检查的签名 + * @returns {boolean} 签名是否有效 + */ +function isValidSignature(signature) { + return typeof signature === 'string' && signature.length >= MIN_SIGNATURE_LENGTH +} + +/** + * 缓存 thinking 签名 + * @param {string} sessionId - 会话 ID + * @param {string} thinkingText - thinking 内容文本 + * @param {string} signature - thoughtSignature + */ +function cacheSignature(sessionId, thinkingText, signature) { + if (!sessionId || !thinkingText || !signature) { + return + } + + if (!isValidSignature(signature)) { + return + } + + const sessionCache = getOrCreateSessionCache(sessionId) + const textHash = hashText(thinkingText) + + if (!textHash) { + return + } + + // 淘汰策略:超过限制时删除最老的 1/4 条目 + if (sessionCache.size >= MAX_ENTRIES_PER_SESSION) { + const entries = Array.from(sessionCache.entries()) + entries.sort((a, b) => a[1].timestamp - b[1].timestamp) + const toRemove = Math.max(1, Math.floor(entries.length / 4)) + for (let i = 0; i < toRemove; i++) { + sessionCache.delete(entries[i][0]) + } + logger.debug( + `[SignatureCache] Evicted ${toRemove} old entries for session ${sessionId.slice(0, 8)}...` + ) + } + + sessionCache.set(textHash, { + signature, + timestamp: Date.now() + }) + + logger.debug( + `[SignatureCache] Cached signature for session ${sessionId.slice(0, 8)}..., hash ${textHash}` + ) +} + +/** + * 获取缓存的签名 + * @param {string} sessionId - 会话 ID + * @param {string} thinkingText - thinking 内容文本 + * @returns {string|null} 缓存的签名,未找到或过期则返回 null + */ +function getCachedSignature(sessionId, thinkingText) { + if (!sessionId || !thinkingText) { + return null + } + + const sessionCache = signatureCache.get(sessionId) + if (!sessionCache) { + return null + } + + const textHash = hashText(thinkingText) + if (!textHash) { + return null + } + + const entry = sessionCache.get(textHash) + if (!entry) { + return null + } + + // 检查是否过期 + if (Date.now() - entry.timestamp > SIGNATURE_CACHE_TTL_MS) { + sessionCache.delete(textHash) + logger.debug(`[SignatureCache] Entry expired for hash ${textHash}`) + return null + } + + logger.debug( + `[SignatureCache] Cache hit for session ${sessionId.slice(0, 8)}..., hash ${textHash}` + ) + return entry.signature +} + +/** + * 清除会话缓存 + * @param {string} sessionId - 要清除的会话 ID,为空则清除全部 + */ +function clearSignatureCache(sessionId = null) { + if (sessionId) { + signatureCache.delete(sessionId) + logger.debug(`[SignatureCache] Cleared cache for session ${sessionId.slice(0, 8)}...`) + } else { + signatureCache.clear() + logger.debug('[SignatureCache] Cleared all caches') + } +} + +/** + * 获取缓存统计信息(调试用) + * @returns {Object} { sessionCount, totalEntries } + */ +function getCacheStats() { + let totalEntries = 0 + for (const sessionCache of signatureCache.values()) { + totalEntries += sessionCache.size + } + return { + sessionCount: signatureCache.size, + totalEntries + } +} + +module.exports = { + cacheSignature, + getCachedSignature, + clearSignatureCache, + getCacheStats, + isValidSignature, + // 内部函数导出(用于测试或扩展) + hashText, + MIN_SIGNATURE_LENGTH, + MAX_ENTRIES_PER_SESSION, + SIGNATURE_CACHE_TTL_MS +}