mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-08 17:18:25 +00:00
Memory/QMD: harden multi-collection search and embed scheduling
This commit is contained in:
@@ -34,6 +34,24 @@ const SNIPPET_HEADER_RE = /@@\s*-([0-9]+),([0-9]+)/;
|
||||
const SEARCH_PENDING_UPDATE_WAIT_MS = 500;
|
||||
const MAX_QMD_OUTPUT_CHARS = 200_000;
|
||||
const NUL_MARKER_RE = /(?:\^@|\\0|\\x00|\\u0000|null\s*byte|nul\s*byte)/i;
|
||||
const QMD_EMBED_BACKOFF_BASE_MS = 60_000;
|
||||
const QMD_EMBED_BACKOFF_MAX_MS = 60 * 60 * 1000;
|
||||
|
||||
let qmdEmbedQueueTail: Promise<void> = Promise.resolve();
|
||||
|
||||
async function runWithQmdEmbedLock<T>(task: () => Promise<T>): Promise<T> {
|
||||
const previous = qmdEmbedQueueTail;
|
||||
let release: (() => void) | undefined;
|
||||
qmdEmbedQueueTail = new Promise<void>((resolve) => {
|
||||
release = resolve;
|
||||
});
|
||||
await previous.catch(() => undefined);
|
||||
try {
|
||||
return await task();
|
||||
} finally {
|
||||
release?.();
|
||||
}
|
||||
}
|
||||
|
||||
type CollectionRoot = {
|
||||
path: string;
|
||||
@@ -104,6 +122,8 @@ export class QmdMemoryManager implements MemorySearchManager {
|
||||
private db: SqliteDatabase | null = null;
|
||||
private lastUpdateAt: number | null = null;
|
||||
private lastEmbedAt: number | null = null;
|
||||
private embedBackoffUntil: number | null = null;
|
||||
private embedFailureCount = 0;
|
||||
private attemptedNullByteCollectionRepair = false;
|
||||
|
||||
private constructor(params: {
|
||||
@@ -318,8 +338,8 @@ export class QmdMemoryManager implements MemorySearchManager {
|
||||
): boolean {
|
||||
if (!listed.path) {
|
||||
// Older qmd versions may only return names from `collection list --json`.
|
||||
// Force sessions collections to rebind so per-agent session export paths stay isolated.
|
||||
return collection.kind === "sessions";
|
||||
// Rebind managed collections so stale path bindings cannot survive upgrades.
|
||||
return true;
|
||||
}
|
||||
if (!this.pathsMatch(listed.path, collection.path)) {
|
||||
return true;
|
||||
@@ -407,8 +427,13 @@ export class QmdMemoryManager implements MemorySearchManager {
|
||||
const qmdSearchCommand = this.qmd.searchMode;
|
||||
let parsed: QmdQueryResult[];
|
||||
try {
|
||||
if (qmdSearchCommand === "query" && collectionNames.length > 1) {
|
||||
parsed = await this.runQueryAcrossCollections(trimmed, limit, collectionNames);
|
||||
if (collectionNames.length > 1) {
|
||||
parsed = await this.runQueryAcrossCollections(
|
||||
trimmed,
|
||||
limit,
|
||||
collectionNames,
|
||||
qmdSearchCommand,
|
||||
);
|
||||
} else {
|
||||
const args = this.buildSearchArgs(qmdSearchCommand, trimmed, limit);
|
||||
args.push(...this.buildCollectionFilterArgs(collectionNames));
|
||||
@@ -424,7 +449,7 @@ export class QmdMemoryManager implements MemorySearchManager {
|
||||
);
|
||||
try {
|
||||
if (collectionNames.length > 1) {
|
||||
parsed = await this.runQueryAcrossCollections(trimmed, limit, collectionNames);
|
||||
parsed = await this.runQueryAcrossCollections(trimmed, limit, collectionNames, "query");
|
||||
} else {
|
||||
const fallbackArgs = this.buildSearchArgs("query", trimmed, limit);
|
||||
fallbackArgs.push(...this.buildCollectionFilterArgs(collectionNames));
|
||||
@@ -444,7 +469,10 @@ export class QmdMemoryManager implements MemorySearchManager {
|
||||
}
|
||||
const results: MemorySearchResult[] = [];
|
||||
for (const entry of parsed) {
|
||||
const doc = await this.resolveDocLocation(entry.docid);
|
||||
const doc = await this.resolveDocLocation(entry.docid, {
|
||||
preferredCollection: entry.collection,
|
||||
preferredFile: entry.file,
|
||||
});
|
||||
if (!doc) {
|
||||
continue;
|
||||
}
|
||||
@@ -605,25 +633,17 @@ export class QmdMemoryManager implements MemorySearchManager {
|
||||
if (this.sessionExporter) {
|
||||
await this.exportSessions();
|
||||
}
|
||||
try {
|
||||
await this.runQmd(["update"], { timeoutMs: this.qmd.update.updateTimeoutMs });
|
||||
} catch (err) {
|
||||
if (!(await this.tryRepairNullByteCollections(err, reason))) {
|
||||
throw err;
|
||||
}
|
||||
await this.runQmd(["update"], { timeoutMs: this.qmd.update.updateTimeoutMs });
|
||||
}
|
||||
const embedIntervalMs = this.qmd.update.embedIntervalMs;
|
||||
const shouldEmbed =
|
||||
Boolean(force) ||
|
||||
this.lastEmbedAt === null ||
|
||||
(embedIntervalMs > 0 && Date.now() - this.lastEmbedAt > embedIntervalMs);
|
||||
if (shouldEmbed) {
|
||||
await this.runQmdUpdateWithRetry(reason);
|
||||
if (this.shouldRunEmbed(force)) {
|
||||
try {
|
||||
await this.runQmd(["embed"], { timeoutMs: this.qmd.update.embedTimeoutMs });
|
||||
await runWithQmdEmbedLock(async () => {
|
||||
await this.runQmd(["embed"], { timeoutMs: this.qmd.update.embedTimeoutMs });
|
||||
});
|
||||
this.lastEmbedAt = Date.now();
|
||||
this.embedBackoffUntil = null;
|
||||
this.embedFailureCount = 0;
|
||||
} catch (err) {
|
||||
log.warn(`qmd embed failed (${reason}): ${String(err)}`);
|
||||
this.noteEmbedFailure(reason, err);
|
||||
}
|
||||
}
|
||||
this.lastUpdateAt = Date.now();
|
||||
@@ -635,6 +655,74 @@ export class QmdMemoryManager implements MemorySearchManager {
|
||||
await this.pendingUpdate;
|
||||
}
|
||||
|
||||
private async runQmdUpdateWithRetry(reason: string): Promise<void> {
|
||||
const isBootRun = reason === "boot" || reason.startsWith("boot:");
|
||||
const maxAttempts = isBootRun ? 3 : 1;
|
||||
for (let attempt = 1; attempt <= maxAttempts; attempt += 1) {
|
||||
try {
|
||||
await this.runQmdUpdateOnce(reason);
|
||||
return;
|
||||
} catch (err) {
|
||||
if (attempt >= maxAttempts || !this.isRetryableUpdateError(err)) {
|
||||
throw err;
|
||||
}
|
||||
const delayMs = 500 * 2 ** (attempt - 1);
|
||||
log.warn(
|
||||
`qmd update retry ${attempt}/${maxAttempts - 1} after failure (${reason}): ${String(err)}`,
|
||||
);
|
||||
await new Promise<void>((resolve) => setTimeout(resolve, delayMs));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async runQmdUpdateOnce(reason: string): Promise<void> {
|
||||
try {
|
||||
await this.runQmd(["update"], { timeoutMs: this.qmd.update.updateTimeoutMs });
|
||||
} catch (err) {
|
||||
if (!(await this.tryRepairNullByteCollections(err, reason))) {
|
||||
throw err;
|
||||
}
|
||||
await this.runQmd(["update"], { timeoutMs: this.qmd.update.updateTimeoutMs });
|
||||
}
|
||||
}
|
||||
|
||||
private isRetryableUpdateError(err: unknown): boolean {
|
||||
if (this.isSqliteBusyError(err)) {
|
||||
return true;
|
||||
}
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
const normalized = message.toLowerCase();
|
||||
return normalized.includes("timed out");
|
||||
}
|
||||
|
||||
private shouldRunEmbed(force?: boolean): boolean {
|
||||
if (this.qmd.searchMode === "search") {
|
||||
return false;
|
||||
}
|
||||
const now = Date.now();
|
||||
if (this.embedBackoffUntil !== null && now < this.embedBackoffUntil) {
|
||||
return false;
|
||||
}
|
||||
const embedIntervalMs = this.qmd.update.embedIntervalMs;
|
||||
return (
|
||||
Boolean(force) ||
|
||||
this.lastEmbedAt === null ||
|
||||
(embedIntervalMs > 0 && now - this.lastEmbedAt > embedIntervalMs)
|
||||
);
|
||||
}
|
||||
|
||||
private noteEmbedFailure(reason: string, err: unknown): void {
|
||||
this.embedFailureCount += 1;
|
||||
const delayMs = Math.min(
|
||||
QMD_EMBED_BACKOFF_MAX_MS,
|
||||
QMD_EMBED_BACKOFF_BASE_MS * 2 ** Math.max(0, this.embedFailureCount - 1),
|
||||
);
|
||||
this.embedBackoffUntil = Date.now() + delayMs;
|
||||
log.warn(
|
||||
`qmd embed failed (${reason}): ${String(err)}; backing off for ${Math.ceil(delayMs / 1000)}s`,
|
||||
);
|
||||
}
|
||||
|
||||
private enqueueForcedUpdate(reason: string): Promise<void> {
|
||||
this.queuedForcedRuns += 1;
|
||||
if (!this.queuedForcedUpdate) {
|
||||
@@ -916,6 +1004,7 @@ export class QmdMemoryManager implements MemorySearchManager {
|
||||
|
||||
private async resolveDocLocation(
|
||||
docid?: string,
|
||||
hints?: { preferredCollection?: string; preferredFile?: string },
|
||||
): Promise<{ rel: string; abs: string; source: MemorySource } | null> {
|
||||
if (!docid) {
|
||||
return null;
|
||||
@@ -924,23 +1013,21 @@ export class QmdMemoryManager implements MemorySearchManager {
|
||||
if (!normalized) {
|
||||
return null;
|
||||
}
|
||||
const cached = this.docPathCache.get(normalized);
|
||||
const cacheKey = `${hints?.preferredCollection ?? "*"}:${normalized}`;
|
||||
const cached = this.docPathCache.get(cacheKey);
|
||||
if (cached) {
|
||||
return cached;
|
||||
}
|
||||
const db = this.ensureDb();
|
||||
let row: { collection: string; path: string } | undefined;
|
||||
let rows: Array<{ collection: string; path: string }> = [];
|
||||
try {
|
||||
const exact = db
|
||||
.prepare("SELECT collection, path FROM documents WHERE hash = ? AND active = 1 LIMIT 1")
|
||||
.get(normalized) as { collection: string; path: string } | undefined;
|
||||
row = exact;
|
||||
if (!row) {
|
||||
row = db
|
||||
.prepare(
|
||||
"SELECT collection, path FROM documents WHERE hash LIKE ? AND active = 1 LIMIT 1",
|
||||
)
|
||||
.get(`${normalized}%`) as { collection: string; path: string } | undefined;
|
||||
rows = db
|
||||
.prepare("SELECT collection, path FROM documents WHERE hash = ? AND active = 1")
|
||||
.all(normalized) as Array<{ collection: string; path: string }>;
|
||||
if (rows.length === 0) {
|
||||
rows = db
|
||||
.prepare("SELECT collection, path FROM documents WHERE hash LIKE ? AND active = 1")
|
||||
.all(`${normalized}%`) as Array<{ collection: string; path: string }>;
|
||||
}
|
||||
} catch (err) {
|
||||
if (this.isSqliteBusyError(err)) {
|
||||
@@ -949,17 +1036,54 @@ export class QmdMemoryManager implements MemorySearchManager {
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
if (!row) {
|
||||
if (rows.length === 0) {
|
||||
return null;
|
||||
}
|
||||
const location = this.toDocLocation(row.collection, row.path);
|
||||
const location = this.pickDocLocation(rows, hints);
|
||||
if (!location) {
|
||||
return null;
|
||||
}
|
||||
this.docPathCache.set(normalized, location);
|
||||
this.docPathCache.set(cacheKey, location);
|
||||
return location;
|
||||
}
|
||||
|
||||
private pickDocLocation(
|
||||
rows: Array<{ collection: string; path: string }>,
|
||||
hints?: { preferredCollection?: string; preferredFile?: string },
|
||||
): { rel: string; abs: string; source: MemorySource } | null {
|
||||
if (hints?.preferredCollection) {
|
||||
for (const row of rows) {
|
||||
if (row.collection !== hints.preferredCollection) {
|
||||
continue;
|
||||
}
|
||||
const location = this.toDocLocation(row.collection, row.path);
|
||||
if (location) {
|
||||
return location;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (hints?.preferredFile) {
|
||||
const preferred = path.normalize(hints.preferredFile);
|
||||
for (const row of rows) {
|
||||
const rowPath = path.normalize(row.path);
|
||||
if (rowPath !== preferred && !rowPath.endsWith(path.sep + preferred)) {
|
||||
continue;
|
||||
}
|
||||
const location = this.toDocLocation(row.collection, row.path);
|
||||
if (location) {
|
||||
return location;
|
||||
}
|
||||
}
|
||||
}
|
||||
for (const row of rows) {
|
||||
const location = this.toDocLocation(row.collection, row.path);
|
||||
if (location) {
|
||||
return location;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private extractSnippetLines(snippet: string): { startLine: number; endLine: number } {
|
||||
const match = SNIPPET_HEADER_RE.exec(snippet);
|
||||
if (match) {
|
||||
@@ -1199,25 +1323,38 @@ export class QmdMemoryManager implements MemorySearchManager {
|
||||
query: string,
|
||||
limit: number,
|
||||
collectionNames: string[],
|
||||
command: "query" | "search" | "vsearch",
|
||||
): Promise<QmdQueryResult[]> {
|
||||
log.debug(
|
||||
`qmd query multi-collection workaround active (${collectionNames.length} collections)`,
|
||||
`qmd ${command} multi-collection workaround active (${collectionNames.length} collections)`,
|
||||
);
|
||||
const bestByDocId = new Map<string, QmdQueryResult>();
|
||||
for (const collectionName of collectionNames) {
|
||||
const args = this.buildSearchArgs("query", query, limit);
|
||||
const args = this.buildSearchArgs(command, query, limit);
|
||||
args.push("-c", collectionName);
|
||||
const result = await this.runQmd(args, { timeoutMs: this.qmd.limits.timeoutMs });
|
||||
const parsed = parseQmdQueryJson(result.stdout, result.stderr);
|
||||
for (const entry of parsed) {
|
||||
if (typeof entry.docid !== "string" || !entry.docid.trim()) {
|
||||
const normalizedDocId =
|
||||
typeof entry.docid === "string" && entry.docid.trim().length > 0
|
||||
? entry.docid
|
||||
: undefined;
|
||||
if (!normalizedDocId) {
|
||||
continue;
|
||||
}
|
||||
const prev = bestByDocId.get(entry.docid);
|
||||
const withCollection = {
|
||||
...entry,
|
||||
docid: normalizedDocId,
|
||||
collection: entry.collection ?? collectionName,
|
||||
} satisfies QmdQueryResult;
|
||||
const prev = bestByDocId.get(normalizedDocId);
|
||||
const prevScore = typeof prev?.score === "number" ? prev.score : Number.NEGATIVE_INFINITY;
|
||||
const nextScore = typeof entry.score === "number" ? entry.score : Number.NEGATIVE_INFINITY;
|
||||
const nextScore =
|
||||
typeof withCollection.score === "number"
|
||||
? withCollection.score
|
||||
: Number.NEGATIVE_INFINITY;
|
||||
if (!prev || nextScore > prevScore) {
|
||||
bestByDocId.set(entry.docid, entry);
|
||||
bestByDocId.set(normalizedDocId, withCollection);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user