From 1b0d41f12a3a3d2bb6b7678effb063a779fbc659 Mon Sep 17 00:00:00 2001 From: Rodrigo Uroz Date: Thu, 12 Mar 2026 17:23:56 +0000 Subject: [PATCH] Compaction Runner: fix pending review comments --- .../pi-embedded-runner/compact.hooks.test.ts | 80 ++++++++++++++++--- src/agents/pi-embedded-runner/compact.ts | 62 ++++++++++---- 2 files changed, 116 insertions(+), 26 deletions(-) diff --git a/src/agents/pi-embedded-runner/compact.hooks.test.ts b/src/agents/pi-embedded-runner/compact.hooks.test.ts index 2127fbf07ba..c4cfaba4448 100644 --- a/src/agents/pi-embedded-runner/compact.hooks.test.ts +++ b/src/agents/pi-embedded-runner/compact.hooks.test.ts @@ -564,14 +564,15 @@ describe("compactEmbeddedPiSessionDirect hooks", () => { }); it("fires post-compaction memory sync without awaiting it in async mode", async () => { - let resolveSync: (() => void) | undefined; - const syncGate = new Promise((resolve) => { - resolveSync = resolve; + const sync = vi.fn(async () => {}); + let resolveManager: ((value: { manager: { sync: typeof sync } }) => void) | undefined; + const managerGate = new Promise<{ manager: { sync: typeof sync } }>((resolve) => { + resolveManager = resolve; }); - const sync = vi.fn(() => syncGate); - getMemorySearchManagerMock.mockResolvedValue({ manager: { sync } }); + getMemorySearchManagerMock.mockImplementation(() => managerGate); + let settled = false; - const result = await compactEmbeddedPiSessionDirect({ + const resultPromise = compactEmbeddedPiSessionDirect({ sessionId: "session-1", sessionKey: "agent:main:session-1", sessionFile: "/tmp/session.jsonl", @@ -588,13 +589,27 @@ describe("compactEmbeddedPiSessionDirect hooks", () => { } as never, }); - expect(result.ok).toBe(true); - expect(sync).toHaveBeenCalledWith({ - reason: "post-compaction", - force: true, + await vi.waitFor(() => { + expect(getMemorySearchManagerMock).toHaveBeenCalledTimes(1); }); - resolveSync?.(); - await syncGate; + void resultPromise.then(() => { + settled = true; + }); + await vi.waitFor(() => { + expect(settled).toBe(true); + }); + expect(sync).not.toHaveBeenCalled(); + resolveManager?.({ manager: { sync } }); + await managerGate; + await vi.waitFor(() => { + expect(sync).toHaveBeenCalledWith({ + reason: "post-compaction", + force: true, + sessionFiles: ["/tmp/session.jsonl"], + }); + }); + const result = await resultPromise; + expect(result.ok).toBe(true); }); it("registers the Ollama api provider before compaction", async () => { @@ -691,8 +706,48 @@ describe("compactEmbeddedPiSession hooks (ownsCompaction engine)", () => { ); }); + it("emits a transcript update and post-compaction memory sync on the engine-owned path", async () => { + const listener = vi.fn(); + const cleanup = onSessionTranscriptUpdate(listener); + const sync = vi.fn(async () => {}); + getMemorySearchManagerMock.mockResolvedValue({ manager: { sync } }); + + try { + const result = await compactEmbeddedPiSession({ + sessionId: "session-1", + sessionKey: "agent:main:session-1", + sessionFile: " /tmp/session.jsonl ", + workspaceDir: "/tmp", + customInstructions: "focus on decisions", + enqueue: (task) => task(), + config: { + agents: { + defaults: { + compaction: { + postIndexSync: "await", + }, + }, + }, + } as never, + }); + + expect(result.ok).toBe(true); + expect(listener).toHaveBeenCalledTimes(1); + expect(listener).toHaveBeenCalledWith({ sessionFile: "/tmp/session.jsonl" }); + expect(sync).toHaveBeenCalledWith({ + reason: "post-compaction", + force: true, + sessionFiles: ["/tmp/session.jsonl"], + }); + } finally { + cleanup(); + } + }); + it("does not fire after_compaction when compaction fails", async () => { hookRunner.hasHooks.mockReturnValue(true); + const sync = vi.fn(async () => {}); + getMemorySearchManagerMock.mockResolvedValue({ manager: { sync } }); contextEngineCompactMock.mockResolvedValue({ ok: false, compacted: false, @@ -712,6 +767,7 @@ describe("compactEmbeddedPiSession hooks (ownsCompaction engine)", () => { expect(result.ok).toBe(false); expect(hookRunner.runBeforeCompaction).toHaveBeenCalled(); expect(hookRunner.runAfterCompaction).not.toHaveBeenCalled(); + expect(sync).not.toHaveBeenCalled(); }); it("catches and logs hook exceptions without aborting compaction", async () => { diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index 7b813cba7a8..2d17c9d27fb 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -278,13 +278,12 @@ function resolvePostCompactionIndexSyncMode(config?: OpenClawConfig): "off" | "a return "async"; } -async function syncPostCompactionSessionMemory(params: { +async function runPostCompactionSessionMemorySync(params: { config?: OpenClawConfig; sessionKey?: string; sessionFile: string; - mode: "off" | "async" | "await"; }): Promise { - if (params.mode === "off" || !params.config) { + if (!params.config) { return; } try { @@ -306,20 +305,50 @@ async function syncPostCompactionSessionMemory(params: { const syncTask = manager.sync({ reason: "post-compaction", force: resolvedMemory.sync.sessions.postCompactionForce, - sessionFiles: [params.sessionFile], + sessionFiles: [params.sessionFile.trim()], }); - if (params.mode === "await") { - await syncTask; - } else { - void syncTask.catch((err) => { - log.warn(`memory sync failed (post-compaction): ${String(err)}`); - }); - } + await syncTask; } catch (err) { log.warn(`memory sync skipped (post-compaction): ${String(err)}`); } } +function syncPostCompactionSessionMemory(params: { + config?: OpenClawConfig; + sessionKey?: string; + sessionFile: string; + mode: "off" | "async" | "await"; +}): Promise { + if (params.mode === "off" || !params.config) { + return Promise.resolve(); + } + + const syncTask = runPostCompactionSessionMemorySync({ + config: params.config, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + }); + if (params.mode === "await") { + return syncTask; + } + void syncTask; + return Promise.resolve(); +} + +async function runPostCompactionSideEffects(params: { + config?: OpenClawConfig; + sessionKey?: string; + sessionFile: string; +}): Promise { + emitSessionTranscriptUpdate(params.sessionFile); + await syncPostCompactionSessionMemory({ + config: params.config, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + mode: resolvePostCompactionIndexSyncMode(params.config), + }); +} + /** * Core compaction logic without lane queueing. * Use this when already inside a session/global lane to avoid deadlocks. @@ -861,12 +890,10 @@ export async function compactEmbeddedPiSessionDirect( const result = await compactWithSafetyTimeout(() => session.compact(params.customInstructions), ); - emitSessionTranscriptUpdate(params.sessionFile); - await syncPostCompactionSessionMemory({ + await runPostCompactionSideEffects({ config: params.config, sessionKey: params.sessionKey, sessionFile: params.sessionFile, - mode: resolvePostCompactionIndexSyncMode(params.config), }); // Estimate tokens after compaction by summing token estimates for remaining messages let tokensAfter: number | undefined; @@ -1057,6 +1084,13 @@ export async function compactEmbeddedPiSession( force: params.trigger === "manual", runtimeContext: params as Record, }); + if (result.ok && result.compacted) { + await runPostCompactionSideEffects({ + config: params.config, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + }); + } if (result.ok && result.compacted && hookRunner?.hasHooks("after_compaction")) { try { await hookRunner.runAfterCompaction(