mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-18 11:27:26 +00:00
refactor(agents): share queued JSONL file writer
This commit is contained in:
@@ -1,5 +1,4 @@
|
||||
import crypto from "node:crypto";
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import type { AgentMessage, StreamFn } from "@mariozechner/pi-agent-core";
|
||||
import type { Api, Model } from "@mariozechner/pi-ai";
|
||||
@@ -8,6 +7,7 @@ import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import { resolveUserPath } from "../utils.js";
|
||||
import { parseBooleanValue } from "../utils/boolean.js";
|
||||
import { safeJsonStringify } from "../utils/safe-json.js";
|
||||
import { getQueuedFileWriter, type QueuedFileWriter } from "./queued-file-writer.js";
|
||||
|
||||
type PayloadLogStage = "request" | "usage";
|
||||
|
||||
@@ -32,10 +32,7 @@ type PayloadLogConfig = {
|
||||
filePath: string;
|
||||
};
|
||||
|
||||
type PayloadLogWriter = {
|
||||
filePath: string;
|
||||
write: (line: string) => void;
|
||||
};
|
||||
type PayloadLogWriter = QueuedFileWriter;
|
||||
|
||||
const writers = new Map<string, PayloadLogWriter>();
|
||||
const log = createSubsystemLogger("agent/anthropic-payload");
|
||||
@@ -50,27 +47,7 @@ function resolvePayloadLogConfig(env: NodeJS.ProcessEnv): PayloadLogConfig {
|
||||
}
|
||||
|
||||
function getWriter(filePath: string): PayloadLogWriter {
|
||||
const existing = writers.get(filePath);
|
||||
if (existing) {
|
||||
return existing;
|
||||
}
|
||||
|
||||
const dir = path.dirname(filePath);
|
||||
const ready = fs.mkdir(dir, { recursive: true }).catch(() => undefined);
|
||||
let queue = Promise.resolve();
|
||||
|
||||
const writer: PayloadLogWriter = {
|
||||
filePath,
|
||||
write: (line: string) => {
|
||||
queue = queue
|
||||
.then(() => ready)
|
||||
.then(() => fs.appendFile(filePath, line, "utf8"))
|
||||
.catch(() => undefined);
|
||||
},
|
||||
};
|
||||
|
||||
writers.set(filePath, writer);
|
||||
return writer;
|
||||
return getQueuedFileWriter(writers, filePath);
|
||||
}
|
||||
|
||||
function formatError(error: unknown): string | undefined {
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import crypto from "node:crypto";
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import type { AgentMessage, StreamFn } from "@mariozechner/pi-agent-core";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
@@ -7,6 +6,7 @@ import { resolveStateDir } from "../config/paths.js";
|
||||
import { resolveUserPath } from "../utils.js";
|
||||
import { parseBooleanValue } from "../utils/boolean.js";
|
||||
import { safeJsonStringify } from "../utils/safe-json.js";
|
||||
import { getQueuedFileWriter, type QueuedFileWriter } from "./queued-file-writer.js";
|
||||
|
||||
export type CacheTraceStage =
|
||||
| "session:loaded"
|
||||
@@ -70,10 +70,7 @@ type CacheTraceConfig = {
|
||||
includeSystem: boolean;
|
||||
};
|
||||
|
||||
type CacheTraceWriter = {
|
||||
filePath: string;
|
||||
write: (line: string) => void;
|
||||
};
|
||||
type CacheTraceWriter = QueuedFileWriter;
|
||||
|
||||
const writers = new Map<string, CacheTraceWriter>();
|
||||
|
||||
@@ -102,27 +99,7 @@ function resolveCacheTraceConfig(params: CacheTraceInit): CacheTraceConfig {
|
||||
}
|
||||
|
||||
function getWriter(filePath: string): CacheTraceWriter {
|
||||
const existing = writers.get(filePath);
|
||||
if (existing) {
|
||||
return existing;
|
||||
}
|
||||
|
||||
const dir = path.dirname(filePath);
|
||||
const ready = fs.mkdir(dir, { recursive: true }).catch(() => undefined);
|
||||
let queue = Promise.resolve();
|
||||
|
||||
const writer: CacheTraceWriter = {
|
||||
filePath,
|
||||
write: (line: string) => {
|
||||
queue = queue
|
||||
.then(() => ready)
|
||||
.then(() => fs.appendFile(filePath, line, "utf8"))
|
||||
.catch(() => undefined);
|
||||
},
|
||||
};
|
||||
|
||||
writers.set(filePath, writer);
|
||||
return writer;
|
||||
return getQueuedFileWriter(writers, filePath);
|
||||
}
|
||||
|
||||
function stableStringify(value: unknown): string {
|
||||
|
||||
34
src/agents/queued-file-writer.ts
Normal file
34
src/agents/queued-file-writer.ts
Normal file
@@ -0,0 +1,34 @@
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
|
||||
export type QueuedFileWriter = {
|
||||
filePath: string;
|
||||
write: (line: string) => void;
|
||||
};
|
||||
|
||||
export function getQueuedFileWriter(
|
||||
writers: Map<string, QueuedFileWriter>,
|
||||
filePath: string,
|
||||
): QueuedFileWriter {
|
||||
const existing = writers.get(filePath);
|
||||
if (existing) {
|
||||
return existing;
|
||||
}
|
||||
|
||||
const dir = path.dirname(filePath);
|
||||
const ready = fs.mkdir(dir, { recursive: true }).catch(() => undefined);
|
||||
let queue = Promise.resolve();
|
||||
|
||||
const writer: QueuedFileWriter = {
|
||||
filePath,
|
||||
write: (line: string) => {
|
||||
queue = queue
|
||||
.then(() => ready)
|
||||
.then(() => fs.appendFile(filePath, line, "utf8"))
|
||||
.catch(() => undefined);
|
||||
},
|
||||
};
|
||||
|
||||
writers.set(filePath, writer);
|
||||
return writer;
|
||||
}
|
||||
Reference in New Issue
Block a user