Memory Sync: queue targeted follow-up refresh

This commit is contained in:
Rodrigo Uroz
2026-03-12 20:19:09 +00:00
committed by Josh Lehman
parent eb4779be00
commit 8bcedd3475
4 changed files with 136 additions and 2 deletions

View File

@@ -662,6 +662,102 @@ describe("memory index", () => {
}
});
it("queues targeted session sync when another sync is already in progress", async () => {
const stateDir = path.join(fixtureRoot, `state-targeted-queued-${randomUUID()}`);
const sessionDir = path.join(stateDir, "agents", "main", "sessions");
const sessionPath = path.join(sessionDir, "targeted-queued.jsonl");
const storePath = path.join(workspaceDir, `index-targeted-queued-${randomUUID()}.sqlite`);
const previousStateDir = process.env.OPENCLAW_STATE_DIR;
process.env.OPENCLAW_STATE_DIR = stateDir;
await fs.mkdir(sessionDir, { recursive: true });
await fs.writeFile(
sessionPath,
`${JSON.stringify({
type: "message",
message: { role: "user", content: [{ type: "text", text: "queued transcript v1" }] },
})}\n`,
);
try {
const manager = requireManager(
await getMemorySearchManager({
cfg: createCfg({
storePath,
sources: ["sessions"],
sessionMemory: true,
}),
agentId: "main",
}),
);
await manager.sync({ reason: "test" });
const db = (
manager as unknown as {
db: {
prepare: (sql: string) => {
get: (path: string, source: string) => { hash: string } | undefined;
};
};
}
).db;
const getSessionHash = (sessionRelPath: string) =>
db
.prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`)
.get(sessionRelPath, "sessions")?.hash;
const originalHash = getSessionHash("sessions/targeted-queued.jsonl");
const internal = manager as unknown as {
runSyncWithReadonlyRecovery: (params?: {
reason?: string;
sessionFiles?: string[];
}) => Promise<void>;
};
const originalRunSync = internal.runSyncWithReadonlyRecovery.bind(manager);
let releaseBusySync: (() => void) | undefined;
const busyGate = new Promise<void>((resolve) => {
releaseBusySync = resolve;
});
internal.runSyncWithReadonlyRecovery = async (params) => {
if (params?.reason === "busy-sync") {
await busyGate;
}
return await originalRunSync(params);
};
const busySyncPromise = manager.sync({ reason: "busy-sync" });
await fs.writeFile(
sessionPath,
`${JSON.stringify({
type: "message",
message: {
role: "user",
content: [{ type: "text", text: "queued transcript v2 after compaction" }],
},
})}\n`,
);
const targetedSyncPromise = manager.sync({
reason: "post-compaction",
sessionFiles: [sessionPath],
});
releaseBusySync?.();
await Promise.all([busySyncPromise, targetedSyncPromise]);
expect(getSessionHash("sessions/targeted-queued.jsonl")).not.toBe(originalHash);
await manager.close?.();
} finally {
if (previousStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = previousStateDir;
}
await fs.rm(stateDir, { recursive: true, force: true });
await fs.rm(storePath, { force: true });
}
});
it("reindexes when the embedding model changes", async () => {
const base = createCfg({ storePath: indexModelPath });
const baseAgents = base.agents!;

View File

@@ -671,13 +671,13 @@ export abstract class MemoryManagerSyncOps {
}
private shouldSyncSessions(
params?: { reason?: string; force?: boolean; forceSessions?: boolean },
params?: { reason?: string; force?: boolean; sessionFiles?: string[] },
needsFullReindex = false,
) {
if (!this.sources.has("sessions")) {
return false;
}
if (params?.forceSessions) {
if (params?.sessionFiles?.some((sessionFile) => sessionFile.trim().length > 0)) {
return true;
}
if (params?.force) {

View File

@@ -125,6 +125,8 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem
>();
private sessionWarm = new Set<string>();
private syncing: Promise<void> | null = null;
private queuedSessionFiles = new Set<string>();
private queuedSessionSync: Promise<void> | null = null;
private readonlyRecoveryAttempts = 0;
private readonlyRecoverySuccesses = 0;
private readonlyRecoveryFailures = 0;
@@ -459,6 +461,9 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem
return;
}
if (this.syncing) {
if (params?.sessionFiles?.some((sessionFile) => sessionFile.trim().length > 0)) {
return this.enqueueTargetedSessionSync(params.sessionFiles);
}
return this.syncing;
}
this.syncing = this.runSyncWithReadonlyRecovery(params).finally(() => {
@@ -467,6 +472,36 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem
return this.syncing ?? Promise.resolve();
}
private enqueueTargetedSessionSync(sessionFiles?: string[]): Promise<void> {
for (const sessionFile of sessionFiles ?? []) {
const trimmed = sessionFile.trim();
if (trimmed) {
this.queuedSessionFiles.add(trimmed);
}
}
if (this.queuedSessionFiles.size === 0) {
return this.syncing ?? Promise.resolve();
}
if (!this.queuedSessionSync) {
this.queuedSessionSync = (async () => {
try {
await this.syncing?.catch(() => undefined);
while (!this.closed && this.queuedSessionFiles.size > 0) {
const queuedSessionFiles = Array.from(this.queuedSessionFiles);
this.queuedSessionFiles.clear();
await this.sync({
reason: "queued-session-files",
sessionFiles: queuedSessionFiles,
});
}
} finally {
this.queuedSessionSync = null;
}
})();
}
return this.queuedSessionSync;
}
private isReadonlyDbError(err: unknown): boolean {
const readonlyPattern =
/attempt to write a readonly database|database is read-only|SQLITE_READONLY/i;

View File

@@ -870,6 +870,9 @@ export class QmdMemoryManager implements MemorySearchManager {
sessionFiles?: string[];
progress?: (update: MemorySyncProgressUpdate) => void;
}): Promise<void> {
if (params?.sessionFiles?.some((sessionFile) => sessionFile.trim().length > 0)) {
log.debug("qmd sync ignoring targeted sessionFiles hint; running regular update");
}
if (params?.progress) {
params.progress({ completed: 0, total: 1, label: "Updating QMD index…" });
}