refactor(cron): share job tick state normalization

This commit is contained in:
Peter Steinberger
2026-02-14 21:44:30 +00:00
parent 6aab89939f
commit 6b400eca5c

View File

@@ -90,6 +90,43 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und
/** Maximum consecutive schedule errors before auto-disabling a job. */ /** Maximum consecutive schedule errors before auto-disabling a job. */
const MAX_SCHEDULE_ERRORS = 3; const MAX_SCHEDULE_ERRORS = 3;
function normalizeJobTickState(params: { state: CronServiceState; job: CronJob; nowMs: number }): {
changed: boolean;
skip: boolean;
} {
const { state, job, nowMs } = params;
let changed = false;
if (!job.state) {
job.state = {};
changed = true;
}
if (!job.enabled) {
if (job.state.nextRunAtMs !== undefined) {
job.state.nextRunAtMs = undefined;
changed = true;
}
if (job.state.runningAtMs !== undefined) {
job.state.runningAtMs = undefined;
changed = true;
}
return { changed, skip: true };
}
const runningAt = job.state.runningAtMs;
if (typeof runningAt === "number" && nowMs - runningAt > STUCK_RUN_MS) {
state.deps.log.warn(
{ jobId: job.id, runningAtMs: runningAt },
"cron: clearing stuck running marker",
);
job.state.runningAtMs = undefined;
changed = true;
}
return { changed, skip: false };
}
export function recomputeNextRuns(state: CronServiceState): boolean { export function recomputeNextRuns(state: CronServiceState): boolean {
if (!state.store) { if (!state.store) {
return false; return false;
@@ -97,30 +134,13 @@ export function recomputeNextRuns(state: CronServiceState): boolean {
let changed = false; let changed = false;
const now = state.deps.nowMs(); const now = state.deps.nowMs();
for (const job of state.store.jobs) { for (const job of state.store.jobs) {
if (!job.state) { const tick = normalizeJobTickState({ state, job, nowMs: now });
job.state = {}; if (tick.changed) {
changed = true; changed = true;
} }
if (!job.enabled) { if (tick.skip) {
if (job.state.nextRunAtMs !== undefined) {
job.state.nextRunAtMs = undefined;
changed = true;
}
if (job.state.runningAtMs !== undefined) {
job.state.runningAtMs = undefined;
changed = true;
}
continue; continue;
} }
const runningAt = job.state.runningAtMs;
if (typeof runningAt === "number" && now - runningAt > STUCK_RUN_MS) {
state.deps.log.warn(
{ jobId: job.id, runningAtMs: runningAt },
"cron: clearing stuck running marker",
);
job.state.runningAtMs = undefined;
changed = true;
}
// Only recompute if nextRunAtMs is missing or already past-due. // Only recompute if nextRunAtMs is missing or already past-due.
// Preserving a still-future nextRunAtMs avoids accidentally advancing // Preserving a still-future nextRunAtMs avoids accidentally advancing
// a job that hasn't fired yet (e.g. during restart recovery). // a job that hasn't fired yet (e.g. during restart recovery).
@@ -177,30 +197,13 @@ export function recomputeNextRunsForMaintenance(state: CronServiceState): boolea
let changed = false; let changed = false;
const now = state.deps.nowMs(); const now = state.deps.nowMs();
for (const job of state.store.jobs) { for (const job of state.store.jobs) {
if (!job.state) { const tick = normalizeJobTickState({ state, job, nowMs: now });
job.state = {}; if (tick.changed) {
changed = true; changed = true;
} }
if (!job.enabled) { if (tick.skip) {
if (job.state.nextRunAtMs !== undefined) {
job.state.nextRunAtMs = undefined;
changed = true;
}
if (job.state.runningAtMs !== undefined) {
job.state.runningAtMs = undefined;
changed = true;
}
continue; continue;
} }
const runningAt = job.state.runningAtMs;
if (typeof runningAt === "number" && now - runningAt > STUCK_RUN_MS) {
state.deps.log.warn(
{ jobId: job.id, runningAtMs: runningAt },
"cron: clearing stuck running marker",
);
job.state.runningAtMs = undefined;
changed = true;
}
// Only compute missing nextRunAtMs, do NOT recompute existing ones. // Only compute missing nextRunAtMs, do NOT recompute existing ones.
// If a job was past-due but not found by findDueJobs, recomputing would // If a job was past-due but not found by findDueJobs, recomputing would
// cause it to be silently skipped. // cause it to be silently skipped.