refactor(cron): share timed job-execution helper

This commit is contained in:
Peter Steinberger
2026-02-22 20:11:15 +00:00
parent dff9ead59a
commit 5d90e31807
3 changed files with 77 additions and 103 deletions

View File

@@ -67,6 +67,29 @@ function createDefaultIsolatedRunner(): CronServiceOptions["runIsolatedAgentJob"
}) as CronServiceOptions["runIsolatedAgentJob"];
}
function createAbortAwareIsolatedRunner(summary = "late") {
let observedAbortSignal: AbortSignal | undefined;
const runIsolatedAgentJob = vi.fn(async ({ abortSignal }) => {
observedAbortSignal = abortSignal;
await new Promise<void>((resolve) => {
if (!abortSignal) {
return;
}
if (abortSignal.aborted) {
resolve();
return;
}
abortSignal.addEventListener("abort", () => resolve(), { once: true });
});
return { status: "ok" as const, summary };
}) as CronServiceOptions["runIsolatedAgentJob"];
return {
runIsolatedAgentJob,
getObservedAbortSignal: () => observedAbortSignal,
};
}
function createIsolatedRegressionJob(params: {
id: string;
name: string;
@@ -684,7 +707,7 @@ describe("Cron issue regressions", () => {
await writeCronJobs(store.storePath, [cronJob]);
let now = scheduledAt;
let observedAbortSignal: AbortSignal | undefined;
const abortAwareRunner = createAbortAwareIsolatedRunner();
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
@@ -692,27 +715,17 @@ describe("Cron issue regressions", () => {
nowMs: () => now,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async ({ abortSignal }) => {
observedAbortSignal = abortSignal;
await new Promise<void>((resolve) => {
if (!abortSignal) {
return;
}
if (abortSignal.aborted) {
resolve();
return;
}
abortSignal.addEventListener("abort", () => resolve(), { once: true });
});
runIsolatedAgentJob: vi.fn(async (params) => {
const result = await abortAwareRunner.runIsolatedAgentJob(params);
now += 5;
return { status: "ok" as const, summary: "late" };
return result;
}),
});
await onTimer(state);
expect(observedAbortSignal).toBeDefined();
expect(observedAbortSignal?.aborted).toBe(true);
expect(abortAwareRunner.getObservedAbortSignal()).toBeDefined();
expect(abortAwareRunner.getObservedAbortSignal()?.aborted).toBe(true);
const job = state.store?.jobs.find((entry) => entry.id === "abort-on-timeout");
expect(job?.state.lastStatus).toBe("error");
expect(job?.state.lastError).toContain("timed out");
@@ -721,24 +734,11 @@ describe("Cron issue regressions", () => {
it("applies timeoutSeconds to manual cron.run isolated executions", async () => {
vi.useRealTimers();
const store = await makeStorePath();
let observedAbortSignal: AbortSignal | undefined;
const abortAwareRunner = createAbortAwareIsolatedRunner();
const cron = await startCronForStore({
storePath: store.storePath,
runIsolatedAgentJob: vi.fn(async ({ abortSignal }) => {
observedAbortSignal = abortSignal;
await new Promise<void>((resolve) => {
if (!abortSignal) {
return;
}
if (abortSignal.aborted) {
resolve();
return;
}
abortSignal.addEventListener("abort", () => resolve(), { once: true });
});
return { status: "ok" as const, summary: "late" };
}),
runIsolatedAgentJob: abortAwareRunner.runIsolatedAgentJob,
});
const job = await cron.add({
@@ -753,8 +753,8 @@ describe("Cron issue regressions", () => {
const result = await cron.run(job.id, "force");
expect(result).toEqual({ ok: true, ran: true });
expect(observedAbortSignal).toBeDefined();
expect(observedAbortSignal?.aborted).toBe(true);
expect(abortAwareRunner.getObservedAbortSignal()).toBeDefined();
expect(abortAwareRunner.getObservedAbortSignal()?.aborted).toBe(true);
const updated = (await cron.list({ includeDisabled: true })).find(
(entry) => entry.id === job.id,

View File

@@ -13,11 +13,10 @@ import { locked } from "./locked.js";
import type { CronServiceState } from "./state.js";
import { ensureLoaded, persist, warnIfDisabled } from "./store.js";
import {
DEFAULT_JOB_TIMEOUT_MS,
applyJobResult,
armTimer,
emit,
executeJobCore,
executeJobCoreWithTimeout,
runMissedJobs,
stopTimer,
wake,
@@ -248,41 +247,9 @@ export async function run(state: CronServiceState, id: string, mode?: "due" | "f
const startedAt = prepared.startedAt;
const jobId = prepared.jobId;
let coreResult: Awaited<ReturnType<typeof executeJobCore>>;
const configuredTimeoutMs =
executionJob.payload.kind === "agentTurn" &&
typeof executionJob.payload.timeoutSeconds === "number"
? Math.floor(executionJob.payload.timeoutSeconds * 1_000)
: undefined;
const jobTimeoutMs =
configuredTimeoutMs !== undefined
? configuredTimeoutMs <= 0
? undefined
: configuredTimeoutMs
: DEFAULT_JOB_TIMEOUT_MS;
let coreResult: Awaited<ReturnType<typeof executeJobCoreWithTimeout>>;
try {
const runAbortController = typeof jobTimeoutMs === "number" ? new AbortController() : undefined;
coreResult =
typeof jobTimeoutMs === "number"
? await (async () => {
let timeoutId: NodeJS.Timeout | undefined;
try {
return await Promise.race([
executeJobCore(state, executionJob, runAbortController?.signal),
new Promise<never>((_, reject) => {
timeoutId = setTimeout(() => {
runAbortController?.abort(new Error("cron: job execution timed out"));
reject(new Error("cron: job execution timed out"));
}, jobTimeoutMs);
}),
]);
} finally {
if (timeoutId) {
clearTimeout(timeoutId);
}
}
})()
: await executeJobCore(state, executionJob);
coreResult = await executeJobCoreWithTimeout(state, executionJob);
} catch (err) {
coreResult = { status: "error", error: String(err) };
}

View File

@@ -45,6 +45,45 @@ type TimedCronRunOutcome = CronRunOutcome &
endedAt: number;
};
function resolveCronJobTimeoutMs(job: CronJob): number | undefined {
const configuredTimeoutMs =
job.payload.kind === "agentTurn" && typeof job.payload.timeoutSeconds === "number"
? Math.floor(job.payload.timeoutSeconds * 1_000)
: undefined;
if (configuredTimeoutMs === undefined) {
return DEFAULT_JOB_TIMEOUT_MS;
}
return configuredTimeoutMs <= 0 ? undefined : configuredTimeoutMs;
}
export async function executeJobCoreWithTimeout(
state: CronServiceState,
job: CronJob,
): Promise<Awaited<ReturnType<typeof executeJobCore>>> {
const jobTimeoutMs = resolveCronJobTimeoutMs(job);
if (typeof jobTimeoutMs !== "number") {
return await executeJobCore(state, job);
}
const runAbortController = new AbortController();
let timeoutId: NodeJS.Timeout | undefined;
try {
return await Promise.race([
executeJobCore(state, job, runAbortController.signal),
new Promise<never>((_, reject) => {
timeoutId = setTimeout(() => {
runAbortController.abort(new Error("cron: job execution timed out"));
reject(new Error("cron: job execution timed out"));
}, jobTimeoutMs);
}),
]);
} finally {
if (timeoutId) {
clearTimeout(timeoutId);
}
}
}
function resolveRunConcurrency(state: CronServiceState): number {
const raw = state.deps.cronConfig?.maxConcurrentRuns;
if (typeof raw !== "number" || !Number.isFinite(raw)) {
@@ -309,42 +348,10 @@ export async function onTimer(state: CronServiceState) {
const startedAt = state.deps.nowMs();
job.state.runningAtMs = startedAt;
emit(state, { jobId: job.id, action: "started", runAtMs: startedAt });
const configuredTimeoutMs =
job.payload.kind === "agentTurn" && typeof job.payload.timeoutSeconds === "number"
? Math.floor(job.payload.timeoutSeconds * 1_000)
: undefined;
const jobTimeoutMs =
configuredTimeoutMs !== undefined
? configuredTimeoutMs <= 0
? undefined
: configuredTimeoutMs
: DEFAULT_JOB_TIMEOUT_MS;
const jobTimeoutMs = resolveCronJobTimeoutMs(job);
try {
const runAbortController =
typeof jobTimeoutMs === "number" ? new AbortController() : undefined;
const result =
typeof jobTimeoutMs === "number"
? await (async () => {
let timeoutId: NodeJS.Timeout | undefined;
try {
return await Promise.race([
executeJobCore(state, job, runAbortController?.signal),
new Promise<never>((_, reject) => {
timeoutId = setTimeout(() => {
runAbortController?.abort(new Error("cron: job execution timed out"));
reject(new Error("cron: job execution timed out"));
}, jobTimeoutMs);
}),
]);
} finally {
if (timeoutId) {
clearTimeout(timeoutId);
}
}
})()
: await executeJobCore(state, job);
const result = await executeJobCoreWithTimeout(state, job);
return { jobId: id, ...result, startedAt, endedAt: state.deps.nowMs() };
} catch (err) {
state.deps.log.warn(