Memory Sync: narrow post-compaction session refresh

This commit is contained in:
Rodrigo Uroz
2026-03-12 19:20:53 +00:00
committed by Josh Lehman
parent a70946a68c
commit 1a1a17b1ef
4 changed files with 218 additions and 40 deletions

View File

@@ -501,7 +501,7 @@ describe("compactEmbeddedPiSessionDirect hooks", () => {
}
});
it("awaits post-compaction memory sync with the resolved force flag", async () => {
it("skips sync in await mode when postCompactionForce is false", async () => {
const sync = vi.fn(async () => {});
getMemorySearchManagerMock.mockResolvedValue({ manager: { sync } });
resolveMemorySearchConfigMock.mockReturnValue({
@@ -535,11 +535,50 @@ describe("compactEmbeddedPiSessionDirect hooks", () => {
sessionKey: "agent:main:session-1",
config: expect.any(Object),
});
expect(sync).toHaveBeenCalledWith({
reason: "post-compaction",
force: false,
sessionFiles: ["/tmp/session.jsonl"],
expect(getMemorySearchManagerMock).not.toHaveBeenCalled();
expect(sync).not.toHaveBeenCalled();
});
it("awaits post-compaction memory sync in await mode when postCompactionForce is true", async () => {
let releaseSync: (() => void) | undefined;
const syncGate = new Promise<void>((resolve) => {
releaseSync = resolve;
});
const sync = vi.fn(() => syncGate);
getMemorySearchManagerMock.mockResolvedValue({ manager: { sync } });
let settled = false;
const resultPromise = compactEmbeddedPiSessionDirect({
sessionId: "session-1",
sessionKey: "agent:main:session-1",
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp",
customInstructions: "focus on decisions",
config: {
agents: {
defaults: {
compaction: {
postIndexSync: "await",
},
},
},
} as never,
});
void resultPromise.then(() => {
settled = true;
});
await vi.waitFor(() => {
expect(sync).toHaveBeenCalledWith({
reason: "post-compaction",
sessionFiles: ["/tmp/session.jsonl"],
});
});
expect(settled).toBe(false);
releaseSync?.();
const result = await resultPromise;
expect(result.ok).toBe(true);
expect(settled).toBe(true);
});
it("skips post-compaction memory sync when the mode is off", async () => {
@@ -610,7 +649,6 @@ describe("compactEmbeddedPiSessionDirect hooks", () => {
await vi.waitFor(() => {
expect(sync).toHaveBeenCalledWith({
reason: "post-compaction",
force: true,
sessionFiles: ["/tmp/session.jsonl"],
});
});
@@ -747,7 +785,6 @@ describe("compactEmbeddedPiSession hooks (ownsCompaction engine)", () => {
expect(listener).toHaveBeenCalledWith({ sessionFile: "/tmp/session.jsonl" });
expect(sync).toHaveBeenCalledWith({
reason: "post-compaction",
force: true,
sessionFiles: ["/tmp/session.jsonl"],
});
} finally {

View File

@@ -287,6 +287,10 @@ async function runPostCompactionSessionMemorySync(params: {
return;
}
try {
const sessionFile = params.sessionFile.trim();
if (!sessionFile) {
return;
}
const agentId = resolveSessionAgentId({
sessionKey: params.sessionKey,
config: params.config,
@@ -295,6 +299,9 @@ async function runPostCompactionSessionMemorySync(params: {
if (!resolvedMemory || !resolvedMemory.sources.includes("sessions")) {
return;
}
if (!resolvedMemory.sync.sessions.postCompactionForce) {
return;
}
const { manager } = await getMemorySearchManager({
cfg: params.config,
agentId,
@@ -304,8 +311,7 @@ async function runPostCompactionSessionMemorySync(params: {
}
const syncTask = manager.sync({
reason: "post-compaction",
force: resolvedMemory.sync.sessions.postCompactionForce,
sessionFiles: [params.sessionFile.trim()],
sessionFiles: [sessionFile],
});
await syncTask;
} catch (err) {
@@ -340,11 +346,15 @@ async function runPostCompactionSideEffects(params: {
sessionKey?: string;
sessionFile: string;
}): Promise<void> {
emitSessionTranscriptUpdate(params.sessionFile);
const sessionFile = params.sessionFile.trim();
if (!sessionFile) {
return;
}
emitSessionTranscriptUpdate(sessionFile);
await syncPostCompactionSessionMemory({
config: params.config,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
sessionFile,
mode: resolvePostCompactionIndexSyncMode(params.config),
});
}

View File

@@ -461,7 +461,7 @@ describe("memory index", () => {
}
});
it("targets explicit session files during forced sync", async () => {
it("targets explicit session files during post-compaction sync", async () => {
const stateDir = path.join(fixtureRoot, `state-targeted-${randomUUID()}`);
const sessionDir = path.join(stateDir, "agents", "main", "sessions");
const firstSessionPath = path.join(sessionDir, "targeted-first.jsonl");
@@ -538,7 +538,6 @@ describe("memory index", () => {
await manager.sync?.({
reason: "post-compaction",
force: true,
sessionFiles: [firstSessionPath],
});
@@ -555,6 +554,114 @@ describe("memory index", () => {
}
});
it("preserves unrelated dirty sessions after targeted post-compaction sync", async () => {
const stateDir = path.join(fixtureRoot, `state-targeted-dirty-${randomUUID()}`);
const sessionDir = path.join(stateDir, "agents", "main", "sessions");
const firstSessionPath = path.join(sessionDir, "targeted-dirty-first.jsonl");
const secondSessionPath = path.join(sessionDir, "targeted-dirty-second.jsonl");
const storePath = path.join(workspaceDir, `index-targeted-dirty-${randomUUID()}.sqlite`);
const previousStateDir = process.env.OPENCLAW_STATE_DIR;
process.env.OPENCLAW_STATE_DIR = stateDir;
await fs.mkdir(sessionDir, { recursive: true });
await fs.writeFile(
firstSessionPath,
`${JSON.stringify({
type: "message",
message: { role: "user", content: [{ type: "text", text: "first transcript v1" }] },
})}\n`,
);
await fs.writeFile(
secondSessionPath,
`${JSON.stringify({
type: "message",
message: { role: "user", content: [{ type: "text", text: "second transcript v1" }] },
})}\n`,
);
try {
const manager = requireManager(
await getMemorySearchManager({
cfg: createCfg({
storePath,
sources: ["sessions"],
sessionMemory: true,
}),
agentId: "main",
}),
);
await manager.sync({ reason: "test" });
const db = (
manager as unknown as {
db: {
prepare: (sql: string) => {
get: (path: string, source: string) => { hash: string } | undefined;
};
};
}
).db;
const getSessionHash = (sessionPath: string) =>
db
.prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`)
.get(sessionPath, "sessions")?.hash;
const firstOriginalHash = getSessionHash("sessions/targeted-dirty-first.jsonl");
const secondOriginalHash = getSessionHash("sessions/targeted-dirty-second.jsonl");
await fs.writeFile(
firstSessionPath,
`${JSON.stringify({
type: "message",
message: {
role: "user",
content: [{ type: "text", text: "first transcript v2 after compaction" }],
},
})}\n`,
);
await fs.writeFile(
secondSessionPath,
`${JSON.stringify({
type: "message",
message: {
role: "user",
content: [{ type: "text", text: "second transcript v2 still pending" }],
},
})}\n`,
);
const internal = manager as unknown as {
sessionsDirty: boolean;
sessionsDirtyFiles: Set<string>;
};
internal.sessionsDirty = true;
internal.sessionsDirtyFiles.add(secondSessionPath);
await manager.sync({
reason: "post-compaction",
sessionFiles: [firstSessionPath],
});
expect(getSessionHash("sessions/targeted-dirty-first.jsonl")).not.toBe(firstOriginalHash);
expect(getSessionHash("sessions/targeted-dirty-second.jsonl")).toBe(secondOriginalHash);
expect(internal.sessionsDirtyFiles.has(secondSessionPath)).toBe(true);
expect(internal.sessionsDirty).toBe(true);
await manager.sync({ reason: "test" });
expect(getSessionHash("sessions/targeted-dirty-second.jsonl")).not.toBe(secondOriginalHash);
await manager.close?.();
} finally {
if (previousStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = previousStateDir;
}
await fs.rm(stateDir, { recursive: true, force: true });
await fs.rm(storePath, { force: true });
}
});
it("reindexes when the embedding model changes", async () => {
const base = createCfg({ storePath: indexModelPath });
const baseAgents = base.agents!;

View File

@@ -151,7 +151,8 @@ export abstract class MemoryManagerSyncOps {
protected abstract sync(params?: {
reason?: string;
force?: boolean;
sessionFiles?: string[];
forceSessions?: boolean;
sessionFile?: string;
progress?: (update: MemorySyncProgressUpdate) => void;
}): Promise<void>;
protected abstract withTimeout<T>(
@@ -612,22 +613,18 @@ export abstract class MemoryManagerSyncOps {
return resolvedFile.startsWith(`${resolvedDir}${path.sep}`);
}
private normalizeTargetSessionFiles(sessionFiles?: string[]): Set<string> | null {
if (!sessionFiles || sessionFiles.length === 0) {
return null;
private normalizeTargetSessionFile(sessionFile?: string): string | undefined {
const trimmed = sessionFile?.trim();
if (!trimmed) {
return undefined;
}
const normalized = new Set<string>();
for (const sessionFile of sessionFiles) {
const trimmed = sessionFile.trim();
if (!trimmed) {
continue;
}
const resolved = path.resolve(trimmed);
if (this.isSessionFileForAgent(resolved)) {
normalized.add(resolved);
}
}
return normalized.size > 0 ? normalized : null;
const resolved = path.resolve(trimmed);
return this.isSessionFileForAgent(resolved) ? resolved : undefined;
}
private clearSyncedSessionFile(targetSessionFile: string) {
this.sessionsDirtyFiles.delete(targetSessionFile);
this.sessionsDirty = this.sessionsDirtyFiles.size > 0;
}
protected ensureIntervalSync() {
@@ -659,13 +656,13 @@ export abstract class MemoryManagerSyncOps {
}
private shouldSyncSessions(
params?: { reason?: string; force?: boolean; sessionFiles?: string[] },
params?: { reason?: string; force?: boolean; forceSessions?: boolean },
needsFullReindex = false,
) {
if (!this.sources.has("sessions")) {
return false;
}
if (params?.sessionFiles?.some((sessionFile) => sessionFile.trim().length > 0)) {
if (params?.forceSessions) {
return true;
}
if (params?.force) {
@@ -774,7 +771,7 @@ export abstract class MemoryManagerSyncOps {
private async syncSessionFiles(params: {
needsFullReindex: boolean;
targetSessionFiles?: string[];
targetSessionFile?: string;
progress?: MemorySyncProgressState;
}) {
// FTS-only mode: skip embedding sync (no provider)
@@ -783,22 +780,22 @@ export abstract class MemoryManagerSyncOps {
return;
}
const targetSessionFiles = params.needsFullReindex
? null
: this.normalizeTargetSessionFiles(params.targetSessionFiles);
const files = targetSessionFiles
? Array.from(targetSessionFiles)
const targetSessionFile = params.needsFullReindex
? undefined
: this.normalizeTargetSessionFile(params.targetSessionFile);
const files = targetSessionFile
? [targetSessionFile]
: await listSessionFilesForAgent(this.agentId);
const activePaths = targetSessionFiles
const activePaths = targetSessionFile
? null
: new Set(files.map((file) => sessionPathForFile(file)));
const indexAll =
params.needsFullReindex || targetSessionFiles !== null || this.sessionsDirtyFiles.size === 0;
params.needsFullReindex || Boolean(targetSessionFile) || this.sessionsDirtyFiles.size === 0;
log.debug("memory sync: indexing session files", {
files: files.length,
indexAll,
dirtyFiles: this.sessionsDirtyFiles.size,
targetedFiles: targetSessionFiles?.size ?? 0,
targetedFiles: targetSessionFile ? 1 : 0,
batch: this.batch.enabled,
concurrency: this.getIndexConcurrency(),
});
@@ -940,6 +937,33 @@ export abstract class MemoryManagerSyncOps {
const configuredScopeHash = this.resolveConfiguredScopeHash();
const targetSessionFiles = this.normalizeTargetSessionFiles(params?.sessionFiles);
const hasTargetSessionFiles = targetSessionFiles !== null;
if (hasTargetSessionFiles && targetSessionFiles && this.sources.has("sessions")) {
// Post-compaction refreshes should only update the explicit transcript files and
// leave broader reindex/dirty-work decisions to the regular sync path.
try {
await this.syncSessionFiles({
needsFullReindex: false,
targetSessionFiles: Array.from(targetSessionFiles),
progress: progress ?? undefined,
});
this.clearSyncedSessionFiles(targetSessionFiles);
} catch (err) {
const reason = err instanceof Error ? err.message : String(err);
const activated =
this.shouldFallbackOnError(reason) && (await this.activateFallbackProvider(reason));
if (activated) {
await this.syncSessionFiles({
needsFullReindex: false,
targetSessionFiles: Array.from(targetSessionFiles),
progress: progress ?? undefined,
});
this.clearSyncedSessionFiles(targetSessionFiles);
return;
}
throw err;
}
return;
}
const needsFullReindex =
(params?.force && !hasTargetSessionFiles) ||
!meta ||