From 8251f7c235b99554d4ec6fd950238067ad0a3923 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 16 Feb 2026 00:26:03 +0000 Subject: [PATCH] refactor(memory): dedupe batch helpers --- src/memory/batch-openai.ts | 48 +++++++---------------------------- src/memory/batch-utils.ts | 35 +++++++++++++++++++++++++ src/memory/batch-voyage.ts | 52 ++++++++------------------------------ 3 files changed, 55 insertions(+), 80 deletions(-) create mode 100644 src/memory/batch-utils.ts diff --git a/src/memory/batch-openai.ts b/src/memory/batch-openai.ts index 6c35a831b36..cdce55b0a98 100644 --- a/src/memory/batch-openai.ts +++ b/src/memory/batch-openai.ts @@ -1,6 +1,7 @@ import type { OpenAiEmbeddingClient } from "./embeddings-openai.js"; import { postJsonWithRetry } from "./batch-http.js"; import { applyEmbeddingBatchOutputLine } from "./batch-output.js"; +import { buildBatchHeaders, normalizeBatchBaseUrl, splitBatchRequests } from "./batch-utils.js"; import { hashText, runWithConcurrency } from "./internal.js"; export type OpenAiBatchRequest = { @@ -36,43 +37,12 @@ export const OPENAI_BATCH_ENDPOINT = "/v1/embeddings"; const OPENAI_BATCH_COMPLETION_WINDOW = "24h"; const OPENAI_BATCH_MAX_REQUESTS = 50000; -function getOpenAiBaseUrl(openAi: OpenAiEmbeddingClient): string { - return openAi.baseUrl?.replace(/\/$/, "") ?? ""; -} - -function getOpenAiHeaders( - openAi: OpenAiEmbeddingClient, - params: { json: boolean }, -): Record { - const headers = openAi.headers ? { ...openAi.headers } : {}; - if (params.json) { - if (!headers["Content-Type"] && !headers["content-type"]) { - headers["Content-Type"] = "application/json"; - } - } else { - delete headers["Content-Type"]; - delete headers["content-type"]; - } - return headers; -} - -function splitOpenAiBatchRequests(requests: OpenAiBatchRequest[]): OpenAiBatchRequest[][] { - if (requests.length <= OPENAI_BATCH_MAX_REQUESTS) { - return [requests]; - } - const groups: OpenAiBatchRequest[][] = []; - for (let i = 0; i < requests.length; i += OPENAI_BATCH_MAX_REQUESTS) { - groups.push(requests.slice(i, i + OPENAI_BATCH_MAX_REQUESTS)); - } - return groups; -} - async function submitOpenAiBatch(params: { openAi: OpenAiEmbeddingClient; requests: OpenAiBatchRequest[]; agentId: string; }): Promise { - const baseUrl = getOpenAiBaseUrl(params.openAi); + const baseUrl = normalizeBatchBaseUrl(params.openAi); const jsonl = params.requests.map((request) => JSON.stringify(request)).join("\n"); const form = new FormData(); form.append("purpose", "batch"); @@ -84,7 +54,7 @@ async function submitOpenAiBatch(params: { const fileRes = await fetch(`${baseUrl}/files`, { method: "POST", - headers: getOpenAiHeaders(params.openAi, { json: false }), + headers: buildBatchHeaders(params.openAi, { json: false }), body: form, }); if (!fileRes.ok) { @@ -98,7 +68,7 @@ async function submitOpenAiBatch(params: { return await postJsonWithRetry({ url: `${baseUrl}/batches`, - headers: getOpenAiHeaders(params.openAi, { json: true }), + headers: buildBatchHeaders(params.openAi, { json: true }), body: { input_file_id: filePayload.id, endpoint: OPENAI_BATCH_ENDPOINT, @@ -116,9 +86,9 @@ async function fetchOpenAiBatchStatus(params: { openAi: OpenAiEmbeddingClient; batchId: string; }): Promise { - const baseUrl = getOpenAiBaseUrl(params.openAi); + const baseUrl = normalizeBatchBaseUrl(params.openAi); const res = await fetch(`${baseUrl}/batches/${params.batchId}`, { - headers: getOpenAiHeaders(params.openAi, { json: true }), + headers: buildBatchHeaders(params.openAi, { json: true }), }); if (!res.ok) { const text = await res.text(); @@ -131,9 +101,9 @@ async function fetchOpenAiFileContent(params: { openAi: OpenAiEmbeddingClient; fileId: string; }): Promise { - const baseUrl = getOpenAiBaseUrl(params.openAi); + const baseUrl = normalizeBatchBaseUrl(params.openAi); const res = await fetch(`${baseUrl}/files/${params.fileId}/content`, { - headers: getOpenAiHeaders(params.openAi, { json: true }), + headers: buildBatchHeaders(params.openAi, { json: true }), }); if (!res.ok) { const text = await res.text(); @@ -236,7 +206,7 @@ export async function runOpenAiEmbeddingBatches(params: { if (params.requests.length === 0) { return new Map(); } - const groups = splitOpenAiBatchRequests(params.requests); + const groups = splitBatchRequests(params.requests, OPENAI_BATCH_MAX_REQUESTS); const byCustomId = new Map(); const tasks = groups.map((group, groupIndex) => async () => { diff --git a/src/memory/batch-utils.ts b/src/memory/batch-utils.ts new file mode 100644 index 00000000000..95aa773e81e --- /dev/null +++ b/src/memory/batch-utils.ts @@ -0,0 +1,35 @@ +export type BatchHttpClientConfig = { + baseUrl?: string; + headers?: Record; +}; + +export function normalizeBatchBaseUrl(client: BatchHttpClientConfig): string { + return client.baseUrl?.replace(/\/$/, "") ?? ""; +} + +export function buildBatchHeaders( + client: Pick, + params: { json: boolean }, +): Record { + const headers = client.headers ? { ...client.headers } : {}; + if (params.json) { + if (!headers["Content-Type"] && !headers["content-type"]) { + headers["Content-Type"] = "application/json"; + } + } else { + delete headers["Content-Type"]; + delete headers["content-type"]; + } + return headers; +} + +export function splitBatchRequests(requests: T[], maxRequests: number): T[][] { + if (requests.length <= maxRequests) { + return [requests]; + } + const groups: T[][] = []; + for (let i = 0; i < requests.length; i += maxRequests) { + groups.push(requests.slice(i, i + maxRequests)); + } + return groups; +} diff --git a/src/memory/batch-voyage.ts b/src/memory/batch-voyage.ts index 72fdf09a2ff..b2c7f9f4d52 100644 --- a/src/memory/batch-voyage.ts +++ b/src/memory/batch-voyage.ts @@ -3,6 +3,7 @@ import { Readable } from "node:stream"; import type { VoyageEmbeddingClient } from "./embeddings-voyage.js"; import { postJsonWithRetry } from "./batch-http.js"; import { applyEmbeddingBatchOutputLine } from "./batch-output.js"; +import { buildBatchHeaders, normalizeBatchBaseUrl, splitBatchRequests } from "./batch-utils.js"; import { hashText, runWithConcurrency } from "./internal.js"; /** @@ -39,43 +40,12 @@ export const VOYAGE_BATCH_ENDPOINT = "/v1/embeddings"; const VOYAGE_BATCH_COMPLETION_WINDOW = "12h"; const VOYAGE_BATCH_MAX_REQUESTS = 50000; -function getVoyageBaseUrl(client: VoyageEmbeddingClient): string { - return client.baseUrl?.replace(/\/$/, "") ?? ""; -} - -function getVoyageHeaders( - client: VoyageEmbeddingClient, - params: { json: boolean }, -): Record { - const headers = client.headers ? { ...client.headers } : {}; - if (params.json) { - if (!headers["Content-Type"] && !headers["content-type"]) { - headers["Content-Type"] = "application/json"; - } - } else { - delete headers["Content-Type"]; - delete headers["content-type"]; - } - return headers; -} - -function splitVoyageBatchRequests(requests: VoyageBatchRequest[]): VoyageBatchRequest[][] { - if (requests.length <= VOYAGE_BATCH_MAX_REQUESTS) { - return [requests]; - } - const groups: VoyageBatchRequest[][] = []; - for (let i = 0; i < requests.length; i += VOYAGE_BATCH_MAX_REQUESTS) { - groups.push(requests.slice(i, i + VOYAGE_BATCH_MAX_REQUESTS)); - } - return groups; -} - async function submitVoyageBatch(params: { client: VoyageEmbeddingClient; requests: VoyageBatchRequest[]; agentId: string; }): Promise { - const baseUrl = getVoyageBaseUrl(params.client); + const baseUrl = normalizeBatchBaseUrl(params.client); const jsonl = params.requests.map((request) => JSON.stringify(request)).join("\n"); const form = new FormData(); form.append("purpose", "batch"); @@ -88,7 +58,7 @@ async function submitVoyageBatch(params: { // 1. Upload file using Voyage Files API const fileRes = await fetch(`${baseUrl}/files`, { method: "POST", - headers: getVoyageHeaders(params.client, { json: false }), + headers: buildBatchHeaders(params.client, { json: false }), body: form, }); if (!fileRes.ok) { @@ -103,7 +73,7 @@ async function submitVoyageBatch(params: { // 2. Create batch job using Voyage Batches API return await postJsonWithRetry({ url: `${baseUrl}/batches`, - headers: getVoyageHeaders(params.client, { json: true }), + headers: buildBatchHeaders(params.client, { json: true }), body: { input_file_id: filePayload.id, endpoint: VOYAGE_BATCH_ENDPOINT, @@ -125,9 +95,9 @@ async function fetchVoyageBatchStatus(params: { client: VoyageEmbeddingClient; batchId: string; }): Promise { - const baseUrl = getVoyageBaseUrl(params.client); + const baseUrl = normalizeBatchBaseUrl(params.client); const res = await fetch(`${baseUrl}/batches/${params.batchId}`, { - headers: getVoyageHeaders(params.client, { json: true }), + headers: buildBatchHeaders(params.client, { json: true }), }); if (!res.ok) { const text = await res.text(); @@ -141,9 +111,9 @@ async function readVoyageBatchError(params: { errorFileId: string; }): Promise { try { - const baseUrl = getVoyageBaseUrl(params.client); + const baseUrl = normalizeBatchBaseUrl(params.client); const res = await fetch(`${baseUrl}/files/${params.errorFileId}/content`, { - headers: getVoyageHeaders(params.client, { json: true }), + headers: buildBatchHeaders(params.client, { json: true }), }); if (!res.ok) { const text = await res.text(); @@ -231,7 +201,7 @@ export async function runVoyageEmbeddingBatches(params: { if (params.requests.length === 0) { return new Map(); } - const groups = splitVoyageBatchRequests(params.requests); + const groups = splitBatchRequests(params.requests, VOYAGE_BATCH_MAX_REQUESTS); const byCustomId = new Map(); const tasks = groups.map((group, groupIndex) => async () => { @@ -277,9 +247,9 @@ export async function runVoyageEmbeddingBatches(params: { throw new Error(`voyage batch ${batchInfo.id} completed without output file`); } - const baseUrl = getVoyageBaseUrl(params.client); + const baseUrl = normalizeBatchBaseUrl(params.client); const contentRes = await fetch(`${baseUrl}/files/${completed.outputFileId}/content`, { - headers: getVoyageHeaders(params.client, { json: true }), + headers: buildBatchHeaders(params.client, { json: true }), }); if (!contentRes.ok) { const text = await contentRes.text();