feat: 增强稳定性与Antigravity适配 (僵尸流看门狗/自动重试/签名缓存)

主要变更:
1. **僵尸流看门狗 (Zombie Stream Watchdog)**:
   - 新增 resetActivityTimeout 机制,45秒无数据强制断开连接,防止服务假死。

2. **智能重试机制**:
   - 针对 Antigravity 429 (Resource Exhausted) 错误,自动清理会话并切换账号重试。
   - 涵盖流式 (Stream) 和非流式 (Non-stream) 请求。

3. **Thought Signature 增强**:
   - 新增签名缓存与恢复机制 (signatureCache)。
   - 增加 skip_thought_signature_validator 兜底签名策略。
   - 强制补充 thought: true 标记以满足上游校验。

4. **系统稳定性与调试**:
   - 使用 util.inspect 替代 JSON.stringify 打印错误日志,彻底修复循环引用导致的服务崩溃。
   - 新增针对 Antigravity 参数错误 (400) 的详细请求结构分析日志。
   - 优化日志写入为轮转模式 (safeRotatingAppend)。

5. **其他优化**:
   - antigravityClient 数据处理安全增强 (safeDataToString)。
This commit is contained in:
52227
2026-01-05 09:37:39 +08:00
parent 41999f56b4
commit 1b834ffcdb
10 changed files with 692 additions and 51 deletions

View File

