import crypto from "node:crypto"; import path from "node:path"; import type { AgentMessage, StreamFn } from "@mariozechner/pi-agent-core"; import type { Api, Model } from "@mariozechner/pi-ai"; import { resolveStateDir } from "../config/paths.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { resolveUserPath } from "../utils.js"; import { parseBooleanValue } from "../utils/boolean.js"; import { safeJsonStringify } from "../utils/safe-json.js"; import { redactImageDataForDiagnostics } from "./payload-redaction.js"; import { getQueuedFileWriter, type QueuedFileWriter } from "./queued-file-writer.js"; type PayloadLogStage = "request" | "usage"; type PayloadLogEvent = { ts: string; stage: PayloadLogStage; runId?: string; sessionId?: string; sessionKey?: string; provider?: string; modelId?: string; modelApi?: string | null; workspaceDir?: string; payload?: unknown; usage?: Record; error?: string; payloadDigest?: string; }; type PayloadLogConfig = { enabled: boolean; filePath: string; }; type PayloadLogWriter = QueuedFileWriter; const writers = new Map(); const log = createSubsystemLogger("agent/anthropic-payload"); function resolvePayloadLogConfig(env: NodeJS.ProcessEnv): PayloadLogConfig { const enabled = parseBooleanValue(env.OPENCLAW_ANTHROPIC_PAYLOAD_LOG) ?? false; const fileOverride = env.OPENCLAW_ANTHROPIC_PAYLOAD_LOG_FILE?.trim(); const filePath = fileOverride ? resolveUserPath(fileOverride) : path.join(resolveStateDir(env), "logs", "anthropic-payload.jsonl"); return { enabled, filePath }; } function getWriter(filePath: string): PayloadLogWriter { return getQueuedFileWriter(writers, filePath); } function formatError(error: unknown): string | undefined { if (error instanceof Error) { return error.message; } if (typeof error === "string") { return error; } if (typeof error === "number" || typeof error === "boolean" || typeof error === "bigint") { return String(error); } if (error && typeof error === "object") { return safeJsonStringify(error) ?? "unknown error"; } return undefined; } function digest(value: unknown): string | undefined { const serialized = safeJsonStringify(value); if (!serialized) { return undefined; } return crypto.createHash("sha256").update(serialized).digest("hex"); } function isAnthropicModel(model: Model | undefined | null): boolean { return (model as { api?: unknown })?.api === "anthropic-messages"; } function findLastAssistantUsage(messages: AgentMessage[]): Record | null { for (let i = messages.length - 1; i >= 0; i -= 1) { const msg = messages[i] as { role?: unknown; usage?: unknown }; if (msg?.role === "assistant" && msg.usage && typeof msg.usage === "object") { return msg.usage as Record; } } return null; } export type AnthropicPayloadLogger = { enabled: true; wrapStreamFn: (streamFn: StreamFn) => StreamFn; recordUsage: (messages: AgentMessage[], error?: unknown) => void; }; export function createAnthropicPayloadLogger(params: { env?: NodeJS.ProcessEnv; runId?: string; sessionId?: string; sessionKey?: string; provider?: string; modelId?: string; modelApi?: string | null; workspaceDir?: string; writer?: PayloadLogWriter; }): AnthropicPayloadLogger | null { const env = params.env ?? process.env; const cfg = resolvePayloadLogConfig(env); if (!cfg.enabled) { return null; } const writer = params.writer ?? getWriter(cfg.filePath); const base: Omit = { runId: params.runId, sessionId: params.sessionId, sessionKey: params.sessionKey, provider: params.provider, modelId: params.modelId, modelApi: params.modelApi, workspaceDir: params.workspaceDir, }; const record = (event: PayloadLogEvent) => { const line = safeJsonStringify(event); if (!line) { return; } writer.write(`${line}\n`); }; const wrapStreamFn: AnthropicPayloadLogger["wrapStreamFn"] = (streamFn) => { const wrapped: StreamFn = (model, context, options) => { if (!isAnthropicModel(model)) { return streamFn(model, context, options); } const nextOnPayload = (payload: unknown) => { const redactedPayload = redactImageDataForDiagnostics(payload); record({ ...base, ts: new Date().toISOString(), stage: "request", payload: redactedPayload, payloadDigest: digest(redactedPayload), }); return options?.onPayload?.(payload, model); }; return streamFn(model, context, { ...options, onPayload: nextOnPayload, }); }; return wrapped; }; const recordUsage: AnthropicPayloadLogger["recordUsage"] = (messages, error) => { const usage = findLastAssistantUsage(messages); const errorMessage = formatError(error); if (!usage) { if (errorMessage) { record({ ...base, ts: new Date().toISOString(), stage: "usage", error: errorMessage, }); } return; } record({ ...base, ts: new Date().toISOString(), stage: "usage", usage, error: errorMessage, }); log.info("anthropic usage", { runId: params.runId, sessionId: params.sessionId, usage, }); }; log.info("anthropic payload logger enabled", { filePath: writer.filePath }); return { enabled: true, wrapStreamFn, recordUsage }; }