mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-09 19:04:31 +00:00
refactor(memory): dedupe batch helpers
This commit is contained in:
@@ -1,6 +1,7 @@
|
|||||||
import type { OpenAiEmbeddingClient } from "./embeddings-openai.js";
|
import type { OpenAiEmbeddingClient } from "./embeddings-openai.js";
|
||||||
import { postJsonWithRetry } from "./batch-http.js";
|
import { postJsonWithRetry } from "./batch-http.js";
|
||||||
import { applyEmbeddingBatchOutputLine } from "./batch-output.js";
|
import { applyEmbeddingBatchOutputLine } from "./batch-output.js";
|
||||||
|
import { buildBatchHeaders, normalizeBatchBaseUrl, splitBatchRequests } from "./batch-utils.js";
|
||||||
import { hashText, runWithConcurrency } from "./internal.js";
|
import { hashText, runWithConcurrency } from "./internal.js";
|
||||||
|
|
||||||
export type OpenAiBatchRequest = {
|
export type OpenAiBatchRequest = {
|
||||||
@@ -36,43 +37,12 @@ export const OPENAI_BATCH_ENDPOINT = "/v1/embeddings";
|
|||||||
const OPENAI_BATCH_COMPLETION_WINDOW = "24h";
|
const OPENAI_BATCH_COMPLETION_WINDOW = "24h";
|
||||||
const OPENAI_BATCH_MAX_REQUESTS = 50000;
|
const OPENAI_BATCH_MAX_REQUESTS = 50000;
|
||||||
|
|
||||||
function getOpenAiBaseUrl(openAi: OpenAiEmbeddingClient): string {
|
|
||||||
return openAi.baseUrl?.replace(/\/$/, "") ?? "";
|
|
||||||
}
|
|
||||||
|
|
||||||
function getOpenAiHeaders(
|
|
||||||
openAi: OpenAiEmbeddingClient,
|
|
||||||
params: { json: boolean },
|
|
||||||
): Record<string, string> {
|
|
||||||
const headers = openAi.headers ? { ...openAi.headers } : {};
|
|
||||||
if (params.json) {
|
|
||||||
if (!headers["Content-Type"] && !headers["content-type"]) {
|
|
||||||
headers["Content-Type"] = "application/json";
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
delete headers["Content-Type"];
|
|
||||||
delete headers["content-type"];
|
|
||||||
}
|
|
||||||
return headers;
|
|
||||||
}
|
|
||||||
|
|
||||||
function splitOpenAiBatchRequests(requests: OpenAiBatchRequest[]): OpenAiBatchRequest[][] {
|
|
||||||
if (requests.length <= OPENAI_BATCH_MAX_REQUESTS) {
|
|
||||||
return [requests];
|
|
||||||
}
|
|
||||||
const groups: OpenAiBatchRequest[][] = [];
|
|
||||||
for (let i = 0; i < requests.length; i += OPENAI_BATCH_MAX_REQUESTS) {
|
|
||||||
groups.push(requests.slice(i, i + OPENAI_BATCH_MAX_REQUESTS));
|
|
||||||
}
|
|
||||||
return groups;
|
|
||||||
}
|
|
||||||
|
|
||||||
async function submitOpenAiBatch(params: {
|
async function submitOpenAiBatch(params: {
|
||||||
openAi: OpenAiEmbeddingClient;
|
openAi: OpenAiEmbeddingClient;
|
||||||
requests: OpenAiBatchRequest[];
|
requests: OpenAiBatchRequest[];
|
||||||
agentId: string;
|
agentId: string;
|
||||||
}): Promise<OpenAiBatchStatus> {
|
}): Promise<OpenAiBatchStatus> {
|
||||||
const baseUrl = getOpenAiBaseUrl(params.openAi);
|
const baseUrl = normalizeBatchBaseUrl(params.openAi);
|
||||||
const jsonl = params.requests.map((request) => JSON.stringify(request)).join("\n");
|
const jsonl = params.requests.map((request) => JSON.stringify(request)).join("\n");
|
||||||
const form = new FormData();
|
const form = new FormData();
|
||||||
form.append("purpose", "batch");
|
form.append("purpose", "batch");
|
||||||
@@ -84,7 +54,7 @@ async function submitOpenAiBatch(params: {
|
|||||||
|
|
||||||
const fileRes = await fetch(`${baseUrl}/files`, {
|
const fileRes = await fetch(`${baseUrl}/files`, {
|
||||||
method: "POST",
|
method: "POST",
|
||||||
headers: getOpenAiHeaders(params.openAi, { json: false }),
|
headers: buildBatchHeaders(params.openAi, { json: false }),
|
||||||
body: form,
|
body: form,
|
||||||
});
|
});
|
||||||
if (!fileRes.ok) {
|
if (!fileRes.ok) {
|
||||||
@@ -98,7 +68,7 @@ async function submitOpenAiBatch(params: {
|
|||||||
|
|
||||||
return await postJsonWithRetry<OpenAiBatchStatus>({
|
return await postJsonWithRetry<OpenAiBatchStatus>({
|
||||||
url: `${baseUrl}/batches`,
|
url: `${baseUrl}/batches`,
|
||||||
headers: getOpenAiHeaders(params.openAi, { json: true }),
|
headers: buildBatchHeaders(params.openAi, { json: true }),
|
||||||
body: {
|
body: {
|
||||||
input_file_id: filePayload.id,
|
input_file_id: filePayload.id,
|
||||||
endpoint: OPENAI_BATCH_ENDPOINT,
|
endpoint: OPENAI_BATCH_ENDPOINT,
|
||||||
@@ -116,9 +86,9 @@ async function fetchOpenAiBatchStatus(params: {
|
|||||||
openAi: OpenAiEmbeddingClient;
|
openAi: OpenAiEmbeddingClient;
|
||||||
batchId: string;
|
batchId: string;
|
||||||
}): Promise<OpenAiBatchStatus> {
|
}): Promise<OpenAiBatchStatus> {
|
||||||
const baseUrl = getOpenAiBaseUrl(params.openAi);
|
const baseUrl = normalizeBatchBaseUrl(params.openAi);
|
||||||
const res = await fetch(`${baseUrl}/batches/${params.batchId}`, {
|
const res = await fetch(`${baseUrl}/batches/${params.batchId}`, {
|
||||||
headers: getOpenAiHeaders(params.openAi, { json: true }),
|
headers: buildBatchHeaders(params.openAi, { json: true }),
|
||||||
});
|
});
|
||||||
if (!res.ok) {
|
if (!res.ok) {
|
||||||
const text = await res.text();
|
const text = await res.text();
|
||||||
@@ -131,9 +101,9 @@ async function fetchOpenAiFileContent(params: {
|
|||||||
openAi: OpenAiEmbeddingClient;
|
openAi: OpenAiEmbeddingClient;
|
||||||
fileId: string;
|
fileId: string;
|
||||||
}): Promise<string> {
|
}): Promise<string> {
|
||||||
const baseUrl = getOpenAiBaseUrl(params.openAi);
|
const baseUrl = normalizeBatchBaseUrl(params.openAi);
|
||||||
const res = await fetch(`${baseUrl}/files/${params.fileId}/content`, {
|
const res = await fetch(`${baseUrl}/files/${params.fileId}/content`, {
|
||||||
headers: getOpenAiHeaders(params.openAi, { json: true }),
|
headers: buildBatchHeaders(params.openAi, { json: true }),
|
||||||
});
|
});
|
||||||
if (!res.ok) {
|
if (!res.ok) {
|
||||||
const text = await res.text();
|
const text = await res.text();
|
||||||
@@ -236,7 +206,7 @@ export async function runOpenAiEmbeddingBatches(params: {
|
|||||||
if (params.requests.length === 0) {
|
if (params.requests.length === 0) {
|
||||||
return new Map();
|
return new Map();
|
||||||
}
|
}
|
||||||
const groups = splitOpenAiBatchRequests(params.requests);
|
const groups = splitBatchRequests(params.requests, OPENAI_BATCH_MAX_REQUESTS);
|
||||||
const byCustomId = new Map<string, number[]>();
|
const byCustomId = new Map<string, number[]>();
|
||||||
|
|
||||||
const tasks = groups.map((group, groupIndex) => async () => {
|
const tasks = groups.map((group, groupIndex) => async () => {
|
||||||
|
|||||||
35
src/memory/batch-utils.ts
Normal file
35
src/memory/batch-utils.ts
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
export type BatchHttpClientConfig = {
|
||||||
|
baseUrl?: string;
|
||||||
|
headers?: Record<string, string>;
|
||||||
|
};
|
||||||
|
|
||||||
|
export function normalizeBatchBaseUrl(client: BatchHttpClientConfig): string {
|
||||||
|
return client.baseUrl?.replace(/\/$/, "") ?? "";
|
||||||
|
}
|
||||||
|
|
||||||
|
export function buildBatchHeaders(
|
||||||
|
client: Pick<BatchHttpClientConfig, "headers">,
|
||||||
|
params: { json: boolean },
|
||||||
|
): Record<string, string> {
|
||||||
|
const headers = client.headers ? { ...client.headers } : {};
|
||||||
|
if (params.json) {
|
||||||
|
if (!headers["Content-Type"] && !headers["content-type"]) {
|
||||||
|
headers["Content-Type"] = "application/json";
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
delete headers["Content-Type"];
|
||||||
|
delete headers["content-type"];
|
||||||
|
}
|
||||||
|
return headers;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function splitBatchRequests<T>(requests: T[], maxRequests: number): T[][] {
|
||||||
|
if (requests.length <= maxRequests) {
|
||||||
|
return [requests];
|
||||||
|
}
|
||||||
|
const groups: T[][] = [];
|
||||||
|
for (let i = 0; i < requests.length; i += maxRequests) {
|
||||||
|
groups.push(requests.slice(i, i + maxRequests));
|
||||||
|
}
|
||||||
|
return groups;
|
||||||
|
}
|
||||||
@@ -3,6 +3,7 @@ import { Readable } from "node:stream";
|
|||||||
import type { VoyageEmbeddingClient } from "./embeddings-voyage.js";
|
import type { VoyageEmbeddingClient } from "./embeddings-voyage.js";
|
||||||
import { postJsonWithRetry } from "./batch-http.js";
|
import { postJsonWithRetry } from "./batch-http.js";
|
||||||
import { applyEmbeddingBatchOutputLine } from "./batch-output.js";
|
import { applyEmbeddingBatchOutputLine } from "./batch-output.js";
|
||||||
|
import { buildBatchHeaders, normalizeBatchBaseUrl, splitBatchRequests } from "./batch-utils.js";
|
||||||
import { hashText, runWithConcurrency } from "./internal.js";
|
import { hashText, runWithConcurrency } from "./internal.js";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -39,43 +40,12 @@ export const VOYAGE_BATCH_ENDPOINT = "/v1/embeddings";
|
|||||||
const VOYAGE_BATCH_COMPLETION_WINDOW = "12h";
|
const VOYAGE_BATCH_COMPLETION_WINDOW = "12h";
|
||||||
const VOYAGE_BATCH_MAX_REQUESTS = 50000;
|
const VOYAGE_BATCH_MAX_REQUESTS = 50000;
|
||||||
|
|
||||||
function getVoyageBaseUrl(client: VoyageEmbeddingClient): string {
|
|
||||||
return client.baseUrl?.replace(/\/$/, "") ?? "";
|
|
||||||
}
|
|
||||||
|
|
||||||
function getVoyageHeaders(
|
|
||||||
client: VoyageEmbeddingClient,
|
|
||||||
params: { json: boolean },
|
|
||||||
): Record<string, string> {
|
|
||||||
const headers = client.headers ? { ...client.headers } : {};
|
|
||||||
if (params.json) {
|
|
||||||
if (!headers["Content-Type"] && !headers["content-type"]) {
|
|
||||||
headers["Content-Type"] = "application/json";
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
delete headers["Content-Type"];
|
|
||||||
delete headers["content-type"];
|
|
||||||
}
|
|
||||||
return headers;
|
|
||||||
}
|
|
||||||
|
|
||||||
function splitVoyageBatchRequests(requests: VoyageBatchRequest[]): VoyageBatchRequest[][] {
|
|
||||||
if (requests.length <= VOYAGE_BATCH_MAX_REQUESTS) {
|
|
||||||
return [requests];
|
|
||||||
}
|
|
||||||
const groups: VoyageBatchRequest[][] = [];
|
|
||||||
for (let i = 0; i < requests.length; i += VOYAGE_BATCH_MAX_REQUESTS) {
|
|
||||||
groups.push(requests.slice(i, i + VOYAGE_BATCH_MAX_REQUESTS));
|
|
||||||
}
|
|
||||||
return groups;
|
|
||||||
}
|
|
||||||
|
|
||||||
async function submitVoyageBatch(params: {
|
async function submitVoyageBatch(params: {
|
||||||
client: VoyageEmbeddingClient;
|
client: VoyageEmbeddingClient;
|
||||||
requests: VoyageBatchRequest[];
|
requests: VoyageBatchRequest[];
|
||||||
agentId: string;
|
agentId: string;
|
||||||
}): Promise<VoyageBatchStatus> {
|
}): Promise<VoyageBatchStatus> {
|
||||||
const baseUrl = getVoyageBaseUrl(params.client);
|
const baseUrl = normalizeBatchBaseUrl(params.client);
|
||||||
const jsonl = params.requests.map((request) => JSON.stringify(request)).join("\n");
|
const jsonl = params.requests.map((request) => JSON.stringify(request)).join("\n");
|
||||||
const form = new FormData();
|
const form = new FormData();
|
||||||
form.append("purpose", "batch");
|
form.append("purpose", "batch");
|
||||||
@@ -88,7 +58,7 @@ async function submitVoyageBatch(params: {
|
|||||||
// 1. Upload file using Voyage Files API
|
// 1. Upload file using Voyage Files API
|
||||||
const fileRes = await fetch(`${baseUrl}/files`, {
|
const fileRes = await fetch(`${baseUrl}/files`, {
|
||||||
method: "POST",
|
method: "POST",
|
||||||
headers: getVoyageHeaders(params.client, { json: false }),
|
headers: buildBatchHeaders(params.client, { json: false }),
|
||||||
body: form,
|
body: form,
|
||||||
});
|
});
|
||||||
if (!fileRes.ok) {
|
if (!fileRes.ok) {
|
||||||
@@ -103,7 +73,7 @@ async function submitVoyageBatch(params: {
|
|||||||
// 2. Create batch job using Voyage Batches API
|
// 2. Create batch job using Voyage Batches API
|
||||||
return await postJsonWithRetry<VoyageBatchStatus>({
|
return await postJsonWithRetry<VoyageBatchStatus>({
|
||||||
url: `${baseUrl}/batches`,
|
url: `${baseUrl}/batches`,
|
||||||
headers: getVoyageHeaders(params.client, { json: true }),
|
headers: buildBatchHeaders(params.client, { json: true }),
|
||||||
body: {
|
body: {
|
||||||
input_file_id: filePayload.id,
|
input_file_id: filePayload.id,
|
||||||
endpoint: VOYAGE_BATCH_ENDPOINT,
|
endpoint: VOYAGE_BATCH_ENDPOINT,
|
||||||
@@ -125,9 +95,9 @@ async function fetchVoyageBatchStatus(params: {
|
|||||||
client: VoyageEmbeddingClient;
|
client: VoyageEmbeddingClient;
|
||||||
batchId: string;
|
batchId: string;
|
||||||
}): Promise<VoyageBatchStatus> {
|
}): Promise<VoyageBatchStatus> {
|
||||||
const baseUrl = getVoyageBaseUrl(params.client);
|
const baseUrl = normalizeBatchBaseUrl(params.client);
|
||||||
const res = await fetch(`${baseUrl}/batches/${params.batchId}`, {
|
const res = await fetch(`${baseUrl}/batches/${params.batchId}`, {
|
||||||
headers: getVoyageHeaders(params.client, { json: true }),
|
headers: buildBatchHeaders(params.client, { json: true }),
|
||||||
});
|
});
|
||||||
if (!res.ok) {
|
if (!res.ok) {
|
||||||
const text = await res.text();
|
const text = await res.text();
|
||||||
@@ -141,9 +111,9 @@ async function readVoyageBatchError(params: {
|
|||||||
errorFileId: string;
|
errorFileId: string;
|
||||||
}): Promise<string | undefined> {
|
}): Promise<string | undefined> {
|
||||||
try {
|
try {
|
||||||
const baseUrl = getVoyageBaseUrl(params.client);
|
const baseUrl = normalizeBatchBaseUrl(params.client);
|
||||||
const res = await fetch(`${baseUrl}/files/${params.errorFileId}/content`, {
|
const res = await fetch(`${baseUrl}/files/${params.errorFileId}/content`, {
|
||||||
headers: getVoyageHeaders(params.client, { json: true }),
|
headers: buildBatchHeaders(params.client, { json: true }),
|
||||||
});
|
});
|
||||||
if (!res.ok) {
|
if (!res.ok) {
|
||||||
const text = await res.text();
|
const text = await res.text();
|
||||||
@@ -231,7 +201,7 @@ export async function runVoyageEmbeddingBatches(params: {
|
|||||||
if (params.requests.length === 0) {
|
if (params.requests.length === 0) {
|
||||||
return new Map();
|
return new Map();
|
||||||
}
|
}
|
||||||
const groups = splitVoyageBatchRequests(params.requests);
|
const groups = splitBatchRequests(params.requests, VOYAGE_BATCH_MAX_REQUESTS);
|
||||||
const byCustomId = new Map<string, number[]>();
|
const byCustomId = new Map<string, number[]>();
|
||||||
|
|
||||||
const tasks = groups.map((group, groupIndex) => async () => {
|
const tasks = groups.map((group, groupIndex) => async () => {
|
||||||
@@ -277,9 +247,9 @@ export async function runVoyageEmbeddingBatches(params: {
|
|||||||
throw new Error(`voyage batch ${batchInfo.id} completed without output file`);
|
throw new Error(`voyage batch ${batchInfo.id} completed without output file`);
|
||||||
}
|
}
|
||||||
|
|
||||||
const baseUrl = getVoyageBaseUrl(params.client);
|
const baseUrl = normalizeBatchBaseUrl(params.client);
|
||||||
const contentRes = await fetch(`${baseUrl}/files/${completed.outputFileId}/content`, {
|
const contentRes = await fetch(`${baseUrl}/files/${completed.outputFileId}/content`, {
|
||||||
headers: getVoyageHeaders(params.client, { json: true }),
|
headers: buildBatchHeaders(params.client, { json: true }),
|
||||||
});
|
});
|
||||||
if (!contentRes.ok) {
|
if (!contentRes.ok) {
|
||||||
const text = await contentRes.text();
|
const text = await contentRes.text();
|
||||||
|
|||||||
Reference in New Issue
Block a user