mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-18 21:17:27 +00:00
fix(cron): prevent list/status from silently skipping recurring jobs (openclaw#16201) thanks @zerone0x
Verified: - pnpm install --frozen-lockfile - pnpm build - pnpm check - pnpm test:macmini Co-authored-by: zerone0x <39543393+zerone0x@users.noreply.github.com> Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
This commit is contained in:
229
src/cron/service.issue-16156-list-skips-cron.test.ts
Normal file
229
src/cron/service.issue-16156-list-skips-cron.test.ts
Normal file
@@ -0,0 +1,229 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { CronEvent } from "./service.js";
|
||||
import { CronService } from "./service.js";
|
||||
|
||||
const noopLogger = {
|
||||
debug: vi.fn(),
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
};
|
||||
|
||||
async function makeStorePath() {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cron-16156-"));
|
||||
return {
|
||||
storePath: path.join(dir, "cron", "jobs.json"),
|
||||
cleanup: async () => {
|
||||
await fs.rm(dir, { recursive: true, force: true });
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function createFinishedBarrier() {
|
||||
const resolvers = new Map<string, (evt: CronEvent) => void>();
|
||||
return {
|
||||
waitForOk: (jobId: string) =>
|
||||
new Promise<CronEvent>((resolve) => {
|
||||
resolvers.set(jobId, resolve);
|
||||
}),
|
||||
onEvent: (evt: CronEvent) => {
|
||||
if (evt.action !== "finished" || evt.status !== "ok") {
|
||||
return;
|
||||
}
|
||||
const resolve = resolvers.get(evt.jobId);
|
||||
if (!resolve) {
|
||||
return;
|
||||
}
|
||||
resolvers.delete(evt.jobId);
|
||||
resolve(evt);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
describe("#16156: cron.list() must not silently advance past-due recurring jobs", () => {
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2025-12-13T00:00:00.000Z"));
|
||||
noopLogger.debug.mockClear();
|
||||
noopLogger.info.mockClear();
|
||||
noopLogger.warn.mockClear();
|
||||
noopLogger.error.mockClear();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("does not skip a cron job when list() is called while the job is past-due", async () => {
|
||||
const store = await makeStorePath();
|
||||
const enqueueSystemEvent = vi.fn();
|
||||
const requestHeartbeatNow = vi.fn();
|
||||
const finished = createFinishedBarrier();
|
||||
|
||||
const cron = new CronService({
|
||||
storePath: store.storePath,
|
||||
cronEnabled: true,
|
||||
log: noopLogger,
|
||||
enqueueSystemEvent,
|
||||
requestHeartbeatNow,
|
||||
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
|
||||
onEvent: finished.onEvent,
|
||||
});
|
||||
|
||||
await cron.start();
|
||||
|
||||
// Create a cron job that fires every minute.
|
||||
const job = await cron.add({
|
||||
name: "every-minute",
|
||||
enabled: true,
|
||||
schedule: { kind: "cron", expr: "* * * * *" },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "cron-tick" },
|
||||
});
|
||||
|
||||
const firstDueAt = job.state.nextRunAtMs!;
|
||||
expect(firstDueAt).toBe(Date.parse("2025-12-13T00:01:00.000Z"));
|
||||
|
||||
// Advance time so the job is past-due but the timer hasn't fired yet.
|
||||
vi.setSystemTime(new Date(firstDueAt + 5));
|
||||
|
||||
// Simulate the user running `cron list` while the job is past-due.
|
||||
// Before the fix, this would call recomputeNextRuns() which silently
|
||||
// advances nextRunAtMs to the next occurrence (00:02:00) without
|
||||
// executing the job.
|
||||
const listedBefore = await cron.list({ includeDisabled: true });
|
||||
const jobBeforeTimer = listedBefore.find((j) => j.id === job.id);
|
||||
|
||||
// The job should still show the past-due nextRunAtMs, NOT the advanced one.
|
||||
expect(jobBeforeTimer?.state.nextRunAtMs).toBe(firstDueAt);
|
||||
|
||||
// Now let the timer fire. The job should be found as due and execute.
|
||||
await vi.runOnlyPendingTimersAsync();
|
||||
|
||||
await finished.waitForOk(job.id);
|
||||
|
||||
const jobs = await cron.list({ includeDisabled: true });
|
||||
const updated = jobs.find((j) => j.id === job.id);
|
||||
|
||||
// Job must have actually executed.
|
||||
expect(enqueueSystemEvent).toHaveBeenCalledWith("cron-tick", {
|
||||
agentId: undefined,
|
||||
});
|
||||
expect(updated?.state.lastStatus).toBe("ok");
|
||||
// nextRunAtMs must advance to a future minute boundary after execution.
|
||||
expect(updated?.state.nextRunAtMs).toBeGreaterThan(firstDueAt);
|
||||
|
||||
cron.stop();
|
||||
await store.cleanup();
|
||||
});
|
||||
|
||||
it("does not skip a cron job when status() is called while the job is past-due", async () => {
|
||||
const store = await makeStorePath();
|
||||
const enqueueSystemEvent = vi.fn();
|
||||
const requestHeartbeatNow = vi.fn();
|
||||
const finished = createFinishedBarrier();
|
||||
|
||||
const cron = new CronService({
|
||||
storePath: store.storePath,
|
||||
cronEnabled: true,
|
||||
log: noopLogger,
|
||||
enqueueSystemEvent,
|
||||
requestHeartbeatNow,
|
||||
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
|
||||
onEvent: finished.onEvent,
|
||||
});
|
||||
|
||||
await cron.start();
|
||||
|
||||
const job = await cron.add({
|
||||
name: "five-min-cron",
|
||||
enabled: true,
|
||||
schedule: { kind: "cron", expr: "*/5 * * * *" },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "tick-5" },
|
||||
});
|
||||
|
||||
const firstDueAt = job.state.nextRunAtMs!;
|
||||
|
||||
// Advance time past due.
|
||||
vi.setSystemTime(new Date(firstDueAt + 10));
|
||||
|
||||
// Call status() while job is past-due.
|
||||
await cron.status();
|
||||
|
||||
// Timer fires.
|
||||
await vi.runOnlyPendingTimersAsync();
|
||||
|
||||
await finished.waitForOk(job.id);
|
||||
|
||||
const jobs = await cron.list({ includeDisabled: true });
|
||||
const updated = jobs.find((j) => j.id === job.id);
|
||||
|
||||
expect(enqueueSystemEvent).toHaveBeenCalledWith("tick-5", {
|
||||
agentId: undefined,
|
||||
});
|
||||
expect(updated?.state.lastStatus).toBe("ok");
|
||||
|
||||
cron.stop();
|
||||
await store.cleanup();
|
||||
});
|
||||
|
||||
it("still fills missing nextRunAtMs via list() for enabled jobs", async () => {
|
||||
const store = await makeStorePath();
|
||||
const nowMs = Date.parse("2025-12-13T00:00:00.000Z");
|
||||
|
||||
// Write a store file with a cron job that has no nextRunAtMs.
|
||||
await fs.mkdir(path.dirname(store.storePath), { recursive: true });
|
||||
await fs.writeFile(
|
||||
store.storePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
version: 1,
|
||||
jobs: [
|
||||
{
|
||||
id: "missing-next",
|
||||
name: "missing next",
|
||||
enabled: true,
|
||||
createdAtMs: nowMs,
|
||||
updatedAtMs: nowMs,
|
||||
schedule: { kind: "cron", expr: "* * * * *", tz: "UTC" },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "now",
|
||||
payload: { kind: "systemEvent", text: "fill-me" },
|
||||
state: {},
|
||||
},
|
||||
],
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
const cron = new CronService({
|
||||
storePath: store.storePath,
|
||||
cronEnabled: true,
|
||||
log: noopLogger,
|
||||
enqueueSystemEvent: vi.fn(),
|
||||
requestHeartbeatNow: vi.fn(),
|
||||
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
|
||||
});
|
||||
|
||||
await cron.start();
|
||||
|
||||
// list() should fill in the missing nextRunAtMs via maintenance recompute.
|
||||
const jobs = await cron.list({ includeDisabled: true });
|
||||
const job = jobs.find((j) => j.id === "missing-next");
|
||||
|
||||
expect(job?.state.nextRunAtMs).toBeTypeOf("number");
|
||||
expect(job?.state.nextRunAtMs).toBeGreaterThan(nowMs);
|
||||
|
||||
cron.stop();
|
||||
await store.cleanup();
|
||||
});
|
||||
});
|
||||
@@ -8,6 +8,7 @@ import {
|
||||
isJobDue,
|
||||
nextWakeAtMs,
|
||||
recomputeNextRuns,
|
||||
recomputeNextRunsForMaintenance,
|
||||
} from "./jobs.js";
|
||||
import { locked } from "./locked.js";
|
||||
import { ensureLoaded, persist, warnIfDisabled } from "./store.js";
|
||||
@@ -53,7 +54,9 @@ export async function status(state: CronServiceState) {
|
||||
return await locked(state, async () => {
|
||||
await ensureLoaded(state, { skipRecompute: true });
|
||||
if (state.store) {
|
||||
const changed = recomputeNextRuns(state);
|
||||
// Use the maintenance-only version so that read-only operations never
|
||||
// advance a past-due nextRunAtMs without executing the job (#16156).
|
||||
const changed = recomputeNextRunsForMaintenance(state);
|
||||
if (changed) {
|
||||
await persist(state);
|
||||
}
|
||||
@@ -71,7 +74,9 @@ export async function list(state: CronServiceState, opts?: { includeDisabled?: b
|
||||
return await locked(state, async () => {
|
||||
await ensureLoaded(state, { skipRecompute: true });
|
||||
if (state.store) {
|
||||
const changed = recomputeNextRuns(state);
|
||||
// Use the maintenance-only version so that read-only operations never
|
||||
// advance a past-due nextRunAtMs without executing the job (#16156).
|
||||
const changed = recomputeNextRunsForMaintenance(state);
|
||||
if (changed) {
|
||||
await persist(state);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user