mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-08 02:51:24 +00:00
refactor(memory): extract manager sync and embedding ops
This commit is contained in:
803
src/memory/manager-embedding-ops.ts
Normal file
803
src/memory/manager-embedding-ops.ts
Normal file
@@ -0,0 +1,803 @@
|
||||
// @ts-nocheck
|
||||
// oxlint-disable eslint/no-unused-vars, typescript/no-explicit-any
|
||||
import fs from "node:fs/promises";
|
||||
import type { SessionFileEntry } from "./session-files.js";
|
||||
import type { MemorySource } from "./types.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import { runGeminiEmbeddingBatches, type GeminiBatchRequest } from "./batch-gemini.js";
|
||||
import {
|
||||
OPENAI_BATCH_ENDPOINT,
|
||||
type OpenAiBatchRequest,
|
||||
runOpenAiEmbeddingBatches,
|
||||
} from "./batch-openai.js";
|
||||
import { type VoyageBatchRequest, runVoyageEmbeddingBatches } from "./batch-voyage.js";
|
||||
import { enforceEmbeddingMaxInputTokens } from "./embedding-chunk-limits.js";
|
||||
import { estimateUtf8Bytes } from "./embedding-input-limits.js";
|
||||
import {
|
||||
chunkMarkdown,
|
||||
hashText,
|
||||
parseEmbedding,
|
||||
remapChunkLines,
|
||||
type MemoryChunk,
|
||||
type MemoryFileEntry,
|
||||
} from "./internal.js";
|
||||
|
||||
const VECTOR_TABLE = "chunks_vec";
|
||||
const FTS_TABLE = "chunks_fts";
|
||||
const EMBEDDING_CACHE_TABLE = "embedding_cache";
|
||||
const EMBEDDING_BATCH_MAX_TOKENS = 8000;
|
||||
const EMBEDDING_INDEX_CONCURRENCY = 4;
|
||||
const EMBEDDING_RETRY_MAX_ATTEMPTS = 3;
|
||||
const EMBEDDING_RETRY_BASE_DELAY_MS = 500;
|
||||
const EMBEDDING_RETRY_MAX_DELAY_MS = 8000;
|
||||
const BATCH_FAILURE_LIMIT = 2;
|
||||
const EMBEDDING_QUERY_TIMEOUT_REMOTE_MS = 60_000;
|
||||
const EMBEDDING_QUERY_TIMEOUT_LOCAL_MS = 5 * 60_000;
|
||||
const EMBEDDING_BATCH_TIMEOUT_REMOTE_MS = 2 * 60_000;
|
||||
const EMBEDDING_BATCH_TIMEOUT_LOCAL_MS = 10 * 60_000;
|
||||
|
||||
const vectorToBlob = (embedding: number[]): Buffer =>
|
||||
Buffer.from(new Float32Array(embedding).buffer);
|
||||
|
||||
const log = createSubsystemLogger("memory");
|
||||
|
||||
class MemoryManagerEmbeddingOps {
|
||||
[key: string]: any;
|
||||
private buildEmbeddingBatches(chunks: MemoryChunk[]): MemoryChunk[][] {
|
||||
const batches: MemoryChunk[][] = [];
|
||||
let current: MemoryChunk[] = [];
|
||||
let currentTokens = 0;
|
||||
|
||||
for (const chunk of chunks) {
|
||||
const estimate = estimateUtf8Bytes(chunk.text);
|
||||
const wouldExceed =
|
||||
current.length > 0 && currentTokens + estimate > EMBEDDING_BATCH_MAX_TOKENS;
|
||||
if (wouldExceed) {
|
||||
batches.push(current);
|
||||
current = [];
|
||||
currentTokens = 0;
|
||||
}
|
||||
if (current.length === 0 && estimate > EMBEDDING_BATCH_MAX_TOKENS) {
|
||||
batches.push([chunk]);
|
||||
continue;
|
||||
}
|
||||
current.push(chunk);
|
||||
currentTokens += estimate;
|
||||
}
|
||||
|
||||
if (current.length > 0) {
|
||||
batches.push(current);
|
||||
}
|
||||
return batches;
|
||||
}
|
||||
|
||||
private loadEmbeddingCache(hashes: string[]): Map<string, number[]> {
|
||||
if (!this.cache.enabled) {
|
||||
return new Map();
|
||||
}
|
||||
if (hashes.length === 0) {
|
||||
return new Map();
|
||||
}
|
||||
const unique: string[] = [];
|
||||
const seen = new Set<string>();
|
||||
for (const hash of hashes) {
|
||||
if (!hash) {
|
||||
continue;
|
||||
}
|
||||
if (seen.has(hash)) {
|
||||
continue;
|
||||
}
|
||||
seen.add(hash);
|
||||
unique.push(hash);
|
||||
}
|
||||
if (unique.length === 0) {
|
||||
return new Map();
|
||||
}
|
||||
|
||||
const out = new Map<string, number[]>();
|
||||
const baseParams = [this.provider.id, this.provider.model, this.providerKey];
|
||||
const batchSize = 400;
|
||||
for (let start = 0; start < unique.length; start += batchSize) {
|
||||
const batch = unique.slice(start, start + batchSize);
|
||||
const placeholders = batch.map(() => "?").join(", ");
|
||||
const rows = this.db
|
||||
.prepare(
|
||||
`SELECT hash, embedding FROM ${EMBEDDING_CACHE_TABLE}\n` +
|
||||
` WHERE provider = ? AND model = ? AND provider_key = ? AND hash IN (${placeholders})`,
|
||||
)
|
||||
.all(...baseParams, ...batch) as Array<{ hash: string; embedding: string }>;
|
||||
for (const row of rows) {
|
||||
out.set(row.hash, parseEmbedding(row.embedding));
|
||||
}
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
private upsertEmbeddingCache(entries: Array<{ hash: string; embedding: number[] }>): void {
|
||||
if (!this.cache.enabled) {
|
||||
return;
|
||||
}
|
||||
if (entries.length === 0) {
|
||||
return;
|
||||
}
|
||||
const now = Date.now();
|
||||
const stmt = this.db.prepare(
|
||||
`INSERT INTO ${EMBEDDING_CACHE_TABLE} (provider, model, provider_key, hash, embedding, dims, updated_at)\n` +
|
||||
` VALUES (?, ?, ?, ?, ?, ?, ?)\n` +
|
||||
` ON CONFLICT(provider, model, provider_key, hash) DO UPDATE SET\n` +
|
||||
` embedding=excluded.embedding,\n` +
|
||||
` dims=excluded.dims,\n` +
|
||||
` updated_at=excluded.updated_at`,
|
||||
);
|
||||
for (const entry of entries) {
|
||||
const embedding = entry.embedding ?? [];
|
||||
stmt.run(
|
||||
this.provider.id,
|
||||
this.provider.model,
|
||||
this.providerKey,
|
||||
entry.hash,
|
||||
JSON.stringify(embedding),
|
||||
embedding.length,
|
||||
now,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private pruneEmbeddingCacheIfNeeded(): void {
|
||||
if (!this.cache.enabled) {
|
||||
return;
|
||||
}
|
||||
const max = this.cache.maxEntries;
|
||||
if (!max || max <= 0) {
|
||||
return;
|
||||
}
|
||||
const row = this.db.prepare(`SELECT COUNT(*) as c FROM ${EMBEDDING_CACHE_TABLE}`).get() as
|
||||
| { c: number }
|
||||
| undefined;
|
||||
const count = row?.c ?? 0;
|
||||
if (count <= max) {
|
||||
return;
|
||||
}
|
||||
const excess = count - max;
|
||||
this.db
|
||||
.prepare(
|
||||
`DELETE FROM ${EMBEDDING_CACHE_TABLE}\n` +
|
||||
` WHERE rowid IN (\n` +
|
||||
` SELECT rowid FROM ${EMBEDDING_CACHE_TABLE}\n` +
|
||||
` ORDER BY updated_at ASC\n` +
|
||||
` LIMIT ?\n` +
|
||||
` )`,
|
||||
)
|
||||
.run(excess);
|
||||
}
|
||||
|
||||
private async embedChunksInBatches(chunks: MemoryChunk[]): Promise<number[][]> {
|
||||
if (chunks.length === 0) {
|
||||
return [];
|
||||
}
|
||||
const cached = this.loadEmbeddingCache(chunks.map((chunk) => chunk.hash));
|
||||
const embeddings: number[][] = Array.from({ length: chunks.length }, () => []);
|
||||
const missing: Array<{ index: number; chunk: MemoryChunk }> = [];
|
||||
|
||||
for (let i = 0; i < chunks.length; i += 1) {
|
||||
const chunk = chunks[i];
|
||||
const hit = chunk?.hash ? cached.get(chunk.hash) : undefined;
|
||||
if (hit && hit.length > 0) {
|
||||
embeddings[i] = hit;
|
||||
} else if (chunk) {
|
||||
missing.push({ index: i, chunk });
|
||||
}
|
||||
}
|
||||
|
||||
if (missing.length === 0) {
|
||||
return embeddings;
|
||||
}
|
||||
|
||||
const missingChunks = missing.map((m) => m.chunk);
|
||||
const batches = this.buildEmbeddingBatches(missingChunks);
|
||||
const toCache: Array<{ hash: string; embedding: number[] }> = [];
|
||||
let cursor = 0;
|
||||
for (const batch of batches) {
|
||||
const batchEmbeddings = await this.embedBatchWithRetry(batch.map((chunk) => chunk.text));
|
||||
for (let i = 0; i < batch.length; i += 1) {
|
||||
const item = missing[cursor + i];
|
||||
const embedding = batchEmbeddings[i] ?? [];
|
||||
if (item) {
|
||||
embeddings[item.index] = embedding;
|
||||
toCache.push({ hash: item.chunk.hash, embedding });
|
||||
}
|
||||
}
|
||||
cursor += batch.length;
|
||||
}
|
||||
this.upsertEmbeddingCache(toCache);
|
||||
return embeddings;
|
||||
}
|
||||
|
||||
private computeProviderKey(): string {
|
||||
if (this.provider.id === "openai" && this.openAi) {
|
||||
const entries = Object.entries(this.openAi.headers)
|
||||
.filter(([key]) => key.toLowerCase() !== "authorization")
|
||||
.toSorted(([a], [b]) => a.localeCompare(b))
|
||||
.map(([key, value]) => [key, value]);
|
||||
return hashText(
|
||||
JSON.stringify({
|
||||
provider: "openai",
|
||||
baseUrl: this.openAi.baseUrl,
|
||||
model: this.openAi.model,
|
||||
headers: entries,
|
||||
}),
|
||||
);
|
||||
}
|
||||
if (this.provider.id === "gemini" && this.gemini) {
|
||||
const entries = Object.entries(this.gemini.headers)
|
||||
.filter(([key]) => {
|
||||
const lower = key.toLowerCase();
|
||||
return lower !== "authorization" && lower !== "x-goog-api-key";
|
||||
})
|
||||
.toSorted(([a], [b]) => a.localeCompare(b))
|
||||
.map(([key, value]) => [key, value]);
|
||||
return hashText(
|
||||
JSON.stringify({
|
||||
provider: "gemini",
|
||||
baseUrl: this.gemini.baseUrl,
|
||||
model: this.gemini.model,
|
||||
headers: entries,
|
||||
}),
|
||||
);
|
||||
}
|
||||
return hashText(JSON.stringify({ provider: this.provider.id, model: this.provider.model }));
|
||||
}
|
||||
|
||||
private async embedChunksWithBatch(
|
||||
chunks: MemoryChunk[],
|
||||
entry: MemoryFileEntry | SessionFileEntry,
|
||||
source: MemorySource,
|
||||
): Promise<number[][]> {
|
||||
if (this.provider.id === "openai" && this.openAi) {
|
||||
return this.embedChunksWithOpenAiBatch(chunks, entry, source);
|
||||
}
|
||||
if (this.provider.id === "gemini" && this.gemini) {
|
||||
return this.embedChunksWithGeminiBatch(chunks, entry, source);
|
||||
}
|
||||
if (this.provider.id === "voyage" && this.voyage) {
|
||||
return this.embedChunksWithVoyageBatch(chunks, entry, source);
|
||||
}
|
||||
return this.embedChunksInBatches(chunks);
|
||||
}
|
||||
|
||||
private async embedChunksWithVoyageBatch(
|
||||
chunks: MemoryChunk[],
|
||||
entry: MemoryFileEntry | SessionFileEntry,
|
||||
source: MemorySource,
|
||||
): Promise<number[][]> {
|
||||
const voyage = this.voyage;
|
||||
if (!voyage) {
|
||||
return this.embedChunksInBatches(chunks);
|
||||
}
|
||||
if (chunks.length === 0) {
|
||||
return [];
|
||||
}
|
||||
const cached = this.loadEmbeddingCache(chunks.map((chunk) => chunk.hash));
|
||||
const embeddings: number[][] = Array.from({ length: chunks.length }, () => []);
|
||||
const missing: Array<{ index: number; chunk: MemoryChunk }> = [];
|
||||
|
||||
for (let i = 0; i < chunks.length; i += 1) {
|
||||
const chunk = chunks[i];
|
||||
const hit = chunk?.hash ? cached.get(chunk.hash) : undefined;
|
||||
if (hit && hit.length > 0) {
|
||||
embeddings[i] = hit;
|
||||
} else if (chunk) {
|
||||
missing.push({ index: i, chunk });
|
||||
}
|
||||
}
|
||||
|
||||
if (missing.length === 0) {
|
||||
return embeddings;
|
||||
}
|
||||
|
||||
const requests: VoyageBatchRequest[] = [];
|
||||
const mapping = new Map<string, { index: number; hash: string }>();
|
||||
for (const item of missing) {
|
||||
const chunk = item.chunk;
|
||||
const customId = hashText(
|
||||
`${source}:${entry.path}:${chunk.startLine}:${chunk.endLine}:${chunk.hash}:${item.index}`,
|
||||
);
|
||||
mapping.set(customId, { index: item.index, hash: chunk.hash });
|
||||
requests.push({
|
||||
custom_id: customId,
|
||||
body: {
|
||||
input: chunk.text,
|
||||
},
|
||||
});
|
||||
}
|
||||
const batchResult = await this.runBatchWithFallback({
|
||||
provider: "voyage",
|
||||
run: async () =>
|
||||
await runVoyageEmbeddingBatches({
|
||||
client: voyage,
|
||||
agentId: this.agentId,
|
||||
requests,
|
||||
wait: this.batch.wait,
|
||||
concurrency: this.batch.concurrency,
|
||||
pollIntervalMs: this.batch.pollIntervalMs,
|
||||
timeoutMs: this.batch.timeoutMs,
|
||||
debug: (message, data) => log.debug(message, { ...data, source, chunks: chunks.length }),
|
||||
}),
|
||||
fallback: async () => await this.embedChunksInBatches(chunks),
|
||||
});
|
||||
if (Array.isArray(batchResult)) {
|
||||
return batchResult;
|
||||
}
|
||||
const byCustomId = batchResult;
|
||||
|
||||
const toCache: Array<{ hash: string; embedding: number[] }> = [];
|
||||
for (const [customId, embedding] of byCustomId.entries()) {
|
||||
const mapped = mapping.get(customId);
|
||||
if (!mapped) {
|
||||
continue;
|
||||
}
|
||||
embeddings[mapped.index] = embedding;
|
||||
toCache.push({ hash: mapped.hash, embedding });
|
||||
}
|
||||
this.upsertEmbeddingCache(toCache);
|
||||
return embeddings;
|
||||
}
|
||||
|
||||
private async embedChunksWithOpenAiBatch(
|
||||
chunks: MemoryChunk[],
|
||||
entry: MemoryFileEntry | SessionFileEntry,
|
||||
source: MemorySource,
|
||||
): Promise<number[][]> {
|
||||
const openAi = this.openAi;
|
||||
if (!openAi) {
|
||||
return this.embedChunksInBatches(chunks);
|
||||
}
|
||||
if (chunks.length === 0) {
|
||||
return [];
|
||||
}
|
||||
const cached = this.loadEmbeddingCache(chunks.map((chunk) => chunk.hash));
|
||||
const embeddings: number[][] = Array.from({ length: chunks.length }, () => []);
|
||||
const missing: Array<{ index: number; chunk: MemoryChunk }> = [];
|
||||
|
||||
for (let i = 0; i < chunks.length; i += 1) {
|
||||
const chunk = chunks[i];
|
||||
const hit = chunk?.hash ? cached.get(chunk.hash) : undefined;
|
||||
if (hit && hit.length > 0) {
|
||||
embeddings[i] = hit;
|
||||
} else if (chunk) {
|
||||
missing.push({ index: i, chunk });
|
||||
}
|
||||
}
|
||||
|
||||
if (missing.length === 0) {
|
||||
return embeddings;
|
||||
}
|
||||
|
||||
const requests: OpenAiBatchRequest[] = [];
|
||||
const mapping = new Map<string, { index: number; hash: string }>();
|
||||
for (const item of missing) {
|
||||
const chunk = item.chunk;
|
||||
const customId = hashText(
|
||||
`${source}:${entry.path}:${chunk.startLine}:${chunk.endLine}:${chunk.hash}:${item.index}`,
|
||||
);
|
||||
mapping.set(customId, { index: item.index, hash: chunk.hash });
|
||||
requests.push({
|
||||
custom_id: customId,
|
||||
method: "POST",
|
||||
url: OPENAI_BATCH_ENDPOINT,
|
||||
body: {
|
||||
model: this.openAi?.model ?? this.provider.model,
|
||||
input: chunk.text,
|
||||
},
|
||||
});
|
||||
}
|
||||
const batchResult = await this.runBatchWithFallback({
|
||||
provider: "openai",
|
||||
run: async () =>
|
||||
await runOpenAiEmbeddingBatches({
|
||||
openAi,
|
||||
agentId: this.agentId,
|
||||
requests,
|
||||
wait: this.batch.wait,
|
||||
concurrency: this.batch.concurrency,
|
||||
pollIntervalMs: this.batch.pollIntervalMs,
|
||||
timeoutMs: this.batch.timeoutMs,
|
||||
debug: (message, data) => log.debug(message, { ...data, source, chunks: chunks.length }),
|
||||
}),
|
||||
fallback: async () => await this.embedChunksInBatches(chunks),
|
||||
});
|
||||
if (Array.isArray(batchResult)) {
|
||||
return batchResult;
|
||||
}
|
||||
const byCustomId = batchResult;
|
||||
|
||||
const toCache: Array<{ hash: string; embedding: number[] }> = [];
|
||||
for (const [customId, embedding] of byCustomId.entries()) {
|
||||
const mapped = mapping.get(customId);
|
||||
if (!mapped) {
|
||||
continue;
|
||||
}
|
||||
embeddings[mapped.index] = embedding;
|
||||
toCache.push({ hash: mapped.hash, embedding });
|
||||
}
|
||||
this.upsertEmbeddingCache(toCache);
|
||||
return embeddings;
|
||||
}
|
||||
|
||||
private async embedChunksWithGeminiBatch(
|
||||
chunks: MemoryChunk[],
|
||||
entry: MemoryFileEntry | SessionFileEntry,
|
||||
source: MemorySource,
|
||||
): Promise<number[][]> {
|
||||
const gemini = this.gemini;
|
||||
if (!gemini) {
|
||||
return this.embedChunksInBatches(chunks);
|
||||
}
|
||||
if (chunks.length === 0) {
|
||||
return [];
|
||||
}
|
||||
const cached = this.loadEmbeddingCache(chunks.map((chunk) => chunk.hash));
|
||||
const embeddings: number[][] = Array.from({ length: chunks.length }, () => []);
|
||||
const missing: Array<{ index: number; chunk: MemoryChunk }> = [];
|
||||
|
||||
for (let i = 0; i < chunks.length; i += 1) {
|
||||
const chunk = chunks[i];
|
||||
const hit = chunk?.hash ? cached.get(chunk.hash) : undefined;
|
||||
if (hit && hit.length > 0) {
|
||||
embeddings[i] = hit;
|
||||
} else if (chunk) {
|
||||
missing.push({ index: i, chunk });
|
||||
}
|
||||
}
|
||||
|
||||
if (missing.length === 0) {
|
||||
return embeddings;
|
||||
}
|
||||
|
||||
const requests: GeminiBatchRequest[] = [];
|
||||
const mapping = new Map<string, { index: number; hash: string }>();
|
||||
for (const item of missing) {
|
||||
const chunk = item.chunk;
|
||||
const customId = hashText(
|
||||
`${source}:${entry.path}:${chunk.startLine}:${chunk.endLine}:${chunk.hash}:${item.index}`,
|
||||
);
|
||||
mapping.set(customId, { index: item.index, hash: chunk.hash });
|
||||
requests.push({
|
||||
custom_id: customId,
|
||||
content: { parts: [{ text: chunk.text }] },
|
||||
taskType: "RETRIEVAL_DOCUMENT",
|
||||
});
|
||||
}
|
||||
|
||||
const batchResult = await this.runBatchWithFallback({
|
||||
provider: "gemini",
|
||||
run: async () =>
|
||||
await runGeminiEmbeddingBatches({
|
||||
gemini,
|
||||
agentId: this.agentId,
|
||||
requests,
|
||||
wait: this.batch.wait,
|
||||
concurrency: this.batch.concurrency,
|
||||
pollIntervalMs: this.batch.pollIntervalMs,
|
||||
timeoutMs: this.batch.timeoutMs,
|
||||
debug: (message, data) => log.debug(message, { ...data, source, chunks: chunks.length }),
|
||||
}),
|
||||
fallback: async () => await this.embedChunksInBatches(chunks),
|
||||
});
|
||||
if (Array.isArray(batchResult)) {
|
||||
return batchResult;
|
||||
}
|
||||
const byCustomId = batchResult;
|
||||
|
||||
const toCache: Array<{ hash: string; embedding: number[] }> = [];
|
||||
for (const [customId, embedding] of byCustomId.entries()) {
|
||||
const mapped = mapping.get(customId);
|
||||
if (!mapped) {
|
||||
continue;
|
||||
}
|
||||
embeddings[mapped.index] = embedding;
|
||||
toCache.push({ hash: mapped.hash, embedding });
|
||||
}
|
||||
this.upsertEmbeddingCache(toCache);
|
||||
return embeddings;
|
||||
}
|
||||
|
||||
private async embedBatchWithRetry(texts: string[]): Promise<number[][]> {
|
||||
if (texts.length === 0) {
|
||||
return [];
|
||||
}
|
||||
let attempt = 0;
|
||||
let delayMs = EMBEDDING_RETRY_BASE_DELAY_MS;
|
||||
while (true) {
|
||||
try {
|
||||
const timeoutMs = this.resolveEmbeddingTimeout("batch");
|
||||
log.debug("memory embeddings: batch start", {
|
||||
provider: this.provider.id,
|
||||
items: texts.length,
|
||||
timeoutMs,
|
||||
});
|
||||
return await this.withTimeout(
|
||||
this.provider.embedBatch(texts),
|
||||
timeoutMs,
|
||||
`memory embeddings batch timed out after ${Math.round(timeoutMs / 1000)}s`,
|
||||
);
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
if (!this.isRetryableEmbeddingError(message) || attempt >= EMBEDDING_RETRY_MAX_ATTEMPTS) {
|
||||
throw err;
|
||||
}
|
||||
const waitMs = Math.min(
|
||||
EMBEDDING_RETRY_MAX_DELAY_MS,
|
||||
Math.round(delayMs * (1 + Math.random() * 0.2)),
|
||||
);
|
||||
log.warn(`memory embeddings rate limited; retrying in ${waitMs}ms`);
|
||||
await new Promise((resolve) => setTimeout(resolve, waitMs));
|
||||
delayMs *= 2;
|
||||
attempt += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private isRetryableEmbeddingError(message: string): boolean {
|
||||
return /(rate[_ ]limit|too many requests|429|resource has been exhausted|5\d\d|cloudflare)/i.test(
|
||||
message,
|
||||
);
|
||||
}
|
||||
|
||||
private resolveEmbeddingTimeout(kind: "query" | "batch"): number {
|
||||
const isLocal = this.provider.id === "local";
|
||||
if (kind === "query") {
|
||||
return isLocal ? EMBEDDING_QUERY_TIMEOUT_LOCAL_MS : EMBEDDING_QUERY_TIMEOUT_REMOTE_MS;
|
||||
}
|
||||
return isLocal ? EMBEDDING_BATCH_TIMEOUT_LOCAL_MS : EMBEDDING_BATCH_TIMEOUT_REMOTE_MS;
|
||||
}
|
||||
|
||||
private async embedQueryWithTimeout(text: string): Promise<number[]> {
|
||||
const timeoutMs = this.resolveEmbeddingTimeout("query");
|
||||
log.debug("memory embeddings: query start", { provider: this.provider.id, timeoutMs });
|
||||
return await this.withTimeout(
|
||||
this.provider.embedQuery(text),
|
||||
timeoutMs,
|
||||
`memory embeddings query timed out after ${Math.round(timeoutMs / 1000)}s`,
|
||||
);
|
||||
}
|
||||
|
||||
private async withTimeout<T>(
|
||||
promise: Promise<T>,
|
||||
timeoutMs: number,
|
||||
message: string,
|
||||
): Promise<T> {
|
||||
if (!Number.isFinite(timeoutMs) || timeoutMs <= 0) {
|
||||
return await promise;
|
||||
}
|
||||
let timer: NodeJS.Timeout | null = null;
|
||||
const timeoutPromise = new Promise<never>((_, reject) => {
|
||||
timer = setTimeout(() => reject(new Error(message)), timeoutMs);
|
||||
});
|
||||
try {
|
||||
return (await Promise.race([promise, timeoutPromise])) as T;
|
||||
} finally {
|
||||
if (timer) {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async withBatchFailureLock<T>(fn: () => Promise<T>): Promise<T> {
|
||||
let release: () => void;
|
||||
const wait = this.batchFailureLock;
|
||||
this.batchFailureLock = new Promise<void>((resolve) => {
|
||||
release = resolve;
|
||||
});
|
||||
await wait;
|
||||
try {
|
||||
return await fn();
|
||||
} finally {
|
||||
release!();
|
||||
}
|
||||
}
|
||||
|
||||
private async resetBatchFailureCount(): Promise<void> {
|
||||
await this.withBatchFailureLock(async () => {
|
||||
if (this.batchFailureCount > 0) {
|
||||
log.debug("memory embeddings: batch recovered; resetting failure count");
|
||||
}
|
||||
this.batchFailureCount = 0;
|
||||
this.batchFailureLastError = undefined;
|
||||
this.batchFailureLastProvider = undefined;
|
||||
});
|
||||
}
|
||||
|
||||
private async recordBatchFailure(params: {
|
||||
provider: string;
|
||||
message: string;
|
||||
attempts?: number;
|
||||
forceDisable?: boolean;
|
||||
}): Promise<{ disabled: boolean; count: number }> {
|
||||
return await this.withBatchFailureLock(async () => {
|
||||
if (!this.batch.enabled) {
|
||||
return { disabled: true, count: this.batchFailureCount };
|
||||
}
|
||||
const increment = params.forceDisable
|
||||
? BATCH_FAILURE_LIMIT
|
||||
: Math.max(1, params.attempts ?? 1);
|
||||
this.batchFailureCount += increment;
|
||||
this.batchFailureLastError = params.message;
|
||||
this.batchFailureLastProvider = params.provider;
|
||||
const disabled = params.forceDisable || this.batchFailureCount >= BATCH_FAILURE_LIMIT;
|
||||
if (disabled) {
|
||||
this.batch.enabled = false;
|
||||
}
|
||||
return { disabled, count: this.batchFailureCount };
|
||||
});
|
||||
}
|
||||
|
||||
private isBatchTimeoutError(message: string): boolean {
|
||||
return /timed out|timeout/i.test(message);
|
||||
}
|
||||
|
||||
private async runBatchWithTimeoutRetry<T>(params: {
|
||||
provider: string;
|
||||
run: () => Promise<T>;
|
||||
}): Promise<T> {
|
||||
try {
|
||||
return await params.run();
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
if (this.isBatchTimeoutError(message)) {
|
||||
log.warn(`memory embeddings: ${params.provider} batch timed out; retrying once`);
|
||||
try {
|
||||
return await params.run();
|
||||
} catch (retryErr) {
|
||||
(retryErr as { batchAttempts?: number }).batchAttempts = 2;
|
||||
throw retryErr;
|
||||
}
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
private async runBatchWithFallback<T>(params: {
|
||||
provider: string;
|
||||
run: () => Promise<T>;
|
||||
fallback: () => Promise<number[][]>;
|
||||
}): Promise<T | number[][]> {
|
||||
if (!this.batch.enabled) {
|
||||
return await params.fallback();
|
||||
}
|
||||
try {
|
||||
const result = await this.runBatchWithTimeoutRetry({
|
||||
provider: params.provider,
|
||||
run: params.run,
|
||||
});
|
||||
await this.resetBatchFailureCount();
|
||||
return result;
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
const attempts = (err as { batchAttempts?: number }).batchAttempts ?? 1;
|
||||
const forceDisable = /asyncBatchEmbedContent not available/i.test(message);
|
||||
const failure = await this.recordBatchFailure({
|
||||
provider: params.provider,
|
||||
message,
|
||||
attempts,
|
||||
forceDisable,
|
||||
});
|
||||
const suffix = failure.disabled ? "disabling batch" : "keeping batch enabled";
|
||||
log.warn(
|
||||
`memory embeddings: ${params.provider} batch failed (${failure.count}/${BATCH_FAILURE_LIMIT}); ${suffix}; falling back to non-batch embeddings: ${message}`,
|
||||
);
|
||||
return await params.fallback();
|
||||
}
|
||||
}
|
||||
|
||||
private getIndexConcurrency(): number {
|
||||
return this.batch.enabled ? this.batch.concurrency : EMBEDDING_INDEX_CONCURRENCY;
|
||||
}
|
||||
|
||||
private async indexFile(
|
||||
entry: MemoryFileEntry | SessionFileEntry,
|
||||
options: { source: MemorySource; content?: string },
|
||||
) {
|
||||
const content = options.content ?? (await fs.readFile(entry.absPath, "utf-8"));
|
||||
const chunks = enforceEmbeddingMaxInputTokens(
|
||||
this.provider,
|
||||
chunkMarkdown(content, this.settings.chunking).filter(
|
||||
(chunk) => chunk.text.trim().length > 0,
|
||||
),
|
||||
);
|
||||
if (options.source === "sessions" && "lineMap" in entry) {
|
||||
remapChunkLines(chunks, entry.lineMap);
|
||||
}
|
||||
const embeddings = this.batch.enabled
|
||||
? await this.embedChunksWithBatch(chunks, entry, options.source)
|
||||
: await this.embedChunksInBatches(chunks);
|
||||
const sample = embeddings.find((embedding) => embedding.length > 0);
|
||||
const vectorReady = sample ? await this.ensureVectorReady(sample.length) : false;
|
||||
const now = Date.now();
|
||||
if (vectorReady) {
|
||||
try {
|
||||
this.db
|
||||
.prepare(
|
||||
`DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`,
|
||||
)
|
||||
.run(entry.path, options.source);
|
||||
} catch {}
|
||||
}
|
||||
if (this.fts.enabled && this.fts.available) {
|
||||
try {
|
||||
this.db
|
||||
.prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`)
|
||||
.run(entry.path, options.source, this.provider.model);
|
||||
} catch {}
|
||||
}
|
||||
this.db
|
||||
.prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`)
|
||||
.run(entry.path, options.source);
|
||||
for (let i = 0; i < chunks.length; i++) {
|
||||
const chunk = chunks[i];
|
||||
const embedding = embeddings[i] ?? [];
|
||||
const id = hashText(
|
||||
`${options.source}:${entry.path}:${chunk.startLine}:${chunk.endLine}:${chunk.hash}:${this.provider.model}`,
|
||||
);
|
||||
this.db
|
||||
.prepare(
|
||||
`INSERT INTO chunks (id, path, source, start_line, end_line, hash, model, text, embedding, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(id) DO UPDATE SET
|
||||
hash=excluded.hash,
|
||||
model=excluded.model,
|
||||
text=excluded.text,
|
||||
embedding=excluded.embedding,
|
||||
updated_at=excluded.updated_at`,
|
||||
)
|
||||
.run(
|
||||
id,
|
||||
entry.path,
|
||||
options.source,
|
||||
chunk.startLine,
|
||||
chunk.endLine,
|
||||
chunk.hash,
|
||||
this.provider.model,
|
||||
chunk.text,
|
||||
JSON.stringify(embedding),
|
||||
now,
|
||||
);
|
||||
if (vectorReady && embedding.length > 0) {
|
||||
try {
|
||||
this.db.prepare(`DELETE FROM ${VECTOR_TABLE} WHERE id = ?`).run(id);
|
||||
} catch {}
|
||||
this.db
|
||||
.prepare(`INSERT INTO ${VECTOR_TABLE} (id, embedding) VALUES (?, ?)`)
|
||||
.run(id, vectorToBlob(embedding));
|
||||
}
|
||||
if (this.fts.enabled && this.fts.available) {
|
||||
this.db
|
||||
.prepare(
|
||||
`INSERT INTO ${FTS_TABLE} (text, id, path, source, model, start_line, end_line)\n` +
|
||||
` VALUES (?, ?, ?, ?, ?, ?, ?)`,
|
||||
)
|
||||
.run(
|
||||
chunk.text,
|
||||
id,
|
||||
entry.path,
|
||||
options.source,
|
||||
this.provider.model,
|
||||
chunk.startLine,
|
||||
chunk.endLine,
|
||||
);
|
||||
}
|
||||
}
|
||||
this.db
|
||||
.prepare(
|
||||
`INSERT INTO files (path, source, hash, mtime, size) VALUES (?, ?, ?, ?, ?)
|
||||
ON CONFLICT(path) DO UPDATE SET
|
||||
source=excluded.source,
|
||||
hash=excluded.hash,
|
||||
mtime=excluded.mtime,
|
||||
size=excluded.size`,
|
||||
)
|
||||
.run(entry.path, options.source, entry.hash, entry.mtimeMs, entry.size);
|
||||
}
|
||||
}
|
||||
|
||||
export const memoryManagerEmbeddingOps = MemoryManagerEmbeddingOps.prototype;
|
||||
Reference in New Issue
Block a user