refactor(memory): consolidate embeddings and batch helpers

This commit is contained in:
Peter Steinberger
2026-02-17 00:10:32 +00:00
parent 423b7a0f28
commit 9bfd3ca195
11 changed files with 443 additions and 423 deletions

View File

@@ -1,8 +1,8 @@
import type { GeminiEmbeddingClient } from "./embeddings-gemini.js";
import { isTruthyEnvValue } from "../infra/env.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { buildBatchHeaders, normalizeBatchBaseUrl, splitBatchRequests } from "./batch-utils.js";
import { hashText, runWithConcurrency } from "./internal.js";
import { runEmbeddingBatchGroups } from "./batch-runner.js";
import { buildBatchHeaders, normalizeBatchBaseUrl } from "./batch-utils.js";
import { debugEmbeddingsLog } from "./embeddings-debug.js";
import { hashText } from "./internal.js";
export type GeminiBatchRequest = {
custom_id: string;
@@ -35,17 +35,6 @@ export type GeminiBatchOutputLine = {
};
const GEMINI_BATCH_MAX_REQUESTS = 50000;
const debugEmbeddings = isTruthyEnvValue(process.env.OPENCLAW_DEBUG_MEMORY_EMBEDDINGS);
const log = createSubsystemLogger("memory/embeddings");
const debugLog = (message: string, meta?: Record<string, unknown>) => {
if (!debugEmbeddings) {
return;
}
const suffix = meta ? ` ${JSON.stringify(meta)}` : "";
log.raw(`${message}${suffix}`);
};
function getGeminiUploadUrl(baseUrl: string): string {
if (baseUrl.includes("/v1beta")) {
return baseUrl.replace(/\/v1beta\/?$/, "/upload/v1beta");
@@ -99,7 +88,7 @@ async function submitGeminiBatch(params: {
const uploadPayload = buildGeminiUploadBody({ jsonl, displayName });
const uploadUrl = `${getGeminiUploadUrl(baseUrl)}/files?uploadType=multipart`;
debugLog("memory embeddings: gemini batch upload", {
debugEmbeddingsLog("memory embeddings: gemini batch upload", {
uploadUrl,
baseUrl,
requests: params.requests.length,
@@ -132,7 +121,7 @@ async function submitGeminiBatch(params: {
};
const batchEndpoint = `${baseUrl}/${params.gemini.modelPath}:asyncBatchEmbedContent`;
debugLog("memory embeddings: gemini batch create", {
debugEmbeddingsLog("memory embeddings: gemini batch create", {
batchEndpoint,
fileId,
});
@@ -162,7 +151,7 @@ async function fetchGeminiBatchStatus(params: {
? params.batchName
: `batches/${params.batchName}`;
const statusUrl = `${baseUrl}/${name}`;
debugLog("memory embeddings: gemini batch status", { statusUrl });
debugEmbeddingsLog("memory embeddings: gemini batch status", { statusUrl });
const res = await fetch(statusUrl, {
headers: buildBatchHeaders(params.gemini, { json: true }),
});
@@ -180,7 +169,7 @@ async function fetchGeminiFileContent(params: {
const baseUrl = normalizeBatchBaseUrl(params.gemini);
const file = params.fileId.startsWith("files/") ? params.fileId : `files/${params.fileId}`;
const downloadUrl = `${baseUrl}/${file}:download`;
debugLog("memory embeddings: gemini batch download", { downloadUrl });
debugEmbeddingsLog("memory embeddings: gemini batch download", { downloadUrl });
const res = await fetch(downloadUrl, {
headers: buildBatchHeaders(params.gemini, { json: true }),
});
@@ -257,110 +246,102 @@ export async function runGeminiEmbeddingBatches(params: {
concurrency: number;
debug?: (message: string, data?: Record<string, unknown>) => void;
}): Promise<Map<string, number[]>> {
if (params.requests.length === 0) {
return new Map();
}
const groups = splitBatchRequests(params.requests, GEMINI_BATCH_MAX_REQUESTS);
const byCustomId = new Map<string, number[]>();
const tasks = groups.map((group, groupIndex) => async () => {
const batchInfo = await submitGeminiBatch({
gemini: params.gemini,
requests: group,
agentId: params.agentId,
});
const batchName = batchInfo.name ?? "";
if (!batchName) {
throw new Error("gemini batch create failed: missing batch name");
}
params.debug?.("memory embeddings: gemini batch created", {
batchName,
state: batchInfo.state,
group: groupIndex + 1,
groups: groups.length,
requests: group.length,
});
if (
!params.wait &&
batchInfo.state &&
!["SUCCEEDED", "COMPLETED", "DONE"].includes(batchInfo.state)
) {
throw new Error(
`gemini batch ${batchName} submitted; enable remote.batch.wait to await completion`,
);
}
const completed =
batchInfo.state && ["SUCCEEDED", "COMPLETED", "DONE"].includes(batchInfo.state)
? {
outputFileId:
batchInfo.outputConfig?.file ??
batchInfo.outputConfig?.fileId ??
batchInfo.metadata?.output?.responsesFile ??
"",
}
: await waitForGeminiBatch({
gemini: params.gemini,
batchName,
wait: params.wait,
pollIntervalMs: params.pollIntervalMs,
timeoutMs: params.timeoutMs,
debug: params.debug,
initial: batchInfo,
});
if (!completed.outputFileId) {
throw new Error(`gemini batch ${batchName} completed without output file`);
}
const content = await fetchGeminiFileContent({
gemini: params.gemini,
fileId: completed.outputFileId,
});
const outputLines = parseGeminiBatchOutput(content);
const errors: string[] = [];
const remaining = new Set(group.map((request) => request.custom_id));
for (const line of outputLines) {
const customId = line.key ?? line.custom_id ?? line.request_id;
if (!customId) {
continue;
}
remaining.delete(customId);
if (line.error?.message) {
errors.push(`${customId}: ${line.error.message}`);
continue;
}
if (line.response?.error?.message) {
errors.push(`${customId}: ${line.response.error.message}`);
continue;
}
const embedding = line.embedding?.values ?? line.response?.embedding?.values ?? [];
if (embedding.length === 0) {
errors.push(`${customId}: empty embedding`);
continue;
}
byCustomId.set(customId, embedding);
}
if (errors.length > 0) {
throw new Error(`gemini batch ${batchName} failed: ${errors.join("; ")}`);
}
if (remaining.size > 0) {
throw new Error(`gemini batch ${batchName} missing ${remaining.size} embedding responses`);
}
});
params.debug?.("memory embeddings: gemini batch submit", {
requests: params.requests.length,
groups: groups.length,
return await runEmbeddingBatchGroups({
requests: params.requests,
maxRequests: GEMINI_BATCH_MAX_REQUESTS,
wait: params.wait,
concurrency: params.concurrency,
pollIntervalMs: params.pollIntervalMs,
timeoutMs: params.timeoutMs,
});
concurrency: params.concurrency,
debug: params.debug,
debugLabel: "memory embeddings: gemini batch submit",
runGroup: async ({ group, groupIndex, groups, byCustomId }) => {
const batchInfo = await submitGeminiBatch({
gemini: params.gemini,
requests: group,
agentId: params.agentId,
});
const batchName = batchInfo.name ?? "";
if (!batchName) {
throw new Error("gemini batch create failed: missing batch name");
}
await runWithConcurrency(tasks, params.concurrency);
return byCustomId;
params.debug?.("memory embeddings: gemini batch created", {
batchName,
state: batchInfo.state,
group: groupIndex + 1,
groups,
requests: group.length,
});
if (
!params.wait &&
batchInfo.state &&
!["SUCCEEDED", "COMPLETED", "DONE"].includes(batchInfo.state)
) {
throw new Error(
`gemini batch ${batchName} submitted; enable remote.batch.wait to await completion`,
);
}
const completed =
batchInfo.state && ["SUCCEEDED", "COMPLETED", "DONE"].includes(batchInfo.state)
? {
outputFileId:
batchInfo.outputConfig?.file ??
batchInfo.outputConfig?.fileId ??
batchInfo.metadata?.output?.responsesFile ??
"",
}
: await waitForGeminiBatch({
gemini: params.gemini,
batchName,
wait: params.wait,
pollIntervalMs: params.pollIntervalMs,
timeoutMs: params.timeoutMs,
debug: params.debug,
initial: batchInfo,
});
if (!completed.outputFileId) {
throw new Error(`gemini batch ${batchName} completed without output file`);
}
const content = await fetchGeminiFileContent({
gemini: params.gemini,
fileId: completed.outputFileId,
});
const outputLines = parseGeminiBatchOutput(content);
const errors: string[] = [];
const remaining = new Set(group.map((request) => request.custom_id));
for (const line of outputLines) {
const customId = line.key ?? line.custom_id ?? line.request_id;
if (!customId) {
continue;
}
remaining.delete(customId);
if (line.error?.message) {
errors.push(`${customId}: ${line.error.message}`);
continue;
}
if (line.response?.error?.message) {
errors.push(`${customId}: ${line.response.error.message}`);
continue;
}
const embedding = line.embedding?.values ?? line.response?.embedding?.values ?? [];
if (embedding.length === 0) {
errors.push(`${customId}: empty embedding`);
continue;
}
byCustomId.set(customId, embedding);
}
if (errors.length > 0) {
throw new Error(`gemini batch ${batchName} failed: ${errors.join("; ")}`);
}
if (remaining.size > 0) {
throw new Error(`gemini batch ${batchName} missing ${remaining.size} embedding responses`);
}
},
});
}

View File

@@ -2,8 +2,9 @@ import type { OpenAiEmbeddingClient } from "./embeddings-openai.js";
import { extractBatchErrorMessage, formatUnavailableBatchError } from "./batch-error-utils.js";
import { postJsonWithRetry } from "./batch-http.js";
import { applyEmbeddingBatchOutputLine } from "./batch-output.js";
import { buildBatchHeaders, normalizeBatchBaseUrl, splitBatchRequests } from "./batch-utils.js";
import { hashText, runWithConcurrency } from "./internal.js";
import { runEmbeddingBatchGroups } from "./batch-runner.js";
import { uploadBatchJsonlFile } from "./batch-upload.js";
import { buildBatchHeaders, normalizeBatchBaseUrl } from "./batch-utils.js";
export type OpenAiBatchRequest = {
custom_id: string;
@@ -44,34 +45,17 @@ async function submitOpenAiBatch(params: {
agentId: string;
}): Promise<OpenAiBatchStatus> {
const baseUrl = normalizeBatchBaseUrl(params.openAi);
const jsonl = params.requests.map((request) => JSON.stringify(request)).join("\n");
const form = new FormData();
form.append("purpose", "batch");
form.append(
"file",
new Blob([jsonl], { type: "application/jsonl" }),
`memory-embeddings.${hashText(String(Date.now()))}.jsonl`,
);
const fileRes = await fetch(`${baseUrl}/files`, {
method: "POST",
headers: buildBatchHeaders(params.openAi, { json: false }),
body: form,
const inputFileId = await uploadBatchJsonlFile({
client: params.openAi,
requests: params.requests,
errorPrefix: "openai batch file upload failed",
});
if (!fileRes.ok) {
const text = await fileRes.text();
throw new Error(`openai batch file upload failed: ${fileRes.status} ${text}`);
}
const filePayload = (await fileRes.json()) as { id?: string };
if (!filePayload.id) {
throw new Error("openai batch file upload failed: missing file id");
}
return await postJsonWithRetry<OpenAiBatchStatus>({
url: `${baseUrl}/batches`,
headers: buildBatchHeaders(params.openAi, { json: true }),
body: {
input_file_id: filePayload.id,
input_file_id: inputFileId,
endpoint: OPENAI_BATCH_ENDPOINT,
completion_window: OPENAI_BATCH_COMPLETION_WINDOW,
metadata: {
@@ -197,84 +181,78 @@ export async function runOpenAiEmbeddingBatches(params: {
concurrency: number;
debug?: (message: string, data?: Record<string, unknown>) => void;
}): Promise<Map<string, number[]>> {
if (params.requests.length === 0) {
return new Map();
}
const groups = splitBatchRequests(params.requests, OPENAI_BATCH_MAX_REQUESTS);
const byCustomId = new Map<string, number[]>();
const tasks = groups.map((group, groupIndex) => async () => {
const batchInfo = await submitOpenAiBatch({
openAi: params.openAi,
requests: group,
agentId: params.agentId,
});
if (!batchInfo.id) {
throw new Error("openai batch create failed: missing batch id");
}
params.debug?.("memory embeddings: openai batch created", {
batchId: batchInfo.id,
status: batchInfo.status,
group: groupIndex + 1,
groups: groups.length,
requests: group.length,
});
if (!params.wait && batchInfo.status !== "completed") {
throw new Error(
`openai batch ${batchInfo.id} submitted; enable remote.batch.wait to await completion`,
);
}
const completed =
batchInfo.status === "completed"
? {
outputFileId: batchInfo.output_file_id ?? "",
errorFileId: batchInfo.error_file_id ?? undefined,
}
: await waitForOpenAiBatch({
openAi: params.openAi,
batchId: batchInfo.id,
wait: params.wait,
pollIntervalMs: params.pollIntervalMs,
timeoutMs: params.timeoutMs,
debug: params.debug,
initial: batchInfo,
});
if (!completed.outputFileId) {
throw new Error(`openai batch ${batchInfo.id} completed without output file`);
}
const content = await fetchOpenAiFileContent({
openAi: params.openAi,
fileId: completed.outputFileId,
});
const outputLines = parseOpenAiBatchOutput(content);
const errors: string[] = [];
const remaining = new Set(group.map((request) => request.custom_id));
for (const line of outputLines) {
applyEmbeddingBatchOutputLine({ line, remaining, errors, byCustomId });
}
if (errors.length > 0) {
throw new Error(`openai batch ${batchInfo.id} failed: ${errors.join("; ")}`);
}
if (remaining.size > 0) {
throw new Error(`openai batch ${batchInfo.id} missing ${remaining.size} embedding responses`);
}
});
params.debug?.("memory embeddings: openai batch submit", {
requests: params.requests.length,
groups: groups.length,
return await runEmbeddingBatchGroups({
requests: params.requests,
maxRequests: OPENAI_BATCH_MAX_REQUESTS,
wait: params.wait,
concurrency: params.concurrency,
pollIntervalMs: params.pollIntervalMs,
timeoutMs: params.timeoutMs,
});
concurrency: params.concurrency,
debug: params.debug,
debugLabel: "memory embeddings: openai batch submit",
runGroup: async ({ group, groupIndex, groups, byCustomId }) => {
const batchInfo = await submitOpenAiBatch({
openAi: params.openAi,
requests: group,
agentId: params.agentId,
});
if (!batchInfo.id) {
throw new Error("openai batch create failed: missing batch id");
}
await runWithConcurrency(tasks, params.concurrency);
return byCustomId;
params.debug?.("memory embeddings: openai batch created", {
batchId: batchInfo.id,
status: batchInfo.status,
group: groupIndex + 1,
groups,
requests: group.length,
});
if (!params.wait && batchInfo.status !== "completed") {
throw new Error(
`openai batch ${batchInfo.id} submitted; enable remote.batch.wait to await completion`,
);
}
const completed =
batchInfo.status === "completed"
? {
outputFileId: batchInfo.output_file_id ?? "",
errorFileId: batchInfo.error_file_id ?? undefined,
}
: await waitForOpenAiBatch({
openAi: params.openAi,
batchId: batchInfo.id,
wait: params.wait,
pollIntervalMs: params.pollIntervalMs,
timeoutMs: params.timeoutMs,
debug: params.debug,
initial: batchInfo,
});
if (!completed.outputFileId) {
throw new Error(`openai batch ${batchInfo.id} completed without output file`);
}
const content = await fetchOpenAiFileContent({
openAi: params.openAi,
fileId: completed.outputFileId,
});
const outputLines = parseOpenAiBatchOutput(content);
const errors: string[] = [];
const remaining = new Set(group.map((request) => request.custom_id));
for (const line of outputLines) {
applyEmbeddingBatchOutputLine({ line, remaining, errors, byCustomId });
}
if (errors.length > 0) {
throw new Error(`openai batch ${batchInfo.id} failed: ${errors.join("; ")}`);
}
if (remaining.size > 0) {
throw new Error(
`openai batch ${batchInfo.id} missing ${remaining.size} embedding responses`,
);
}
},
});
}

View File

@@ -0,0 +1,40 @@
import { splitBatchRequests } from "./batch-utils.js";
import { runWithConcurrency } from "./internal.js";
export async function runEmbeddingBatchGroups<TRequest>(params: {
requests: TRequest[];
maxRequests: number;
wait: boolean;
pollIntervalMs: number;
timeoutMs: number;
concurrency: number;
debugLabel: string;
debug?: (message: string, data?: Record<string, unknown>) => void;
runGroup: (args: {
group: TRequest[];
groupIndex: number;
groups: number;
byCustomId: Map<string, number[]>;
}) => Promise<void>;
}): Promise<Map<string, number[]>> {
if (params.requests.length === 0) {
return new Map();
}
const groups = splitBatchRequests(params.requests, params.maxRequests);
const byCustomId = new Map<string, number[]>();
const tasks = groups.map((group, groupIndex) => async () => {
await params.runGroup({ group, groupIndex, groups: groups.length, byCustomId });
});
params.debug?.(params.debugLabel, {
requests: params.requests.length,
groups: groups.length,
wait: params.wait,
concurrency: params.concurrency,
pollIntervalMs: params.pollIntervalMs,
timeoutMs: params.timeoutMs,
});
await runWithConcurrency(tasks, params.concurrency);
return byCustomId;
}

View File

@@ -0,0 +1,37 @@
import {
buildBatchHeaders,
normalizeBatchBaseUrl,
type BatchHttpClientConfig,
} from "./batch-utils.js";
import { hashText } from "./internal.js";
export async function uploadBatchJsonlFile(params: {
client: BatchHttpClientConfig;
requests: unknown[];
errorPrefix: string;
}): Promise<string> {
const baseUrl = normalizeBatchBaseUrl(params.client);
const jsonl = params.requests.map((request) => JSON.stringify(request)).join("\n");
const form = new FormData();
form.append("purpose", "batch");
form.append(
"file",
new Blob([jsonl], { type: "application/jsonl" }),
`memory-embeddings.${hashText(String(Date.now()))}.jsonl`,
);
const fileRes = await fetch(`${baseUrl}/files`, {
method: "POST",
headers: buildBatchHeaders(params.client, { json: false }),
body: form,
});
if (!fileRes.ok) {
const text = await fileRes.text();
throw new Error(`${params.errorPrefix}: ${fileRes.status} ${text}`);
}
const filePayload = (await fileRes.json()) as { id?: string };
if (!filePayload.id) {
throw new Error(`${params.errorPrefix}: missing file id`);
}
return filePayload.id;
}

View File

@@ -4,8 +4,9 @@ import type { VoyageEmbeddingClient } from "./embeddings-voyage.js";
import { extractBatchErrorMessage, formatUnavailableBatchError } from "./batch-error-utils.js";
import { postJsonWithRetry } from "./batch-http.js";
import { applyEmbeddingBatchOutputLine } from "./batch-output.js";
import { buildBatchHeaders, normalizeBatchBaseUrl, splitBatchRequests } from "./batch-utils.js";
import { hashText, runWithConcurrency } from "./internal.js";
import { runEmbeddingBatchGroups } from "./batch-runner.js";
import { uploadBatchJsonlFile } from "./batch-upload.js";
import { buildBatchHeaders, normalizeBatchBaseUrl } from "./batch-utils.js";
/**
* Voyage Batch API Input Line format.
@@ -47,36 +48,18 @@ async function submitVoyageBatch(params: {
agentId: string;
}): Promise<VoyageBatchStatus> {
const baseUrl = normalizeBatchBaseUrl(params.client);
const jsonl = params.requests.map((request) => JSON.stringify(request)).join("\n");
const form = new FormData();
form.append("purpose", "batch");
form.append(
"file",
new Blob([jsonl], { type: "application/jsonl" }),
`memory-embeddings.${hashText(String(Date.now()))}.jsonl`,
);
// 1. Upload file using Voyage Files API
const fileRes = await fetch(`${baseUrl}/files`, {
method: "POST",
headers: buildBatchHeaders(params.client, { json: false }),
body: form,
const inputFileId = await uploadBatchJsonlFile({
client: params.client,
requests: params.requests,
errorPrefix: "voyage batch file upload failed",
});
if (!fileRes.ok) {
const text = await fileRes.text();
throw new Error(`voyage batch file upload failed: ${fileRes.status} ${text}`);
}
const filePayload = (await fileRes.json()) as { id?: string };
if (!filePayload.id) {
throw new Error("voyage batch file upload failed: missing file id");
}
// 2. Create batch job using Voyage Batches API
return await postJsonWithRetry<VoyageBatchStatus>({
url: `${baseUrl}/batches`,
headers: buildBatchHeaders(params.client, { json: true }),
body: {
input_file_id: filePayload.id,
input_file_id: inputFileId,
endpoint: VOYAGE_BATCH_ENDPOINT,
completion_window: VOYAGE_BATCH_COMPLETION_WINDOW,
request_params: {
@@ -192,99 +175,95 @@ export async function runVoyageEmbeddingBatches(params: {
concurrency: number;
debug?: (message: string, data?: Record<string, unknown>) => void;
}): Promise<Map<string, number[]>> {
if (params.requests.length === 0) {
return new Map();
}
const groups = splitBatchRequests(params.requests, VOYAGE_BATCH_MAX_REQUESTS);
const byCustomId = new Map<string, number[]>();
const tasks = groups.map((group, groupIndex) => async () => {
const batchInfo = await submitVoyageBatch({
client: params.client,
requests: group,
agentId: params.agentId,
});
if (!batchInfo.id) {
throw new Error("voyage batch create failed: missing batch id");
}
params.debug?.("memory embeddings: voyage batch created", {
batchId: batchInfo.id,
status: batchInfo.status,
group: groupIndex + 1,
groups: groups.length,
requests: group.length,
});
if (!params.wait && batchInfo.status !== "completed") {
throw new Error(
`voyage batch ${batchInfo.id} submitted; enable remote.batch.wait to await completion`,
);
}
const completed =
batchInfo.status === "completed"
? {
outputFileId: batchInfo.output_file_id ?? "",
errorFileId: batchInfo.error_file_id ?? undefined,
}
: await waitForVoyageBatch({
client: params.client,
batchId: batchInfo.id,
wait: params.wait,
pollIntervalMs: params.pollIntervalMs,
timeoutMs: params.timeoutMs,
debug: params.debug,
initial: batchInfo,
});
if (!completed.outputFileId) {
throw new Error(`voyage batch ${batchInfo.id} completed without output file`);
}
const baseUrl = normalizeBatchBaseUrl(params.client);
const contentRes = await fetch(`${baseUrl}/files/${completed.outputFileId}/content`, {
headers: buildBatchHeaders(params.client, { json: true }),
});
if (!contentRes.ok) {
const text = await contentRes.text();
throw new Error(`voyage batch file content failed: ${contentRes.status} ${text}`);
}
const errors: string[] = [];
const remaining = new Set(group.map((request) => request.custom_id));
if (contentRes.body) {
const reader = createInterface({
input: Readable.fromWeb(contentRes.body as unknown as import("stream/web").ReadableStream),
terminal: false,
});
for await (const rawLine of reader) {
if (!rawLine.trim()) {
continue;
}
const line = JSON.parse(rawLine) as VoyageBatchOutputLine;
applyEmbeddingBatchOutputLine({ line, remaining, errors, byCustomId });
}
}
if (errors.length > 0) {
throw new Error(`voyage batch ${batchInfo.id} failed: ${errors.join("; ")}`);
}
if (remaining.size > 0) {
throw new Error(`voyage batch ${batchInfo.id} missing ${remaining.size} embedding responses`);
}
});
params.debug?.("memory embeddings: voyage batch submit", {
requests: params.requests.length,
groups: groups.length,
return await runEmbeddingBatchGroups({
requests: params.requests,
maxRequests: VOYAGE_BATCH_MAX_REQUESTS,
wait: params.wait,
concurrency: params.concurrency,
pollIntervalMs: params.pollIntervalMs,
timeoutMs: params.timeoutMs,
});
concurrency: params.concurrency,
debug: params.debug,
debugLabel: "memory embeddings: voyage batch submit",
runGroup: async ({ group, groupIndex, groups, byCustomId }) => {
const batchInfo = await submitVoyageBatch({
client: params.client,
requests: group,
agentId: params.agentId,
});
if (!batchInfo.id) {
throw new Error("voyage batch create failed: missing batch id");
}
await runWithConcurrency(tasks, params.concurrency);
return byCustomId;
params.debug?.("memory embeddings: voyage batch created", {
batchId: batchInfo.id,
status: batchInfo.status,
group: groupIndex + 1,
groups,
requests: group.length,
});
if (!params.wait && batchInfo.status !== "completed") {
throw new Error(
`voyage batch ${batchInfo.id} submitted; enable remote.batch.wait to await completion`,
);
}
const completed =
batchInfo.status === "completed"
? {
outputFileId: batchInfo.output_file_id ?? "",
errorFileId: batchInfo.error_file_id ?? undefined,
}
: await waitForVoyageBatch({
client: params.client,
batchId: batchInfo.id,
wait: params.wait,
pollIntervalMs: params.pollIntervalMs,
timeoutMs: params.timeoutMs,
debug: params.debug,
initial: batchInfo,
});
if (!completed.outputFileId) {
throw new Error(`voyage batch ${batchInfo.id} completed without output file`);
}
const baseUrl = normalizeBatchBaseUrl(params.client);
const contentRes = await fetch(`${baseUrl}/files/${completed.outputFileId}/content`, {
headers: buildBatchHeaders(params.client, { json: true }),
});
if (!contentRes.ok) {
const text = await contentRes.text();
throw new Error(`voyage batch file content failed: ${contentRes.status} ${text}`);
}
const errors: string[] = [];
const remaining = new Set(group.map((request) => request.custom_id));
if (contentRes.body) {
const reader = createInterface({
input: Readable.fromWeb(
contentRes.body as unknown as import("stream/web").ReadableStream,
),
terminal: false,
});
for await (const rawLine of reader) {
if (!rawLine.trim()) {
continue;
}
const line = JSON.parse(rawLine) as VoyageBatchOutputLine;
applyEmbeddingBatchOutputLine({ line, remaining, errors, byCustomId });
}
}
if (errors.length > 0) {
throw new Error(`voyage batch ${batchInfo.id} failed: ${errors.join("; ")}`);
}
if (remaining.size > 0) {
throw new Error(
`voyage batch ${batchInfo.id} missing ${remaining.size} embedding responses`,
);
}
},
});
}

View File

@@ -0,0 +1,13 @@
import { isTruthyEnvValue } from "../infra/env.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
const debugEmbeddings = isTruthyEnvValue(process.env.OPENCLAW_DEBUG_MEMORY_EMBEDDINGS);
const log = createSubsystemLogger("memory/embeddings");
export function debugEmbeddingsLog(message: string, meta?: Record<string, unknown>): void {
if (!debugEmbeddings) {
return;
}
const suffix = meta ? ` ${JSON.stringify(meta)}` : "";
log.raw(`${message}${suffix}`);
}

View File

@@ -1,8 +1,7 @@
import type { EmbeddingProvider, EmbeddingProviderOptions } from "./embeddings.js";
import { requireApiKey, resolveApiKeyForProvider } from "../agents/model-auth.js";
import { isTruthyEnvValue } from "../infra/env.js";
import { parseGeminiAuth } from "../infra/gemini-auth.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { debugEmbeddingsLog } from "./embeddings-debug.js";
export type GeminiEmbeddingClient = {
baseUrl: string;
@@ -16,17 +15,6 @@ export const DEFAULT_GEMINI_EMBEDDING_MODEL = "gemini-embedding-001";
const GEMINI_MAX_INPUT_TOKENS: Record<string, number> = {
"text-embedding-004": 2048,
};
const debugEmbeddings = isTruthyEnvValue(process.env.OPENCLAW_DEBUG_MEMORY_EMBEDDINGS);
const log = createSubsystemLogger("memory/embeddings");
const debugLog = (message: string, meta?: Record<string, unknown>) => {
if (!debugEmbeddings) {
return;
}
const suffix = meta ? ` ${JSON.stringify(meta)}` : "";
log.raw(`${message}${suffix}`);
};
function resolveRemoteApiKey(remoteApiKey?: string): string | undefined {
const trimmed = remoteApiKey?.trim();
if (!trimmed) {
@@ -158,7 +146,7 @@ export async function resolveGeminiEmbeddingClient(
};
const model = normalizeGeminiModel(options.model);
const modelPath = buildGeminiModelPath(model);
debugLog("memory embeddings: gemini client", {
debugEmbeddingsLog("memory embeddings: gemini client", {
rawBaseUrl,
baseUrl,
model,

View File

@@ -1,5 +1,6 @@
import type { EmbeddingProvider, EmbeddingProviderOptions } from "./embeddings.js";
import { requireApiKey, resolveApiKeyForProvider } from "../agents/model-auth.js";
import { resolveRemoteEmbeddingBearerClient } from "./embeddings-remote-client.js";
import { fetchRemoteEmbeddingVectors } from "./embeddings-remote-fetch.js";
export type OpenAiEmbeddingClient = {
baseUrl: string;
@@ -36,20 +37,12 @@ export async function createOpenAiEmbeddingProvider(
if (input.length === 0) {
return [];
}
const res = await fetch(url, {
method: "POST",
return await fetchRemoteEmbeddingVectors({
url,
headers: client.headers,
body: JSON.stringify({ model: client.model, input }),
body: { model: client.model, input },
errorPrefix: "openai embeddings failed",
});
if (!res.ok) {
const text = await res.text();
throw new Error(`openai embeddings failed: ${res.status} ${text}`);
}
const payload = (await res.json()) as {
data?: Array<{ embedding?: number[] }>;
};
const data = payload.data ?? [];
return data.map((entry) => entry.embedding ?? []);
};
return {
@@ -70,29 +63,11 @@ export async function createOpenAiEmbeddingProvider(
export async function resolveOpenAiEmbeddingClient(
options: EmbeddingProviderOptions,
): Promise<OpenAiEmbeddingClient> {
const remote = options.remote;
const remoteApiKey = remote?.apiKey?.trim();
const remoteBaseUrl = remote?.baseUrl?.trim();
const apiKey = remoteApiKey
? remoteApiKey
: requireApiKey(
await resolveApiKeyForProvider({
provider: "openai",
cfg: options.config,
agentDir: options.agentDir,
}),
"openai",
);
const providerConfig = options.config.models?.providers?.openai;
const baseUrl = remoteBaseUrl || providerConfig?.baseUrl?.trim() || DEFAULT_OPENAI_BASE_URL;
const headerOverrides = Object.assign({}, providerConfig?.headers, remote?.headers);
const headers: Record<string, string> = {
"Content-Type": "application/json",
Authorization: `Bearer ${apiKey}`,
...headerOverrides,
};
const { baseUrl, headers } = await resolveRemoteEmbeddingBearerClient({
provider: "openai",
options,
defaultBaseUrl: DEFAULT_OPENAI_BASE_URL,
});
const model = normalizeOpenAiModel(options.model);
return { baseUrl, headers, model };
}

View File

@@ -0,0 +1,33 @@
import type { EmbeddingProviderOptions } from "./embeddings.js";
import { requireApiKey, resolveApiKeyForProvider } from "../agents/model-auth.js";
type RemoteEmbeddingProviderId = "openai" | "voyage";
export async function resolveRemoteEmbeddingBearerClient(params: {
provider: RemoteEmbeddingProviderId;
options: EmbeddingProviderOptions;
defaultBaseUrl: string;
}): Promise<{ baseUrl: string; headers: Record<string, string> }> {
const remote = params.options.remote;
const remoteApiKey = remote?.apiKey?.trim();
const remoteBaseUrl = remote?.baseUrl?.trim();
const providerConfig = params.options.config.models?.providers?.[params.provider];
const apiKey = remoteApiKey
? remoteApiKey
: requireApiKey(
await resolveApiKeyForProvider({
provider: params.provider,
cfg: params.options.config,
agentDir: params.options.agentDir,
}),
params.provider,
);
const baseUrl = remoteBaseUrl || providerConfig?.baseUrl?.trim() || params.defaultBaseUrl;
const headerOverrides = Object.assign({}, providerConfig?.headers, remote?.headers);
const headers: Record<string, string> = {
"Content-Type": "application/json",
Authorization: `Bearer ${apiKey}`,
...headerOverrides,
};
return { baseUrl, headers };
}

View File

@@ -0,0 +1,21 @@
export async function fetchRemoteEmbeddingVectors(params: {
url: string;
headers: Record<string, string>;
body: unknown;
errorPrefix: string;
}): Promise<number[][]> {
const res = await fetch(params.url, {
method: "POST",
headers: params.headers,
body: JSON.stringify(params.body),
});
if (!res.ok) {
const text = await res.text();
throw new Error(`${params.errorPrefix}: ${res.status} ${text}`);
}
const payload = (await res.json()) as {
data?: Array<{ embedding?: number[] }>;
};
const data = payload.data ?? [];
return data.map((entry) => entry.embedding ?? []);
}

View File

@@ -1,5 +1,6 @@
import type { EmbeddingProvider, EmbeddingProviderOptions } from "./embeddings.js";
import { requireApiKey, resolveApiKeyForProvider } from "../agents/model-auth.js";
import { resolveRemoteEmbeddingBearerClient } from "./embeddings-remote-client.js";
import { fetchRemoteEmbeddingVectors } from "./embeddings-remote-fetch.js";
export type VoyageEmbeddingClient = {
baseUrl: string;
@@ -44,20 +45,12 @@ export async function createVoyageEmbeddingProvider(
body.input_type = input_type;
}
const res = await fetch(url, {
method: "POST",
return await fetchRemoteEmbeddingVectors({
url,
headers: client.headers,
body: JSON.stringify(body),
body,
errorPrefix: "voyage embeddings failed",
});
if (!res.ok) {
const text = await res.text();
throw new Error(`voyage embeddings failed: ${res.status} ${text}`);
}
const payload = (await res.json()) as {
data?: Array<{ embedding?: number[] }>;
};
const data = payload.data ?? [];
return data.map((entry) => entry.embedding ?? []);
};
return {
@@ -78,29 +71,11 @@ export async function createVoyageEmbeddingProvider(
export async function resolveVoyageEmbeddingClient(
options: EmbeddingProviderOptions,
): Promise<VoyageEmbeddingClient> {
const remote = options.remote;
const remoteApiKey = remote?.apiKey?.trim();
const remoteBaseUrl = remote?.baseUrl?.trim();
const apiKey = remoteApiKey
? remoteApiKey
: requireApiKey(
await resolveApiKeyForProvider({
provider: "voyage",
cfg: options.config,
agentDir: options.agentDir,
}),
"voyage",
);
const providerConfig = options.config.models?.providers?.voyage;
const baseUrl = remoteBaseUrl || providerConfig?.baseUrl?.trim() || DEFAULT_VOYAGE_BASE_URL;
const headerOverrides = Object.assign({}, providerConfig?.headers, remote?.headers);
const headers: Record<string, string> = {
"Content-Type": "application/json",
Authorization: `Bearer ${apiKey}`,
...headerOverrides,
};
const { baseUrl, headers } = await resolveRemoteEmbeddingBearerClient({
provider: "voyage",
options,
defaultBaseUrl: DEFAULT_VOYAGE_BASE_URL,
});
const model = normalizeVoyageModel(options.model);
return { baseUrl, headers, model };
}