Compaction Runner: wire post-compaction memory sync (#25561)

Merged via squash.

Prepared head SHA: 6d2bc02cc1
Co-authored-by: rodrigouroz <384037+rodrigouroz@users.noreply.github.com>
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Reviewed-by: @jalehman
This commit is contained in:
Rodrigo Uroz
2026-03-12 18:24:29 -03:00
committed by GitHub
parent fd568c4f74
commit 143e593ab8
19 changed files with 973 additions and 47 deletions

View File

@@ -284,6 +284,7 @@ describe("memory search config", () => {
expect(resolved?.sync.sessions).toEqual({
deltaBytes: 100000,
deltaMessages: 50,
postCompactionForce: true,
});
});

View File

@@ -61,6 +61,7 @@ export type ResolvedMemorySearchConfig = {
sessions: {
deltaBytes: number;
deltaMessages: number;
postCompactionForce: boolean;
};
};
query: {
@@ -248,6 +249,10 @@ function mergeConfig(
overrides?.sync?.sessions?.deltaMessages ??
defaults?.sync?.sessions?.deltaMessages ??
DEFAULT_SESSION_DELTA_MESSAGES,
postCompactionForce:
overrides?.sync?.sessions?.postCompactionForce ??
defaults?.sync?.sessions?.postCompactionForce ??
true,
},
};
const query = {
@@ -315,6 +320,7 @@ function mergeConfig(
);
const deltaBytes = clampInt(sync.sessions.deltaBytes, 0, Number.MAX_SAFE_INTEGER);
const deltaMessages = clampInt(sync.sessions.deltaMessages, 0, Number.MAX_SAFE_INTEGER);
const postCompactionForce = sync.sessions.postCompactionForce;
return {
enabled,
sources,
@@ -336,6 +342,7 @@ function mergeConfig(
sessions: {
deltaBytes,
deltaMessages,
postCompactionForce,
},
},
query: {

View File

@@ -4,41 +4,67 @@ import { onSessionTranscriptUpdate } from "../../sessions/transcript-events.js";
const {
hookRunner,
ensureRuntimePluginsLoaded,
resolveContextEngineMock,
resolveModelMock,
sessionCompactImpl,
triggerInternalHook,
sanitizeSessionHistoryMock,
contextEngineCompactMock,
} = vi.hoisted(() => ({
hookRunner: {
hasHooks: vi.fn(),
runBeforeCompaction: vi.fn(),
runAfterCompaction: vi.fn(),
},
ensureRuntimePluginsLoaded: vi.fn(),
resolveModelMock: vi.fn(() => ({
model: { provider: "openai", api: "responses", id: "fake", input: [] },
error: null,
authStorage: { setRuntimeApiKey: vi.fn() },
modelRegistry: {},
})),
sessionCompactImpl: vi.fn(async () => ({
summary: "summary",
firstKeptEntryId: "entry-1",
tokensBefore: 120,
details: { ok: true },
})),
triggerInternalHook: vi.fn(),
sanitizeSessionHistoryMock: vi.fn(async (params: { messages: unknown[] }) => params.messages),
contextEngineCompactMock: vi.fn(async () => ({
getMemorySearchManagerMock,
resolveMemorySearchConfigMock,
resolveSessionAgentIdMock,
} = vi.hoisted(() => {
const contextEngineCompactMock = vi.fn(async () => ({
ok: true as boolean,
compacted: true as boolean,
reason: undefined as string | undefined,
result: { summary: "engine-summary", tokensAfter: 50 } as
| { summary: string; tokensAfter: number }
| undefined,
})),
}));
}));
return {
hookRunner: {
hasHooks: vi.fn(),
runBeforeCompaction: vi.fn(),
runAfterCompaction: vi.fn(),
},
ensureRuntimePluginsLoaded: vi.fn(),
resolveContextEngineMock: vi.fn(async () => ({
info: { ownsCompaction: true },
compact: contextEngineCompactMock,
})),
resolveModelMock: vi.fn(() => ({
model: { provider: "openai", api: "responses", id: "fake", input: [] },
error: null,
authStorage: { setRuntimeApiKey: vi.fn() },
modelRegistry: {},
})),
sessionCompactImpl: vi.fn(async () => ({
summary: "summary",
firstKeptEntryId: "entry-1",
tokensBefore: 120,
details: { ok: true },
})),
triggerInternalHook: vi.fn(),
sanitizeSessionHistoryMock: vi.fn(async (params: { messages: unknown[] }) => params.messages),
contextEngineCompactMock,
getMemorySearchManagerMock: vi.fn(async () => ({
manager: {
sync: vi.fn(async () => {}),
},
})),
resolveMemorySearchConfigMock: vi.fn(() => ({
sources: ["sessions"],
sync: {
sessions: {
postCompactionForce: true,
},
},
})),
resolveSessionAgentIdMock: vi.fn(() => "main"),
};
});
vi.mock("../../plugins/hook-runner-global.js", () => ({
getGlobalHookRunner: () => hookRunner,
@@ -135,10 +161,7 @@ vi.mock("../session-write-lock.js", () => ({
vi.mock("../../context-engine/index.js", () => ({
ensureContextEnginesInitialized: vi.fn(),
resolveContextEngine: vi.fn(async () => ({
info: { ownsCompaction: true },
compact: contextEngineCompactMock,
})),
resolveContextEngine: resolveContextEngineMock,
}));
vi.mock("../../process/command-queue.js", () => ({
@@ -211,9 +234,18 @@ vi.mock("../agent-paths.js", () => ({
}));
vi.mock("../agent-scope.js", () => ({
resolveSessionAgentId: resolveSessionAgentIdMock,
resolveSessionAgentIds: vi.fn(() => ({ defaultAgentId: "main", sessionAgentId: "main" })),
}));
vi.mock("../memory-search.js", () => ({
resolveMemorySearchConfig: resolveMemorySearchConfigMock,
}));
vi.mock("../../memory/index.js", () => ({
getMemorySearchManager: getMemorySearchManagerMock,
}));
vi.mock("../date-time.js", () => ({
formatUserTime: vi.fn(() => ""),
resolveUserTimeFormat: vi.fn(() => ""),
@@ -314,6 +346,23 @@ describe("compactEmbeddedPiSessionDirect hooks", () => {
sanitizeSessionHistoryMock.mockImplementation(async (params: { messages: unknown[] }) => {
return params.messages;
});
getMemorySearchManagerMock.mockReset();
getMemorySearchManagerMock.mockResolvedValue({
manager: {
sync: vi.fn(async () => {}),
},
});
resolveMemorySearchConfigMock.mockReset();
resolveMemorySearchConfigMock.mockReturnValue({
sources: ["sessions"],
sync: {
sessions: {
postCompactionForce: true,
},
},
});
resolveSessionAgentIdMock.mockReset();
resolveSessionAgentIdMock.mockReturnValue("main");
unregisterApiProviders(getCustomApiRegistrySourceId("ollama"));
});
@@ -452,6 +501,161 @@ describe("compactEmbeddedPiSessionDirect hooks", () => {
}
});
it("skips sync in await mode when postCompactionForce is false", async () => {
const sync = vi.fn(async () => {});
getMemorySearchManagerMock.mockResolvedValue({ manager: { sync } });
resolveMemorySearchConfigMock.mockReturnValue({
sources: ["sessions"],
sync: {
sessions: {
postCompactionForce: false,
},
},
});
const result = await compactEmbeddedPiSessionDirect({
sessionId: "session-1",
sessionKey: "agent:main:session-1",
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp",
customInstructions: "focus on decisions",
config: {
agents: {
defaults: {
compaction: {
postIndexSync: "await",
},
},
},
} as never,
});
expect(result.ok).toBe(true);
expect(resolveSessionAgentIdMock).toHaveBeenCalledWith({
sessionKey: "agent:main:session-1",
config: expect.any(Object),
});
expect(getMemorySearchManagerMock).not.toHaveBeenCalled();
expect(sync).not.toHaveBeenCalled();
});
it("awaits post-compaction memory sync in await mode when postCompactionForce is true", async () => {
let releaseSync: (() => void) | undefined;
const syncGate = new Promise<void>((resolve) => {
releaseSync = resolve;
});
const sync = vi.fn(() => syncGate);
getMemorySearchManagerMock.mockResolvedValue({ manager: { sync } });
let settled = false;
const resultPromise = compactEmbeddedPiSessionDirect({
sessionId: "session-1",
sessionKey: "agent:main:session-1",
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp",
customInstructions: "focus on decisions",
config: {
agents: {
defaults: {
compaction: {
postIndexSync: "await",
},
},
},
} as never,
});
void resultPromise.then(() => {
settled = true;
});
await vi.waitFor(() => {
expect(sync).toHaveBeenCalledWith({
reason: "post-compaction",
sessionFiles: ["/tmp/session.jsonl"],
});
});
expect(settled).toBe(false);
releaseSync?.();
const result = await resultPromise;
expect(result.ok).toBe(true);
expect(settled).toBe(true);
});
it("skips post-compaction memory sync when the mode is off", async () => {
const sync = vi.fn(async () => {});
getMemorySearchManagerMock.mockResolvedValue({ manager: { sync } });
const result = await compactEmbeddedPiSessionDirect({
sessionId: "session-1",
sessionKey: "agent:main:session-1",
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp",
customInstructions: "focus on decisions",
config: {
agents: {
defaults: {
compaction: {
postIndexSync: "off",
},
},
},
} as never,
});
expect(result.ok).toBe(true);
expect(resolveSessionAgentIdMock).not.toHaveBeenCalled();
expect(getMemorySearchManagerMock).not.toHaveBeenCalled();
expect(sync).not.toHaveBeenCalled();
});
it("fires post-compaction memory sync without awaiting it in async mode", async () => {
const sync = vi.fn(async () => {});
let resolveManager: ((value: { manager: { sync: typeof sync } }) => void) | undefined;
const managerGate = new Promise<{ manager: { sync: typeof sync } }>((resolve) => {
resolveManager = resolve;
});
getMemorySearchManagerMock.mockImplementation(() => managerGate);
let settled = false;
const resultPromise = compactEmbeddedPiSessionDirect({
sessionId: "session-1",
sessionKey: "agent:main:session-1",
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp",
customInstructions: "focus on decisions",
config: {
agents: {
defaults: {
compaction: {
postIndexSync: "async",
},
},
},
} as never,
});
await vi.waitFor(() => {
expect(getMemorySearchManagerMock).toHaveBeenCalledTimes(1);
});
void resultPromise.then(() => {
settled = true;
});
await vi.waitFor(() => {
expect(settled).toBe(true);
});
expect(sync).not.toHaveBeenCalled();
resolveManager?.({ manager: { sync } });
await managerGate;
await vi.waitFor(() => {
expect(sync).toHaveBeenCalledWith({
reason: "post-compaction",
sessionFiles: ["/tmp/session.jsonl"],
});
});
const result = await resultPromise;
expect(result.ok).toBe(true);
});
it("registers the Ollama api provider before compaction", async () => {
resolveModelMock.mockReturnValue({
model: {
@@ -493,6 +697,11 @@ describe("compactEmbeddedPiSession hooks (ownsCompaction engine)", () => {
hookRunner.hasHooks.mockReset();
hookRunner.runBeforeCompaction.mockReset();
hookRunner.runAfterCompaction.mockReset();
resolveContextEngineMock.mockReset();
resolveContextEngineMock.mockResolvedValue({
info: { ownsCompaction: true },
compact: contextEngineCompactMock,
});
contextEngineCompactMock.mockReset();
contextEngineCompactMock.mockResolvedValue({
ok: true,
@@ -546,8 +755,47 @@ describe("compactEmbeddedPiSession hooks (ownsCompaction engine)", () => {
);
});
it("emits a transcript update and post-compaction memory sync on the engine-owned path", async () => {
const listener = vi.fn();
const cleanup = onSessionTranscriptUpdate(listener);
const sync = vi.fn(async () => {});
getMemorySearchManagerMock.mockResolvedValue({ manager: { sync } });
try {
const result = await compactEmbeddedPiSession({
sessionId: "session-1",
sessionKey: "agent:main:session-1",
sessionFile: " /tmp/session.jsonl ",
workspaceDir: "/tmp",
customInstructions: "focus on decisions",
enqueue: (task) => task(),
config: {
agents: {
defaults: {
compaction: {
postIndexSync: "await",
},
},
},
} as never,
});
expect(result.ok).toBe(true);
expect(listener).toHaveBeenCalledTimes(1);
expect(listener).toHaveBeenCalledWith({ sessionFile: "/tmp/session.jsonl" });
expect(sync).toHaveBeenCalledWith({
reason: "post-compaction",
sessionFiles: ["/tmp/session.jsonl"],
});
} finally {
cleanup();
}
});
it("does not fire after_compaction when compaction fails", async () => {
hookRunner.hasHooks.mockReturnValue(true);
const sync = vi.fn(async () => {});
getMemorySearchManagerMock.mockResolvedValue({ manager: { sync } });
contextEngineCompactMock.mockResolvedValue({
ok: false,
compacted: false,
@@ -567,6 +815,44 @@ describe("compactEmbeddedPiSession hooks (ownsCompaction engine)", () => {
expect(result.ok).toBe(false);
expect(hookRunner.runBeforeCompaction).toHaveBeenCalled();
expect(hookRunner.runAfterCompaction).not.toHaveBeenCalled();
expect(sync).not.toHaveBeenCalled();
});
it("does not duplicate transcript updates or sync in the wrapper when the engine delegates compaction", async () => {
const listener = vi.fn();
const cleanup = onSessionTranscriptUpdate(listener);
const sync = vi.fn(async () => {});
getMemorySearchManagerMock.mockResolvedValue({ manager: { sync } });
resolveContextEngineMock.mockResolvedValue({
info: { ownsCompaction: false },
compact: contextEngineCompactMock,
});
try {
const result = await compactEmbeddedPiSession({
sessionId: "session-1",
sessionKey: "agent:main:session-1",
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp",
customInstructions: "focus on decisions",
enqueue: (task) => task(),
config: {
agents: {
defaults: {
compaction: {
postIndexSync: "await",
},
},
},
} as never,
});
expect(result.ok).toBe(true);
expect(listener).not.toHaveBeenCalled();
expect(sync).not.toHaveBeenCalled();
} finally {
cleanup();
}
});
it("catches and logs hook exceptions without aborting compaction", async () => {

View File

@@ -18,6 +18,7 @@ import {
import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js";
import { getMachineDisplayName } from "../../infra/machine-name.js";
import { generateSecureToken } from "../../infra/secure-random.js";
import { getMemorySearchManager } from "../../memory/index.js";
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
import { type enqueueCommand, enqueueCommandInLane } from "../../process/command-queue.js";
import { isCronSessionKey, isSubagentSessionKey } from "../../routing/session-key.js";
@@ -30,7 +31,7 @@ import { resolveUserPath } from "../../utils.js";
import { normalizeMessageChannel } from "../../utils/message-channel.js";
import { isReasoningTagProvider } from "../../utils/provider-utils.js";
import { resolveOpenClawAgentDir } from "../agent-paths.js";
import { resolveSessionAgentIds } from "../agent-scope.js";
import { resolveSessionAgentId, resolveSessionAgentIds } from "../agent-scope.js";
import type { ExecElevatedDefaults } from "../bash-tools.js";
import { makeBootstrapWarn, resolveBootstrapContextForRun } from "../bootstrap-files.js";
import { listChannelSupportedActions, resolveChannelMessageToolHints } from "../channel-tools.js";
@@ -39,6 +40,7 @@ import { ensureCustomApiRegistered } from "../custom-api-registry.js";
import { formatUserTime, resolveUserTimeFormat, resolveUserTimezone } from "../date-time.js";
import { DEFAULT_CONTEXT_TOKENS, DEFAULT_MODEL, DEFAULT_PROVIDER } from "../defaults.js";
import { resolveOpenClawDocsPath } from "../docs-path.js";
import { resolveMemorySearchConfig } from "../memory-search.js";
import { getApiKeyForModel, resolveModelAuthMode } from "../model-auth.js";
import { supportsModelTools } from "../model-tool-support.js";
import { ensureOpenClawModelsJson } from "../models-config.js";
@@ -268,6 +270,95 @@ function classifyCompactionReason(reason?: string): string {
return "unknown";
}
function resolvePostCompactionIndexSyncMode(config?: OpenClawConfig): "off" | "async" | "await" {
const mode = config?.agents?.defaults?.compaction?.postIndexSync;
if (mode === "off" || mode === "async" || mode === "await") {
return mode;
}
return "async";
}
async function runPostCompactionSessionMemorySync(params: {
config?: OpenClawConfig;
sessionKey?: string;
sessionFile: string;
}): Promise<void> {
if (!params.config) {
return;
}
try {
const sessionFile = params.sessionFile.trim();
if (!sessionFile) {
return;
}
const agentId = resolveSessionAgentId({
sessionKey: params.sessionKey,
config: params.config,
});
const resolvedMemory = resolveMemorySearchConfig(params.config, agentId);
if (!resolvedMemory || !resolvedMemory.sources.includes("sessions")) {
return;
}
if (!resolvedMemory.sync.sessions.postCompactionForce) {
return;
}
const { manager } = await getMemorySearchManager({
cfg: params.config,
agentId,
});
if (!manager?.sync) {
return;
}
const syncTask = manager.sync({
reason: "post-compaction",
sessionFiles: [sessionFile],
});
await syncTask;
} catch (err) {
log.warn(`memory sync skipped (post-compaction): ${String(err)}`);
}
}
function syncPostCompactionSessionMemory(params: {
config?: OpenClawConfig;
sessionKey?: string;
sessionFile: string;
mode: "off" | "async" | "await";
}): Promise<void> {
if (params.mode === "off" || !params.config) {
return Promise.resolve();
}
const syncTask = runPostCompactionSessionMemorySync({
config: params.config,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
});
if (params.mode === "await") {
return syncTask;
}
void syncTask;
return Promise.resolve();
}
async function runPostCompactionSideEffects(params: {
config?: OpenClawConfig;
sessionKey?: string;
sessionFile: string;
}): Promise<void> {
const sessionFile = params.sessionFile.trim();
if (!sessionFile) {
return;
}
emitSessionTranscriptUpdate(sessionFile);
await syncPostCompactionSessionMemory({
config: params.config,
sessionKey: params.sessionKey,
sessionFile,
mode: resolvePostCompactionIndexSyncMode(params.config),
});
}
/**
* Core compaction logic without lane queueing.
* Use this when already inside a session/global lane to avoid deadlocks.
@@ -809,7 +900,11 @@ export async function compactEmbeddedPiSessionDirect(
const result = await compactWithSafetyTimeout(() =>
session.compact(params.customInstructions),
);
emitSessionTranscriptUpdate(params.sessionFile);
await runPostCompactionSideEffects({
config: params.config,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
});
// Estimate tokens after compaction by summing token estimates for remaining messages
let tokensAfter: number | undefined;
try {
@@ -999,6 +1094,13 @@ export async function compactEmbeddedPiSession(
force: params.trigger === "manual",
runtimeContext: params as Record<string, unknown>,
});
if (engineOwnsCompaction && result.ok && result.compacted) {
await runPostCompactionSideEffects({
config: params.config,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
});
}
if (result.ok && result.compacted && hookRunner?.hasHooks("after_compaction")) {
try {
await hookRunner.runAfterCompaction(