mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-08 04:21:24 +00:00
fix(heartbeat): prune transcript for HEARTBEAT_OK turns
When a heartbeat run results in HEARTBEAT_OK (or empty/duplicate), the user+assistant turns are now pruned from the session transcript. This prevents context window pollution from zero-information exchanges. Implementation: - captureTranscriptState(): records transcript file path and size before heartbeat - pruneHeartbeatTranscript(): truncates file back to pre-heartbeat size - Called in ok-empty, ok-token, and duplicate cases (same places as restoreHeartbeatUpdatedAt) This extends the existing pattern where delivery is suppressed and updatedAt is restored for HEARTBEAT_OK responses - now the transcript is also cleaned up. Fixes #17804
This commit is contained in:
committed by
Peter Steinberger
parent
7bb9a7dcfc
commit
e9f2e6a829
188
src/infra/heartbeat-runner.transcript-prune.test.ts
Normal file
188
src/infra/heartbeat-runner.transcript-prune.test.ts
Normal file
@@ -0,0 +1,188 @@
|
|||||||
|
import fs from "node:fs/promises";
|
||||||
|
import os from "node:os";
|
||||||
|
import path from "node:path";
|
||||||
|
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||||
|
import type { OpenClawConfig } from "../config/config.js";
|
||||||
|
import { telegramPlugin } from "../../extensions/telegram/src/channel.js";
|
||||||
|
import { setTelegramRuntime } from "../../extensions/telegram/src/runtime.js";
|
||||||
|
import * as replyModule from "../auto-reply/reply.js";
|
||||||
|
import { resolveMainSessionKey } from "../config/sessions.js";
|
||||||
|
import { setActivePluginRegistry } from "../plugins/runtime.js";
|
||||||
|
import { createPluginRuntime } from "../plugins/runtime/index.js";
|
||||||
|
import { createTestRegistry } from "../test-utils/channel-plugins.js";
|
||||||
|
import { runHeartbeatOnce } from "./heartbeat-runner.js";
|
||||||
|
|
||||||
|
// Avoid pulling optional runtime deps during isolated runs.
|
||||||
|
vi.mock("jiti", () => ({ createJiti: () => () => ({}) }));
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
const runtime = createPluginRuntime();
|
||||||
|
setTelegramRuntime(runtime);
|
||||||
|
setActivePluginRegistry(
|
||||||
|
createTestRegistry([
|
||||||
|
{ pluginId: "telegram", plugin: telegramPlugin, source: "test" },
|
||||||
|
]),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("heartbeat transcript pruning", () => {
|
||||||
|
async function seedSessionStore(
|
||||||
|
storePath: string,
|
||||||
|
sessionKey: string,
|
||||||
|
session: {
|
||||||
|
sessionId?: string;
|
||||||
|
updatedAt?: number;
|
||||||
|
lastChannel: string;
|
||||||
|
lastProvider: string;
|
||||||
|
lastTo: string;
|
||||||
|
},
|
||||||
|
) {
|
||||||
|
await fs.writeFile(
|
||||||
|
storePath,
|
||||||
|
JSON.stringify(
|
||||||
|
{
|
||||||
|
[sessionKey]: {
|
||||||
|
sessionId: session.sessionId ?? "sid",
|
||||||
|
updatedAt: session.updatedAt ?? Date.now(),
|
||||||
|
...session,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
null,
|
||||||
|
2,
|
||||||
|
),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function createTranscriptWithContent(transcriptPath: string, sessionId: string) {
|
||||||
|
const header = {
|
||||||
|
type: "session",
|
||||||
|
version: 3,
|
||||||
|
id: sessionId,
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
cwd: process.cwd(),
|
||||||
|
};
|
||||||
|
const existingContent = `${JSON.stringify(header)}\n{"role":"user","content":"Hello"}\n{"role":"assistant","content":"Hi there"}\n`;
|
||||||
|
await fs.mkdir(path.dirname(transcriptPath), { recursive: true });
|
||||||
|
await fs.writeFile(transcriptPath, existingContent);
|
||||||
|
return existingContent;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function withTempHeartbeatSandbox<T>(
|
||||||
|
fn: (ctx: {
|
||||||
|
tmpDir: string;
|
||||||
|
storePath: string;
|
||||||
|
replySpy: ReturnType<typeof vi.spyOn>;
|
||||||
|
}) => Promise<T>,
|
||||||
|
) {
|
||||||
|
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-hb-prune-"));
|
||||||
|
const storePath = path.join(tmpDir, "sessions.json");
|
||||||
|
const replySpy = vi.spyOn(replyModule, "getReplyFromConfig");
|
||||||
|
const prevTelegramToken = process.env.TELEGRAM_BOT_TOKEN;
|
||||||
|
process.env.TELEGRAM_BOT_TOKEN = "";
|
||||||
|
try {
|
||||||
|
return await fn({ tmpDir, storePath, replySpy });
|
||||||
|
} finally {
|
||||||
|
replySpy.mockRestore();
|
||||||
|
if (prevTelegramToken === undefined) {
|
||||||
|
delete process.env.TELEGRAM_BOT_TOKEN;
|
||||||
|
} else {
|
||||||
|
process.env.TELEGRAM_BOT_TOKEN = prevTelegramToken;
|
||||||
|
}
|
||||||
|
await fs.rm(tmpDir, { recursive: true, force: true });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
it("prunes transcript when heartbeat returns HEARTBEAT_OK", async () => {
|
||||||
|
await withTempHeartbeatSandbox(async ({ tmpDir, storePath, replySpy }) => {
|
||||||
|
const sessionKey = resolveMainSessionKey(undefined);
|
||||||
|
const sessionId = "test-session-prune";
|
||||||
|
const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`);
|
||||||
|
|
||||||
|
// Create a transcript with some existing content
|
||||||
|
const originalContent = await createTranscriptWithContent(transcriptPath, sessionId);
|
||||||
|
const originalSize = (await fs.stat(transcriptPath)).size;
|
||||||
|
|
||||||
|
// Seed session store
|
||||||
|
await seedSessionStore(storePath, sessionKey, {
|
||||||
|
sessionId,
|
||||||
|
lastChannel: "telegram",
|
||||||
|
lastProvider: "telegram",
|
||||||
|
lastTo: "user123",
|
||||||
|
});
|
||||||
|
|
||||||
|
// Mock reply to return HEARTBEAT_OK (which triggers pruning)
|
||||||
|
replySpy.mockResolvedValueOnce({
|
||||||
|
text: "HEARTBEAT_OK",
|
||||||
|
usage: { inputTokens: 0, outputTokens: 0, cacheReadTokens: 0, cacheWriteTokens: 0 },
|
||||||
|
});
|
||||||
|
|
||||||
|
// Run heartbeat
|
||||||
|
const cfg: OpenClawConfig = {
|
||||||
|
version: 1,
|
||||||
|
model: "test-model",
|
||||||
|
agent: { workspace: tmpDir },
|
||||||
|
sessionStore: storePath,
|
||||||
|
channels: { telegram: { showOk: true, showAlerts: true } },
|
||||||
|
};
|
||||||
|
|
||||||
|
await runHeartbeatOnce({
|
||||||
|
agentId: undefined,
|
||||||
|
reason: "test",
|
||||||
|
cfg,
|
||||||
|
deps: { sendTelegram: vi.fn() },
|
||||||
|
});
|
||||||
|
|
||||||
|
// Verify transcript was truncated back to original size
|
||||||
|
const finalContent = await fs.readFile(transcriptPath, "utf-8");
|
||||||
|
expect(finalContent).toBe(originalContent);
|
||||||
|
const finalSize = (await fs.stat(transcriptPath)).size;
|
||||||
|
expect(finalSize).toBe(originalSize);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("does not prune transcript when heartbeat returns meaningful content", async () => {
|
||||||
|
await withTempHeartbeatSandbox(async ({ tmpDir, storePath, replySpy }) => {
|
||||||
|
const sessionKey = resolveMainSessionKey(undefined);
|
||||||
|
const sessionId = "test-session-no-prune";
|
||||||
|
const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`);
|
||||||
|
|
||||||
|
// Create a transcript with some existing content
|
||||||
|
const originalContent = await createTranscriptWithContent(transcriptPath, sessionId);
|
||||||
|
const originalSize = (await fs.stat(transcriptPath)).size;
|
||||||
|
|
||||||
|
// Seed session store
|
||||||
|
await seedSessionStore(storePath, sessionKey, {
|
||||||
|
sessionId,
|
||||||
|
lastChannel: "telegram",
|
||||||
|
lastProvider: "telegram",
|
||||||
|
lastTo: "user123",
|
||||||
|
});
|
||||||
|
|
||||||
|
// Mock reply to return meaningful content (should NOT trigger pruning)
|
||||||
|
replySpy.mockResolvedValueOnce({
|
||||||
|
text: "Alert: Something needs your attention!",
|
||||||
|
usage: { inputTokens: 10, outputTokens: 20, cacheReadTokens: 0, cacheWriteTokens: 0 },
|
||||||
|
});
|
||||||
|
|
||||||
|
// Run heartbeat
|
||||||
|
const cfg: OpenClawConfig = {
|
||||||
|
version: 1,
|
||||||
|
model: "test-model",
|
||||||
|
agent: { workspace: tmpDir },
|
||||||
|
sessionStore: storePath,
|
||||||
|
channels: { telegram: { showOk: true, showAlerts: true } },
|
||||||
|
};
|
||||||
|
|
||||||
|
await runHeartbeatOnce({
|
||||||
|
agentId: undefined,
|
||||||
|
reason: "test",
|
||||||
|
cfg,
|
||||||
|
deps: { sendTelegram: vi.fn() },
|
||||||
|
});
|
||||||
|
|
||||||
|
// Verify transcript was NOT truncated (it may have grown with new entries)
|
||||||
|
const finalSize = (await fs.stat(transcriptPath)).size;
|
||||||
|
expect(finalSize).toBeGreaterThanOrEqual(originalSize);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -31,6 +31,7 @@ import {
|
|||||||
loadSessionStore,
|
loadSessionStore,
|
||||||
resolveAgentIdFromSessionKey,
|
resolveAgentIdFromSessionKey,
|
||||||
resolveAgentMainSessionKey,
|
resolveAgentMainSessionKey,
|
||||||
|
resolveSessionFilePath,
|
||||||
resolveStorePath,
|
resolveStorePath,
|
||||||
saveSessionStore,
|
saveSessionStore,
|
||||||
updateSessionStore,
|
updateSessionStore,
|
||||||
@@ -351,6 +352,58 @@ async function restoreHeartbeatUpdatedAt(params: {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prune heartbeat transcript entries by truncating the file back to a previous size.
|
||||||
|
* This removes the user+assistant turns that were written during a HEARTBEAT_OK run,
|
||||||
|
* preventing context pollution from zero-information exchanges.
|
||||||
|
*/
|
||||||
|
async function pruneHeartbeatTranscript(params: {
|
||||||
|
transcriptPath?: string;
|
||||||
|
preHeartbeatSize?: number;
|
||||||
|
}) {
|
||||||
|
const { transcriptPath, preHeartbeatSize } = params;
|
||||||
|
if (!transcriptPath || typeof preHeartbeatSize !== "number" || preHeartbeatSize < 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
const stat = await fs.stat(transcriptPath);
|
||||||
|
// Only truncate if the file has grown during the heartbeat run
|
||||||
|
if (stat.size > preHeartbeatSize) {
|
||||||
|
await fs.truncate(transcriptPath, preHeartbeatSize);
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// File may not exist or may have been removed - ignore errors
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the transcript file path and its current size before a heartbeat run.
|
||||||
|
* Returns undefined values if the session or transcript doesn't exist yet.
|
||||||
|
*/
|
||||||
|
async function captureTranscriptState(params: {
|
||||||
|
storePath: string;
|
||||||
|
sessionKey: string;
|
||||||
|
agentId?: string;
|
||||||
|
}): Promise<{ transcriptPath?: string; preHeartbeatSize?: number }> {
|
||||||
|
const { storePath, sessionKey, agentId } = params;
|
||||||
|
try {
|
||||||
|
const store = loadSessionStore(storePath);
|
||||||
|
const entry = store[sessionKey];
|
||||||
|
if (!entry?.sessionId) {
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
const transcriptPath = resolveSessionFilePath(entry.sessionId, entry, {
|
||||||
|
agentId,
|
||||||
|
sessionsDir: path.dirname(storePath),
|
||||||
|
});
|
||||||
|
const stat = await fs.stat(transcriptPath);
|
||||||
|
return { transcriptPath, preHeartbeatSize: stat.size };
|
||||||
|
} catch {
|
||||||
|
// Session or transcript doesn't exist yet - nothing to prune
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
function normalizeHeartbeatReply(
|
function normalizeHeartbeatReply(
|
||||||
payload: ReplyPayload,
|
payload: ReplyPayload,
|
||||||
responsePrefix: string | undefined,
|
responsePrefix: string | undefined,
|
||||||
@@ -546,6 +599,13 @@ export async function runHeartbeatOnce(opts: {
|
|||||||
};
|
};
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
// Capture transcript state before the heartbeat run so we can prune if HEARTBEAT_OK
|
||||||
|
const transcriptState = await captureTranscriptState({
|
||||||
|
storePath,
|
||||||
|
sessionKey,
|
||||||
|
agentId,
|
||||||
|
});
|
||||||
|
|
||||||
const heartbeatModelOverride = heartbeat?.model?.trim() || undefined;
|
const heartbeatModelOverride = heartbeat?.model?.trim() || undefined;
|
||||||
const suppressToolErrorWarnings = heartbeat?.suppressToolErrorWarnings === true;
|
const suppressToolErrorWarnings = heartbeat?.suppressToolErrorWarnings === true;
|
||||||
const replyOpts = heartbeatModelOverride
|
const replyOpts = heartbeatModelOverride
|
||||||
@@ -567,6 +627,8 @@ export async function runHeartbeatOnce(opts: {
|
|||||||
sessionKey,
|
sessionKey,
|
||||||
updatedAt: previousUpdatedAt,
|
updatedAt: previousUpdatedAt,
|
||||||
});
|
});
|
||||||
|
// Prune the transcript to remove HEARTBEAT_OK turns
|
||||||
|
await pruneHeartbeatTranscript(transcriptState);
|
||||||
const okSent = await maybeSendHeartbeatOk();
|
const okSent = await maybeSendHeartbeatOk();
|
||||||
emitHeartbeatEvent({
|
emitHeartbeatEvent({
|
||||||
status: "ok-empty",
|
status: "ok-empty",
|
||||||
@@ -601,6 +663,8 @@ export async function runHeartbeatOnce(opts: {
|
|||||||
sessionKey,
|
sessionKey,
|
||||||
updatedAt: previousUpdatedAt,
|
updatedAt: previousUpdatedAt,
|
||||||
});
|
});
|
||||||
|
// Prune the transcript to remove HEARTBEAT_OK turns
|
||||||
|
await pruneHeartbeatTranscript(transcriptState);
|
||||||
const okSent = await maybeSendHeartbeatOk();
|
const okSent = await maybeSendHeartbeatOk();
|
||||||
emitHeartbeatEvent({
|
emitHeartbeatEvent({
|
||||||
status: "ok-token",
|
status: "ok-token",
|
||||||
@@ -637,6 +701,8 @@ export async function runHeartbeatOnce(opts: {
|
|||||||
sessionKey,
|
sessionKey,
|
||||||
updatedAt: previousUpdatedAt,
|
updatedAt: previousUpdatedAt,
|
||||||
});
|
});
|
||||||
|
// Prune the transcript to remove duplicate heartbeat turns
|
||||||
|
await pruneHeartbeatTranscript(transcriptState);
|
||||||
emitHeartbeatEvent({
|
emitHeartbeatEvent({
|
||||||
status: "skipped",
|
status: "skipped",
|
||||||
reason: "duplicate",
|
reason: "duplicate",
|
||||||
|
|||||||
Reference in New Issue
Block a user