mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-07 22:09:57 +00:00
Compaction: add post-index sync config and coverage
This commit is contained in:
committed by
Josh Lehman
parent
3b25aeee52
commit
fc7b6103f3
@@ -461,6 +461,97 @@ describe("memory index", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("targets explicit session files during forced sync", async () => {
|
||||
const stateDir = path.join(fixtureRoot, `state-targeted-${randomUUID()}`);
|
||||
const sessionDir = path.join(stateDir, "agents", "main", "sessions");
|
||||
const firstSessionPath = path.join(sessionDir, "targeted-first.jsonl");
|
||||
const secondSessionPath = path.join(sessionDir, "targeted-second.jsonl");
|
||||
const storePath = path.join(workspaceDir, `index-targeted-${randomUUID()}.sqlite`);
|
||||
const previousStateDir = process.env.OPENCLAW_STATE_DIR;
|
||||
process.env.OPENCLAW_STATE_DIR = stateDir;
|
||||
|
||||
await fs.mkdir(sessionDir, { recursive: true });
|
||||
await fs.writeFile(
|
||||
firstSessionPath,
|
||||
`${JSON.stringify({
|
||||
type: "message",
|
||||
message: { role: "user", content: [{ type: "text", text: "first transcript v1" }] },
|
||||
})}\n`,
|
||||
);
|
||||
await fs.writeFile(
|
||||
secondSessionPath,
|
||||
`${JSON.stringify({
|
||||
type: "message",
|
||||
message: { role: "user", content: [{ type: "text", text: "second transcript v1" }] },
|
||||
})}\n`,
|
||||
);
|
||||
|
||||
try {
|
||||
const result = await getMemorySearchManager({
|
||||
cfg: createCfg({
|
||||
storePath,
|
||||
sources: ["sessions"],
|
||||
sessionMemory: true,
|
||||
}),
|
||||
agentId: "main",
|
||||
});
|
||||
const manager = requireManager(result);
|
||||
await manager.sync?.({ reason: "test" });
|
||||
|
||||
const db = (manager as unknown as {
|
||||
db: {
|
||||
prepare: (
|
||||
sql: string,
|
||||
) => { get: (path: string, source: string) => { hash: string } | undefined };
|
||||
};
|
||||
}).db;
|
||||
const getSessionHash = (sessionPath: string) =>
|
||||
db.prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`).get(sessionPath, "sessions")
|
||||
?.hash;
|
||||
|
||||
const firstOriginalHash = getSessionHash("sessions/targeted-first.jsonl");
|
||||
const secondOriginalHash = getSessionHash("sessions/targeted-second.jsonl");
|
||||
|
||||
await fs.writeFile(
|
||||
firstSessionPath,
|
||||
`${JSON.stringify({
|
||||
type: "message",
|
||||
message: {
|
||||
role: "user",
|
||||
content: [{ type: "text", text: "first transcript v2 after compaction" }],
|
||||
},
|
||||
})}\n`,
|
||||
);
|
||||
await fs.writeFile(
|
||||
secondSessionPath,
|
||||
`${JSON.stringify({
|
||||
type: "message",
|
||||
message: {
|
||||
role: "user",
|
||||
content: [{ type: "text", text: "second transcript v2 should stay untouched" }],
|
||||
},
|
||||
})}\n`,
|
||||
);
|
||||
|
||||
await manager.sync?.({
|
||||
reason: "post-compaction",
|
||||
force: true,
|
||||
sessionFiles: [firstSessionPath],
|
||||
});
|
||||
|
||||
expect(getSessionHash("sessions/targeted-first.jsonl")).not.toBe(firstOriginalHash);
|
||||
expect(getSessionHash("sessions/targeted-second.jsonl")).toBe(secondOriginalHash);
|
||||
await manager.close?.();
|
||||
} finally {
|
||||
if (previousStateDir === undefined) {
|
||||
delete process.env.OPENCLAW_STATE_DIR;
|
||||
} else {
|
||||
process.env.OPENCLAW_STATE_DIR = previousStateDir;
|
||||
}
|
||||
await fs.rm(stateDir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it("reindexes when the embedding model changes", async () => {
|
||||
const base = createCfg({ storePath: indexModelPath });
|
||||
const baseAgents = base.agents!;
|
||||
|
||||
@@ -151,6 +151,7 @@ export abstract class MemoryManagerSyncOps {
|
||||
protected abstract sync(params?: {
|
||||
reason?: string;
|
||||
force?: boolean;
|
||||
sessionFiles?: string[];
|
||||
progress?: (update: MemorySyncProgressUpdate) => void;
|
||||
}): Promise<void>;
|
||||
protected abstract withTimeout<T>(
|
||||
@@ -611,6 +612,24 @@ export abstract class MemoryManagerSyncOps {
|
||||
return resolvedFile.startsWith(`${resolvedDir}${path.sep}`);
|
||||
}
|
||||
|
||||
private normalizeTargetSessionFiles(sessionFiles?: string[]): Set<string> | null {
|
||||
if (!sessionFiles || sessionFiles.length === 0) {
|
||||
return null;
|
||||
}
|
||||
const normalized = new Set<string>();
|
||||
for (const sessionFile of sessionFiles) {
|
||||
const trimmed = sessionFile.trim();
|
||||
if (!trimmed) {
|
||||
continue;
|
||||
}
|
||||
const resolved = path.resolve(trimmed);
|
||||
if (this.isSessionFileForAgent(resolved)) {
|
||||
normalized.add(resolved);
|
||||
}
|
||||
}
|
||||
return normalized.size > 0 ? normalized : null;
|
||||
}
|
||||
|
||||
protected ensureIntervalSync() {
|
||||
const minutes = this.settings.sync.intervalMinutes;
|
||||
if (!minutes || minutes <= 0 || this.intervalTimer) {
|
||||
@@ -640,12 +659,15 @@ export abstract class MemoryManagerSyncOps {
|
||||
}
|
||||
|
||||
private shouldSyncSessions(
|
||||
params?: { reason?: string; force?: boolean },
|
||||
params?: { reason?: string; force?: boolean; sessionFiles?: string[] },
|
||||
needsFullReindex = false,
|
||||
) {
|
||||
if (!this.sources.has("sessions")) {
|
||||
return false;
|
||||
}
|
||||
if (params?.sessionFiles?.some((sessionFile) => sessionFile.trim().length > 0)) {
|
||||
return true;
|
||||
}
|
||||
if (params?.force) {
|
||||
return true;
|
||||
}
|
||||
@@ -752,6 +774,7 @@ export abstract class MemoryManagerSyncOps {
|
||||
|
||||
private async syncSessionFiles(params: {
|
||||
needsFullReindex: boolean;
|
||||
targetSessionFiles?: string[];
|
||||
progress?: MemorySyncProgressState;
|
||||
}) {
|
||||
// FTS-only mode: skip embedding sync (no provider)
|
||||
@@ -760,13 +783,22 @@ export abstract class MemoryManagerSyncOps {
|
||||
return;
|
||||
}
|
||||
|
||||
const files = await listSessionFilesForAgent(this.agentId);
|
||||
const activePaths = new Set(files.map((file) => sessionPathForFile(file)));
|
||||
const indexAll = params.needsFullReindex || this.sessionsDirtyFiles.size === 0;
|
||||
const targetSessionFiles = params.needsFullReindex
|
||||
? null
|
||||
: this.normalizeTargetSessionFiles(params.targetSessionFiles);
|
||||
const files = targetSessionFiles
|
||||
? Array.from(targetSessionFiles)
|
||||
: await listSessionFilesForAgent(this.agentId);
|
||||
const activePaths = targetSessionFiles
|
||||
? null
|
||||
: new Set(files.map((file) => sessionPathForFile(file)));
|
||||
const indexAll =
|
||||
params.needsFullReindex || targetSessionFiles !== null || this.sessionsDirtyFiles.size === 0;
|
||||
log.debug("memory sync: indexing session files", {
|
||||
files: files.length,
|
||||
indexAll,
|
||||
dirtyFiles: this.sessionsDirtyFiles.size,
|
||||
targetedFiles: targetSessionFiles?.size ?? 0,
|
||||
batch: this.batch.enabled,
|
||||
concurrency: this.getIndexConcurrency(),
|
||||
});
|
||||
@@ -827,6 +859,12 @@ export abstract class MemoryManagerSyncOps {
|
||||
});
|
||||
await runWithConcurrency(tasks, this.getIndexConcurrency());
|
||||
|
||||
if (activePaths === null) {
|
||||
// Targeted syncs only refresh the requested transcripts and should not
|
||||
// prune unrelated session rows without a full directory enumeration.
|
||||
return;
|
||||
}
|
||||
|
||||
const staleRows = this.db
|
||||
.prepare(`SELECT path FROM files WHERE source = ?`)
|
||||
.all("sessions") as Array<{ path: string }>;
|
||||
@@ -899,8 +937,10 @@ export abstract class MemoryManagerSyncOps {
|
||||
const meta = this.readMeta();
|
||||
const configuredSources = this.resolveConfiguredSourcesForMeta();
|
||||
const configuredScopeHash = this.resolveConfiguredScopeHash();
|
||||
const targetSessionFiles = this.normalizeTargetSessionFiles(params?.sessionFiles);
|
||||
const hasTargetSessionFiles = targetSessionFiles !== null;
|
||||
const needsFullReindex =
|
||||
params?.force ||
|
||||
(params?.force && !hasTargetSessionFiles) ||
|
||||
!meta ||
|
||||
(this.provider && meta.model !== this.provider.model) ||
|
||||
(this.provider && meta.provider !== this.provider.id) ||
|
||||
@@ -932,7 +972,8 @@ export abstract class MemoryManagerSyncOps {
|
||||
}
|
||||
|
||||
const shouldSyncMemory =
|
||||
this.sources.has("memory") && (params?.force || needsFullReindex || this.dirty);
|
||||
this.sources.has("memory") &&
|
||||
((!hasTargetSessionFiles && params?.force) || needsFullReindex || this.dirty);
|
||||
const shouldSyncSessions = this.shouldSyncSessions(params, needsFullReindex);
|
||||
|
||||
if (shouldSyncMemory) {
|
||||
@@ -941,7 +982,11 @@ export abstract class MemoryManagerSyncOps {
|
||||
}
|
||||
|
||||
if (shouldSyncSessions) {
|
||||
await this.syncSessionFiles({ needsFullReindex, progress: progress ?? undefined });
|
||||
await this.syncSessionFiles({
|
||||
needsFullReindex,
|
||||
targetSessionFiles: targetSessionFiles ? Array.from(targetSessionFiles) : undefined,
|
||||
progress: progress ?? undefined,
|
||||
});
|
||||
this.sessionsDirty = false;
|
||||
this.sessionsDirtyFiles.clear();
|
||||
} else if (this.sessionsDirtyFiles.size > 0) {
|
||||
|
||||
@@ -452,6 +452,7 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem
|
||||
async sync(params?: {
|
||||
reason?: string;
|
||||
force?: boolean;
|
||||
sessionFiles?: string[];
|
||||
progress?: (update: MemorySyncProgressUpdate) => void;
|
||||
}): Promise<void> {
|
||||
if (this.closed) {
|
||||
@@ -518,6 +519,7 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem
|
||||
private async runSyncWithReadonlyRecovery(params?: {
|
||||
reason?: string;
|
||||
force?: boolean;
|
||||
sessionFiles?: string[];
|
||||
progress?: (update: MemorySyncProgressUpdate) => void;
|
||||
}): Promise<void> {
|
||||
try {
|
||||
|
||||
@@ -867,6 +867,7 @@ export class QmdMemoryManager implements MemorySearchManager {
|
||||
async sync(params?: {
|
||||
reason?: string;
|
||||
force?: boolean;
|
||||
sessionFiles?: string[];
|
||||
progress?: (update: MemorySyncProgressUpdate) => void;
|
||||
}): Promise<void> {
|
||||
if (params?.progress) {
|
||||
|
||||
@@ -181,6 +181,7 @@ class FallbackMemoryManager implements MemorySearchManager {
|
||||
async sync(params?: {
|
||||
reason?: string;
|
||||
force?: boolean;
|
||||
sessionFiles?: string[];
|
||||
progress?: (update: MemorySyncProgressUpdate) => void;
|
||||
}) {
|
||||
if (!this.primaryFailed) {
|
||||
|
||||
@@ -72,6 +72,7 @@ export interface MemorySearchManager {
|
||||
sync?(params?: {
|
||||
reason?: string;
|
||||
force?: boolean;
|
||||
sessionFiles?: string[];
|
||||
progress?: (update: MemorySyncProgressUpdate) => void;
|
||||
}): Promise<void>;
|
||||
probeEmbeddingAvailability(): Promise<MemoryEmbeddingProbeResult>;
|
||||
|
||||
Reference in New Issue
Block a user