perf: avoid async cron timer callbacks

This commit is contained in:
Peter Steinberger
2026-02-16 02:31:21 +00:00
parent 8515ae6eea
commit 17e5a5015c
2 changed files with 80 additions and 58 deletions

View File

@@ -16,6 +16,21 @@ async function makeStorePath() {
return { return {
storePath: path.join(dir, "cron", "jobs.json"), storePath: path.join(dir, "cron", "jobs.json"),
cleanup: async () => { cleanup: async () => {
// On macOS, teardown can race with trailing async fs writes and leave
// transient ENOTEMPTY errors. Retry briefly for stability.
for (let i = 0; i < 10; i += 1) {
try {
await fs.rm(dir, { recursive: true, force: true });
return;
} catch (err) {
const code = (err as NodeJS.ErrnoException).code;
if (code !== "ENOTEMPTY") {
throw err;
}
// eslint-disable-next-line no-await-in-loop
await new Promise<void>((resolve) => setTimeout(resolve, 10));
}
}
await fs.rm(dir, { recursive: true, force: true }); await fs.rm(dir, { recursive: true, force: true });
}, },
}; };
@@ -23,24 +38,35 @@ async function makeStorePath() {
describe("CronService read ops while job is running", () => { describe("CronService read ops while job is running", () => {
it("keeps list and status responsive during a long isolated run", async () => { it("keeps list and status responsive during a long isolated run", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2025-12-13T00:00:00.000Z"));
const store = await makeStorePath(); const store = await makeStorePath();
const enqueueSystemEvent = vi.fn(); const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn(); const requestHeartbeatNow = vi.fn();
let resolveFinished: (() => void) | undefined;
const finished = new Promise<void>((resolve) => {
resolveFinished = resolve;
});
let resolveRun: let resolveRun:
| ((value: { status: "ok" | "error" | "skipped"; summary?: string; error?: string }) => void) | ((value: { status: "ok" | "error" | "skipped"; summary?: string; error?: string }) => void)
| undefined; | undefined;
const runIsolatedAgentJob = vi.fn( let resolveRunStarted: (() => void) | undefined;
async () => const runStarted = new Promise<void>((resolve) => {
await new Promise<{ resolveRunStarted = resolve;
status: "ok" | "error" | "skipped"; });
summary?: string;
error?: string; const runIsolatedAgentJob = vi.fn(async () => {
}>((resolve) => { resolveRunStarted?.();
resolveRun = resolve; return await new Promise<{
}), status: "ok" | "error" | "skipped";
); summary?: string;
error?: string;
}>((resolve) => {
resolveRun = resolve;
});
});
const cron = new CronService({ const cron = new CronService({
storePath: store.storePath, storePath: store.storePath,
@@ -49,70 +75,67 @@ describe("CronService read ops while job is running", () => {
enqueueSystemEvent, enqueueSystemEvent,
requestHeartbeatNow, requestHeartbeatNow,
runIsolatedAgentJob, runIsolatedAgentJob,
onEvent: (evt) => {
if (evt.action === "finished" && evt.status === "ok") {
resolveFinished?.();
}
},
}); });
const timeout = async <T>(promise: Promise<T>, ms: number): Promise<T> => {
let t: NodeJS.Timeout;
const timeoutPromise = new Promise<never>((_, reject) => {
t = setTimeout(() => reject(new Error("timeout")), ms);
});
return await Promise.race([promise.finally(() => clearTimeout(t!)), timeoutPromise]);
};
try { try {
await cron.start(); await cron.start();
// Schedule the job in the past so the cron timer fires immediately. // Schedule the job a second in the future; then jump time to trigger the tick.
await cron.add({ await cron.add({
name: "slow isolated", name: "slow isolated",
enabled: true, enabled: true,
deleteAfterRun: false, deleteAfterRun: false,
schedule: { kind: "at", at: new Date(Date.now() - 1).toISOString() }, schedule: {
kind: "at",
at: new Date("2025-12-13T00:00:01.000Z").toISOString(),
},
sessionTarget: "isolated", sessionTarget: "isolated",
wakeMode: "next-heartbeat", wakeMode: "next-heartbeat",
payload: { kind: "agentTurn", message: "long task" }, payload: { kind: "agentTurn", message: "long task" },
delivery: { mode: "none" }, delivery: { mode: "none" },
}); });
// Let the scheduler tick and start the job. vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z"));
await timeout( await vi.runOnlyPendingTimersAsync();
(async () => {
for (;;) {
if (runIsolatedAgentJob.mock.calls.length > 0) {
return;
}
await new Promise<void>((resolve) => setTimeout(resolve, 0));
}
})(),
2000,
);
await runStarted;
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1); expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1);
await expect(timeout(cron.list({ includeDisabled: true }), 1000)).resolves.toBeTypeOf( await expect(cron.list({ includeDisabled: true })).resolves.toBeTypeOf("object");
"object", await expect(cron.status()).resolves.toBeTypeOf("object");
);
await expect(timeout(cron.status(), 1000)).resolves.toBeTypeOf("object");
const running = await cron.list({ includeDisabled: true }); const running = await cron.list({ includeDisabled: true });
expect(running[0]?.state.runningAtMs).toBeTypeOf("number"); expect(running[0]?.state.runningAtMs).toBeTypeOf("number");
resolveRun?.({ status: "ok", summary: "done" }); resolveRun?.({ status: "ok", summary: "done" });
await timeout( // Wait until the scheduler writes the result back to the store.
(async () => { await finished;
for (;;) { // Ensure any trailing store writes have finished before cleanup.
const finished = await cron.list({ includeDisabled: true }); await cron.status();
if (finished[0]?.state.lastStatus === "ok") {
return; const completed = await cron.list({ includeDisabled: true });
} expect(completed[0]?.state.lastStatus).toBe("ok");
await new Promise<void>((resolve) => setTimeout(resolve, 0));
} // Ensure the scheduler loop has fully settled before deleting the store directory.
})(), const internal = cron as unknown as { state?: { running?: boolean } };
2000, for (let i = 0; i < 100; i += 1) {
); if (!internal.state?.running) {
break;
}
// eslint-disable-next-line no-await-in-loop
await Promise.resolve();
}
expect(internal.state?.running).toBe(false);
} finally { } finally {
cron.stop(); cron.stop();
vi.clearAllTimers();
vi.useRealTimers();
await store.cleanup(); await store.cleanup();
} }
}); });

View File

@@ -144,12 +144,13 @@ export function armTimer(state: CronServiceState) {
// Wake at least once a minute to avoid schedule drift and recover quickly // Wake at least once a minute to avoid schedule drift and recover quickly
// when the process was paused or wall-clock time jumps. // when the process was paused or wall-clock time jumps.
const clampedDelay = Math.min(delay, MAX_TIMER_DELAY_MS); const clampedDelay = Math.min(delay, MAX_TIMER_DELAY_MS);
state.timer = setTimeout(async () => { // Intentionally avoid an `async` timer callback:
try { // Vitest's fake-timer helpers can await async callbacks, which would block
await onTimer(state); // tests that simulate long-running jobs. Runtime behavior is unchanged.
} catch (err) { state.timer = setTimeout(() => {
void onTimer(state).catch((err) => {
state.deps.log.error({ err: String(err) }, "cron: timer tick failed"); state.deps.log.error({ err: String(err) }, "cron: timer tick failed");
} });
}, clampedDelay); }, clampedDelay);
state.deps.log.debug( state.deps.log.debug(
{ nextAt, delayMs: clampedDelay, clamped: delay > MAX_TIMER_DELAY_MS }, { nextAt, delayMs: clampedDelay, clamped: delay > MAX_TIMER_DELAY_MS },
@@ -172,12 +173,10 @@ export async function onTimer(state: CronServiceState) {
if (state.timer) { if (state.timer) {
clearTimeout(state.timer); clearTimeout(state.timer);
} }
state.timer = setTimeout(async () => { state.timer = setTimeout(() => {
try { void onTimer(state).catch((err) => {
await onTimer(state);
} catch (err) {
state.deps.log.error({ err: String(err) }, "cron: timer tick failed"); state.deps.log.error({ err: String(err) }, "cron: timer tick failed");
} });
}, MAX_TIMER_DELAY_MS); }, MAX_TIMER_DELAY_MS);
return; return;
} }