Compaction Runner: fix pending review comments

This commit is contained in:
Rodrigo Uroz
2026-03-12 17:23:56 +00:00
committed by Josh Lehman
parent 19850cab32
commit 1b0d41f12a
2 changed files with 116 additions and 26 deletions

View File

@@ -564,14 +564,15 @@ describe("compactEmbeddedPiSessionDirect hooks", () => {
});
it("fires post-compaction memory sync without awaiting it in async mode", async () => {
let resolveSync: (() => void) | undefined;
const syncGate = new Promise<void>((resolve) => {
resolveSync = resolve;
const sync = vi.fn(async () => {});
let resolveManager: ((value: { manager: { sync: typeof sync } }) => void) | undefined;
const managerGate = new Promise<{ manager: { sync: typeof sync } }>((resolve) => {
resolveManager = resolve;
});
const sync = vi.fn(() => syncGate);
getMemorySearchManagerMock.mockResolvedValue({ manager: { sync } });
getMemorySearchManagerMock.mockImplementation(() => managerGate);
let settled = false;
const result = await compactEmbeddedPiSessionDirect({
const resultPromise = compactEmbeddedPiSessionDirect({
sessionId: "session-1",
sessionKey: "agent:main:session-1",
sessionFile: "/tmp/session.jsonl",
@@ -588,13 +589,27 @@ describe("compactEmbeddedPiSessionDirect hooks", () => {
} as never,
});
expect(result.ok).toBe(true);
expect(sync).toHaveBeenCalledWith({
reason: "post-compaction",
force: true,
await vi.waitFor(() => {
expect(getMemorySearchManagerMock).toHaveBeenCalledTimes(1);
});
resolveSync?.();
await syncGate;
void resultPromise.then(() => {
settled = true;
});
await vi.waitFor(() => {
expect(settled).toBe(true);
});
expect(sync).not.toHaveBeenCalled();
resolveManager?.({ manager: { sync } });
await managerGate;
await vi.waitFor(() => {
expect(sync).toHaveBeenCalledWith({
reason: "post-compaction",
force: true,
sessionFiles: ["/tmp/session.jsonl"],
});
});
const result = await resultPromise;
expect(result.ok).toBe(true);
});
it("registers the Ollama api provider before compaction", async () => {
@@ -691,8 +706,48 @@ describe("compactEmbeddedPiSession hooks (ownsCompaction engine)", () => {
);
});
it("emits a transcript update and post-compaction memory sync on the engine-owned path", async () => {
const listener = vi.fn();
const cleanup = onSessionTranscriptUpdate(listener);
const sync = vi.fn(async () => {});
getMemorySearchManagerMock.mockResolvedValue({ manager: { sync } });
try {
const result = await compactEmbeddedPiSession({
sessionId: "session-1",
sessionKey: "agent:main:session-1",
sessionFile: " /tmp/session.jsonl ",
workspaceDir: "/tmp",
customInstructions: "focus on decisions",
enqueue: (task) => task(),
config: {
agents: {
defaults: {
compaction: {
postIndexSync: "await",
},
},
},
} as never,
});
expect(result.ok).toBe(true);
expect(listener).toHaveBeenCalledTimes(1);
expect(listener).toHaveBeenCalledWith({ sessionFile: "/tmp/session.jsonl" });
expect(sync).toHaveBeenCalledWith({
reason: "post-compaction",
force: true,
sessionFiles: ["/tmp/session.jsonl"],
});
} finally {
cleanup();
}
});
it("does not fire after_compaction when compaction fails", async () => {
hookRunner.hasHooks.mockReturnValue(true);
const sync = vi.fn(async () => {});
getMemorySearchManagerMock.mockResolvedValue({ manager: { sync } });
contextEngineCompactMock.mockResolvedValue({
ok: false,
compacted: false,
@@ -712,6 +767,7 @@ describe("compactEmbeddedPiSession hooks (ownsCompaction engine)", () => {
expect(result.ok).toBe(false);
expect(hookRunner.runBeforeCompaction).toHaveBeenCalled();
expect(hookRunner.runAfterCompaction).not.toHaveBeenCalled();
expect(sync).not.toHaveBeenCalled();
});
it("catches and logs hook exceptions without aborting compaction", async () => {

View File

@@ -278,13 +278,12 @@ function resolvePostCompactionIndexSyncMode(config?: OpenClawConfig): "off" | "a
return "async";
}
async function syncPostCompactionSessionMemory(params: {
async function runPostCompactionSessionMemorySync(params: {
config?: OpenClawConfig;
sessionKey?: string;
sessionFile: string;
mode: "off" | "async" | "await";
}): Promise<void> {
if (params.mode === "off" || !params.config) {
if (!params.config) {
return;
}
try {
@@ -306,20 +305,50 @@ async function syncPostCompactionSessionMemory(params: {
const syncTask = manager.sync({
reason: "post-compaction",
force: resolvedMemory.sync.sessions.postCompactionForce,
sessionFiles: [params.sessionFile],
sessionFiles: [params.sessionFile.trim()],
});
if (params.mode === "await") {
await syncTask;
} else {
void syncTask.catch((err) => {
log.warn(`memory sync failed (post-compaction): ${String(err)}`);
});
}
await syncTask;
} catch (err) {
log.warn(`memory sync skipped (post-compaction): ${String(err)}`);
}
}
function syncPostCompactionSessionMemory(params: {
config?: OpenClawConfig;
sessionKey?: string;
sessionFile: string;
mode: "off" | "async" | "await";
}): Promise<void> {
if (params.mode === "off" || !params.config) {
return Promise.resolve();
}
const syncTask = runPostCompactionSessionMemorySync({
config: params.config,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
});
if (params.mode === "await") {
return syncTask;
}
void syncTask;
return Promise.resolve();
}
async function runPostCompactionSideEffects(params: {
config?: OpenClawConfig;
sessionKey?: string;
sessionFile: string;
}): Promise<void> {
emitSessionTranscriptUpdate(params.sessionFile);
await syncPostCompactionSessionMemory({
config: params.config,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
mode: resolvePostCompactionIndexSyncMode(params.config),
});
}
/**
* Core compaction logic without lane queueing.
* Use this when already inside a session/global lane to avoid deadlocks.
@@ -861,12 +890,10 @@ export async function compactEmbeddedPiSessionDirect(
const result = await compactWithSafetyTimeout(() =>
session.compact(params.customInstructions),
);
emitSessionTranscriptUpdate(params.sessionFile);
await syncPostCompactionSessionMemory({
await runPostCompactionSideEffects({
config: params.config,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
mode: resolvePostCompactionIndexSyncMode(params.config),
});
// Estimate tokens after compaction by summing token estimates for remaining messages
let tokensAfter: number | undefined;
@@ -1057,6 +1084,13 @@ export async function compactEmbeddedPiSession(
force: params.trigger === "manual",
runtimeContext: params as Record<string, unknown>,
});
if (result.ok && result.compacted) {
await runPostCompactionSideEffects({
config: params.config,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
});
}
if (result.ok && result.compacted && hookRunner?.hasHooks("after_compaction")) {
try {
await hookRunner.runAfterCompaction(