mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-09 09:47:40 +00:00
refactor(memory): dedupe batch embedding glue
This commit is contained in:
@@ -265,18 +265,10 @@ class MemoryManagerEmbeddingOps {
|
|||||||
return this.embedChunksInBatches(chunks);
|
return this.embedChunksInBatches(chunks);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async embedChunksWithVoyageBatch(
|
private collectCachedEmbeddings(chunks: MemoryChunk[]): {
|
||||||
chunks: MemoryChunk[],
|
embeddings: number[][];
|
||||||
entry: MemoryFileEntry | SessionFileEntry,
|
missing: Array<{ index: number; chunk: MemoryChunk }>;
|
||||||
source: MemorySource,
|
} {
|
||||||
): Promise<number[][]> {
|
|
||||||
const voyage = this.voyage;
|
|
||||||
if (!voyage) {
|
|
||||||
return this.embedChunksInBatches(chunks);
|
|
||||||
}
|
|
||||||
if (chunks.length === 0) {
|
|
||||||
return [];
|
|
||||||
}
|
|
||||||
const cached = this.loadEmbeddingCache(chunks.map((chunk) => chunk.hash));
|
const cached = this.loadEmbeddingCache(chunks.map((chunk) => chunk.hash));
|
||||||
const embeddings: number[][] = Array.from({ length: chunks.length }, () => []);
|
const embeddings: number[][] = Array.from({ length: chunks.length }, () => []);
|
||||||
const missing: Array<{ index: number; chunk: MemoryChunk }> = [];
|
const missing: Array<{ index: number; chunk: MemoryChunk }> = [];
|
||||||
@@ -291,25 +283,87 @@ class MemoryManagerEmbeddingOps {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return { embeddings, missing };
|
||||||
|
}
|
||||||
|
|
||||||
|
private buildBatchCustomId(params: {
|
||||||
|
source: MemorySource;
|
||||||
|
entry: MemoryFileEntry | SessionFileEntry;
|
||||||
|
chunk: MemoryChunk;
|
||||||
|
index: number;
|
||||||
|
}): string {
|
||||||
|
return hashText(
|
||||||
|
`${params.source}:${params.entry.path}:${params.chunk.startLine}:${params.chunk.endLine}:${params.chunk.hash}:${params.index}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private buildBatchRequests<T extends { custom_id: string }>(params: {
|
||||||
|
missing: Array<{ index: number; chunk: MemoryChunk }>;
|
||||||
|
entry: MemoryFileEntry | SessionFileEntry;
|
||||||
|
source: MemorySource;
|
||||||
|
build: (chunk: MemoryChunk) => Omit<T, "custom_id">;
|
||||||
|
}): { requests: T[]; mapping: Map<string, { index: number; hash: string }> } {
|
||||||
|
const requests: T[] = [];
|
||||||
|
const mapping = new Map<string, { index: number; hash: string }>();
|
||||||
|
|
||||||
|
for (const item of params.missing) {
|
||||||
|
const chunk = item.chunk;
|
||||||
|
const customId = this.buildBatchCustomId({
|
||||||
|
source: params.source,
|
||||||
|
entry: params.entry,
|
||||||
|
chunk,
|
||||||
|
index: item.index,
|
||||||
|
});
|
||||||
|
mapping.set(customId, { index: item.index, hash: chunk.hash });
|
||||||
|
const built = params.build(chunk);
|
||||||
|
requests.push({ custom_id: customId, ...built } as T);
|
||||||
|
}
|
||||||
|
|
||||||
|
return { requests, mapping };
|
||||||
|
}
|
||||||
|
|
||||||
|
private applyBatchEmbeddings(params: {
|
||||||
|
byCustomId: Map<string, number[]>;
|
||||||
|
mapping: Map<string, { index: number; hash: string }>;
|
||||||
|
embeddings: number[][];
|
||||||
|
}): void {
|
||||||
|
const toCache: Array<{ hash: string; embedding: number[] }> = [];
|
||||||
|
for (const [customId, embedding] of params.byCustomId.entries()) {
|
||||||
|
const mapped = params.mapping.get(customId);
|
||||||
|
if (!mapped) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
params.embeddings[mapped.index] = embedding;
|
||||||
|
toCache.push({ hash: mapped.hash, embedding });
|
||||||
|
}
|
||||||
|
this.upsertEmbeddingCache(toCache);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async embedChunksWithVoyageBatch(
|
||||||
|
chunks: MemoryChunk[],
|
||||||
|
entry: MemoryFileEntry | SessionFileEntry,
|
||||||
|
source: MemorySource,
|
||||||
|
): Promise<number[][]> {
|
||||||
|
const voyage = this.voyage;
|
||||||
|
if (!voyage) {
|
||||||
|
return this.embedChunksInBatches(chunks);
|
||||||
|
}
|
||||||
|
if (chunks.length === 0) {
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
const { embeddings, missing } = this.collectCachedEmbeddings(chunks);
|
||||||
if (missing.length === 0) {
|
if (missing.length === 0) {
|
||||||
return embeddings;
|
return embeddings;
|
||||||
}
|
}
|
||||||
|
|
||||||
const requests: VoyageBatchRequest[] = [];
|
const { requests, mapping } = this.buildBatchRequests<VoyageBatchRequest>({
|
||||||
const mapping = new Map<string, { index: number; hash: string }>();
|
missing,
|
||||||
for (const item of missing) {
|
entry,
|
||||||
const chunk = item.chunk;
|
source,
|
||||||
const customId = hashText(
|
build: (chunk) => ({
|
||||||
`${source}:${entry.path}:${chunk.startLine}:${chunk.endLine}:${chunk.hash}:${item.index}`,
|
body: { input: chunk.text },
|
||||||
);
|
}),
|
||||||
mapping.set(customId, { index: item.index, hash: chunk.hash });
|
});
|
||||||
requests.push({
|
|
||||||
custom_id: customId,
|
|
||||||
body: {
|
|
||||||
input: chunk.text,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
}
|
|
||||||
const batchResult = await this.runBatchWithFallback({
|
const batchResult = await this.runBatchWithFallback({
|
||||||
provider: "voyage",
|
provider: "voyage",
|
||||||
run: async () =>
|
run: async () =>
|
||||||
@@ -328,18 +382,7 @@ class MemoryManagerEmbeddingOps {
|
|||||||
if (Array.isArray(batchResult)) {
|
if (Array.isArray(batchResult)) {
|
||||||
return batchResult;
|
return batchResult;
|
||||||
}
|
}
|
||||||
const byCustomId = batchResult;
|
this.applyBatchEmbeddings({ byCustomId: batchResult, mapping, embeddings });
|
||||||
|
|
||||||
const toCache: Array<{ hash: string; embedding: number[] }> = [];
|
|
||||||
for (const [customId, embedding] of byCustomId.entries()) {
|
|
||||||
const mapped = mapping.get(customId);
|
|
||||||
if (!mapped) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
embeddings[mapped.index] = embedding;
|
|
||||||
toCache.push({ hash: mapped.hash, embedding });
|
|
||||||
}
|
|
||||||
this.upsertEmbeddingCache(toCache);
|
|
||||||
return embeddings;
|
return embeddings;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -355,42 +398,24 @@ class MemoryManagerEmbeddingOps {
|
|||||||
if (chunks.length === 0) {
|
if (chunks.length === 0) {
|
||||||
return [];
|
return [];
|
||||||
}
|
}
|
||||||
const cached = this.loadEmbeddingCache(chunks.map((chunk) => chunk.hash));
|
const { embeddings, missing } = this.collectCachedEmbeddings(chunks);
|
||||||
const embeddings: number[][] = Array.from({ length: chunks.length }, () => []);
|
|
||||||
const missing: Array<{ index: number; chunk: MemoryChunk }> = [];
|
|
||||||
|
|
||||||
for (let i = 0; i < chunks.length; i += 1) {
|
|
||||||
const chunk = chunks[i];
|
|
||||||
const hit = chunk?.hash ? cached.get(chunk.hash) : undefined;
|
|
||||||
if (hit && hit.length > 0) {
|
|
||||||
embeddings[i] = hit;
|
|
||||||
} else if (chunk) {
|
|
||||||
missing.push({ index: i, chunk });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (missing.length === 0) {
|
if (missing.length === 0) {
|
||||||
return embeddings;
|
return embeddings;
|
||||||
}
|
}
|
||||||
|
|
||||||
const requests: OpenAiBatchRequest[] = [];
|
const { requests, mapping } = this.buildBatchRequests<OpenAiBatchRequest>({
|
||||||
const mapping = new Map<string, { index: number; hash: string }>();
|
missing,
|
||||||
for (const item of missing) {
|
entry,
|
||||||
const chunk = item.chunk;
|
source,
|
||||||
const customId = hashText(
|
build: (chunk) => ({
|
||||||
`${source}:${entry.path}:${chunk.startLine}:${chunk.endLine}:${chunk.hash}:${item.index}`,
|
|
||||||
);
|
|
||||||
mapping.set(customId, { index: item.index, hash: chunk.hash });
|
|
||||||
requests.push({
|
|
||||||
custom_id: customId,
|
|
||||||
method: "POST",
|
method: "POST",
|
||||||
url: OPENAI_BATCH_ENDPOINT,
|
url: OPENAI_BATCH_ENDPOINT,
|
||||||
body: {
|
body: {
|
||||||
model: this.openAi?.model ?? this.provider.model,
|
model: this.openAi?.model ?? this.provider.model,
|
||||||
input: chunk.text,
|
input: chunk.text,
|
||||||
},
|
},
|
||||||
});
|
}),
|
||||||
}
|
});
|
||||||
const batchResult = await this.runBatchWithFallback({
|
const batchResult = await this.runBatchWithFallback({
|
||||||
provider: "openai",
|
provider: "openai",
|
||||||
run: async () =>
|
run: async () =>
|
||||||
@@ -409,18 +434,7 @@ class MemoryManagerEmbeddingOps {
|
|||||||
if (Array.isArray(batchResult)) {
|
if (Array.isArray(batchResult)) {
|
||||||
return batchResult;
|
return batchResult;
|
||||||
}
|
}
|
||||||
const byCustomId = batchResult;
|
this.applyBatchEmbeddings({ byCustomId: batchResult, mapping, embeddings });
|
||||||
|
|
||||||
const toCache: Array<{ hash: string; embedding: number[] }> = [];
|
|
||||||
for (const [customId, embedding] of byCustomId.entries()) {
|
|
||||||
const mapped = mapping.get(customId);
|
|
||||||
if (!mapped) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
embeddings[mapped.index] = embedding;
|
|
||||||
toCache.push({ hash: mapped.hash, embedding });
|
|
||||||
}
|
|
||||||
this.upsertEmbeddingCache(toCache);
|
|
||||||
return embeddings;
|
return embeddings;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -436,38 +450,20 @@ class MemoryManagerEmbeddingOps {
|
|||||||
if (chunks.length === 0) {
|
if (chunks.length === 0) {
|
||||||
return [];
|
return [];
|
||||||
}
|
}
|
||||||
const cached = this.loadEmbeddingCache(chunks.map((chunk) => chunk.hash));
|
const { embeddings, missing } = this.collectCachedEmbeddings(chunks);
|
||||||
const embeddings: number[][] = Array.from({ length: chunks.length }, () => []);
|
|
||||||
const missing: Array<{ index: number; chunk: MemoryChunk }> = [];
|
|
||||||
|
|
||||||
for (let i = 0; i < chunks.length; i += 1) {
|
|
||||||
const chunk = chunks[i];
|
|
||||||
const hit = chunk?.hash ? cached.get(chunk.hash) : undefined;
|
|
||||||
if (hit && hit.length > 0) {
|
|
||||||
embeddings[i] = hit;
|
|
||||||
} else if (chunk) {
|
|
||||||
missing.push({ index: i, chunk });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (missing.length === 0) {
|
if (missing.length === 0) {
|
||||||
return embeddings;
|
return embeddings;
|
||||||
}
|
}
|
||||||
|
|
||||||
const requests: GeminiBatchRequest[] = [];
|
const { requests, mapping } = this.buildBatchRequests<GeminiBatchRequest>({
|
||||||
const mapping = new Map<string, { index: number; hash: string }>();
|
missing,
|
||||||
for (const item of missing) {
|
entry,
|
||||||
const chunk = item.chunk;
|
source,
|
||||||
const customId = hashText(
|
build: (chunk) => ({
|
||||||
`${source}:${entry.path}:${chunk.startLine}:${chunk.endLine}:${chunk.hash}:${item.index}`,
|
|
||||||
);
|
|
||||||
mapping.set(customId, { index: item.index, hash: chunk.hash });
|
|
||||||
requests.push({
|
|
||||||
custom_id: customId,
|
|
||||||
content: { parts: [{ text: chunk.text }] },
|
content: { parts: [{ text: chunk.text }] },
|
||||||
taskType: "RETRIEVAL_DOCUMENT",
|
taskType: "RETRIEVAL_DOCUMENT",
|
||||||
});
|
}),
|
||||||
}
|
});
|
||||||
|
|
||||||
const batchResult = await this.runBatchWithFallback({
|
const batchResult = await this.runBatchWithFallback({
|
||||||
provider: "gemini",
|
provider: "gemini",
|
||||||
@@ -487,18 +483,7 @@ class MemoryManagerEmbeddingOps {
|
|||||||
if (Array.isArray(batchResult)) {
|
if (Array.isArray(batchResult)) {
|
||||||
return batchResult;
|
return batchResult;
|
||||||
}
|
}
|
||||||
const byCustomId = batchResult;
|
this.applyBatchEmbeddings({ byCustomId: batchResult, mapping, embeddings });
|
||||||
|
|
||||||
const toCache: Array<{ hash: string; embedding: number[] }> = [];
|
|
||||||
for (const [customId, embedding] of byCustomId.entries()) {
|
|
||||||
const mapped = mapping.get(customId);
|
|
||||||
if (!mapped) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
embeddings[mapped.index] = embedding;
|
|
||||||
toCache.push({ hash: mapped.hash, embedding });
|
|
||||||
}
|
|
||||||
this.upsertEmbeddingCache(toCache);
|
|
||||||
return embeddings;
|
return embeddings;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user