diff --git a/src/cron/service.read-ops-nonblocking.test.ts b/src/cron/service.read-ops-nonblocking.test.ts index 048246cc5cb..3a649d7ce90 100644 --- a/src/cron/service.read-ops-nonblocking.test.ts +++ b/src/cron/service.read-ops-nonblocking.test.ts @@ -16,6 +16,21 @@ async function makeStorePath() { return { storePath: path.join(dir, "cron", "jobs.json"), cleanup: async () => { + // On macOS, teardown can race with trailing async fs writes and leave + // transient ENOTEMPTY errors. Retry briefly for stability. + for (let i = 0; i < 10; i += 1) { + try { + await fs.rm(dir, { recursive: true, force: true }); + return; + } catch (err) { + const code = (err as NodeJS.ErrnoException).code; + if (code !== "ENOTEMPTY") { + throw err; + } + // eslint-disable-next-line no-await-in-loop + await new Promise((resolve) => setTimeout(resolve, 10)); + } + } await fs.rm(dir, { recursive: true, force: true }); }, }; @@ -23,24 +38,35 @@ async function makeStorePath() { describe("CronService read ops while job is running", () => { it("keeps list and status responsive during a long isolated run", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2025-12-13T00:00:00.000Z")); const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); const requestHeartbeatNow = vi.fn(); + let resolveFinished: (() => void) | undefined; + const finished = new Promise((resolve) => { + resolveFinished = resolve; + }); let resolveRun: | ((value: { status: "ok" | "error" | "skipped"; summary?: string; error?: string }) => void) | undefined; - const runIsolatedAgentJob = vi.fn( - async () => - await new Promise<{ - status: "ok" | "error" | "skipped"; - summary?: string; - error?: string; - }>((resolve) => { - resolveRun = resolve; - }), - ); + let resolveRunStarted: (() => void) | undefined; + const runStarted = new Promise((resolve) => { + resolveRunStarted = resolve; + }); + + const runIsolatedAgentJob = vi.fn(async () => { + resolveRunStarted?.(); + return await new Promise<{ + status: "ok" | "error" | "skipped"; + summary?: string; + error?: string; + }>((resolve) => { + resolveRun = resolve; + }); + }); const cron = new CronService({ storePath: store.storePath, @@ -49,70 +75,67 @@ describe("CronService read ops while job is running", () => { enqueueSystemEvent, requestHeartbeatNow, runIsolatedAgentJob, + onEvent: (evt) => { + if (evt.action === "finished" && evt.status === "ok") { + resolveFinished?.(); + } + }, }); - const timeout = async (promise: Promise, ms: number): Promise => { - let t: NodeJS.Timeout; - const timeoutPromise = new Promise((_, reject) => { - t = setTimeout(() => reject(new Error("timeout")), ms); - }); - return await Promise.race([promise.finally(() => clearTimeout(t!)), timeoutPromise]); - }; - try { await cron.start(); - // Schedule the job in the past so the cron timer fires immediately. + // Schedule the job a second in the future; then jump time to trigger the tick. await cron.add({ name: "slow isolated", enabled: true, deleteAfterRun: false, - schedule: { kind: "at", at: new Date(Date.now() - 1).toISOString() }, + schedule: { + kind: "at", + at: new Date("2025-12-13T00:00:01.000Z").toISOString(), + }, sessionTarget: "isolated", wakeMode: "next-heartbeat", payload: { kind: "agentTurn", message: "long task" }, delivery: { mode: "none" }, }); - // Let the scheduler tick and start the job. - await timeout( - (async () => { - for (;;) { - if (runIsolatedAgentJob.mock.calls.length > 0) { - return; - } - await new Promise((resolve) => setTimeout(resolve, 0)); - } - })(), - 2000, - ); + vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z")); + await vi.runOnlyPendingTimersAsync(); + await runStarted; expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1); - await expect(timeout(cron.list({ includeDisabled: true }), 1000)).resolves.toBeTypeOf( - "object", - ); - await expect(timeout(cron.status(), 1000)).resolves.toBeTypeOf("object"); + await expect(cron.list({ includeDisabled: true })).resolves.toBeTypeOf("object"); + await expect(cron.status()).resolves.toBeTypeOf("object"); const running = await cron.list({ includeDisabled: true }); expect(running[0]?.state.runningAtMs).toBeTypeOf("number"); resolveRun?.({ status: "ok", summary: "done" }); - await timeout( - (async () => { - for (;;) { - const finished = await cron.list({ includeDisabled: true }); - if (finished[0]?.state.lastStatus === "ok") { - return; - } - await new Promise((resolve) => setTimeout(resolve, 0)); - } - })(), - 2000, - ); + // Wait until the scheduler writes the result back to the store. + await finished; + // Ensure any trailing store writes have finished before cleanup. + await cron.status(); + + const completed = await cron.list({ includeDisabled: true }); + expect(completed[0]?.state.lastStatus).toBe("ok"); + + // Ensure the scheduler loop has fully settled before deleting the store directory. + const internal = cron as unknown as { state?: { running?: boolean } }; + for (let i = 0; i < 100; i += 1) { + if (!internal.state?.running) { + break; + } + // eslint-disable-next-line no-await-in-loop + await Promise.resolve(); + } + expect(internal.state?.running).toBe(false); } finally { cron.stop(); + vi.clearAllTimers(); + vi.useRealTimers(); await store.cleanup(); } }); diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 08cdfad626d..674f191a875 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -144,12 +144,13 @@ export function armTimer(state: CronServiceState) { // Wake at least once a minute to avoid schedule drift and recover quickly // when the process was paused or wall-clock time jumps. const clampedDelay = Math.min(delay, MAX_TIMER_DELAY_MS); - state.timer = setTimeout(async () => { - try { - await onTimer(state); - } catch (err) { + // Intentionally avoid an `async` timer callback: + // Vitest's fake-timer helpers can await async callbacks, which would block + // tests that simulate long-running jobs. Runtime behavior is unchanged. + state.timer = setTimeout(() => { + void onTimer(state).catch((err) => { state.deps.log.error({ err: String(err) }, "cron: timer tick failed"); - } + }); }, clampedDelay); state.deps.log.debug( { nextAt, delayMs: clampedDelay, clamped: delay > MAX_TIMER_DELAY_MS }, @@ -172,12 +173,10 @@ export async function onTimer(state: CronServiceState) { if (state.timer) { clearTimeout(state.timer); } - state.timer = setTimeout(async () => { - try { - await onTimer(state); - } catch (err) { + state.timer = setTimeout(() => { + void onTimer(state).catch((err) => { state.deps.log.error({ err: String(err) }, "cron: timer tick failed"); - } + }); }, MAX_TIMER_DELAY_MS); return; }