@@ -28,6 +28,7 @@
* - 缺失 tool_result 自动补全:避免 tool_use concurrency 错误
*/
const util = require('util')
const crypto = require('crypto')
const fs = require('fs')
const path = require('path')
@@ -36,6 +37,7 @@ const { getProjectRoot } = require('../utils/projectPaths')
const geminiAccountService = require('./geminiAccountService')
const unifiedGeminiScheduler = require('./unifiedGeminiScheduler')
const sessionHelper = require('../utils/sessionHelper')
const signatureCache = require('../utils/signatureCache')
const apiKeyService = require('./apiKeyService')
const { updateRateLimitCounters } = require('../utils/rateLimitHelper')
const { parseSSELine } = require('../utils/sseParser')
@@ -54,6 +56,9 @@ const {
// 常量定义
// ============================================================================
// 默认签名
const THOUGHT_SIGNATURE_FALLBACK = 'skip_thought_signature_validator'
// 支持的后端类型
const SUPPORTED_VENDORS = new Set(['gemini-cli', 'antigravity'])
// 需要跳过的系统提醒前缀Claude 内部消息,不应转发给上游)
@@ -363,6 +368,84 @@ function sanitizeThoughtSignatureForAntigravity(signature) {
return compacted
}
/**
* 检测是否是 Antigravity 的 INVALID_ARGUMENT (400) 错误
* 用于在日志中特殊标记这类错误,方便调试
*
* @param {Object} sanitized - sanitizeUpstreamError 处理后的错误对象
* @returns {boolean} 是否是参数无效错误
*/
function isInvalidAntigravityArgumentError(sanitized) {
if (!sanitized || typeof sanitized !== 'object') {
return false
}
const upstreamType = String(sanitized.upstreamType || '').toUpperCase()
if (upstreamType === 'INVALID_ARGUMENT') {
return true
}
const message = String(sanitized.upstreamMessage || sanitized.message || '')
return /invalid argument/i.test(message)
}
/**
* 汇总 Antigravity 请求信息用于调试
* 当发生 400 错误时,输出请求的关键统计信息,帮助定位问题
*
* @param {Object} requestData - 发送给 Antigravity 的请求数据
* @returns {Object} 请求摘要信息
*/
function summarizeAntigravityRequestForDebug(requestData) {
const request = requestData?.request || {}
const contents = Array.isArray(request.contents) ? request.contents : []
const partStats = { text: 0, thought: 0, functionCall: 0, functionResponse: 0, other: 0 }
let functionResponseIds = 0
let fallbackSignatureCount = 0
for (const message of contents) {
const parts = Array.isArray(message?.parts) ? message.parts : []
for (const part of parts) {
if (!part || typeof part !== 'object') {
continue
}
if (part.thoughtSignature === THOUGHT_SIGNATURE_FALLBACK) {
fallbackSignatureCount += 1
}
if (part.thought) {
partStats.thought += 1
continue
}
if (part.functionCall) {
partStats.functionCall += 1
continue
}
if (part.functionResponse) {
partStats.functionResponse += 1
if (part.functionResponse.id) {
functionResponseIds += 1
}
continue
}
if (typeof part.text === 'string') {
partStats.text += 1
continue
}
partStats.other += 1
}
}
return {
model: requestData?.model,
toolCount: Array.isArray(request.tools) ? request.tools.length : 0,
toolConfigMode: request.toolConfig?.functionCallingConfig?.mode,
thinkingConfig: request.generationConfig?.thinkingConfig,
maxOutputTokens: request.generationConfig?.maxOutputTokens,
contentsCount: contents.length,
partStats,
functionResponseIds,
fallbackSignatureCount
}
}
/**
* 清洗工具结果的 content blocks
* - 移除 base64 图片(避免体积过大)
@@ -933,7 +1016,7 @@ function convertAnthropicToolChoiceToGeminiToolConfig(toolChoice) {
function convertAnthropicMessagesToGeminiContents(
messages,
toolUseIdToName,
{ vendor = null, stripThinking = false } = {}
{ vendor = null, stripThinking = false, sessionId = null } = {}
) {
const contents = []
for (const message of messages || []) {
@@ -972,7 +1055,15 @@ function convertAnthropicMessagesToGeminiContents(
const thinkingText = extractAnthropicText(part.thinking || part.text || '')
if (vendor === 'antigravity') {
const hasThinkingText = thinkingText && !shouldSkipText(thinkingText)
const signature = sanitizeThoughtSignatureForAntigravity(part.signature)
// 先尝试使用请求中的签名,如果没有则尝试从缓存恢复
let signature = sanitizeThoughtSignatureForAntigravity(part.signature)
if (!signature && sessionId && hasThinkingText) {
const cachedSig = signatureCache.getCachedSignature(sessionId, thinkingText)
if (cachedSig) {
signature = cachedSig
logger.debug('[SignatureCache] Restored signature from cache for thinking block')
}
}
const hasSignature = Boolean(signature)
// Claude Code 有时会发送空的 thinking block无 thinking / 无 signature
@@ -1023,8 +1114,19 @@ function convertAnthropicMessagesToGeminiContents(
// Antigravity 对历史工具调用的 functionCall 会校验 thoughtSignature
// Claude Code 侧的签名存放在 thinking blockpart.signature这里需要回填到 functionCall part 上。
if (vendor === 'antigravity' && lastAntigravityThoughtSignature) {
parts.push({ thoughtSignature: lastAntigravityThoughtSignature, functionCall })
// [大东的绝杀补丁] 再次尝试!
if (vendor === 'antigravity') {
// 如果没有真签名,就用“免检金牌”
const effectiveSignature =
lastAntigravityThoughtSignature || THOUGHT_SIGNATURE_FALLBACK
// 必须把这个塞进去
// Antigravity 要求:每个包含 thoughtSignature 的 part 都必须有 thought: true
parts.push({
thought: true,
thoughtSignature: effectiveSignature,
functionCall
})
} else {
parts.push({ functionCall })
}
@@ -1185,7 +1287,11 @@ function canEnableAntigravityThinking(messages) {
* @param {Object} options - 选项,包含 vendor
* @returns {Object} { model, request } Gemini 请求对象
*/
function buildGeminiRequestFromAnthropic(body, baseModel, { vendor = null } = {}) {
function buildGeminiRequestFromAnthropic(
body,
baseModel,
{ vendor = null, sessionId = null } = {}
) {
const normalizedMessages = normalizeAnthropicMessages(body.messages || [], { vendor })
const toolUseIdToName = buildToolUseIdToNameMap(normalizedMessages || [])
@@ -1204,7 +1310,8 @@ function buildGeminiRequestFromAnthropic(body, baseModel, { vendor = null } = {}
{
vendor,
// 当 Antigravity 无法启用 thinking 时,剥离所有 thinking blocks
stripThinking: vendor === 'antigravity' && !canEnableThinking
stripThinking: vendor === 'antigravity' && !canEnableThinking,
sessionId
}
)
const systemParts = buildSystemParts(body.system)
@@ -1782,7 +1889,8 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel })
.json(buildAnthropicError(error.message || 'No available Gemini accounts'))
}
const { accountId, accountType } = accountSelection
let { accountId } = accountSelection
const { accountType } = accountSelection
if (accountType !== 'gemini') {
return res
.status(400)
@@ -1831,7 +1939,10 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel })
})
}
let requestData = buildGeminiRequestFromAnthropic(req.body, effectiveModel, { vendor })
let requestData = buildGeminiRequestFromAnthropic(req.body, effectiveModel, {
vendor,
sessionId: sessionHash
})
// Antigravity 上游对 function calling 的启用/校验更严格:参考实现普遍使用 VALIDATED。
// 这里仅在 tools 存在且未显式禁用tool_choice=none时应用避免破坏原始语义。
@@ -1897,6 +2008,67 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel })
accountId
})
rawResponse = await attemptRequest(stripToolsFromRequest(requestData))
} else if (
// [429 账户切换] 检测到 Antigravity 配额耗尽错误时,尝试切换账户重试
vendor === 'antigravity' &&
sanitized.statusCode === 429 &&
(sanitized.message?.toLowerCase()?.includes('exhausted') ||
sanitized.upstreamMessage?.toLowerCase()?.includes('exhausted') ||
sanitized.message?.toLowerCase()?.includes('capacity'))
) {
logger.warn(
'⚠️ Antigravity 429 quota exhausted (non-stream), switching account and retrying',
{
vendor,
accountId,
model: effectiveModel
}
)
// 删除当前会话映射,让调度器选择其他账户
if (sessionHash) {
await unifiedGeminiScheduler._deleteSessionMapping(sessionHash)
}
// 重新选择账户
try {
const newAccountSelection = await unifiedGeminiScheduler.selectAccountForApiKey(
req.apiKey,
sessionHash,
effectiveModel,
{ oauthProvider: vendor }
)
const newAccountId = newAccountSelection.accountId
const newClient = await geminiAccountService.getGeminiClient(newAccountId)
if (!newClient) {
throw new Error('Failed to get new Gemini client for retry')
}
logger.info(
`🔄 Retrying non-stream with new account: ${newAccountId} (was: ${accountId})`
)
// 用新账户的 client 重试
rawResponse =
vendor === 'antigravity'
? await geminiAccountService.generateContentAntigravity(
newClient,
requestData,
null,
projectId,
upstreamSessionId,
proxyConfig
)
: await geminiAccountService.generateContent(
newClient,
requestData,
null,
projectId,
upstreamSessionId,
proxyConfig
)
// 更新 accountId 以便后续使用记录
accountId = newAccountId
} catch (retryError) {
logger.error('❌ Failed to retry non-stream with new account:', retryError)
throw error // 抛出原始错误
}
} else {
throw error
}
@@ -2035,6 +2207,64 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel })
accountId
})
streamResponse = await startStream(stripToolsFromRequest(requestData))
} else if (
// [429 账户切换] 检测到 Antigravity 配额耗尽错误时,尝试切换账户重试
vendor === 'antigravity' &&
sanitized.statusCode === 429 &&
(sanitized.message?.toLowerCase()?.includes('exhausted') ||
sanitized.upstreamMessage?.toLowerCase()?.includes('exhausted') ||
sanitized.message?.toLowerCase()?.includes('capacity'))
) {
logger.warn('⚠️ Antigravity 429 quota exhausted, switching account and retrying', {
vendor,
accountId,
model: effectiveModel
})
// 删除当前会话映射,让调度器选择其他账户
if (sessionHash) {
await unifiedGeminiScheduler._deleteSessionMapping(sessionHash)
}
// 重新选择账户
try {
const newAccountSelection = await unifiedGeminiScheduler.selectAccountForApiKey(
req.apiKey,
sessionHash,
effectiveModel,
{ oauthProvider: vendor }
)
const newAccountId = newAccountSelection.accountId
const newClient = await geminiAccountService.getGeminiClient(newAccountId)
if (!newClient) {
throw new Error('Failed to get new Gemini client for retry')
}
logger.info(`🔄 Retrying with new account: ${newAccountId} (was: ${accountId})`)
// 用新账户的 client 重试
streamResponse =
vendor === 'antigravity'
? await geminiAccountService.generateContentStreamAntigravity(
newClient,
requestData,
null,
projectId,
upstreamSessionId,
abortController.signal,
proxyConfig
)
: await geminiAccountService.generateContentStream(
newClient,
requestData,
null,
projectId,
upstreamSessionId,
abortController.signal,
proxyConfig
)
// 更新 accountId 以便后续使用记录
accountId = newAccountId
} catch (retryError) {
logger.error('❌ Failed to retry with new account:', retryError)
throw error // 抛出原始错误
}
} else {
throw error
}
@@ -2065,10 +2295,52 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel })
}
})
const isAntigravityVendor = vendor === 'antigravity'
const wantsThinkingBlockFirst =
vendor === 'antigravity' &&
isAntigravityVendor &&
requestData?.request?.generationConfig?.thinkingConfig?.include_thoughts === true
// ========================================================================
// [大东的 2.0 补丁 - 修复版] 活跃度看门狗 (Watchdog)
// ========================================================================
let activityTimeout = null
const STREAM_ACTIVITY_TIMEOUT_MS = 45000 // 45秒无数据视为卡死
const resetActivityTimeout = () => {
if (activityTimeout) {
clearTimeout(activityTimeout)
}
activityTimeout = setTimeout(() => {
if (finished) {
return
}
// 🛑【关键修改】先锁门!防止 abort() 触发的 onError 再次写入 res
finished = true
logger.warn('⚠️ Upstream stream zombie detected (no data for 45s). Forcing termination.', {
requestId: req.requestId
})
if (!abortController.signal.aborted) {
abortController.abort()
}
writeAnthropicSseEvent(res, 'error', {
type: 'error',
error: {
type: 'overloaded_error',
message: 'Upstream stream timed out (zombie connection). Please try again.'
}
})
res.end()
}, STREAM_ACTIVITY_TIMEOUT_MS)
}
// 🔥【这里!】一定要加这句来启动它!
resetActivityTimeout()
// ========================================================================
let buffer = ''
let emittedText = ''
let emittedThinking = ''
@@ -2128,7 +2400,15 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel })
}
}
const canStartThinkingBlock = () => {
const canStartThinkingBlock = (_hasSignature = false) => {
// Antigravity 特殊处理:某些情况下不应启动 thinking block
if (isAntigravityVendor) {
// 如果 wantsThinkingBlockFirst 且已发送过工具调用,不应再启动 thinking
if (wantsThinkingBlockFirst && emittedAnyToolUse) {
return false
}
// [移除规则2] 签名可能在后续 chunk 中到达,不应提前阻止 thinking 启动
}
if (currentIndex < 0) {
return true
}
@@ -2297,9 +2577,7 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel })
effectiveModel,
responseModel,
stop_reason: 'error',
tool_use_names: Array.from(emittedToolCallKeys)
.map((key) => key.split(':')[0])
.filter(Boolean),
tool_use_names: Array.from(emittedToolUseNames).filter(Boolean),
text_preview: emittedText ? emittedText.slice(0, 800) : '',
usage: { input_tokens: 0, output_tokens: 0 }
})
@@ -2354,9 +2632,7 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel })
stop_reason: emittedAnyToolUse
? 'tool_use'
: mapGeminiFinishReasonToAnthropicStopReason(finishReason),
tool_use_names: Array.from(emittedToolCallKeys)
.map((key) => key.split(':')[0])
.filter(Boolean),
tool_use_names: Array.from(emittedToolUseNames).filter(Boolean),
text_preview: emittedText ? emittedText.slice(0, 800) : '',
usage: { input_tokens: inputTokens, output_tokens: outputTokens }
})
@@ -2396,6 +2672,8 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel })
}
streamResponse.on('data', (chunk) => {
resetActivityTimeout() // <--- 【新增】收到数据了,重置倒计时!
if (finished) {
return
}
@@ -2442,7 +2720,11 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel })
}
const parts = extractGeminiParts(payload)
const thoughtSignature = extractGeminiThoughtSignature(payload)
const rawThoughtSignature = extractGeminiThoughtSignature(payload)
// Antigravity 专用净化:确保签名格式符合 API 要求
const thoughtSignature = isAntigravityVendor
? sanitizeThoughtSignatureForAntigravity(rawThoughtSignature)
: rawThoughtSignature
const fullThoughtForToolOrdering = extractGeminiThoughtText(payload)
if (wantsThinkingBlockFirst) {
@@ -2531,7 +2813,7 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel })
}
}
if (thoughtSignature && canStartThinkingBlock()) {
if (thoughtSignature && canStartThinkingBlock(true)) {
let delta = ''
if (thoughtSignature.startsWith(emittedThoughtSignature)) {
delta = thoughtSignature.slice(emittedThoughtSignature.length)
@@ -2550,7 +2832,10 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel })
}
const fullThought = extractGeminiThoughtText(payload)
if (fullThought && canStartThinkingBlock()) {
if (
fullThought &&
canStartThinkingBlock(Boolean(thoughtSignature || emittedThoughtSignature))
) {
let delta = ''
if (fullThought.startsWith(emittedThinking)) {
delta = fullThought.slice(emittedThinking.length)
@@ -2565,6 +2850,10 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel })
index: currentIndex,
delta: { type: 'thinking_delta', thinking: delta }
})
// [签名缓存] 当 thinking 内容和签名都有时,缓存供后续请求使用
if (isAntigravityVendor && sessionHash && emittedThoughtSignature) {
signatureCache.cacheSignature(sessionHash, fullThought, emittedThoughtSignature)
}
}
}
@@ -2590,10 +2879,18 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel })
})
streamResponse.on('end', () => {
if (activityTimeout) {
clearTimeout(activityTimeout)
} // <--- 【新增】正常结束,取消报警
finalize().catch((e) => logger.error('Failed to finalize Anthropic SSE response:', e))
})
streamResponse.on('error', (error) => {
if (activityTimeout) {
clearTimeout(activityTimeout)
} // <--- 【新增】报错了,取消报警
if (finished) {
return
}
@@ -2609,22 +2906,54 @@ async function handleAnthropicMessagesToGemini(req, res, { vendor, baseModel })
return undefined
} catch (error) {
const sanitized = sanitizeUpstreamError(error)
logger.error('Failed to start Gemini stream (via /v1/messages):', sanitized)
// ============================================================
// [大东修复 3.0] 彻底防止 JSON 循环引用导致服务崩溃
// ============================================================
// 上游尚未建立 SSE未写入任何事件优先返回真实 HTTP 错误码,避免 200 + SSE error 的混淆。
// 1. 使用 util.inspect 安全地将错误对象转为字符串,不使用 JSON.stringify
const safeErrorDetails = util.inspect(error, {
showHidden: false,
depth: 2,
colors: false,
breakLength: Infinity
})
// 2. 打印安全日志,绝对不会崩
logger.error(`❌ [Critical] Failed to start Gemini stream. 错误详情:\n${safeErrorDetails}`)
const sanitized = sanitizeUpstreamError(error)
// 3. 特殊处理 Antigravity 的参数错误 (400),输出详细请求信息便于调试
if (
vendor === 'antigravity' &&
effectiveModel.includes('claude') &&
isInvalidAntigravityArgumentError(sanitized)
) {
logger.warn('⚠️ Antigravity Claude invalid argument detected', {
requestId: req.requestId,
...summarizeAntigravityRequestForDebug(requestData),
statusCode: sanitized.statusCode,
upstreamType: sanitized.upstreamType,
upstreamMessage: sanitized.upstreamMessage || sanitized.message
})
}
// 4. 确保返回 JSON 响应给客户端 (让客户端知道出错了并重试)
if (!res.headersSent) {
// 记录非流式响应日志
dumpAnthropicNonStreamResponse(
req,
sanitized.statusCode || 502,
buildAnthropicError(sanitized.upstreamMessage || sanitized.message),
{ vendor, accountId, effectiveModel, forcedVendor: vendor, upstreamError: sanitized }
)
return res
.status(sanitized.statusCode || 502)
.json(buildAnthropicError(sanitized.upstreamMessage || sanitized.message))
}
// 5. 如果头已经发了,走 SSE 发送错误
writeAnthropicSseEvent(
res,
'error',
@@ -2708,7 +3037,8 @@ async function handleAnthropicCountTokensToGemini(req, res, { vendor }) {
toolUseIdToName,
{
vendor,
stripThinking: vendor === 'antigravity' && !canEnableThinking
stripThinking: vendor === 'antigravity' && !canEnableThinking,
sessionId: sessionHash
}
)

View File

@@ -434,7 +434,37 @@ async function request({
const status = error?.response?.status
if (status === 429 && !retriedAfterDelay && !signal?.aborted) {
const data = error?.response?.data
const msg = typeof data === 'string' ? data : JSON.stringify(data || '')
// 安全地将 data 转为字符串,避免 stream 对象导致循环引用崩溃
const safeDataToString = (value) => {
if (typeof value === 'string') {
return value
}
if (value === null || value === undefined) {
return ''
}
// stream 对象存在循环引用,不能 JSON.stringify
if (typeof value === 'object' && typeof value.pipe === 'function') {
return ''
}
if (Buffer.isBuffer(value)) {
try {
return value.toString('utf8')
} catch (_) {
return ''
}
}
if (typeof value === 'object') {
try {
return JSON.stringify(value)
} catch (_) {
return ''
}
}
return String(value)
}
const msg = safeDataToString(data)
if (
msg.toLowerCase().includes('resource_exhausted') ||
msg.toLowerCase().includes('no capacity')

View File

@@ -1,7 +1,7 @@
const fs = require('fs/promises')
const path = require('path')
const logger = require('./logger')
const { getProjectRoot } = require('./projectPaths')
const { safeRotatingAppend } = require('./safeRotatingAppend')
const REQUEST_DUMP_ENV = 'ANTHROPIC_DEBUG_REQUEST_DUMP'
const REQUEST_DUMP_MAX_BYTES_ENV = 'ANTHROPIC_DEBUG_REQUEST_DUMP_MAX_BYTES'
@@ -108,7 +108,7 @@ async function dumpAnthropicMessagesRequest(req, meta = {}) {
const line = `${safeJsonStringify(record, maxBytes)}\n`
try {
await fs.appendFile(filename, line, { encoding: 'utf8' })
await safeRotatingAppend(filename, line)
} catch (e) {
logger.warn('Failed to dump Anthropic request', {
filename,

View File

@@ -1,7 +1,7 @@
const fs = require('fs/promises')
const path = require('path')
const logger = require('./logger')
const { getProjectRoot } = require('./projectPaths')
const { safeRotatingAppend } = require('./safeRotatingAppend')
const RESPONSE_DUMP_ENV = 'ANTHROPIC_DEBUG_RESPONSE_DUMP'
const RESPONSE_DUMP_MAX_BYTES_ENV = 'ANTHROPIC_DEBUG_RESPONSE_DUMP_MAX_BYTES'
@@ -89,7 +89,7 @@ async function dumpAnthropicResponse(req, responseInfo, meta = {}) {
const line = `${safeJsonStringify(record, maxBytes)}\n`
try {
await fs.appendFile(filename, line, { encoding: 'utf8' })
await safeRotatingAppend(filename, line)
} catch (e) {
logger.warn('Failed to dump Anthropic response', {
filename,

View File

@@ -1,7 +1,7 @@
const fs = require('fs/promises')
const path = require('path')
const logger = require('./logger')
const { getProjectRoot } = require('./projectPaths')
const { safeRotatingAppend } = require('./safeRotatingAppend')
const UPSTREAM_REQUEST_DUMP_ENV = 'ANTIGRAVITY_DEBUG_UPSTREAM_REQUEST_DUMP'
const UPSTREAM_REQUEST_DUMP_MAX_BYTES_ENV = 'ANTIGRAVITY_DEBUG_UPSTREAM_REQUEST_DUMP_MAX_BYTES'
@@ -103,7 +103,7 @@ async function dumpAntigravityUpstreamRequest(requestInfo) {
const line = `${safeJsonStringify(record, maxBytes)}\n`
try {
await fs.appendFile(filename, line, { encoding: 'utf8' })
await safeRotatingAppend(filename, line)
} catch (e) {
logger.warn('Failed to dump Antigravity upstream request', {
filename,

View File

@@ -1,7 +1,7 @@
const fs = require('fs/promises')
const path = require('path')
const logger = require('./logger')
const { getProjectRoot } = require('./projectPaths')
const { safeRotatingAppend } = require('./safeRotatingAppend')
const UPSTREAM_RESPONSE_DUMP_ENV = 'ANTIGRAVITY_DEBUG_UPSTREAM_RESPONSE_DUMP'
const UPSTREAM_RESPONSE_DUMP_MAX_BYTES_ENV = 'ANTIGRAVITY_DEBUG_UPSTREAM_RESPONSE_DUMP_MAX_BYTES'
@@ -89,7 +89,7 @@ async function dumpAntigravityUpstreamResponse(responseInfo) {
const line = `${safeJsonStringify(record, maxBytes)}\n`
try {
await fs.appendFile(filename, line, { encoding: 'utf8' })
await safeRotatingAppend(filename, line)
} catch (e) {
logger.warn('Failed to dump Antigravity upstream response', {
filename,
@@ -121,7 +121,7 @@ async function dumpAntigravityStreamEvent(eventInfo) {
const line = `${safeJsonStringify(record, maxBytes)}\n`
try {
await fs.appendFile(filename, line, { encoding: 'utf8' })
await safeRotatingAppend(filename, line)
} catch (e) {
// 静默处理,避免日志过多
}
@@ -155,7 +155,7 @@ async function dumpAntigravityStreamSummary(summaryInfo) {
const line = `${safeJsonStringify(record, maxBytes)}\n`
try {
await fs.appendFile(filename, line, { encoding: 'utf8' })
await safeRotatingAppend(filename, line)
} catch (e) {
logger.warn('Failed to dump Antigravity stream summary', {
filename,

View File

@@ -0,0 +1,88 @@
/**
* ============================================================================
* 安全 JSONL 追加工具(带文件大小限制与自动轮转)
* ============================================================================
*
* 用于所有调试 Dump 模块,避免日志文件无限增长导致 I/O 拥塞。
*
* 策略:
* - 每次写入前检查目标文件大小
* - 超过阈值时,将现有文件重命名为 .bak覆盖旧 .bak
* - 然后写入新文件
*/
const fs = require('fs/promises')
const logger = require('./logger')
// 默认文件大小上限10MB
const DEFAULT_MAX_FILE_SIZE_BYTES = 10 * 1024 * 1024
const MAX_FILE_SIZE_ENV = 'DUMP_MAX_FILE_SIZE_BYTES'
/**
* 获取文件大小上限(可通过环境变量覆盖)
*/
function getMaxFileSize() {
const raw = process.env[MAX_FILE_SIZE_ENV]
if (raw) {
const parsed = Number.parseInt(raw, 10)
if (Number.isFinite(parsed) && parsed > 0) {
return parsed
}
}
return DEFAULT_MAX_FILE_SIZE_BYTES
}
/**
* 获取文件大小,文件不存在时返回 0
*/
async function getFileSize(filepath) {
try {
const stat = await fs.stat(filepath)
return stat.size
} catch (e) {
// 文件不存在或无法读取
return 0
}
}
/**
* 安全追加写入 JSONL 文件,支持自动轮转
*
* @param {string} filepath - 目标文件绝对路径
* @param {string} line - 要写入的单行(应以 \n 结尾)
* @param {Object} options - 可选配置
* @param {number} options.maxFileSize - 文件大小上限(字节),默认从环境变量或 10MB
*/
async function safeRotatingAppend(filepath, line, options = {}) {
const maxFileSize = options.maxFileSize || getMaxFileSize()
const currentSize = await getFileSize(filepath)
// 如果当前文件已达到或超过阈值,轮转
if (currentSize >= maxFileSize) {
const backupPath = `${filepath}.bak`
try {
// 先删除旧备份(如果存在)
await fs.unlink(backupPath).catch(() => {})
// 重命名当前文件为备份
await fs.rename(filepath, backupPath)
} catch (renameErr) {
// 轮转失败时记录警告日志,继续写入原文件
logger.warn('⚠️ Log rotation failed, continuing to write to original file', {
filepath,
backupPath,
error: renameErr?.message || String(renameErr)
})
}
}
// 追加写入
await fs.appendFile(filepath, line, { encoding: 'utf8' })
}
module.exports = {
safeRotatingAppend,
getMaxFileSize,
MAX_FILE_SIZE_ENV,
DEFAULT_MAX_FILE_SIZE_BYTES
}

183
src/utils/signatureCache.js Normal file
View File

@@ -0,0 +1,183 @@
/**
* Signature Cache - 签名缓存模块
*
* 用于缓存 Antigravity thinking block 的 thoughtSignature。
* Claude Code 客户端可能剥离非标准字段,导致多轮对话时签名丢失。
* 此模块按 sessionId + thinkingText 存储签名,便于后续请求恢复。
*
* 参考实现:
* - CLIProxyAPI: internal/cache/signature_cache.go
* - antigravity-claude-proxy: src/format/signature-cache.js
*/
const crypto = require('crypto')
const logger = require('./logger')
// 配置常量
const SIGNATURE_CACHE_TTL_MS = 60 * 60 * 1000 // 1 小时(同 CLIProxyAPI
const MAX_ENTRIES_PER_SESSION = 100 // 每会话最大缓存条目
const MIN_SIGNATURE_LENGTH = 50 // 最小有效签名长度
const TEXT_HASH_LENGTH = 16 // 文本哈希长度SHA256 前 16 位)
// 主缓存sessionId -> Map<textHash, { signature, timestamp }>
const signatureCache = new Map()
/**
* 生成文本内容的稳定哈希值
* @param {string} text - 待哈希的文本
* @returns {string} 16 字符的十六进制哈希
*/
function hashText(text) {
if (!text || typeof text !== 'string') {
return ''
}
const hash = crypto.createHash('sha256').update(text).digest('hex')
return hash.slice(0, TEXT_HASH_LENGTH)
}
/**
* 获取或创建会话缓存
* @param {string} sessionId - 会话 ID
* @returns {Map} 会话的签名缓存 Map
*/
function getOrCreateSessionCache(sessionId) {
if (!signatureCache.has(sessionId)) {
signatureCache.set(sessionId, new Map())
}
return signatureCache.get(sessionId)
}
/**
* 检查签名是否有效
* @param {string} signature - 待检查的签名
* @returns {boolean} 签名是否有效
*/
function isValidSignature(signature) {
return typeof signature === 'string' && signature.length >= MIN_SIGNATURE_LENGTH
}
/**
* 缓存 thinking 签名
* @param {string} sessionId - 会话 ID
* @param {string} thinkingText - thinking 内容文本
* @param {string} signature - thoughtSignature
*/
function cacheSignature(sessionId, thinkingText, signature) {
if (!sessionId || !thinkingText || !signature) {
return
}
if (!isValidSignature(signature)) {
return
}
const sessionCache = getOrCreateSessionCache(sessionId)
const textHash = hashText(thinkingText)
if (!textHash) {
return
}
// 淘汰策略:超过限制时删除最老的 1/4 条目
if (sessionCache.size >= MAX_ENTRIES_PER_SESSION) {
const entries = Array.from(sessionCache.entries())
entries.sort((a, b) => a[1].timestamp - b[1].timestamp)
const toRemove = Math.max(1, Math.floor(entries.length / 4))
for (let i = 0; i < toRemove; i++) {
sessionCache.delete(entries[i][0])
}
logger.debug(
`[SignatureCache] Evicted ${toRemove} old entries for session ${sessionId.slice(0, 8)}...`
)
}
sessionCache.set(textHash, {
signature,
timestamp: Date.now()
})
logger.debug(
`[SignatureCache] Cached signature for session ${sessionId.slice(0, 8)}..., hash ${textHash}`
)
}
/**
* 获取缓存的签名
* @param {string} sessionId - 会话 ID
* @param {string} thinkingText - thinking 内容文本
* @returns {string|null} 缓存的签名,未找到或过期则返回 null
*/
function getCachedSignature(sessionId, thinkingText) {
if (!sessionId || !thinkingText) {
return null
}
const sessionCache = signatureCache.get(sessionId)
if (!sessionCache) {
return null
}
const textHash = hashText(thinkingText)
if (!textHash) {
return null
}
const entry = sessionCache.get(textHash)
if (!entry) {
return null
}
// 检查是否过期
if (Date.now() - entry.timestamp > SIGNATURE_CACHE_TTL_MS) {
sessionCache.delete(textHash)
logger.debug(`[SignatureCache] Entry expired for hash ${textHash}`)
return null
}
logger.debug(
`[SignatureCache] Cache hit for session ${sessionId.slice(0, 8)}..., hash ${textHash}`
)
return entry.signature
}
/**
* 清除会话缓存
* @param {string} sessionId - 要清除的会话 ID为空则清除全部
*/
function clearSignatureCache(sessionId = null) {
if (sessionId) {
signatureCache.delete(sessionId)
logger.debug(`[SignatureCache] Cleared cache for session ${sessionId.slice(0, 8)}...`)
} else {
signatureCache.clear()
logger.debug('[SignatureCache] Cleared all caches')
}
}
/**
* 获取缓存统计信息(调试用)
* @returns {Object} { sessionCount, totalEntries }
*/
function getCacheStats() {
let totalEntries = 0
for (const sessionCache of signatureCache.values()) {
totalEntries += sessionCache.size
}
return {
sessionCount: signatureCache.size,
totalEntries
}
}
module.exports = {
cacheSignature,
getCachedSignature,
clearSignatureCache,
getCacheStats,
isValidSignature,
// 内部函数导出(用于测试或扩展)
hashText,
MIN_SIGNATURE_LENGTH,
MAX_ENTRIES_PER_SESSION,
SIGNATURE_CACHE_TTL_MS
}