mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-30 11:44:15 +00:00
cron: unify stale-run recovery and preserve manual-run every anchors (#35363)
* cron: unify stale-run recovery and preserve manual every anchors * cron: address unresolved review threads on recovery paths * cron: remove duplicate timestamp helper after rebase
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
import { beforeEach, describe, expect, it } from "vitest";
|
||||
import {
|
||||
coerceFiniteScheduleNumber,
|
||||
clearCronScheduleCacheForTest,
|
||||
computeNextRunAtMs,
|
||||
computePreviousRunAtMs,
|
||||
@@ -76,6 +77,26 @@ describe("cron schedule", () => {
|
||||
expect(next).toBe(now + 30_000);
|
||||
});
|
||||
|
||||
it("handles string-typed everyMs and anchorMs from legacy persisted data", () => {
|
||||
const anchor = Date.parse("2025-12-13T00:00:00.000Z");
|
||||
const now = anchor + 10_000;
|
||||
const next = computeNextRunAtMs(
|
||||
{
|
||||
kind: "every",
|
||||
everyMs: "30000" as unknown as number,
|
||||
anchorMs: `${anchor}` as unknown as number,
|
||||
},
|
||||
now,
|
||||
);
|
||||
expect(next).toBe(anchor + 30_000);
|
||||
});
|
||||
|
||||
it("returns undefined for non-numeric string everyMs", () => {
|
||||
const now = Date.now();
|
||||
const next = computeNextRunAtMs({ kind: "every", everyMs: "abc" as unknown as number }, now);
|
||||
expect(next).toBeUndefined();
|
||||
});
|
||||
|
||||
it("advances when now matches anchor for every schedule", () => {
|
||||
const anchor = Date.parse("2025-12-13T00:00:00.000Z");
|
||||
const next = computeNextRunAtMs({ kind: "every", everyMs: 30_000, anchorMs: anchor }, anchor);
|
||||
@@ -175,3 +196,23 @@ describe("cron schedule", () => {
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("coerceFiniteScheduleNumber", () => {
|
||||
it("returns finite numbers directly", () => {
|
||||
expect(coerceFiniteScheduleNumber(60_000)).toBe(60_000);
|
||||
});
|
||||
|
||||
it("parses numeric strings", () => {
|
||||
expect(coerceFiniteScheduleNumber("60000")).toBe(60_000);
|
||||
expect(coerceFiniteScheduleNumber(" 60000 ")).toBe(60_000);
|
||||
});
|
||||
|
||||
it("returns undefined for invalid inputs", () => {
|
||||
expect(coerceFiniteScheduleNumber("")).toBeUndefined();
|
||||
expect(coerceFiniteScheduleNumber("abc")).toBeUndefined();
|
||||
expect(coerceFiniteScheduleNumber(NaN)).toBeUndefined();
|
||||
expect(coerceFiniteScheduleNumber(Infinity)).toBeUndefined();
|
||||
expect(coerceFiniteScheduleNumber(null)).toBeUndefined();
|
||||
expect(coerceFiniteScheduleNumber(undefined)).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -30,6 +30,21 @@ function resolveCachedCron(expr: string, timezone: string): Cron {
|
||||
return next;
|
||||
}
|
||||
|
||||
export function coerceFiniteScheduleNumber(value: unknown): number | undefined {
|
||||
if (typeof value === "number") {
|
||||
return Number.isFinite(value) ? value : undefined;
|
||||
}
|
||||
if (typeof value === "string") {
|
||||
const trimmed = value.trim();
|
||||
if (!trimmed) {
|
||||
return undefined;
|
||||
}
|
||||
const parsed = Number(trimmed);
|
||||
return Number.isFinite(parsed) ? parsed : undefined;
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
export function computeNextRunAtMs(schedule: CronSchedule, nowMs: number): number | undefined {
|
||||
if (schedule.kind === "at") {
|
||||
// Handle both canonical `at` (string) and legacy `atMs` (number) fields.
|
||||
@@ -51,8 +66,13 @@ export function computeNextRunAtMs(schedule: CronSchedule, nowMs: number): numbe
|
||||
}
|
||||
|
||||
if (schedule.kind === "every") {
|
||||
const everyMs = Math.max(1, Math.floor(schedule.everyMs));
|
||||
const anchor = Math.max(0, Math.floor(schedule.anchorMs ?? nowMs));
|
||||
const everyMsRaw = coerceFiniteScheduleNumber(schedule.everyMs);
|
||||
if (everyMsRaw === undefined) {
|
||||
return undefined;
|
||||
}
|
||||
const everyMs = Math.max(1, Math.floor(everyMsRaw));
|
||||
const anchorRaw = coerceFiniteScheduleNumber(schedule.anchorMs);
|
||||
const anchor = Math.max(0, Math.floor(anchorRaw ?? nowMs));
|
||||
if (nowMs < anchor) {
|
||||
return anchor;
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ function createCronSystemEventJob(now: number, overrides: Partial<CronJob> = {})
|
||||
}
|
||||
|
||||
describe("issue #13992 regression - cron jobs skip execution", () => {
|
||||
it("should NOT recompute nextRunAtMs for past-due jobs during maintenance", () => {
|
||||
it("should NOT recompute nextRunAtMs for past-due jobs by default", () => {
|
||||
const now = Date.now();
|
||||
const pastDue = now - 60_000; // 1 minute ago
|
||||
|
||||
@@ -40,6 +40,61 @@ describe("issue #13992 regression - cron jobs skip execution", () => {
|
||||
expect(job.state.nextRunAtMs).toBe(pastDue);
|
||||
});
|
||||
|
||||
it("should recompute past-due nextRunAtMs with recomputeExpired when slot already executed", () => {
|
||||
// NOTE: in onTimer this recovery branch is used only when due scan found no
|
||||
// runnable jobs; this unit test validates the maintenance helper contract.
|
||||
const now = Date.now();
|
||||
const pastDue = now - 60_000;
|
||||
|
||||
const job: CronJob = {
|
||||
id: "test-job",
|
||||
name: "test job",
|
||||
enabled: true,
|
||||
schedule: { kind: "cron", expr: "0 8 * * *", tz: "UTC" },
|
||||
payload: { kind: "systemEvent", text: "test" },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
createdAtMs: now - 3600_000,
|
||||
updatedAtMs: now - 3600_000,
|
||||
state: {
|
||||
nextRunAtMs: pastDue,
|
||||
lastRunAtMs: pastDue + 1000,
|
||||
},
|
||||
};
|
||||
|
||||
const state = createMockCronStateForJobs({ jobs: [job], nowMs: now });
|
||||
recomputeNextRunsForMaintenance(state, { recomputeExpired: true });
|
||||
|
||||
expect(typeof job.state.nextRunAtMs).toBe("number");
|
||||
expect((job.state.nextRunAtMs ?? 0) > now).toBe(true);
|
||||
});
|
||||
|
||||
it("should NOT recompute past-due nextRunAtMs for running jobs even with recomputeExpired", () => {
|
||||
const now = Date.now();
|
||||
const pastDue = now - 60_000;
|
||||
|
||||
const job: CronJob = {
|
||||
id: "test-job",
|
||||
name: "test job",
|
||||
enabled: true,
|
||||
schedule: { kind: "cron", expr: "0 8 * * *", tz: "UTC" },
|
||||
payload: { kind: "systemEvent", text: "test" },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
createdAtMs: now - 3600_000,
|
||||
updatedAtMs: now - 3600_000,
|
||||
state: {
|
||||
nextRunAtMs: pastDue,
|
||||
runningAtMs: now - 500,
|
||||
},
|
||||
};
|
||||
|
||||
const state = createMockCronStateForJobs({ jobs: [job], nowMs: now });
|
||||
recomputeNextRunsForMaintenance(state, { recomputeExpired: true });
|
||||
|
||||
expect(job.state.nextRunAtMs).toBe(pastDue);
|
||||
});
|
||||
|
||||
it("should compute missing nextRunAtMs during maintenance", () => {
|
||||
const now = Date.now();
|
||||
|
||||
@@ -138,4 +193,78 @@ describe("issue #13992 regression - cron jobs skip execution", () => {
|
||||
expect(malformedJob.state.scheduleErrorCount).toBe(1);
|
||||
expect(malformedJob.state.lastError).toMatch(/^schedule error:/);
|
||||
});
|
||||
|
||||
it("recomputes expired slots already executed but keeps never-executed stale slots", () => {
|
||||
const now = Date.now();
|
||||
const pastDue = now - 60_000;
|
||||
const alreadyExecuted: CronJob = {
|
||||
id: "already-executed",
|
||||
name: "already executed",
|
||||
enabled: true,
|
||||
schedule: { kind: "cron", expr: "0 8 * * *", tz: "UTC" },
|
||||
payload: { kind: "systemEvent", text: "done" },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
createdAtMs: now - 86400_000,
|
||||
updatedAtMs: now - 86400_000,
|
||||
state: {
|
||||
nextRunAtMs: pastDue,
|
||||
lastRunAtMs: pastDue + 1000,
|
||||
},
|
||||
};
|
||||
|
||||
const neverExecuted: CronJob = {
|
||||
id: "never-executed",
|
||||
name: "never executed",
|
||||
enabled: true,
|
||||
schedule: { kind: "cron", expr: "0 8 * * *", tz: "UTC" },
|
||||
payload: { kind: "systemEvent", text: "pending" },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
createdAtMs: now - 86400_000 * 2,
|
||||
updatedAtMs: now - 86400_000 * 2,
|
||||
state: {
|
||||
nextRunAtMs: pastDue,
|
||||
lastRunAtMs: pastDue - 86400_000,
|
||||
},
|
||||
};
|
||||
|
||||
const state = createMockCronStateForJobs({
|
||||
jobs: [alreadyExecuted, neverExecuted],
|
||||
nowMs: now,
|
||||
});
|
||||
recomputeNextRunsForMaintenance(state, { recomputeExpired: true });
|
||||
|
||||
expect((alreadyExecuted.state.nextRunAtMs ?? 0) > now).toBe(true);
|
||||
expect(neverExecuted.state.nextRunAtMs).toBe(pastDue);
|
||||
});
|
||||
|
||||
it("does not advance overdue never-executed jobs when stale running marker is cleared", () => {
|
||||
const now = Date.now();
|
||||
const pastDue = now - 60_000;
|
||||
const staleRunningAt = now - 3 * 60 * 60_000;
|
||||
|
||||
const job: CronJob = {
|
||||
id: "stale-running-overdue",
|
||||
name: "stale running overdue",
|
||||
enabled: true,
|
||||
schedule: { kind: "cron", expr: "0 8 * * *", tz: "UTC" },
|
||||
payload: { kind: "systemEvent", text: "test" },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
createdAtMs: now - 86400_000,
|
||||
updatedAtMs: now - 86400_000,
|
||||
state: {
|
||||
nextRunAtMs: pastDue,
|
||||
runningAtMs: staleRunningAt,
|
||||
lastRunAtMs: pastDue - 3600_000,
|
||||
},
|
||||
};
|
||||
|
||||
const state = createMockCronStateForJobs({ jobs: [job], nowMs: now });
|
||||
recomputeNextRunsForMaintenance(state, { recomputeExpired: true, nowMs: now });
|
||||
|
||||
expect(job.state.runningAtMs).toBeUndefined();
|
||||
expect(job.state.nextRunAtMs).toBe(pastDue);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -36,7 +36,7 @@ describe("issue #17852 - daily cron jobs should not skip days", () => {
|
||||
};
|
||||
}
|
||||
|
||||
it("recomputeNextRunsForMaintenance should NOT advance past-due nextRunAtMs", () => {
|
||||
it("recomputeNextRunsForMaintenance should NOT advance past-due nextRunAtMs by default", () => {
|
||||
// Simulate: job scheduled for 3:00 AM, timer processing happens at 3:00:01
|
||||
// The job was NOT executed in this tick (e.g., it became due between
|
||||
// findDueJobs and the post-execution block).
|
||||
@@ -53,6 +53,20 @@ describe("issue #17852 - daily cron jobs should not skip days", () => {
|
||||
expect(job.state.nextRunAtMs).toBe(threeAM);
|
||||
});
|
||||
|
||||
it("recomputeNextRunsForMaintenance can advance expired nextRunAtMs on recovery path when slot already executed", () => {
|
||||
const threeAM = Date.parse("2026-02-16T03:00:00.000Z");
|
||||
const now = threeAM + 1_000; // 3:00:01
|
||||
|
||||
const job = createDailyThreeAmJob(threeAM);
|
||||
job.state.lastRunAtMs = threeAM + 1;
|
||||
|
||||
const state = createMockCronStateForJobs({ jobs: [job], nowMs: now });
|
||||
recomputeNextRunsForMaintenance(state, { recomputeExpired: true });
|
||||
|
||||
const tomorrowThreeAM = threeAM + DAY_MS;
|
||||
expect(job.state.nextRunAtMs).toBe(tomorrowThreeAM);
|
||||
});
|
||||
|
||||
it("full recomputeNextRuns WOULD silently advance past-due nextRunAtMs (the bug)", () => {
|
||||
// This test documents the buggy behavior that caused #17852.
|
||||
// The full recomputeNextRuns sees a past-due nextRunAtMs and advances it
|
||||
|
||||
@@ -423,10 +423,11 @@ describe("Cron issue regressions", () => {
|
||||
cron.stop();
|
||||
});
|
||||
|
||||
it("does not advance unrelated due jobs after manual cron.run", async () => {
|
||||
it("manual cron.run preserves unrelated due jobs but advances already-executed stale slots", async () => {
|
||||
const store = makeStorePath();
|
||||
const nowMs = Date.now();
|
||||
const dueNextRunAtMs = nowMs - 1_000;
|
||||
const staleExecutedNextRunAtMs = nowMs - 2_000;
|
||||
|
||||
await writeCronJobs(store.storePath, [
|
||||
createIsolatedRegressionJob({
|
||||
@@ -445,6 +446,17 @@ describe("Cron issue regressions", () => {
|
||||
payload: { kind: "agentTurn", message: "unrelated due" },
|
||||
state: { nextRunAtMs: dueNextRunAtMs },
|
||||
}),
|
||||
createIsolatedRegressionJob({
|
||||
id: "unrelated-stale-executed",
|
||||
name: "unrelated stale executed",
|
||||
scheduledAt: nowMs,
|
||||
schedule: { kind: "cron", expr: "*/5 * * * *", tz: "UTC" },
|
||||
payload: { kind: "agentTurn", message: "unrelated stale executed" },
|
||||
state: {
|
||||
nextRunAtMs: staleExecutedNextRunAtMs,
|
||||
lastRunAtMs: staleExecutedNextRunAtMs + 1,
|
||||
},
|
||||
}),
|
||||
]);
|
||||
|
||||
const cron = await startCronForStore({
|
||||
@@ -458,8 +470,11 @@ describe("Cron issue regressions", () => {
|
||||
|
||||
const jobs = await cron.list({ includeDisabled: true });
|
||||
const unrelated = jobs.find((entry) => entry.id === "unrelated-due");
|
||||
const staleExecuted = jobs.find((entry) => entry.id === "unrelated-stale-executed");
|
||||
expect(unrelated).toBeDefined();
|
||||
expect(unrelated?.state.nextRunAtMs).toBe(dueNextRunAtMs);
|
||||
expect(staleExecuted).toBeDefined();
|
||||
expect((staleExecuted?.state.nextRunAtMs ?? 0) > nowMs).toBe(true);
|
||||
|
||||
cron.stop();
|
||||
});
|
||||
@@ -1499,4 +1514,41 @@ describe("Cron issue regressions", () => {
|
||||
expect(job.state.nextRunAtMs).toBe(endedAt + 30_000);
|
||||
expect(job.enabled).toBe(true);
|
||||
});
|
||||
|
||||
it("force run preserves 'every' anchor while recording manual lastRunAtMs", () => {
|
||||
const nowMs = Date.now();
|
||||
const everyMs = 24 * 60 * 60 * 1_000;
|
||||
const lastScheduledRunMs = nowMs - 6 * 60 * 60 * 1_000;
|
||||
const expectedNextMs = lastScheduledRunMs + everyMs;
|
||||
|
||||
const job: CronJob = {
|
||||
id: "daily-job",
|
||||
name: "Daily job",
|
||||
enabled: true,
|
||||
createdAtMs: lastScheduledRunMs - everyMs,
|
||||
updatedAtMs: lastScheduledRunMs,
|
||||
schedule: { kind: "every", everyMs, anchorMs: lastScheduledRunMs - everyMs },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "daily check-in" },
|
||||
state: {
|
||||
lastRunAtMs: lastScheduledRunMs,
|
||||
nextRunAtMs: expectedNextMs,
|
||||
},
|
||||
};
|
||||
const state = createRunningCronServiceState({
|
||||
storePath: "/tmp/cron-force-run-anchor-test.json",
|
||||
log: noopLogger as never,
|
||||
nowMs: () => nowMs,
|
||||
jobs: [job],
|
||||
});
|
||||
|
||||
const startedAt = nowMs;
|
||||
const endedAt = nowMs + 2_000;
|
||||
|
||||
applyJobResult(state, job, { status: "ok", startedAt, endedAt }, { preserveSchedule: true });
|
||||
|
||||
expect(job.state.lastRunAtMs).toBe(startedAt);
|
||||
expect(job.state.nextRunAtMs).toBe(expectedNextMs);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
import crypto from "node:crypto";
|
||||
import { normalizeAgentId } from "../../routing/session-key.js";
|
||||
import { parseAbsoluteTimeMs } from "../parse.js";
|
||||
import { computeNextRunAtMs, computePreviousRunAtMs } from "../schedule.js";
|
||||
import {
|
||||
coerceFiniteScheduleNumber,
|
||||
computeNextRunAtMs,
|
||||
computePreviousRunAtMs,
|
||||
} from "../schedule.js";
|
||||
import {
|
||||
normalizeCronStaggerMs,
|
||||
resolveCronStaggerMs,
|
||||
@@ -31,6 +35,10 @@ const STUCK_RUN_MS = 2 * 60 * 60 * 1000;
|
||||
const STAGGER_OFFSET_CACHE_MAX = 4096;
|
||||
const staggerOffsetCache = new Map<string, number>();
|
||||
|
||||
function isFiniteTimestamp(value: unknown): value is number {
|
||||
return typeof value === "number" && Number.isFinite(value);
|
||||
}
|
||||
|
||||
function resolveStableCronOffsetMs(jobId: string, staggerMs: number) {
|
||||
if (staggerMs <= 1) {
|
||||
return 0;
|
||||
@@ -108,17 +116,13 @@ function computeStaggeredCronPreviousRunAtMs(job: CronJob, nowMs: number) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function isFiniteTimestamp(value: unknown): value is number {
|
||||
return typeof value === "number" && Number.isFinite(value);
|
||||
}
|
||||
|
||||
function resolveEveryAnchorMs(params: {
|
||||
schedule: { everyMs: number; anchorMs?: number };
|
||||
fallbackAnchorMs: number;
|
||||
}) {
|
||||
const raw = params.schedule.anchorMs;
|
||||
if (isFiniteTimestamp(raw)) {
|
||||
return Math.max(0, Math.floor(raw));
|
||||
const coerced = coerceFiniteScheduleNumber(params.schedule.anchorMs);
|
||||
if (coerced !== undefined) {
|
||||
return Math.max(0, Math.floor(coerced));
|
||||
}
|
||||
if (isFiniteTimestamp(params.fallbackAnchorMs)) {
|
||||
return Math.max(0, Math.floor(params.fallbackAnchorMs));
|
||||
@@ -229,7 +233,11 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und
|
||||
return undefined;
|
||||
}
|
||||
if (job.schedule.kind === "every") {
|
||||
const everyMs = Math.max(1, Math.floor(job.schedule.everyMs));
|
||||
const everyMsRaw = coerceFiniteScheduleNumber(job.schedule.everyMs);
|
||||
if (everyMsRaw === undefined) {
|
||||
return undefined;
|
||||
}
|
||||
const everyMs = Math.max(1, Math.floor(everyMsRaw));
|
||||
const lastRunAtMs = job.state.lastRunAtMs;
|
||||
if (typeof lastRunAtMs === "number" && Number.isFinite(lastRunAtMs)) {
|
||||
const nextFromLastRun = Math.floor(lastRunAtMs) + everyMs;
|
||||
@@ -374,21 +382,21 @@ function normalizeJobTickState(params: { state: CronServiceState; job: CronJob;
|
||||
function walkSchedulableJobs(
|
||||
state: CronServiceState,
|
||||
fn: (params: { job: CronJob; nowMs: number }) => boolean,
|
||||
nowMs = state.deps.nowMs(),
|
||||
): boolean {
|
||||
if (!state.store) {
|
||||
return false;
|
||||
}
|
||||
let changed = false;
|
||||
const now = state.deps.nowMs();
|
||||
for (const job of state.store.jobs) {
|
||||
const tick = normalizeJobTickState({ state, job, nowMs: now });
|
||||
const tick = normalizeJobTickState({ state, job, nowMs });
|
||||
if (tick.changed) {
|
||||
changed = true;
|
||||
}
|
||||
if (tick.skip) {
|
||||
continue;
|
||||
}
|
||||
if (fn({ job, nowMs: now })) {
|
||||
if (fn({ job, nowMs })) {
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
@@ -440,19 +448,39 @@ export function recomputeNextRuns(state: CronServiceState): boolean {
|
||||
* to prevent silently advancing past-due nextRunAtMs values without execution
|
||||
* (see #13992).
|
||||
*/
|
||||
export function recomputeNextRunsForMaintenance(state: CronServiceState): boolean {
|
||||
return walkSchedulableJobs(state, ({ job, nowMs: now }) => {
|
||||
let changed = false;
|
||||
// Only compute missing nextRunAtMs, do NOT recompute existing ones.
|
||||
// If a job was past-due but not found by findDueJobs, recomputing would
|
||||
// cause it to be silently skipped.
|
||||
if (!isFiniteTimestamp(job.state.nextRunAtMs)) {
|
||||
if (recomputeJobNextRunAtMs({ state, job, nowMs: now })) {
|
||||
changed = true;
|
||||
export function recomputeNextRunsForMaintenance(
|
||||
state: CronServiceState,
|
||||
opts?: { recomputeExpired?: boolean; nowMs?: number },
|
||||
): boolean {
|
||||
const recomputeExpired = opts?.recomputeExpired ?? false;
|
||||
return walkSchedulableJobs(
|
||||
state,
|
||||
({ job, nowMs: now }) => {
|
||||
let changed = false;
|
||||
if (!isFiniteTimestamp(job.state.nextRunAtMs)) {
|
||||
// Missing or invalid nextRunAtMs is always repaired.
|
||||
if (recomputeJobNextRunAtMs({ state, job, nowMs: now })) {
|
||||
changed = true;
|
||||
}
|
||||
} else if (
|
||||
recomputeExpired &&
|
||||
now >= job.state.nextRunAtMs &&
|
||||
typeof job.state.runningAtMs !== "number"
|
||||
) {
|
||||
// Only advance when the expired slot was already executed.
|
||||
// If not, preserve the past-due value so the job can still run.
|
||||
const lastRun = job.state.lastRunAtMs;
|
||||
const alreadyExecutedSlot = isFiniteTimestamp(lastRun) && lastRun >= job.state.nextRunAtMs;
|
||||
if (alreadyExecutedSlot) {
|
||||
if (recomputeJobNextRunAtMs({ state, job, nowMs: now })) {
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return changed;
|
||||
});
|
||||
return changed;
|
||||
},
|
||||
opts?.nowMs,
|
||||
);
|
||||
}
|
||||
|
||||
export function nextWakeAtMs(state: CronServiceState) {
|
||||
|
||||
@@ -398,13 +398,18 @@ export async function run(state: CronServiceState, id: string, mode?: "due" | "f
|
||||
return;
|
||||
}
|
||||
|
||||
const shouldDelete = applyJobResult(state, job, {
|
||||
status: coreResult.status,
|
||||
error: coreResult.error,
|
||||
delivered: coreResult.delivered,
|
||||
startedAt,
|
||||
endedAt,
|
||||
});
|
||||
const shouldDelete = applyJobResult(
|
||||
state,
|
||||
job,
|
||||
{
|
||||
status: coreResult.status,
|
||||
error: coreResult.error,
|
||||
delivered: coreResult.delivered,
|
||||
startedAt,
|
||||
endedAt,
|
||||
},
|
||||
{ preserveSchedule: mode === "force" },
|
||||
);
|
||||
|
||||
emit(state, {
|
||||
jobId: job.id,
|
||||
@@ -450,7 +455,7 @@ export async function run(state: CronServiceState, id: string, mode?: "due" | "f
|
||||
snapshot: postRunSnapshot,
|
||||
removed: postRunRemoved,
|
||||
});
|
||||
recomputeNextRunsForMaintenance(state);
|
||||
recomputeNextRunsForMaintenance(state, { recomputeExpired: true });
|
||||
await persist(state);
|
||||
armTimer(state);
|
||||
});
|
||||
|
||||
@@ -6,6 +6,7 @@ import {
|
||||
} from "../legacy-delivery.js";
|
||||
import { parseAbsoluteTimeMs } from "../parse.js";
|
||||
import { migrateLegacyCronPayload } from "../payload-migration.js";
|
||||
import { coerceFiniteScheduleNumber } from "../schedule.js";
|
||||
import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "../stagger.js";
|
||||
import { loadCronStore, saveCronStore } from "../store.js";
|
||||
import type { CronJob } from "../types.js";
|
||||
@@ -411,15 +412,18 @@ export async function ensureLoaded(
|
||||
}
|
||||
|
||||
const everyMsRaw = sched.everyMs;
|
||||
const everyMs =
|
||||
typeof everyMsRaw === "number" && Number.isFinite(everyMsRaw)
|
||||
? Math.floor(everyMsRaw)
|
||||
: null;
|
||||
const everyMsCoerced = coerceFiniteScheduleNumber(everyMsRaw);
|
||||
const everyMs = everyMsCoerced !== undefined ? Math.floor(everyMsCoerced) : null;
|
||||
if (everyMs !== null && everyMsRaw !== everyMs) {
|
||||
sched.everyMs = everyMs;
|
||||
mutated = true;
|
||||
}
|
||||
if ((kind === "every" || sched.kind === "every") && everyMs !== null) {
|
||||
const anchorRaw = sched.anchorMs;
|
||||
const anchorCoerced = coerceFiniteScheduleNumber(anchorRaw);
|
||||
const normalizedAnchor =
|
||||
typeof anchorRaw === "number" && Number.isFinite(anchorRaw)
|
||||
? Math.max(0, Math.floor(anchorRaw))
|
||||
anchorCoerced !== undefined
|
||||
? Math.max(0, Math.floor(anchorCoerced))
|
||||
: typeof raw.createdAtMs === "number" && Number.isFinite(raw.createdAtMs)
|
||||
? Math.max(0, Math.floor(raw.createdAtMs))
|
||||
: typeof raw.updatedAtMs === "number" && Number.isFinite(raw.updatedAtMs)
|
||||
|
||||
@@ -287,7 +287,21 @@ export function applyJobResult(
|
||||
startedAt: number;
|
||||
endedAt: number;
|
||||
},
|
||||
opts?: {
|
||||
// Preserve recurring "every" anchors for manual force runs.
|
||||
preserveSchedule?: boolean;
|
||||
},
|
||||
): boolean {
|
||||
const prevLastRunAtMs = job.state.lastRunAtMs;
|
||||
const computeNextWithPreservedLastRun = (nowMs: number) => {
|
||||
const saved = job.state.lastRunAtMs;
|
||||
job.state.lastRunAtMs = prevLastRunAtMs;
|
||||
try {
|
||||
return computeJobNextRunAtMs(job, nowMs);
|
||||
} finally {
|
||||
job.state.lastRunAtMs = saved;
|
||||
}
|
||||
};
|
||||
job.state.runningAtMs = undefined;
|
||||
job.state.lastRunAtMs = result.startedAt;
|
||||
job.state.lastRunStatus = result.status;
|
||||
@@ -385,7 +399,10 @@ export function applyJobResult(
|
||||
const backoff = errorBackoffMs(job.state.consecutiveErrors ?? 1);
|
||||
let normalNext: number | undefined;
|
||||
try {
|
||||
normalNext = computeJobNextRunAtMs(job, result.endedAt);
|
||||
normalNext =
|
||||
opts?.preserveSchedule && job.schedule.kind === "every"
|
||||
? computeNextWithPreservedLastRun(result.endedAt)
|
||||
: computeJobNextRunAtMs(job, result.endedAt);
|
||||
} catch (err) {
|
||||
// If the schedule expression/timezone throws (croner edge cases),
|
||||
// record the schedule error (auto-disables after repeated failures)
|
||||
@@ -408,7 +425,10 @@ export function applyJobResult(
|
||||
} else if (job.enabled) {
|
||||
let naturalNext: number | undefined;
|
||||
try {
|
||||
naturalNext = computeJobNextRunAtMs(job, result.endedAt);
|
||||
naturalNext =
|
||||
opts?.preserveSchedule && job.schedule.kind === "every"
|
||||
? computeNextWithPreservedLastRun(result.endedAt)
|
||||
: computeJobNextRunAtMs(job, result.endedAt);
|
||||
} catch (err) {
|
||||
// If the schedule expression/timezone throws (croner edge cases),
|
||||
// record the schedule error (auto-disables after repeated failures)
|
||||
@@ -552,13 +572,17 @@ export async function onTimer(state: CronServiceState) {
|
||||
try {
|
||||
const dueJobs = await locked(state, async () => {
|
||||
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
|
||||
const due = findDueJobs(state);
|
||||
const dueCheckNow = state.deps.nowMs();
|
||||
const due = collectRunnableJobs(state, dueCheckNow);
|
||||
|
||||
if (due.length === 0) {
|
||||
// Use maintenance-only recompute to avoid advancing past-due nextRunAtMs
|
||||
// values without execution. This prevents jobs from being silently skipped
|
||||
// when the timer wakes up but findDueJobs returns empty (see #13992).
|
||||
const changed = recomputeNextRunsForMaintenance(state);
|
||||
const changed = recomputeNextRunsForMaintenance(state, {
|
||||
recomputeExpired: true,
|
||||
nowMs: dueCheckNow,
|
||||
});
|
||||
if (changed) {
|
||||
await persist(state);
|
||||
}
|
||||
@@ -688,14 +712,6 @@ export async function onTimer(state: CronServiceState) {
|
||||
}
|
||||
}
|
||||
|
||||
function findDueJobs(state: CronServiceState): CronJob[] {
|
||||
if (!state.store) {
|
||||
return [];
|
||||
}
|
||||
const now = state.deps.nowMs();
|
||||
return collectRunnableJobs(state, now);
|
||||
}
|
||||
|
||||
function isRunnableJob(params: {
|
||||
job: CronJob;
|
||||
nowMs: number;
|
||||
|
||||
Reference in New Issue
Block a user