mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-08 16:18:26 +00:00
feat(cron): add default stagger controls for scheduled jobs
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { normalizeCronJobCreate, normalizeCronJobPatch } from "./normalize.js";
|
||||
import { DEFAULT_TOP_OF_HOUR_STAGGER_MS } from "./stagger.js";
|
||||
|
||||
function expectNormalizedAtSchedule(scheduleInput: Record<string, unknown>) {
|
||||
const normalized = normalizeCronJobCreate({
|
||||
@@ -137,6 +138,40 @@ describe("normalizeCronJobCreate", () => {
|
||||
expectNormalizedAtSchedule({ kind: "at", atMs: "2026-01-12T18:00:00" });
|
||||
});
|
||||
|
||||
it("defaults cron stagger for recurring top-of-hour schedules", () => {
|
||||
const normalized = normalizeCronJobCreate({
|
||||
name: "hourly",
|
||||
enabled: true,
|
||||
schedule: { kind: "cron", expr: "0 * * * *", tz: "UTC" },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: {
|
||||
kind: "systemEvent",
|
||||
text: "tick",
|
||||
},
|
||||
}) as unknown as Record<string, unknown>;
|
||||
|
||||
const schedule = normalized.schedule as Record<string, unknown>;
|
||||
expect(schedule.staggerMs).toBe(DEFAULT_TOP_OF_HOUR_STAGGER_MS);
|
||||
});
|
||||
|
||||
it("preserves explicit exact cron schedule", () => {
|
||||
const normalized = normalizeCronJobCreate({
|
||||
name: "exact",
|
||||
enabled: true,
|
||||
schedule: { kind: "cron", expr: "0 * * * *", tz: "UTC", staggerMs: 0 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: {
|
||||
kind: "systemEvent",
|
||||
text: "tick",
|
||||
},
|
||||
}) as unknown as Record<string, unknown>;
|
||||
|
||||
const schedule = normalized.schedule as Record<string, unknown>;
|
||||
expect(schedule.staggerMs).toBe(0);
|
||||
});
|
||||
|
||||
it("defaults deleteAfterRun for one-shot schedules", () => {
|
||||
const normalized = normalizeCronJobCreate({
|
||||
name: "default delete",
|
||||
@@ -377,4 +412,13 @@ describe("normalizeCronJobPatch", () => {
|
||||
}) as unknown as Record<string, unknown>;
|
||||
expect(cleared.sessionKey).toBeNull();
|
||||
});
|
||||
|
||||
it("normalizes cron stagger values in patch schedules", () => {
|
||||
const normalized = normalizeCronJobPatch({
|
||||
schedule: { kind: "cron", expr: "0 * * * *", staggerMs: "30000" },
|
||||
}) as unknown as Record<string, unknown>;
|
||||
|
||||
const schedule = normalized.schedule as Record<string, unknown>;
|
||||
expect(schedule.staggerMs).toBe(30_000);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import type { CronJobCreate, CronJobPatch } from "./types.js";
|
||||
import { sanitizeAgentId } from "../routing/session-key.js";
|
||||
import { isRecord } from "../utils.js";
|
||||
import {
|
||||
@@ -8,7 +9,7 @@ import {
|
||||
import { parseAbsoluteTimeMs } from "./parse.js";
|
||||
import { migrateLegacyCronPayload } from "./payload-migration.js";
|
||||
import { inferLegacyName } from "./service/normalize.js";
|
||||
import type { CronJobCreate, CronJobPatch } from "./types.js";
|
||||
import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "./stagger.js";
|
||||
|
||||
type UnknownRecord = Record<string, unknown>;
|
||||
|
||||
@@ -61,6 +62,13 @@ function coerceSchedule(schedule: UnknownRecord) {
|
||||
delete next.atMs;
|
||||
}
|
||||
|
||||
const staggerMs = normalizeCronStaggerMs(schedule.staggerMs);
|
||||
if (staggerMs !== undefined) {
|
||||
next.staggerMs = staggerMs;
|
||||
} else if ("staggerMs" in next) {
|
||||
delete next.staggerMs;
|
||||
}
|
||||
|
||||
return next;
|
||||
}
|
||||
|
||||
@@ -420,6 +428,19 @@ export function normalizeCronJobInput(
|
||||
) {
|
||||
next.deleteAfterRun = true;
|
||||
}
|
||||
if ("schedule" in next && isRecord(next.schedule) && next.schedule.kind === "cron") {
|
||||
const schedule = next.schedule as UnknownRecord;
|
||||
const explicit = normalizeCronStaggerMs(schedule.staggerMs);
|
||||
if (explicit !== undefined) {
|
||||
schedule.staggerMs = explicit;
|
||||
} else {
|
||||
const expr = typeof schedule.expr === "string" ? schedule.expr : "";
|
||||
const defaultStaggerMs = resolveDefaultCronStaggerMs(expr);
|
||||
if (defaultStaggerMs !== undefined) {
|
||||
schedule.staggerMs = defaultStaggerMs;
|
||||
}
|
||||
}
|
||||
}
|
||||
const payload = isRecord(next.payload) ? next.payload : null;
|
||||
const payloadKind = payload && typeof payload.kind === "string" ? payload.kind : "";
|
||||
const sessionTarget = typeof next.sessionTarget === "string" ? next.sessionTarget : "";
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
import crypto from "node:crypto";
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { CronJob, CronJobState } from "./types.js";
|
||||
import * as schedule from "./schedule.js";
|
||||
import { CronService } from "./service.js";
|
||||
import { computeJobNextRunAtMs } from "./service/jobs.js";
|
||||
import { createCronServiceState, type CronEvent } from "./service/state.js";
|
||||
import { onTimer } from "./service/timer.js";
|
||||
import type { CronJob, CronJobState } from "./types.js";
|
||||
|
||||
const noopLogger = {
|
||||
info: vi.fn(),
|
||||
@@ -16,6 +17,12 @@ const noopLogger = {
|
||||
debug: vi.fn(),
|
||||
trace: vi.fn(),
|
||||
};
|
||||
const TOP_OF_HOUR_STAGGER_MS = 5 * 60 * 1_000;
|
||||
|
||||
function topOfHourOffsetMs(jobId: string) {
|
||||
const digest = crypto.createHash("sha256").update(jobId).digest();
|
||||
return digest.readUInt32BE(0) % TOP_OF_HOUR_STAGGER_MS;
|
||||
}
|
||||
|
||||
let fixtureRoot = "";
|
||||
let fixtureCount = 0;
|
||||
@@ -101,13 +108,14 @@ describe("Cron issue regressions", () => {
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "tick" },
|
||||
});
|
||||
expect(created.state.nextRunAtMs).toBe(Date.parse("2026-02-06T11:00:00.000Z"));
|
||||
const offsetMs = topOfHourOffsetMs(created.id);
|
||||
expect(created.state.nextRunAtMs).toBe(Date.parse("2026-02-06T11:00:00.000Z") + offsetMs);
|
||||
|
||||
const updated = await cron.update(created.id, {
|
||||
schedule: { kind: "cron", expr: "0 */2 * * *", tz: "UTC" },
|
||||
});
|
||||
|
||||
expect(updated.state.nextRunAtMs).toBe(Date.parse("2026-02-06T12:00:00.000Z"));
|
||||
expect(updated.state.nextRunAtMs).toBe(Date.parse("2026-02-06T12:00:00.000Z") + offsetMs);
|
||||
|
||||
const forceNow = await cron.add({
|
||||
name: "force-now",
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { applyJobPatch } from "./service/jobs.js";
|
||||
import type { CronServiceState } from "./service/state.js";
|
||||
import type { CronJob, CronJobPatch } from "./types.js";
|
||||
import { applyJobPatch, createJob } from "./service/jobs.js";
|
||||
import { DEFAULT_TOP_OF_HOUR_STAGGER_MS } from "./stagger.js";
|
||||
|
||||
describe("applyJobPatch", () => {
|
||||
it("clears delivery when switching to main session", () => {
|
||||
@@ -179,3 +181,102 @@ describe("applyJobPatch", () => {
|
||||
expect(job.delivery).toEqual({ mode: "webhook", to: "https://example.invalid/trim" });
|
||||
});
|
||||
});
|
||||
|
||||
function createMockState(now: number): CronServiceState {
|
||||
return {
|
||||
deps: {
|
||||
nowMs: () => now,
|
||||
},
|
||||
} as unknown as CronServiceState;
|
||||
}
|
||||
|
||||
describe("cron stagger defaults", () => {
|
||||
it("defaults top-of-hour cron jobs to 5m stagger", () => {
|
||||
const now = Date.parse("2026-02-08T10:00:00.000Z");
|
||||
const state = createMockState(now);
|
||||
|
||||
const job = createJob(state, {
|
||||
name: "hourly",
|
||||
enabled: true,
|
||||
schedule: { kind: "cron", expr: "0 * * * *", tz: "UTC" },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "now",
|
||||
payload: { kind: "systemEvent", text: "tick" },
|
||||
});
|
||||
|
||||
expect(job.schedule.kind).toBe("cron");
|
||||
if (job.schedule.kind === "cron") {
|
||||
expect(job.schedule.staggerMs).toBe(DEFAULT_TOP_OF_HOUR_STAGGER_MS);
|
||||
}
|
||||
});
|
||||
|
||||
it("keeps exact schedules when staggerMs is explicitly 0", () => {
|
||||
const now = Date.parse("2026-02-08T10:00:00.000Z");
|
||||
const state = createMockState(now);
|
||||
|
||||
const job = createJob(state, {
|
||||
name: "exact-hourly",
|
||||
enabled: true,
|
||||
schedule: { kind: "cron", expr: "0 * * * *", tz: "UTC", staggerMs: 0 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "now",
|
||||
payload: { kind: "systemEvent", text: "tick" },
|
||||
});
|
||||
|
||||
expect(job.schedule.kind).toBe("cron");
|
||||
if (job.schedule.kind === "cron") {
|
||||
expect(job.schedule.staggerMs).toBe(0);
|
||||
}
|
||||
});
|
||||
|
||||
it("preserves existing stagger when editing cron expression without stagger", () => {
|
||||
const now = Date.now();
|
||||
const job: CronJob = {
|
||||
id: "job-keep-stagger",
|
||||
name: "job-keep-stagger",
|
||||
enabled: true,
|
||||
createdAtMs: now,
|
||||
updatedAtMs: now,
|
||||
schedule: { kind: "cron", expr: "0 * * * *", tz: "UTC", staggerMs: 120_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "now",
|
||||
payload: { kind: "systemEvent", text: "tick" },
|
||||
state: {},
|
||||
};
|
||||
|
||||
applyJobPatch(job, {
|
||||
schedule: { kind: "cron", expr: "0 */2 * * *", tz: "UTC" },
|
||||
});
|
||||
|
||||
expect(job.schedule.kind).toBe("cron");
|
||||
if (job.schedule.kind === "cron") {
|
||||
expect(job.schedule.expr).toBe("0 */2 * * *");
|
||||
expect(job.schedule.staggerMs).toBe(120_000);
|
||||
}
|
||||
});
|
||||
|
||||
it("applies default stagger when switching from every to top-of-hour cron", () => {
|
||||
const now = Date.now();
|
||||
const job: CronJob = {
|
||||
id: "job-switch-cron",
|
||||
name: "job-switch-cron",
|
||||
enabled: true,
|
||||
createdAtMs: now,
|
||||
updatedAtMs: now,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "now",
|
||||
payload: { kind: "systemEvent", text: "tick" },
|
||||
state: {},
|
||||
};
|
||||
|
||||
applyJobPatch(job, {
|
||||
schedule: { kind: "cron", expr: "0 * * * *", tz: "UTC" },
|
||||
});
|
||||
|
||||
expect(job.schedule.kind).toBe("cron");
|
||||
if (job.schedule.kind === "cron") {
|
||||
expect(job.schedule.staggerMs).toBe(DEFAULT_TOP_OF_HOUR_STAGGER_MS);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
93
src/cron/service.jobs.top-of-hour-stagger.test.ts
Normal file
93
src/cron/service.jobs.top-of-hour-stagger.test.ts
Normal file
@@ -0,0 +1,93 @@
|
||||
import crypto from "node:crypto";
|
||||
import { describe, expect, it } from "vitest";
|
||||
import type { CronJob } from "./types.js";
|
||||
import { computeJobNextRunAtMs } from "./service/jobs.js";
|
||||
import { DEFAULT_TOP_OF_HOUR_STAGGER_MS } from "./stagger.js";
|
||||
|
||||
function stableOffsetMs(jobId: string, windowMs: number) {
|
||||
const digest = crypto.createHash("sha256").update(jobId).digest();
|
||||
return digest.readUInt32BE(0) % windowMs;
|
||||
}
|
||||
|
||||
function createCronJob(params: {
|
||||
id: string;
|
||||
expr: string;
|
||||
tz?: string;
|
||||
staggerMs?: number;
|
||||
state?: CronJob["state"];
|
||||
}): CronJob {
|
||||
return {
|
||||
id: params.id,
|
||||
name: params.id,
|
||||
enabled: true,
|
||||
createdAtMs: Date.parse("2026-02-06T00:00:00.000Z"),
|
||||
updatedAtMs: Date.parse("2026-02-06T00:00:00.000Z"),
|
||||
schedule: { kind: "cron", expr: params.expr, tz: params.tz, staggerMs: params.staggerMs },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "tick" },
|
||||
state: params.state ?? {},
|
||||
};
|
||||
}
|
||||
|
||||
describe("computeJobNextRunAtMs top-of-hour staggering", () => {
|
||||
it("applies deterministic 0..5m stagger for recurring top-of-hour schedules", () => {
|
||||
const now = Date.parse("2026-02-06T10:05:00.000Z");
|
||||
const job = createCronJob({ id: "hourly-job-a", expr: "0 * * * *", tz: "UTC" });
|
||||
const offsetMs = stableOffsetMs(job.id, DEFAULT_TOP_OF_HOUR_STAGGER_MS);
|
||||
|
||||
const next = computeJobNextRunAtMs(job, now);
|
||||
|
||||
expect(next).toBe(Date.parse("2026-02-06T11:00:00.000Z") + offsetMs);
|
||||
expect(offsetMs).toBeGreaterThanOrEqual(0);
|
||||
expect(offsetMs).toBeLessThan(DEFAULT_TOP_OF_HOUR_STAGGER_MS);
|
||||
});
|
||||
|
||||
it("can still fire in the current hour when the staggered slot is ahead", () => {
|
||||
const now = Date.parse("2026-02-06T10:02:00.000Z");
|
||||
const thisHour = Date.parse("2026-02-06T10:00:00.000Z");
|
||||
const nextHour = Date.parse("2026-02-06T11:00:00.000Z");
|
||||
const job = createCronJob({ id: "hourly-job-b", expr: "0 * * * *", tz: "UTC" });
|
||||
const offsetMs = stableOffsetMs(job.id, DEFAULT_TOP_OF_HOUR_STAGGER_MS);
|
||||
|
||||
const expected = thisHour + offsetMs > now ? thisHour + offsetMs : nextHour + offsetMs;
|
||||
const next = computeJobNextRunAtMs(job, now);
|
||||
|
||||
expect(next).toBe(expected);
|
||||
});
|
||||
|
||||
it("also applies to 6-field top-of-hour cron expressions", () => {
|
||||
const now = Date.parse("2026-02-06T10:05:00.000Z");
|
||||
const job = createCronJob({ id: "hourly-job-seconds", expr: "0 0 * * * *", tz: "UTC" });
|
||||
const offsetMs = stableOffsetMs(job.id, DEFAULT_TOP_OF_HOUR_STAGGER_MS);
|
||||
|
||||
const next = computeJobNextRunAtMs(job, now);
|
||||
|
||||
expect(next).toBe(Date.parse("2026-02-06T11:00:00.000Z") + offsetMs);
|
||||
});
|
||||
|
||||
it("supports explicit stagger for non top-of-hour cron expressions", () => {
|
||||
const now = Date.parse("2026-02-06T10:05:00.000Z");
|
||||
const windowMs = 30_000;
|
||||
const job = createCronJob({
|
||||
id: "minute-17-staggered",
|
||||
expr: "17 * * * *",
|
||||
tz: "UTC",
|
||||
staggerMs: windowMs,
|
||||
});
|
||||
const offsetMs = stableOffsetMs(job.id, windowMs);
|
||||
|
||||
const next = computeJobNextRunAtMs(job, now);
|
||||
|
||||
expect(next).toBe(Date.parse("2026-02-06T10:17:00.000Z") + offsetMs);
|
||||
});
|
||||
|
||||
it("keeps schedules exact when staggerMs is set to 0", () => {
|
||||
const now = Date.parse("2026-02-06T10:05:00.000Z");
|
||||
const job = createCronJob({ id: "daily-job", expr: "0 7 * * *", tz: "UTC", staggerMs: 0 });
|
||||
|
||||
const next = computeJobNextRunAtMs(job, now);
|
||||
|
||||
expect(next).toBe(Date.parse("2026-02-07T07:00:00.000Z"));
|
||||
});
|
||||
});
|
||||
@@ -3,6 +3,7 @@ import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { CronService } from "./service.js";
|
||||
import { DEFAULT_TOP_OF_HOUR_STAGGER_MS } from "./stagger.js";
|
||||
import { loadCronStore } from "./store.js";
|
||||
|
||||
const noopLogger = {
|
||||
@@ -136,4 +137,68 @@ describe("cron store migration", () => {
|
||||
|
||||
await store.cleanup();
|
||||
});
|
||||
|
||||
it("adds default staggerMs to legacy recurring top-of-hour cron schedules", async () => {
|
||||
const store = await makeStorePath();
|
||||
const createdAtMs = 1_700_000_000_000;
|
||||
const legacyJob = {
|
||||
id: "job-cron-legacy",
|
||||
agentId: undefined,
|
||||
name: "Legacy cron",
|
||||
description: null,
|
||||
enabled: true,
|
||||
deleteAfterRun: false,
|
||||
createdAtMs,
|
||||
updatedAtMs: createdAtMs,
|
||||
schedule: { kind: "cron", expr: "0 */2 * * *", tz: "UTC" },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: {
|
||||
kind: "systemEvent",
|
||||
text: "tick",
|
||||
},
|
||||
state: {},
|
||||
};
|
||||
await fs.mkdir(path.dirname(store.storePath), { recursive: true });
|
||||
await fs.writeFile(store.storePath, JSON.stringify({ version: 1, jobs: [legacyJob] }, null, 2));
|
||||
|
||||
const migrated = await migrateAndLoadFirstJob(store.storePath);
|
||||
const schedule = migrated.schedule as Record<string, unknown>;
|
||||
expect(schedule.kind).toBe("cron");
|
||||
expect(schedule.staggerMs).toBe(DEFAULT_TOP_OF_HOUR_STAGGER_MS);
|
||||
|
||||
await store.cleanup();
|
||||
});
|
||||
|
||||
it("adds default staggerMs to legacy 6-field top-of-hour cron schedules", async () => {
|
||||
const store = await makeStorePath();
|
||||
const createdAtMs = 1_700_000_000_000;
|
||||
const legacyJob = {
|
||||
id: "job-cron-seconds-legacy",
|
||||
agentId: undefined,
|
||||
name: "Legacy cron seconds",
|
||||
description: null,
|
||||
enabled: true,
|
||||
deleteAfterRun: false,
|
||||
createdAtMs,
|
||||
updatedAtMs: createdAtMs,
|
||||
schedule: { kind: "cron", expr: "0 0 */3 * * *", tz: "UTC" },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: {
|
||||
kind: "systemEvent",
|
||||
text: "tick",
|
||||
},
|
||||
state: {},
|
||||
};
|
||||
await fs.mkdir(path.dirname(store.storePath), { recursive: true });
|
||||
await fs.writeFile(store.storePath, JSON.stringify({ version: 1, jobs: [legacyJob] }, null, 2));
|
||||
|
||||
const migrated = await migrateAndLoadFirstJob(store.storePath);
|
||||
const schedule = migrated.schedule as Record<string, unknown>;
|
||||
expect(schedule.kind).toBe("cron");
|
||||
expect(schedule.staggerMs).toBe(DEFAULT_TOP_OF_HOUR_STAGGER_MS);
|
||||
|
||||
await store.cleanup();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,6 +1,4 @@
|
||||
import crypto from "node:crypto";
|
||||
import { parseAbsoluteTimeMs } from "../parse.js";
|
||||
import { computeNextRunAtMs } from "../schedule.js";
|
||||
import type {
|
||||
CronDelivery,
|
||||
CronDeliveryPatch,
|
||||
@@ -10,6 +8,14 @@ import type {
|
||||
CronPayload,
|
||||
CronPayloadPatch,
|
||||
} from "../types.js";
|
||||
import type { CronServiceState } from "./state.js";
|
||||
import { parseAbsoluteTimeMs } from "../parse.js";
|
||||
import { computeNextRunAtMs } from "../schedule.js";
|
||||
import {
|
||||
normalizeCronStaggerMs,
|
||||
resolveCronStaggerMs,
|
||||
resolveDefaultCronStaggerMs,
|
||||
} from "../stagger.js";
|
||||
import { normalizeHttpWebhookUrl } from "../webhook-url.js";
|
||||
import {
|
||||
normalizeOptionalAgentId,
|
||||
@@ -18,10 +24,45 @@ import {
|
||||
normalizePayloadToSystemText,
|
||||
normalizeRequiredName,
|
||||
} from "./normalize.js";
|
||||
import type { CronServiceState } from "./state.js";
|
||||
|
||||
const STUCK_RUN_MS = 2 * 60 * 60 * 1000;
|
||||
|
||||
function resolveStableCronOffsetMs(jobId: string, staggerMs: number) {
|
||||
if (staggerMs <= 1) {
|
||||
return 0;
|
||||
}
|
||||
const digest = crypto.createHash("sha256").update(jobId).digest();
|
||||
return digest.readUInt32BE(0) % staggerMs;
|
||||
}
|
||||
|
||||
function computeStaggeredCronNextRunAtMs(job: CronJob, nowMs: number) {
|
||||
if (job.schedule.kind !== "cron") {
|
||||
return computeNextRunAtMs(job.schedule, nowMs);
|
||||
}
|
||||
|
||||
const staggerMs = resolveCronStaggerMs(job.schedule);
|
||||
const offsetMs = resolveStableCronOffsetMs(job.id, staggerMs);
|
||||
if (offsetMs <= 0) {
|
||||
return computeNextRunAtMs(job.schedule, nowMs);
|
||||
}
|
||||
|
||||
// Shift the schedule cursor backwards by the per-job offset so we can still
|
||||
// target the current schedule window if its staggered slot has not passed yet.
|
||||
let cursorMs = Math.max(0, nowMs - offsetMs);
|
||||
for (let attempt = 0; attempt < 4; attempt += 1) {
|
||||
const baseNext = computeNextRunAtMs(job.schedule, cursorMs);
|
||||
if (baseNext === undefined) {
|
||||
return undefined;
|
||||
}
|
||||
const shifted = baseNext + offsetMs;
|
||||
if (shifted > nowMs) {
|
||||
return shifted;
|
||||
}
|
||||
cursorMs = Math.max(cursorMs + 1, baseNext + 1_000);
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function resolveEveryAnchorMs(params: {
|
||||
schedule: { everyMs: number; anchorMs?: number };
|
||||
fallbackAnchorMs: number;
|
||||
@@ -97,18 +138,7 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und
|
||||
: null;
|
||||
return atMs !== null ? atMs : undefined;
|
||||
}
|
||||
const next = computeNextRunAtMs(job.schedule, nowMs);
|
||||
// Guard against the scheduler returning a time within the same second as
|
||||
// nowMs. When a cron job completes within the same wall-clock second it
|
||||
// was scheduled for, some croner versions/timezone combinations may return
|
||||
// the current second (or computeNextRunAtMs may return undefined, which
|
||||
// triggers recomputation). Advancing to the next second and retrying
|
||||
// ensures we always land on the *next* occurrence. (See #17821)
|
||||
if (next === undefined && job.schedule.kind === "cron") {
|
||||
const nextSecondMs = (Math.floor(nowMs / 1000) + 1) * 1000;
|
||||
return computeNextRunAtMs(job.schedule, nextSecondMs);
|
||||
}
|
||||
return next;
|
||||
return computeStaggeredCronNextRunAtMs(job, nowMs);
|
||||
}
|
||||
|
||||
/** Maximum consecutive schedule errors before auto-disabling a job. */
|
||||
@@ -288,7 +318,18 @@ export function createJob(state: CronServiceState, input: CronJobCreate): CronJo
|
||||
fallbackAnchorMs: now,
|
||||
}),
|
||||
}
|
||||
: input.schedule;
|
||||
: input.schedule.kind === "cron"
|
||||
? (() => {
|
||||
const explicitStaggerMs = normalizeCronStaggerMs(input.schedule.staggerMs);
|
||||
if (explicitStaggerMs !== undefined) {
|
||||
return { ...input.schedule, staggerMs: explicitStaggerMs };
|
||||
}
|
||||
const defaultStaggerMs = resolveDefaultCronStaggerMs(input.schedule.expr);
|
||||
return defaultStaggerMs !== undefined
|
||||
? { ...input.schedule, staggerMs: defaultStaggerMs }
|
||||
: input.schedule;
|
||||
})()
|
||||
: input.schedule;
|
||||
const deleteAfterRun =
|
||||
typeof input.deleteAfterRun === "boolean"
|
||||
? input.deleteAfterRun
|
||||
@@ -335,7 +376,22 @@ export function applyJobPatch(job: CronJob, patch: CronJobPatch) {
|
||||
job.deleteAfterRun = patch.deleteAfterRun;
|
||||
}
|
||||
if (patch.schedule) {
|
||||
job.schedule = patch.schedule;
|
||||
if (patch.schedule.kind === "cron") {
|
||||
const explicitStaggerMs = normalizeCronStaggerMs(patch.schedule.staggerMs);
|
||||
if (explicitStaggerMs !== undefined) {
|
||||
job.schedule = { ...patch.schedule, staggerMs: explicitStaggerMs };
|
||||
} else if (job.schedule.kind === "cron") {
|
||||
job.schedule = { ...patch.schedule, staggerMs: job.schedule.staggerMs };
|
||||
} else {
|
||||
const defaultStaggerMs = resolveDefaultCronStaggerMs(patch.schedule.expr);
|
||||
job.schedule =
|
||||
defaultStaggerMs !== undefined
|
||||
? { ...patch.schedule, staggerMs: defaultStaggerMs }
|
||||
: patch.schedule;
|
||||
}
|
||||
} else {
|
||||
job.schedule = patch.schedule;
|
||||
}
|
||||
}
|
||||
if (patch.sessionTarget) {
|
||||
job.sessionTarget = patch.sessionTarget;
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import fs from "node:fs";
|
||||
import type { CronJob } from "../types.js";
|
||||
import type { CronServiceState } from "./state.js";
|
||||
import {
|
||||
buildDeliveryFromLegacyPayload,
|
||||
hasLegacyDeliveryHints,
|
||||
@@ -6,11 +8,10 @@ import {
|
||||
} from "../legacy-delivery.js";
|
||||
import { parseAbsoluteTimeMs } from "../parse.js";
|
||||
import { migrateLegacyCronPayload } from "../payload-migration.js";
|
||||
import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "../stagger.js";
|
||||
import { loadCronStore, saveCronStore } from "../store.js";
|
||||
import type { CronJob } from "../types.js";
|
||||
import { recomputeNextRuns } from "./jobs.js";
|
||||
import { inferLegacyName, normalizeOptionalText } from "./normalize.js";
|
||||
import type { CronServiceState } from "./state.js";
|
||||
|
||||
function buildDeliveryPatchFromLegacyPayload(payload: Record<string, unknown>) {
|
||||
const deliver = payload.deliver;
|
||||
@@ -380,6 +381,26 @@ export async function ensureLoaded(
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
|
||||
const exprRaw = typeof sched.expr === "string" ? sched.expr.trim() : "";
|
||||
if (typeof sched.expr === "string" && sched.expr !== exprRaw) {
|
||||
sched.expr = exprRaw;
|
||||
mutated = true;
|
||||
}
|
||||
if ((kind === "cron" || sched.kind === "cron") && exprRaw) {
|
||||
const explicitStaggerMs = normalizeCronStaggerMs(sched.staggerMs);
|
||||
const defaultStaggerMs = resolveDefaultCronStaggerMs(exprRaw);
|
||||
const targetStaggerMs = explicitStaggerMs ?? defaultStaggerMs;
|
||||
if (targetStaggerMs === undefined) {
|
||||
if ("staggerMs" in sched) {
|
||||
delete sched.staggerMs;
|
||||
mutated = true;
|
||||
}
|
||||
} else if (sched.staggerMs !== targetStaggerMs) {
|
||||
sched.staggerMs = targetStaggerMs;
|
||||
mutated = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const delivery = raw.delivery;
|
||||
|
||||
36
src/cron/stagger.test.ts
Normal file
36
src/cron/stagger.test.ts
Normal file
@@ -0,0 +1,36 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import {
|
||||
DEFAULT_TOP_OF_HOUR_STAGGER_MS,
|
||||
isRecurringTopOfHourCronExpr,
|
||||
normalizeCronStaggerMs,
|
||||
resolveCronStaggerMs,
|
||||
} from "./stagger.js";
|
||||
|
||||
describe("cron stagger helpers", () => {
|
||||
it("detects recurring top-of-hour cron expressions for 5-field and 6-field cron", () => {
|
||||
expect(isRecurringTopOfHourCronExpr("0 * * * *")).toBe(true);
|
||||
expect(isRecurringTopOfHourCronExpr("0 */2 * * *")).toBe(true);
|
||||
expect(isRecurringTopOfHourCronExpr("0 0 */3 * * *")).toBe(true);
|
||||
expect(isRecurringTopOfHourCronExpr("0 7 * * *")).toBe(false);
|
||||
expect(isRecurringTopOfHourCronExpr("15 * * * *")).toBe(false);
|
||||
});
|
||||
|
||||
it("normalizes explicit stagger values", () => {
|
||||
expect(normalizeCronStaggerMs("30000")).toBe(30_000);
|
||||
expect(normalizeCronStaggerMs(42.8)).toBe(42);
|
||||
expect(normalizeCronStaggerMs(-10)).toBe(0);
|
||||
expect(normalizeCronStaggerMs("")).toBeUndefined();
|
||||
expect(normalizeCronStaggerMs("abc")).toBeUndefined();
|
||||
});
|
||||
|
||||
it("resolves effective stagger for cron schedules", () => {
|
||||
expect(resolveCronStaggerMs({ kind: "cron", expr: "0 * * * *" })).toBe(
|
||||
DEFAULT_TOP_OF_HOUR_STAGGER_MS,
|
||||
);
|
||||
expect(resolveCronStaggerMs({ kind: "cron", expr: "0 * * * *", staggerMs: 30_000 })).toBe(
|
||||
30_000,
|
||||
);
|
||||
expect(resolveCronStaggerMs({ kind: "cron", expr: "0 * * * *", staggerMs: 0 })).toBe(0);
|
||||
expect(resolveCronStaggerMs({ kind: "cron", expr: "15 * * * *" })).toBe(0);
|
||||
});
|
||||
});
|
||||
45
src/cron/stagger.ts
Normal file
45
src/cron/stagger.ts
Normal file
@@ -0,0 +1,45 @@
|
||||
import type { CronSchedule } from "./types.js";
|
||||
|
||||
export const DEFAULT_TOP_OF_HOUR_STAGGER_MS = 5 * 60 * 1000;
|
||||
|
||||
function parseCronFields(expr: string) {
|
||||
return expr.trim().split(/\s+/).filter(Boolean);
|
||||
}
|
||||
|
||||
export function isRecurringTopOfHourCronExpr(expr: string) {
|
||||
const fields = parseCronFields(expr);
|
||||
if (fields.length === 5) {
|
||||
const [minuteField, hourField] = fields;
|
||||
return minuteField === "0" && hourField.includes("*");
|
||||
}
|
||||
if (fields.length === 6) {
|
||||
const [secondField, minuteField, hourField] = fields;
|
||||
return secondField === "0" && minuteField === "0" && hourField.includes("*");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
export function normalizeCronStaggerMs(raw: unknown): number | undefined {
|
||||
const numeric =
|
||||
typeof raw === "number"
|
||||
? raw
|
||||
: typeof raw === "string" && raw.trim()
|
||||
? Number(raw)
|
||||
: Number.NaN;
|
||||
if (!Number.isFinite(numeric)) {
|
||||
return undefined;
|
||||
}
|
||||
return Math.max(0, Math.floor(numeric));
|
||||
}
|
||||
|
||||
export function resolveDefaultCronStaggerMs(expr: string): number | undefined {
|
||||
return isRecurringTopOfHourCronExpr(expr) ? DEFAULT_TOP_OF_HOUR_STAGGER_MS : undefined;
|
||||
}
|
||||
|
||||
export function resolveCronStaggerMs(schedule: Extract<CronSchedule, { kind: "cron" }>): number {
|
||||
const explicit = normalizeCronStaggerMs(schedule.staggerMs);
|
||||
if (explicit !== undefined) {
|
||||
return explicit;
|
||||
}
|
||||
return resolveDefaultCronStaggerMs(schedule.expr) ?? 0;
|
||||
}
|
||||
@@ -3,7 +3,13 @@ import type { ChannelId } from "../channels/plugins/types.js";
|
||||
export type CronSchedule =
|
||||
| { kind: "at"; at: string }
|
||||
| { kind: "every"; everyMs: number; anchorMs?: number }
|
||||
| { kind: "cron"; expr: string; tz?: string };
|
||||
| {
|
||||
kind: "cron";
|
||||
expr: string;
|
||||
tz?: string;
|
||||
/** Optional deterministic stagger window in milliseconds (0 keeps exact schedule). */
|
||||
staggerMs?: number;
|
||||
};
|
||||
|
||||
export type CronSessionTarget = "main" | "isolated";
|
||||
export type CronWakeMode = "next-heartbeat" | "now";
|
||||
|
||||
Reference in New Issue
Block a user