From 1a1a17b1ef10844f720e6ac61693e509dfeeb9d6 Mon Sep 17 00:00:00 2001 From: Rodrigo Uroz Date: Thu, 12 Mar 2026 19:20:53 +0000 Subject: [PATCH] Memory Sync: narrow post-compaction session refresh --- .../pi-embedded-runner/compact.hooks.test.ts | 51 ++++++-- src/agents/pi-embedded-runner/compact.ts | 18 ++- src/memory/index.test.ts | 111 +++++++++++++++++- src/memory/manager-sync-ops.ts | 78 +++++++----- 4 files changed, 218 insertions(+), 40 deletions(-) diff --git a/src/agents/pi-embedded-runner/compact.hooks.test.ts b/src/agents/pi-embedded-runner/compact.hooks.test.ts index e05c10c3f36..3e59f14af35 100644 --- a/src/agents/pi-embedded-runner/compact.hooks.test.ts +++ b/src/agents/pi-embedded-runner/compact.hooks.test.ts @@ -501,7 +501,7 @@ describe("compactEmbeddedPiSessionDirect hooks", () => { } }); - it("awaits post-compaction memory sync with the resolved force flag", async () => { + it("skips sync in await mode when postCompactionForce is false", async () => { const sync = vi.fn(async () => {}); getMemorySearchManagerMock.mockResolvedValue({ manager: { sync } }); resolveMemorySearchConfigMock.mockReturnValue({ @@ -535,11 +535,50 @@ describe("compactEmbeddedPiSessionDirect hooks", () => { sessionKey: "agent:main:session-1", config: expect.any(Object), }); - expect(sync).toHaveBeenCalledWith({ - reason: "post-compaction", - force: false, - sessionFiles: ["/tmp/session.jsonl"], + expect(getMemorySearchManagerMock).not.toHaveBeenCalled(); + expect(sync).not.toHaveBeenCalled(); + }); + + it("awaits post-compaction memory sync in await mode when postCompactionForce is true", async () => { + let releaseSync: (() => void) | undefined; + const syncGate = new Promise((resolve) => { + releaseSync = resolve; }); + const sync = vi.fn(() => syncGate); + getMemorySearchManagerMock.mockResolvedValue({ manager: { sync } }); + let settled = false; + + const resultPromise = compactEmbeddedPiSessionDirect({ + sessionId: "session-1", + sessionKey: "agent:main:session-1", + sessionFile: "/tmp/session.jsonl", + workspaceDir: "/tmp", + customInstructions: "focus on decisions", + config: { + agents: { + defaults: { + compaction: { + postIndexSync: "await", + }, + }, + }, + } as never, + }); + + void resultPromise.then(() => { + settled = true; + }); + await vi.waitFor(() => { + expect(sync).toHaveBeenCalledWith({ + reason: "post-compaction", + sessionFiles: ["/tmp/session.jsonl"], + }); + }); + expect(settled).toBe(false); + releaseSync?.(); + const result = await resultPromise; + expect(result.ok).toBe(true); + expect(settled).toBe(true); }); it("skips post-compaction memory sync when the mode is off", async () => { @@ -610,7 +649,6 @@ describe("compactEmbeddedPiSessionDirect hooks", () => { await vi.waitFor(() => { expect(sync).toHaveBeenCalledWith({ reason: "post-compaction", - force: true, sessionFiles: ["/tmp/session.jsonl"], }); }); @@ -747,7 +785,6 @@ describe("compactEmbeddedPiSession hooks (ownsCompaction engine)", () => { expect(listener).toHaveBeenCalledWith({ sessionFile: "/tmp/session.jsonl" }); expect(sync).toHaveBeenCalledWith({ reason: "post-compaction", - force: true, sessionFiles: ["/tmp/session.jsonl"], }); } finally { diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index 0d394d1b297..1207a0c3b0b 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -287,6 +287,10 @@ async function runPostCompactionSessionMemorySync(params: { return; } try { + const sessionFile = params.sessionFile.trim(); + if (!sessionFile) { + return; + } const agentId = resolveSessionAgentId({ sessionKey: params.sessionKey, config: params.config, @@ -295,6 +299,9 @@ async function runPostCompactionSessionMemorySync(params: { if (!resolvedMemory || !resolvedMemory.sources.includes("sessions")) { return; } + if (!resolvedMemory.sync.sessions.postCompactionForce) { + return; + } const { manager } = await getMemorySearchManager({ cfg: params.config, agentId, @@ -304,8 +311,7 @@ async function runPostCompactionSessionMemorySync(params: { } const syncTask = manager.sync({ reason: "post-compaction", - force: resolvedMemory.sync.sessions.postCompactionForce, - sessionFiles: [params.sessionFile.trim()], + sessionFiles: [sessionFile], }); await syncTask; } catch (err) { @@ -340,11 +346,15 @@ async function runPostCompactionSideEffects(params: { sessionKey?: string; sessionFile: string; }): Promise { - emitSessionTranscriptUpdate(params.sessionFile); + const sessionFile = params.sessionFile.trim(); + if (!sessionFile) { + return; + } + emitSessionTranscriptUpdate(sessionFile); await syncPostCompactionSessionMemory({ config: params.config, sessionKey: params.sessionKey, - sessionFile: params.sessionFile, + sessionFile, mode: resolvePostCompactionIndexSyncMode(params.config), }); } diff --git a/src/memory/index.test.ts b/src/memory/index.test.ts index 96aa58ca694..30a5d170af0 100644 --- a/src/memory/index.test.ts +++ b/src/memory/index.test.ts @@ -461,7 +461,7 @@ describe("memory index", () => { } }); - it("targets explicit session files during forced sync", async () => { + it("targets explicit session files during post-compaction sync", async () => { const stateDir = path.join(fixtureRoot, `state-targeted-${randomUUID()}`); const sessionDir = path.join(stateDir, "agents", "main", "sessions"); const firstSessionPath = path.join(sessionDir, "targeted-first.jsonl"); @@ -538,7 +538,6 @@ describe("memory index", () => { await manager.sync?.({ reason: "post-compaction", - force: true, sessionFiles: [firstSessionPath], }); @@ -555,6 +554,114 @@ describe("memory index", () => { } }); + it("preserves unrelated dirty sessions after targeted post-compaction sync", async () => { + const stateDir = path.join(fixtureRoot, `state-targeted-dirty-${randomUUID()}`); + const sessionDir = path.join(stateDir, "agents", "main", "sessions"); + const firstSessionPath = path.join(sessionDir, "targeted-dirty-first.jsonl"); + const secondSessionPath = path.join(sessionDir, "targeted-dirty-second.jsonl"); + const storePath = path.join(workspaceDir, `index-targeted-dirty-${randomUUID()}.sqlite`); + const previousStateDir = process.env.OPENCLAW_STATE_DIR; + process.env.OPENCLAW_STATE_DIR = stateDir; + + await fs.mkdir(sessionDir, { recursive: true }); + await fs.writeFile( + firstSessionPath, + `${JSON.stringify({ + type: "message", + message: { role: "user", content: [{ type: "text", text: "first transcript v1" }] }, + })}\n`, + ); + await fs.writeFile( + secondSessionPath, + `${JSON.stringify({ + type: "message", + message: { role: "user", content: [{ type: "text", text: "second transcript v1" }] }, + })}\n`, + ); + + try { + const manager = requireManager( + await getMemorySearchManager({ + cfg: createCfg({ + storePath, + sources: ["sessions"], + sessionMemory: true, + }), + agentId: "main", + }), + ); + await manager.sync({ reason: "test" }); + + const db = ( + manager as unknown as { + db: { + prepare: (sql: string) => { + get: (path: string, source: string) => { hash: string } | undefined; + }; + }; + } + ).db; + const getSessionHash = (sessionPath: string) => + db + .prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`) + .get(sessionPath, "sessions")?.hash; + + const firstOriginalHash = getSessionHash("sessions/targeted-dirty-first.jsonl"); + const secondOriginalHash = getSessionHash("sessions/targeted-dirty-second.jsonl"); + + await fs.writeFile( + firstSessionPath, + `${JSON.stringify({ + type: "message", + message: { + role: "user", + content: [{ type: "text", text: "first transcript v2 after compaction" }], + }, + })}\n`, + ); + await fs.writeFile( + secondSessionPath, + `${JSON.stringify({ + type: "message", + message: { + role: "user", + content: [{ type: "text", text: "second transcript v2 still pending" }], + }, + })}\n`, + ); + + const internal = manager as unknown as { + sessionsDirty: boolean; + sessionsDirtyFiles: Set; + }; + internal.sessionsDirty = true; + internal.sessionsDirtyFiles.add(secondSessionPath); + + await manager.sync({ + reason: "post-compaction", + sessionFiles: [firstSessionPath], + }); + + expect(getSessionHash("sessions/targeted-dirty-first.jsonl")).not.toBe(firstOriginalHash); + expect(getSessionHash("sessions/targeted-dirty-second.jsonl")).toBe(secondOriginalHash); + expect(internal.sessionsDirtyFiles.has(secondSessionPath)).toBe(true); + expect(internal.sessionsDirty).toBe(true); + + await manager.sync({ reason: "test" }); + + expect(getSessionHash("sessions/targeted-dirty-second.jsonl")).not.toBe(secondOriginalHash); + await manager.close?.(); + } finally { + if (previousStateDir === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = previousStateDir; + } + await fs.rm(stateDir, { recursive: true, force: true }); + await fs.rm(storePath, { force: true }); + } + }); + it("reindexes when the embedding model changes", async () => { const base = createCfg({ storePath: indexModelPath }); const baseAgents = base.agents!; diff --git a/src/memory/manager-sync-ops.ts b/src/memory/manager-sync-ops.ts index 273b6af16fc..b51387de9d4 100644 --- a/src/memory/manager-sync-ops.ts +++ b/src/memory/manager-sync-ops.ts @@ -151,7 +151,8 @@ export abstract class MemoryManagerSyncOps { protected abstract sync(params?: { reason?: string; force?: boolean; - sessionFiles?: string[]; + forceSessions?: boolean; + sessionFile?: string; progress?: (update: MemorySyncProgressUpdate) => void; }): Promise; protected abstract withTimeout( @@ -612,22 +613,18 @@ export abstract class MemoryManagerSyncOps { return resolvedFile.startsWith(`${resolvedDir}${path.sep}`); } - private normalizeTargetSessionFiles(sessionFiles?: string[]): Set | null { - if (!sessionFiles || sessionFiles.length === 0) { - return null; + private normalizeTargetSessionFile(sessionFile?: string): string | undefined { + const trimmed = sessionFile?.trim(); + if (!trimmed) { + return undefined; } - const normalized = new Set(); - for (const sessionFile of sessionFiles) { - const trimmed = sessionFile.trim(); - if (!trimmed) { - continue; - } - const resolved = path.resolve(trimmed); - if (this.isSessionFileForAgent(resolved)) { - normalized.add(resolved); - } - } - return normalized.size > 0 ? normalized : null; + const resolved = path.resolve(trimmed); + return this.isSessionFileForAgent(resolved) ? resolved : undefined; + } + + private clearSyncedSessionFile(targetSessionFile: string) { + this.sessionsDirtyFiles.delete(targetSessionFile); + this.sessionsDirty = this.sessionsDirtyFiles.size > 0; } protected ensureIntervalSync() { @@ -659,13 +656,13 @@ export abstract class MemoryManagerSyncOps { } private shouldSyncSessions( - params?: { reason?: string; force?: boolean; sessionFiles?: string[] }, + params?: { reason?: string; force?: boolean; forceSessions?: boolean }, needsFullReindex = false, ) { if (!this.sources.has("sessions")) { return false; } - if (params?.sessionFiles?.some((sessionFile) => sessionFile.trim().length > 0)) { + if (params?.forceSessions) { return true; } if (params?.force) { @@ -774,7 +771,7 @@ export abstract class MemoryManagerSyncOps { private async syncSessionFiles(params: { needsFullReindex: boolean; - targetSessionFiles?: string[]; + targetSessionFile?: string; progress?: MemorySyncProgressState; }) { // FTS-only mode: skip embedding sync (no provider) @@ -783,22 +780,22 @@ export abstract class MemoryManagerSyncOps { return; } - const targetSessionFiles = params.needsFullReindex - ? null - : this.normalizeTargetSessionFiles(params.targetSessionFiles); - const files = targetSessionFiles - ? Array.from(targetSessionFiles) + const targetSessionFile = params.needsFullReindex + ? undefined + : this.normalizeTargetSessionFile(params.targetSessionFile); + const files = targetSessionFile + ? [targetSessionFile] : await listSessionFilesForAgent(this.agentId); - const activePaths = targetSessionFiles + const activePaths = targetSessionFile ? null : new Set(files.map((file) => sessionPathForFile(file))); const indexAll = - params.needsFullReindex || targetSessionFiles !== null || this.sessionsDirtyFiles.size === 0; + params.needsFullReindex || Boolean(targetSessionFile) || this.sessionsDirtyFiles.size === 0; log.debug("memory sync: indexing session files", { files: files.length, indexAll, dirtyFiles: this.sessionsDirtyFiles.size, - targetedFiles: targetSessionFiles?.size ?? 0, + targetedFiles: targetSessionFile ? 1 : 0, batch: this.batch.enabled, concurrency: this.getIndexConcurrency(), }); @@ -940,6 +937,33 @@ export abstract class MemoryManagerSyncOps { const configuredScopeHash = this.resolveConfiguredScopeHash(); const targetSessionFiles = this.normalizeTargetSessionFiles(params?.sessionFiles); const hasTargetSessionFiles = targetSessionFiles !== null; + if (hasTargetSessionFiles && targetSessionFiles && this.sources.has("sessions")) { + // Post-compaction refreshes should only update the explicit transcript files and + // leave broader reindex/dirty-work decisions to the regular sync path. + try { + await this.syncSessionFiles({ + needsFullReindex: false, + targetSessionFiles: Array.from(targetSessionFiles), + progress: progress ?? undefined, + }); + this.clearSyncedSessionFiles(targetSessionFiles); + } catch (err) { + const reason = err instanceof Error ? err.message : String(err); + const activated = + this.shouldFallbackOnError(reason) && (await this.activateFallbackProvider(reason)); + if (activated) { + await this.syncSessionFiles({ + needsFullReindex: false, + targetSessionFiles: Array.from(targetSessionFiles), + progress: progress ?? undefined, + }); + this.clearSyncedSessionFiles(targetSessionFiles); + return; + } + throw err; + } + return; + } const needsFullReindex = (params?.force && !hasTargetSessionFiles) || !meta ||