refactor(core): unify bounded concurrency runner

This commit is contained in:
Peter Steinberger
2026-02-18 16:47:03 +00:00
parent 2b8f1bade0
commit 82cb185881
4 changed files with 143 additions and 51 deletions

View File

@@ -1,33 +1,18 @@
import { logVerbose, shouldLogVerbose } from "../globals.js";
import { runTasksWithConcurrency } from "../utils/run-with-concurrency.js";
export async function runWithConcurrency<T>(
tasks: Array<() => Promise<T>>,
limit: number,
): Promise<T[]> {
if (tasks.length === 0) {
return [];
}
const resolvedLimit = Math.max(1, Math.min(limit, tasks.length));
const results: T[] = Array.from({ length: tasks.length });
let next = 0;
const workers = Array.from({ length: resolvedLimit }, async () => {
while (true) {
const index = next;
next += 1;
if (index >= tasks.length) {
return;
const { results } = await runTasksWithConcurrency({
tasks,
limit,
onTaskError(err) {
if (shouldLogVerbose()) {
logVerbose(`Media understanding task failed: ${String(err)}`);
}
try {
results[index] = await tasks[index]();
} catch (err) {
if (shouldLogVerbose()) {
logVerbose(`Media understanding task failed: ${String(err)}`);
}
}
}
},
});
await Promise.allSettled(workers);
return results;
}

View File

@@ -2,6 +2,7 @@ import crypto from "node:crypto";
import fsSync from "node:fs";
import fs from "node:fs/promises";
import path from "node:path";
import { runTasksWithConcurrency } from "../utils/run-with-concurrency.js";
export type MemoryFileEntry = {
path: string;
@@ -301,35 +302,12 @@ export async function runWithConcurrency<T>(
tasks: Array<() => Promise<T>>,
limit: number,
): Promise<T[]> {
if (tasks.length === 0) {
return [];
}
const resolvedLimit = Math.max(1, Math.min(limit, tasks.length));
const results: T[] = Array.from({ length: tasks.length });
let next = 0;
let firstError: unknown = null;
const workers = Array.from({ length: resolvedLimit }, async () => {
while (true) {
if (firstError) {
return;
}
const index = next;
next += 1;
if (index >= tasks.length) {
return;
}
try {
results[index] = await tasks[index]();
} catch (err) {
firstError = err;
return;
}
}
const { results, firstError, hasError } = await runTasksWithConcurrency({
tasks,
limit,
errorMode: "stop",
});
await Promise.allSettled(workers);
if (firstError) {
if (hasError) {
throw firstError;
}
return results;

View File

@@ -0,0 +1,81 @@
import { describe, expect, it, vi } from "vitest";
import { runTasksWithConcurrency } from "./run-with-concurrency.js";
describe("runTasksWithConcurrency", () => {
it("preserves task order with bounded worker count", async () => {
let running = 0;
let peak = 0;
const tasks = [25, 10, 5, 15].map((delayMs, index) => async (): Promise<number> => {
running += 1;
peak = Math.max(peak, running);
await new Promise((resolve) => setTimeout(resolve, delayMs));
running -= 1;
return index + 1;
});
const result = await runTasksWithConcurrency({ tasks, limit: 2 });
expect(result.hasError).toBe(false);
expect(result.firstError).toBeUndefined();
expect(result.results).toEqual([1, 2, 3, 4]);
expect(peak).toBeLessThanOrEqual(2);
});
it("stops scheduling after first failure in stop mode", async () => {
const err = new Error("boom");
const seen: number[] = [];
const tasks = [
async () => {
seen.push(0);
return 10;
},
async () => {
seen.push(1);
throw err;
},
async () => {
seen.push(2);
return 30;
},
];
const result = await runTasksWithConcurrency({
tasks,
limit: 1,
errorMode: "stop",
});
expect(result.hasError).toBe(true);
expect(result.firstError).toBe(err);
expect(result.results[0]).toBe(10);
expect(result.results[2]).toBeUndefined();
expect(seen).toEqual([0, 1]);
});
it("continues after failures and reports the first one", async () => {
const firstErr = new Error("first");
const onTaskError = vi.fn();
const tasks = [
async () => {
throw firstErr;
},
async () => 20,
async () => {
throw new Error("second");
},
async () => 40,
];
const result = await runTasksWithConcurrency({
tasks,
limit: 1,
errorMode: "continue",
onTaskError,
});
expect(result.hasError).toBe(true);
expect(result.firstError).toBe(firstErr);
expect(result.results[1]).toBe(20);
expect(result.results[3]).toBe(40);
expect(onTaskError).toHaveBeenCalledTimes(2);
expect(onTaskError).toHaveBeenNthCalledWith(1, firstErr, 0);
expect(onTaskError).toHaveBeenNthCalledWith(2, expect.any(Error), 2);
});
});

View File

@@ -0,0 +1,48 @@
export type ConcurrencyErrorMode = "continue" | "stop";
export async function runTasksWithConcurrency<T>(params: {
tasks: Array<() => Promise<T>>;
limit: number;
errorMode?: ConcurrencyErrorMode;
onTaskError?: (error: unknown, index: number) => void;
}): Promise<{ results: T[]; firstError: unknown; hasError: boolean }> {
const { tasks, limit, onTaskError } = params;
const errorMode = params.errorMode ?? "continue";
if (tasks.length === 0) {
return { results: [], firstError: undefined, hasError: false };
}
const resolvedLimit = Math.max(1, Math.min(limit, tasks.length));
const results: T[] = Array.from({ length: tasks.length });
let next = 0;
let firstError: unknown = undefined;
let hasError = false;
const workers = Array.from({ length: resolvedLimit }, async () => {
while (true) {
if (errorMode === "stop" && hasError) {
return;
}
const index = next;
next += 1;
if (index >= tasks.length) {
return;
}
try {
results[index] = await tasks[index]();
} catch (error) {
if (!hasError) {
firstError = error;
hasError = true;
}
onTaskError?.(error, index);
if (errorMode === "stop") {
return;
}
}
}
});
await Promise.allSettled(workers);
return { results, firstError, hasError };
}