diff --git a/src/cron/schedule.test.ts b/src/cron/schedule.test.ts index 614a980f4cd..1b4a09744b1 100644 --- a/src/cron/schedule.test.ts +++ b/src/cron/schedule.test.ts @@ -1,5 +1,6 @@ import { beforeEach, describe, expect, it } from "vitest"; import { + coerceFiniteScheduleNumber, clearCronScheduleCacheForTest, computeNextRunAtMs, computePreviousRunAtMs, @@ -76,6 +77,26 @@ describe("cron schedule", () => { expect(next).toBe(now + 30_000); }); + it("handles string-typed everyMs and anchorMs from legacy persisted data", () => { + const anchor = Date.parse("2025-12-13T00:00:00.000Z"); + const now = anchor + 10_000; + const next = computeNextRunAtMs( + { + kind: "every", + everyMs: "30000" as unknown as number, + anchorMs: `${anchor}` as unknown as number, + }, + now, + ); + expect(next).toBe(anchor + 30_000); + }); + + it("returns undefined for non-numeric string everyMs", () => { + const now = Date.now(); + const next = computeNextRunAtMs({ kind: "every", everyMs: "abc" as unknown as number }, now); + expect(next).toBeUndefined(); + }); + it("advances when now matches anchor for every schedule", () => { const anchor = Date.parse("2025-12-13T00:00:00.000Z"); const next = computeNextRunAtMs({ kind: "every", everyMs: 30_000, anchorMs: anchor }, anchor); @@ -175,3 +196,23 @@ describe("cron schedule", () => { }); }); }); + +describe("coerceFiniteScheduleNumber", () => { + it("returns finite numbers directly", () => { + expect(coerceFiniteScheduleNumber(60_000)).toBe(60_000); + }); + + it("parses numeric strings", () => { + expect(coerceFiniteScheduleNumber("60000")).toBe(60_000); + expect(coerceFiniteScheduleNumber(" 60000 ")).toBe(60_000); + }); + + it("returns undefined for invalid inputs", () => { + expect(coerceFiniteScheduleNumber("")).toBeUndefined(); + expect(coerceFiniteScheduleNumber("abc")).toBeUndefined(); + expect(coerceFiniteScheduleNumber(NaN)).toBeUndefined(); + expect(coerceFiniteScheduleNumber(Infinity)).toBeUndefined(); + expect(coerceFiniteScheduleNumber(null)).toBeUndefined(); + expect(coerceFiniteScheduleNumber(undefined)).toBeUndefined(); + }); +}); diff --git a/src/cron/schedule.ts b/src/cron/schedule.ts index 4c31c0a1afe..e62e9e2e7ab 100644 --- a/src/cron/schedule.ts +++ b/src/cron/schedule.ts @@ -30,6 +30,21 @@ function resolveCachedCron(expr: string, timezone: string): Cron { return next; } +export function coerceFiniteScheduleNumber(value: unknown): number | undefined { + if (typeof value === "number") { + return Number.isFinite(value) ? value : undefined; + } + if (typeof value === "string") { + const trimmed = value.trim(); + if (!trimmed) { + return undefined; + } + const parsed = Number(trimmed); + return Number.isFinite(parsed) ? parsed : undefined; + } + return undefined; +} + export function computeNextRunAtMs(schedule: CronSchedule, nowMs: number): number | undefined { if (schedule.kind === "at") { // Handle both canonical `at` (string) and legacy `atMs` (number) fields. @@ -51,8 +66,13 @@ export function computeNextRunAtMs(schedule: CronSchedule, nowMs: number): numbe } if (schedule.kind === "every") { - const everyMs = Math.max(1, Math.floor(schedule.everyMs)); - const anchor = Math.max(0, Math.floor(schedule.anchorMs ?? nowMs)); + const everyMsRaw = coerceFiniteScheduleNumber(schedule.everyMs); + if (everyMsRaw === undefined) { + return undefined; + } + const everyMs = Math.max(1, Math.floor(everyMsRaw)); + const anchorRaw = coerceFiniteScheduleNumber(schedule.anchorMs); + const anchor = Math.max(0, Math.floor(anchorRaw ?? nowMs)); if (nowMs < anchor) { return anchor; } diff --git a/src/cron/service.issue-13992-regression.test.ts b/src/cron/service.issue-13992-regression.test.ts index 58db3962f65..f3ee7121a70 100644 --- a/src/cron/service.issue-13992-regression.test.ts +++ b/src/cron/service.issue-13992-regression.test.ts @@ -21,7 +21,7 @@ function createCronSystemEventJob(now: number, overrides: Partial = {}) } describe("issue #13992 regression - cron jobs skip execution", () => { - it("should NOT recompute nextRunAtMs for past-due jobs during maintenance", () => { + it("should NOT recompute nextRunAtMs for past-due jobs by default", () => { const now = Date.now(); const pastDue = now - 60_000; // 1 minute ago @@ -40,6 +40,61 @@ describe("issue #13992 regression - cron jobs skip execution", () => { expect(job.state.nextRunAtMs).toBe(pastDue); }); + it("should recompute past-due nextRunAtMs with recomputeExpired when slot already executed", () => { + // NOTE: in onTimer this recovery branch is used only when due scan found no + // runnable jobs; this unit test validates the maintenance helper contract. + const now = Date.now(); + const pastDue = now - 60_000; + + const job: CronJob = { + id: "test-job", + name: "test job", + enabled: true, + schedule: { kind: "cron", expr: "0 8 * * *", tz: "UTC" }, + payload: { kind: "systemEvent", text: "test" }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + createdAtMs: now - 3600_000, + updatedAtMs: now - 3600_000, + state: { + nextRunAtMs: pastDue, + lastRunAtMs: pastDue + 1000, + }, + }; + + const state = createMockCronStateForJobs({ jobs: [job], nowMs: now }); + recomputeNextRunsForMaintenance(state, { recomputeExpired: true }); + + expect(typeof job.state.nextRunAtMs).toBe("number"); + expect((job.state.nextRunAtMs ?? 0) > now).toBe(true); + }); + + it("should NOT recompute past-due nextRunAtMs for running jobs even with recomputeExpired", () => { + const now = Date.now(); + const pastDue = now - 60_000; + + const job: CronJob = { + id: "test-job", + name: "test job", + enabled: true, + schedule: { kind: "cron", expr: "0 8 * * *", tz: "UTC" }, + payload: { kind: "systemEvent", text: "test" }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + createdAtMs: now - 3600_000, + updatedAtMs: now - 3600_000, + state: { + nextRunAtMs: pastDue, + runningAtMs: now - 500, + }, + }; + + const state = createMockCronStateForJobs({ jobs: [job], nowMs: now }); + recomputeNextRunsForMaintenance(state, { recomputeExpired: true }); + + expect(job.state.nextRunAtMs).toBe(pastDue); + }); + it("should compute missing nextRunAtMs during maintenance", () => { const now = Date.now(); @@ -138,4 +193,78 @@ describe("issue #13992 regression - cron jobs skip execution", () => { expect(malformedJob.state.scheduleErrorCount).toBe(1); expect(malformedJob.state.lastError).toMatch(/^schedule error:/); }); + + it("recomputes expired slots already executed but keeps never-executed stale slots", () => { + const now = Date.now(); + const pastDue = now - 60_000; + const alreadyExecuted: CronJob = { + id: "already-executed", + name: "already executed", + enabled: true, + schedule: { kind: "cron", expr: "0 8 * * *", tz: "UTC" }, + payload: { kind: "systemEvent", text: "done" }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + createdAtMs: now - 86400_000, + updatedAtMs: now - 86400_000, + state: { + nextRunAtMs: pastDue, + lastRunAtMs: pastDue + 1000, + }, + }; + + const neverExecuted: CronJob = { + id: "never-executed", + name: "never executed", + enabled: true, + schedule: { kind: "cron", expr: "0 8 * * *", tz: "UTC" }, + payload: { kind: "systemEvent", text: "pending" }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + createdAtMs: now - 86400_000 * 2, + updatedAtMs: now - 86400_000 * 2, + state: { + nextRunAtMs: pastDue, + lastRunAtMs: pastDue - 86400_000, + }, + }; + + const state = createMockCronStateForJobs({ + jobs: [alreadyExecuted, neverExecuted], + nowMs: now, + }); + recomputeNextRunsForMaintenance(state, { recomputeExpired: true }); + + expect((alreadyExecuted.state.nextRunAtMs ?? 0) > now).toBe(true); + expect(neverExecuted.state.nextRunAtMs).toBe(pastDue); + }); + + it("does not advance overdue never-executed jobs when stale running marker is cleared", () => { + const now = Date.now(); + const pastDue = now - 60_000; + const staleRunningAt = now - 3 * 60 * 60_000; + + const job: CronJob = { + id: "stale-running-overdue", + name: "stale running overdue", + enabled: true, + schedule: { kind: "cron", expr: "0 8 * * *", tz: "UTC" }, + payload: { kind: "systemEvent", text: "test" }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + createdAtMs: now - 86400_000, + updatedAtMs: now - 86400_000, + state: { + nextRunAtMs: pastDue, + runningAtMs: staleRunningAt, + lastRunAtMs: pastDue - 3600_000, + }, + }; + + const state = createMockCronStateForJobs({ jobs: [job], nowMs: now }); + recomputeNextRunsForMaintenance(state, { recomputeExpired: true, nowMs: now }); + + expect(job.state.runningAtMs).toBeUndefined(); + expect(job.state.nextRunAtMs).toBe(pastDue); + }); }); diff --git a/src/cron/service.issue-17852-daily-skip.test.ts b/src/cron/service.issue-17852-daily-skip.test.ts index 3ec2a75466b..62f7d5316ce 100644 --- a/src/cron/service.issue-17852-daily-skip.test.ts +++ b/src/cron/service.issue-17852-daily-skip.test.ts @@ -36,7 +36,7 @@ describe("issue #17852 - daily cron jobs should not skip days", () => { }; } - it("recomputeNextRunsForMaintenance should NOT advance past-due nextRunAtMs", () => { + it("recomputeNextRunsForMaintenance should NOT advance past-due nextRunAtMs by default", () => { // Simulate: job scheduled for 3:00 AM, timer processing happens at 3:00:01 // The job was NOT executed in this tick (e.g., it became due between // findDueJobs and the post-execution block). @@ -53,6 +53,20 @@ describe("issue #17852 - daily cron jobs should not skip days", () => { expect(job.state.nextRunAtMs).toBe(threeAM); }); + it("recomputeNextRunsForMaintenance can advance expired nextRunAtMs on recovery path when slot already executed", () => { + const threeAM = Date.parse("2026-02-16T03:00:00.000Z"); + const now = threeAM + 1_000; // 3:00:01 + + const job = createDailyThreeAmJob(threeAM); + job.state.lastRunAtMs = threeAM + 1; + + const state = createMockCronStateForJobs({ jobs: [job], nowMs: now }); + recomputeNextRunsForMaintenance(state, { recomputeExpired: true }); + + const tomorrowThreeAM = threeAM + DAY_MS; + expect(job.state.nextRunAtMs).toBe(tomorrowThreeAM); + }); + it("full recomputeNextRuns WOULD silently advance past-due nextRunAtMs (the bug)", () => { // This test documents the buggy behavior that caused #17852. // The full recomputeNextRuns sees a past-due nextRunAtMs and advances it diff --git a/src/cron/service.issue-regressions.test.ts b/src/cron/service.issue-regressions.test.ts index ed6a927686e..9665d40ec55 100644 --- a/src/cron/service.issue-regressions.test.ts +++ b/src/cron/service.issue-regressions.test.ts @@ -423,10 +423,11 @@ describe("Cron issue regressions", () => { cron.stop(); }); - it("does not advance unrelated due jobs after manual cron.run", async () => { + it("manual cron.run preserves unrelated due jobs but advances already-executed stale slots", async () => { const store = makeStorePath(); const nowMs = Date.now(); const dueNextRunAtMs = nowMs - 1_000; + const staleExecutedNextRunAtMs = nowMs - 2_000; await writeCronJobs(store.storePath, [ createIsolatedRegressionJob({ @@ -445,6 +446,17 @@ describe("Cron issue regressions", () => { payload: { kind: "agentTurn", message: "unrelated due" }, state: { nextRunAtMs: dueNextRunAtMs }, }), + createIsolatedRegressionJob({ + id: "unrelated-stale-executed", + name: "unrelated stale executed", + scheduledAt: nowMs, + schedule: { kind: "cron", expr: "*/5 * * * *", tz: "UTC" }, + payload: { kind: "agentTurn", message: "unrelated stale executed" }, + state: { + nextRunAtMs: staleExecutedNextRunAtMs, + lastRunAtMs: staleExecutedNextRunAtMs + 1, + }, + }), ]); const cron = await startCronForStore({ @@ -458,8 +470,11 @@ describe("Cron issue regressions", () => { const jobs = await cron.list({ includeDisabled: true }); const unrelated = jobs.find((entry) => entry.id === "unrelated-due"); + const staleExecuted = jobs.find((entry) => entry.id === "unrelated-stale-executed"); expect(unrelated).toBeDefined(); expect(unrelated?.state.nextRunAtMs).toBe(dueNextRunAtMs); + expect(staleExecuted).toBeDefined(); + expect((staleExecuted?.state.nextRunAtMs ?? 0) > nowMs).toBe(true); cron.stop(); }); @@ -1499,4 +1514,41 @@ describe("Cron issue regressions", () => { expect(job.state.nextRunAtMs).toBe(endedAt + 30_000); expect(job.enabled).toBe(true); }); + + it("force run preserves 'every' anchor while recording manual lastRunAtMs", () => { + const nowMs = Date.now(); + const everyMs = 24 * 60 * 60 * 1_000; + const lastScheduledRunMs = nowMs - 6 * 60 * 60 * 1_000; + const expectedNextMs = lastScheduledRunMs + everyMs; + + const job: CronJob = { + id: "daily-job", + name: "Daily job", + enabled: true, + createdAtMs: lastScheduledRunMs - everyMs, + updatedAtMs: lastScheduledRunMs, + schedule: { kind: "every", everyMs, anchorMs: lastScheduledRunMs - everyMs }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "daily check-in" }, + state: { + lastRunAtMs: lastScheduledRunMs, + nextRunAtMs: expectedNextMs, + }, + }; + const state = createRunningCronServiceState({ + storePath: "/tmp/cron-force-run-anchor-test.json", + log: noopLogger as never, + nowMs: () => nowMs, + jobs: [job], + }); + + const startedAt = nowMs; + const endedAt = nowMs + 2_000; + + applyJobResult(state, job, { status: "ok", startedAt, endedAt }, { preserveSchedule: true }); + + expect(job.state.lastRunAtMs).toBe(startedAt); + expect(job.state.nextRunAtMs).toBe(expectedNextMs); + }); }); diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index 6ae2e130412..4f3b5682a44 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -1,7 +1,11 @@ import crypto from "node:crypto"; import { normalizeAgentId } from "../../routing/session-key.js"; import { parseAbsoluteTimeMs } from "../parse.js"; -import { computeNextRunAtMs, computePreviousRunAtMs } from "../schedule.js"; +import { + coerceFiniteScheduleNumber, + computeNextRunAtMs, + computePreviousRunAtMs, +} from "../schedule.js"; import { normalizeCronStaggerMs, resolveCronStaggerMs, @@ -31,6 +35,10 @@ const STUCK_RUN_MS = 2 * 60 * 60 * 1000; const STAGGER_OFFSET_CACHE_MAX = 4096; const staggerOffsetCache = new Map(); +function isFiniteTimestamp(value: unknown): value is number { + return typeof value === "number" && Number.isFinite(value); +} + function resolveStableCronOffsetMs(jobId: string, staggerMs: number) { if (staggerMs <= 1) { return 0; @@ -108,17 +116,13 @@ function computeStaggeredCronPreviousRunAtMs(job: CronJob, nowMs: number) { return undefined; } -function isFiniteTimestamp(value: unknown): value is number { - return typeof value === "number" && Number.isFinite(value); -} - function resolveEveryAnchorMs(params: { schedule: { everyMs: number; anchorMs?: number }; fallbackAnchorMs: number; }) { - const raw = params.schedule.anchorMs; - if (isFiniteTimestamp(raw)) { - return Math.max(0, Math.floor(raw)); + const coerced = coerceFiniteScheduleNumber(params.schedule.anchorMs); + if (coerced !== undefined) { + return Math.max(0, Math.floor(coerced)); } if (isFiniteTimestamp(params.fallbackAnchorMs)) { return Math.max(0, Math.floor(params.fallbackAnchorMs)); @@ -229,7 +233,11 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und return undefined; } if (job.schedule.kind === "every") { - const everyMs = Math.max(1, Math.floor(job.schedule.everyMs)); + const everyMsRaw = coerceFiniteScheduleNumber(job.schedule.everyMs); + if (everyMsRaw === undefined) { + return undefined; + } + const everyMs = Math.max(1, Math.floor(everyMsRaw)); const lastRunAtMs = job.state.lastRunAtMs; if (typeof lastRunAtMs === "number" && Number.isFinite(lastRunAtMs)) { const nextFromLastRun = Math.floor(lastRunAtMs) + everyMs; @@ -374,21 +382,21 @@ function normalizeJobTickState(params: { state: CronServiceState; job: CronJob; function walkSchedulableJobs( state: CronServiceState, fn: (params: { job: CronJob; nowMs: number }) => boolean, + nowMs = state.deps.nowMs(), ): boolean { if (!state.store) { return false; } let changed = false; - const now = state.deps.nowMs(); for (const job of state.store.jobs) { - const tick = normalizeJobTickState({ state, job, nowMs: now }); + const tick = normalizeJobTickState({ state, job, nowMs }); if (tick.changed) { changed = true; } if (tick.skip) { continue; } - if (fn({ job, nowMs: now })) { + if (fn({ job, nowMs })) { changed = true; } } @@ -440,19 +448,39 @@ export function recomputeNextRuns(state: CronServiceState): boolean { * to prevent silently advancing past-due nextRunAtMs values without execution * (see #13992). */ -export function recomputeNextRunsForMaintenance(state: CronServiceState): boolean { - return walkSchedulableJobs(state, ({ job, nowMs: now }) => { - let changed = false; - // Only compute missing nextRunAtMs, do NOT recompute existing ones. - // If a job was past-due but not found by findDueJobs, recomputing would - // cause it to be silently skipped. - if (!isFiniteTimestamp(job.state.nextRunAtMs)) { - if (recomputeJobNextRunAtMs({ state, job, nowMs: now })) { - changed = true; +export function recomputeNextRunsForMaintenance( + state: CronServiceState, + opts?: { recomputeExpired?: boolean; nowMs?: number }, +): boolean { + const recomputeExpired = opts?.recomputeExpired ?? false; + return walkSchedulableJobs( + state, + ({ job, nowMs: now }) => { + let changed = false; + if (!isFiniteTimestamp(job.state.nextRunAtMs)) { + // Missing or invalid nextRunAtMs is always repaired. + if (recomputeJobNextRunAtMs({ state, job, nowMs: now })) { + changed = true; + } + } else if ( + recomputeExpired && + now >= job.state.nextRunAtMs && + typeof job.state.runningAtMs !== "number" + ) { + // Only advance when the expired slot was already executed. + // If not, preserve the past-due value so the job can still run. + const lastRun = job.state.lastRunAtMs; + const alreadyExecutedSlot = isFiniteTimestamp(lastRun) && lastRun >= job.state.nextRunAtMs; + if (alreadyExecutedSlot) { + if (recomputeJobNextRunAtMs({ state, job, nowMs: now })) { + changed = true; + } + } } - } - return changed; - }); + return changed; + }, + opts?.nowMs, + ); } export function nextWakeAtMs(state: CronServiceState) { diff --git a/src/cron/service/ops.ts b/src/cron/service/ops.ts index dd02ca4ab6d..14758c5df34 100644 --- a/src/cron/service/ops.ts +++ b/src/cron/service/ops.ts @@ -398,13 +398,18 @@ export async function run(state: CronServiceState, id: string, mode?: "due" | "f return; } - const shouldDelete = applyJobResult(state, job, { - status: coreResult.status, - error: coreResult.error, - delivered: coreResult.delivered, - startedAt, - endedAt, - }); + const shouldDelete = applyJobResult( + state, + job, + { + status: coreResult.status, + error: coreResult.error, + delivered: coreResult.delivered, + startedAt, + endedAt, + }, + { preserveSchedule: mode === "force" }, + ); emit(state, { jobId: job.id, @@ -450,7 +455,7 @@ export async function run(state: CronServiceState, id: string, mode?: "due" | "f snapshot: postRunSnapshot, removed: postRunRemoved, }); - recomputeNextRunsForMaintenance(state); + recomputeNextRunsForMaintenance(state, { recomputeExpired: true }); await persist(state); armTimer(state); }); diff --git a/src/cron/service/store.ts b/src/cron/service/store.ts index dca0bde2efe..0a52197bf81 100644 --- a/src/cron/service/store.ts +++ b/src/cron/service/store.ts @@ -6,6 +6,7 @@ import { } from "../legacy-delivery.js"; import { parseAbsoluteTimeMs } from "../parse.js"; import { migrateLegacyCronPayload } from "../payload-migration.js"; +import { coerceFiniteScheduleNumber } from "../schedule.js"; import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "../stagger.js"; import { loadCronStore, saveCronStore } from "../store.js"; import type { CronJob } from "../types.js"; @@ -411,15 +412,18 @@ export async function ensureLoaded( } const everyMsRaw = sched.everyMs; - const everyMs = - typeof everyMsRaw === "number" && Number.isFinite(everyMsRaw) - ? Math.floor(everyMsRaw) - : null; + const everyMsCoerced = coerceFiniteScheduleNumber(everyMsRaw); + const everyMs = everyMsCoerced !== undefined ? Math.floor(everyMsCoerced) : null; + if (everyMs !== null && everyMsRaw !== everyMs) { + sched.everyMs = everyMs; + mutated = true; + } if ((kind === "every" || sched.kind === "every") && everyMs !== null) { const anchorRaw = sched.anchorMs; + const anchorCoerced = coerceFiniteScheduleNumber(anchorRaw); const normalizedAnchor = - typeof anchorRaw === "number" && Number.isFinite(anchorRaw) - ? Math.max(0, Math.floor(anchorRaw)) + anchorCoerced !== undefined + ? Math.max(0, Math.floor(anchorCoerced)) : typeof raw.createdAtMs === "number" && Number.isFinite(raw.createdAtMs) ? Math.max(0, Math.floor(raw.createdAtMs)) : typeof raw.updatedAtMs === "number" && Number.isFinite(raw.updatedAtMs) diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index f871edcdd49..8d1d40024ed 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -287,7 +287,21 @@ export function applyJobResult( startedAt: number; endedAt: number; }, + opts?: { + // Preserve recurring "every" anchors for manual force runs. + preserveSchedule?: boolean; + }, ): boolean { + const prevLastRunAtMs = job.state.lastRunAtMs; + const computeNextWithPreservedLastRun = (nowMs: number) => { + const saved = job.state.lastRunAtMs; + job.state.lastRunAtMs = prevLastRunAtMs; + try { + return computeJobNextRunAtMs(job, nowMs); + } finally { + job.state.lastRunAtMs = saved; + } + }; job.state.runningAtMs = undefined; job.state.lastRunAtMs = result.startedAt; job.state.lastRunStatus = result.status; @@ -385,7 +399,10 @@ export function applyJobResult( const backoff = errorBackoffMs(job.state.consecutiveErrors ?? 1); let normalNext: number | undefined; try { - normalNext = computeJobNextRunAtMs(job, result.endedAt); + normalNext = + opts?.preserveSchedule && job.schedule.kind === "every" + ? computeNextWithPreservedLastRun(result.endedAt) + : computeJobNextRunAtMs(job, result.endedAt); } catch (err) { // If the schedule expression/timezone throws (croner edge cases), // record the schedule error (auto-disables after repeated failures) @@ -408,7 +425,10 @@ export function applyJobResult( } else if (job.enabled) { let naturalNext: number | undefined; try { - naturalNext = computeJobNextRunAtMs(job, result.endedAt); + naturalNext = + opts?.preserveSchedule && job.schedule.kind === "every" + ? computeNextWithPreservedLastRun(result.endedAt) + : computeJobNextRunAtMs(job, result.endedAt); } catch (err) { // If the schedule expression/timezone throws (croner edge cases), // record the schedule error (auto-disables after repeated failures) @@ -552,13 +572,17 @@ export async function onTimer(state: CronServiceState) { try { const dueJobs = await locked(state, async () => { await ensureLoaded(state, { forceReload: true, skipRecompute: true }); - const due = findDueJobs(state); + const dueCheckNow = state.deps.nowMs(); + const due = collectRunnableJobs(state, dueCheckNow); if (due.length === 0) { // Use maintenance-only recompute to avoid advancing past-due nextRunAtMs // values without execution. This prevents jobs from being silently skipped // when the timer wakes up but findDueJobs returns empty (see #13992). - const changed = recomputeNextRunsForMaintenance(state); + const changed = recomputeNextRunsForMaintenance(state, { + recomputeExpired: true, + nowMs: dueCheckNow, + }); if (changed) { await persist(state); } @@ -688,14 +712,6 @@ export async function onTimer(state: CronServiceState) { } } -function findDueJobs(state: CronServiceState): CronJob[] { - if (!state.store) { - return []; - } - const now = state.deps.nowMs(); - return collectRunnableJobs(state, now); -} - function isRunnableJob(params: { job: CronJob; nowMs: number;