From 8bcedd3475fed8ba63bf31ef6e7e9ad256131338 Mon Sep 17 00:00:00 2001 From: Rodrigo Uroz Date: Thu, 12 Mar 2026 20:19:09 +0000 Subject: [PATCH] Memory Sync: queue targeted follow-up refresh --- src/memory/index.test.ts | 96 ++++++++++++++++++++++++++++++++++ src/memory/manager-sync-ops.ts | 4 +- src/memory/manager.ts | 35 +++++++++++++ src/memory/qmd-manager.ts | 3 ++ 4 files changed, 136 insertions(+), 2 deletions(-) diff --git a/src/memory/index.test.ts b/src/memory/index.test.ts index 30a5d170af0..15448c3ba64 100644 --- a/src/memory/index.test.ts +++ b/src/memory/index.test.ts @@ -662,6 +662,102 @@ describe("memory index", () => { } }); + it("queues targeted session sync when another sync is already in progress", async () => { + const stateDir = path.join(fixtureRoot, `state-targeted-queued-${randomUUID()}`); + const sessionDir = path.join(stateDir, "agents", "main", "sessions"); + const sessionPath = path.join(sessionDir, "targeted-queued.jsonl"); + const storePath = path.join(workspaceDir, `index-targeted-queued-${randomUUID()}.sqlite`); + const previousStateDir = process.env.OPENCLAW_STATE_DIR; + process.env.OPENCLAW_STATE_DIR = stateDir; + + await fs.mkdir(sessionDir, { recursive: true }); + await fs.writeFile( + sessionPath, + `${JSON.stringify({ + type: "message", + message: { role: "user", content: [{ type: "text", text: "queued transcript v1" }] }, + })}\n`, + ); + + try { + const manager = requireManager( + await getMemorySearchManager({ + cfg: createCfg({ + storePath, + sources: ["sessions"], + sessionMemory: true, + }), + agentId: "main", + }), + ); + await manager.sync({ reason: "test" }); + + const db = ( + manager as unknown as { + db: { + prepare: (sql: string) => { + get: (path: string, source: string) => { hash: string } | undefined; + }; + }; + } + ).db; + const getSessionHash = (sessionRelPath: string) => + db + .prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`) + .get(sessionRelPath, "sessions")?.hash; + const originalHash = getSessionHash("sessions/targeted-queued.jsonl"); + + const internal = manager as unknown as { + runSyncWithReadonlyRecovery: (params?: { + reason?: string; + sessionFiles?: string[]; + }) => Promise; + }; + const originalRunSync = internal.runSyncWithReadonlyRecovery.bind(manager); + let releaseBusySync: (() => void) | undefined; + const busyGate = new Promise((resolve) => { + releaseBusySync = resolve; + }); + internal.runSyncWithReadonlyRecovery = async (params) => { + if (params?.reason === "busy-sync") { + await busyGate; + } + return await originalRunSync(params); + }; + + const busySyncPromise = manager.sync({ reason: "busy-sync" }); + await fs.writeFile( + sessionPath, + `${JSON.stringify({ + type: "message", + message: { + role: "user", + content: [{ type: "text", text: "queued transcript v2 after compaction" }], + }, + })}\n`, + ); + + const targetedSyncPromise = manager.sync({ + reason: "post-compaction", + sessionFiles: [sessionPath], + }); + + releaseBusySync?.(); + await Promise.all([busySyncPromise, targetedSyncPromise]); + + expect(getSessionHash("sessions/targeted-queued.jsonl")).not.toBe(originalHash); + await manager.close?.(); + } finally { + if (previousStateDir === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = previousStateDir; + } + await fs.rm(stateDir, { recursive: true, force: true }); + await fs.rm(storePath, { force: true }); + } + }); + it("reindexes when the embedding model changes", async () => { const base = createCfg({ storePath: indexModelPath }); const baseAgents = base.agents!; diff --git a/src/memory/manager-sync-ops.ts b/src/memory/manager-sync-ops.ts index a402a0fd1fe..5b03c5d4c74 100644 --- a/src/memory/manager-sync-ops.ts +++ b/src/memory/manager-sync-ops.ts @@ -671,13 +671,13 @@ export abstract class MemoryManagerSyncOps { } private shouldSyncSessions( - params?: { reason?: string; force?: boolean; forceSessions?: boolean }, + params?: { reason?: string; force?: boolean; sessionFiles?: string[] }, needsFullReindex = false, ) { if (!this.sources.has("sessions")) { return false; } - if (params?.forceSessions) { + if (params?.sessionFiles?.some((sessionFile) => sessionFile.trim().length > 0)) { return true; } if (params?.force) { diff --git a/src/memory/manager.ts b/src/memory/manager.ts index 00ac6f0b453..61e2cd71af8 100644 --- a/src/memory/manager.ts +++ b/src/memory/manager.ts @@ -125,6 +125,8 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem >(); private sessionWarm = new Set(); private syncing: Promise | null = null; + private queuedSessionFiles = new Set(); + private queuedSessionSync: Promise | null = null; private readonlyRecoveryAttempts = 0; private readonlyRecoverySuccesses = 0; private readonlyRecoveryFailures = 0; @@ -459,6 +461,9 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem return; } if (this.syncing) { + if (params?.sessionFiles?.some((sessionFile) => sessionFile.trim().length > 0)) { + return this.enqueueTargetedSessionSync(params.sessionFiles); + } return this.syncing; } this.syncing = this.runSyncWithReadonlyRecovery(params).finally(() => { @@ -467,6 +472,36 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem return this.syncing ?? Promise.resolve(); } + private enqueueTargetedSessionSync(sessionFiles?: string[]): Promise { + for (const sessionFile of sessionFiles ?? []) { + const trimmed = sessionFile.trim(); + if (trimmed) { + this.queuedSessionFiles.add(trimmed); + } + } + if (this.queuedSessionFiles.size === 0) { + return this.syncing ?? Promise.resolve(); + } + if (!this.queuedSessionSync) { + this.queuedSessionSync = (async () => { + try { + await this.syncing?.catch(() => undefined); + while (!this.closed && this.queuedSessionFiles.size > 0) { + const queuedSessionFiles = Array.from(this.queuedSessionFiles); + this.queuedSessionFiles.clear(); + await this.sync({ + reason: "queued-session-files", + sessionFiles: queuedSessionFiles, + }); + } + } finally { + this.queuedSessionSync = null; + } + })(); + } + return this.queuedSessionSync; + } + private isReadonlyDbError(err: unknown): boolean { const readonlyPattern = /attempt to write a readonly database|database is read-only|SQLITE_READONLY/i; diff --git a/src/memory/qmd-manager.ts b/src/memory/qmd-manager.ts index 36067ff88a6..986d526e013 100644 --- a/src/memory/qmd-manager.ts +++ b/src/memory/qmd-manager.ts @@ -870,6 +870,9 @@ export class QmdMemoryManager implements MemorySearchManager { sessionFiles?: string[]; progress?: (update: MemorySyncProgressUpdate) => void; }): Promise { + if (params?.sessionFiles?.some((sessionFile) => sessionFile.trim().length > 0)) { + log.debug("qmd sync ignoring targeted sessionFiles hint; running regular update"); + } if (params?.progress) { params.progress({ completed: 0, total: 1, label: "Updating QMD index…" }); }