mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-12 19:01:11 +00:00
fix: schedule nextWakeAtMs for isolated sessionTarget cron jobs (#19541)
* fix(cron): repair isolated next wake scheduling * cron: harden isolated next-wake timestamp guards --------- Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
This commit is contained in:
@@ -1035,6 +1035,7 @@ Docs: https://docs.openclaw.ai
|
|||||||
- Feishu: detect bot mentions in post messages with embedded docs when `message.mentions` is empty. (#18074) Thanks @popomore.
|
- Feishu: detect bot mentions in post messages with embedded docs when `message.mentions` is empty. (#18074) Thanks @popomore.
|
||||||
- Agents/Sessions: align session lock watchdog hold windows with run and compaction timeout budgets (plus grace), preventing valid long-running turns from being force-unlocked mid-run while still recovering hung lock owners. (#18060)
|
- Agents/Sessions: align session lock watchdog hold windows with run and compaction timeout budgets (plus grace), preventing valid long-running turns from being force-unlocked mid-run while still recovering hung lock owners. (#18060)
|
||||||
- Cron: preserve default model fallbacks for cron agent runs when only `model.primary` is overridden, so failover still follows configured fallbacks unless explicitly cleared with `fallbacks: []`. (#18210) Thanks @mahsumaktas.
|
- Cron: preserve default model fallbacks for cron agent runs when only `model.primary` is overridden, so failover still follows configured fallbacks unless explicitly cleared with `fallbacks: []`. (#18210) Thanks @mahsumaktas.
|
||||||
|
- Cron/Isolation: treat non-finite `nextRunAtMs` as missing and repair isolated `every` anchor fallback so legacy jobs without valid timestamps self-heal and scheduler wake timing remains valid. (#19469) Thanks @guirguispierre.
|
||||||
- Cron: route text-only announce output through the main session announce flow via runSubagentAnnounceFlow so cron text-only output remains visible to the initiating session. Thanks @tyler6204.
|
- Cron: route text-only announce output through the main session announce flow via runSubagentAnnounceFlow so cron text-only output remains visible to the initiating session. Thanks @tyler6204.
|
||||||
- Cron: treat `timeoutSeconds: 0` as no-timeout (not clamped to 1), ensuring long-running cron runs are not prematurely terminated. Thanks @tyler6204.
|
- Cron: treat `timeoutSeconds: 0` as no-timeout (not clamped to 1), ensuring long-running cron runs are not prematurely terminated. Thanks @tyler6204.
|
||||||
- Cron announce injection now targets the session determined by delivery config (`to` + channel) instead of defaulting to the current session. Thanks @tyler6204.
|
- Cron announce injection now targets the session determined by delivery config (`to` + channel) instead of defaulting to the current session. Thanks @tyler6204.
|
||||||
|
|||||||
@@ -241,6 +241,55 @@ describe("Cron issue regressions", () => {
|
|||||||
cron.stop();
|
cron.stop();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("repairs isolated every jobs missing createdAtMs and sets nextWakeAtMs", async () => {
|
||||||
|
const store = await makeStorePath();
|
||||||
|
await fs.writeFile(
|
||||||
|
store.storePath,
|
||||||
|
JSON.stringify({
|
||||||
|
version: 1,
|
||||||
|
jobs: [
|
||||||
|
{
|
||||||
|
id: "legacy-isolated",
|
||||||
|
agentId: "feature-dev_planner",
|
||||||
|
sessionKey: "agent:main:main",
|
||||||
|
name: "legacy isolated",
|
||||||
|
enabled: true,
|
||||||
|
schedule: { kind: "every", everyMs: 300_000 },
|
||||||
|
sessionTarget: "isolated",
|
||||||
|
wakeMode: "now",
|
||||||
|
payload: { kind: "agentTurn", message: "poll workflow queue" },
|
||||||
|
state: {},
|
||||||
|
},
|
||||||
|
],
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
const cron = new CronService({
|
||||||
|
cronEnabled: true,
|
||||||
|
storePath: store.storePath,
|
||||||
|
log: noopLogger,
|
||||||
|
enqueueSystemEvent: vi.fn(),
|
||||||
|
requestHeartbeatNow: vi.fn(),
|
||||||
|
runIsolatedAgentJob: vi.fn().mockResolvedValue({ status: "ok", summary: "ok" }),
|
||||||
|
});
|
||||||
|
await cron.start();
|
||||||
|
|
||||||
|
const status = await cron.status();
|
||||||
|
const jobs = await cron.list({ includeDisabled: true });
|
||||||
|
const isolated = jobs.find((job) => job.id === "legacy-isolated");
|
||||||
|
expect(Number.isFinite(isolated?.state.nextRunAtMs)).toBe(true);
|
||||||
|
expect(Number.isFinite(status.nextWakeAtMs)).toBe(true);
|
||||||
|
|
||||||
|
const persisted = JSON.parse(await fs.readFile(store.storePath, "utf8")) as {
|
||||||
|
jobs: Array<{ id: string; state?: { nextRunAtMs?: number | null } }>;
|
||||||
|
};
|
||||||
|
const persistedIsolated = persisted.jobs.find((job) => job.id === "legacy-isolated");
|
||||||
|
expect(typeof persistedIsolated?.state?.nextRunAtMs).toBe("number");
|
||||||
|
expect(Number.isFinite(persistedIsolated?.state?.nextRunAtMs)).toBe(true);
|
||||||
|
|
||||||
|
cron.stop();
|
||||||
|
});
|
||||||
|
|
||||||
it("repairs missing nextRunAtMs on non-schedule updates without touching other jobs", async () => {
|
it("repairs missing nextRunAtMs on non-schedule updates without touching other jobs", async () => {
|
||||||
const store = await makeStorePath();
|
const store = await makeStorePath();
|
||||||
const cron = await startCronForStore({ storePath: store.storePath });
|
const cron = await startCronForStore({ storePath: store.storePath });
|
||||||
|
|||||||
@@ -63,15 +63,22 @@ function computeStaggeredCronNextRunAtMs(job: CronJob, nowMs: number) {
|
|||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function isFiniteTimestamp(value: unknown): value is number {
|
||||||
|
return typeof value === "number" && Number.isFinite(value);
|
||||||
|
}
|
||||||
|
|
||||||
function resolveEveryAnchorMs(params: {
|
function resolveEveryAnchorMs(params: {
|
||||||
schedule: { everyMs: number; anchorMs?: number };
|
schedule: { everyMs: number; anchorMs?: number };
|
||||||
fallbackAnchorMs: number;
|
fallbackAnchorMs: number;
|
||||||
}) {
|
}) {
|
||||||
const raw = params.schedule.anchorMs;
|
const raw = params.schedule.anchorMs;
|
||||||
if (typeof raw === "number" && Number.isFinite(raw)) {
|
if (isFiniteTimestamp(raw)) {
|
||||||
return Math.max(0, Math.floor(raw));
|
return Math.max(0, Math.floor(raw));
|
||||||
}
|
}
|
||||||
return Math.max(0, Math.floor(params.fallbackAnchorMs));
|
if (isFiniteTimestamp(params.fallbackAnchorMs)) {
|
||||||
|
return Math.max(0, Math.floor(params.fallbackAnchorMs));
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
export function assertSupportedJobSpec(job: Pick<CronJob, "sessionTarget" | "payload">) {
|
export function assertSupportedJobSpec(job: Pick<CronJob, "sessionTarget" | "payload">) {
|
||||||
@@ -144,11 +151,13 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und
|
|||||||
return nextFromLastRun;
|
return nextFromLastRun;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
const fallbackAnchorMs = isFiniteTimestamp(job.createdAtMs) ? job.createdAtMs : nowMs;
|
||||||
const anchorMs = resolveEveryAnchorMs({
|
const anchorMs = resolveEveryAnchorMs({
|
||||||
schedule: job.schedule,
|
schedule: job.schedule,
|
||||||
fallbackAnchorMs: job.createdAtMs,
|
fallbackAnchorMs,
|
||||||
});
|
});
|
||||||
return computeNextRunAtMs({ ...job.schedule, everyMs, anchorMs }, nowMs);
|
const next = computeNextRunAtMs({ ...job.schedule, everyMs, anchorMs }, nowMs);
|
||||||
|
return isFiniteTimestamp(next) ? next : undefined;
|
||||||
}
|
}
|
||||||
if (job.schedule.kind === "at") {
|
if (job.schedule.kind === "at") {
|
||||||
// One-shot jobs stay due until they successfully finish.
|
// One-shot jobs stay due until they successfully finish.
|
||||||
@@ -167,14 +176,14 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und
|
|||||||
: typeof schedule.at === "string"
|
: typeof schedule.at === "string"
|
||||||
? parseAbsoluteTimeMs(schedule.at)
|
? parseAbsoluteTimeMs(schedule.at)
|
||||||
: null;
|
: null;
|
||||||
return atMs !== null ? atMs : undefined;
|
return atMs !== null && Number.isFinite(atMs) ? atMs : undefined;
|
||||||
}
|
}
|
||||||
const next = computeStaggeredCronNextRunAtMs(job, nowMs);
|
const next = computeStaggeredCronNextRunAtMs(job, nowMs);
|
||||||
if (next === undefined && job.schedule.kind === "cron") {
|
if (next === undefined && job.schedule.kind === "cron") {
|
||||||
const nextSecondMs = Math.floor(nowMs / 1000) * 1000 + 1000;
|
const nextSecondMs = Math.floor(nowMs / 1000) * 1000 + 1000;
|
||||||
return computeStaggeredCronNextRunAtMs(job, nextSecondMs);
|
return computeStaggeredCronNextRunAtMs(job, nextSecondMs);
|
||||||
}
|
}
|
||||||
return next;
|
return isFiniteTimestamp(next) ? next : undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Maximum consecutive schedule errors before auto-disabling a job. */
|
/** Maximum consecutive schedule errors before auto-disabling a job. */
|
||||||
@@ -233,6 +242,11 @@ function normalizeJobTickState(params: { state: CronServiceState; job: CronJob;
|
|||||||
return { changed, skip: true };
|
return { changed, skip: true };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!isFiniteTimestamp(job.state.nextRunAtMs) && job.state.nextRunAtMs !== undefined) {
|
||||||
|
job.state.nextRunAtMs = undefined;
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
|
||||||
const runningAt = job.state.runningAtMs;
|
const runningAt = job.state.runningAtMs;
|
||||||
if (typeof runningAt === "number" && nowMs - runningAt > STUCK_RUN_MS) {
|
if (typeof runningAt === "number" && nowMs - runningAt > STUCK_RUN_MS) {
|
||||||
state.deps.log.warn(
|
state.deps.log.warn(
|
||||||
@@ -298,7 +312,7 @@ export function recomputeNextRuns(state: CronServiceState): boolean {
|
|||||||
// Preserving a still-future nextRunAtMs avoids accidentally advancing
|
// Preserving a still-future nextRunAtMs avoids accidentally advancing
|
||||||
// a job that hasn't fired yet (e.g. during restart recovery).
|
// a job that hasn't fired yet (e.g. during restart recovery).
|
||||||
const nextRun = job.state.nextRunAtMs;
|
const nextRun = job.state.nextRunAtMs;
|
||||||
const isDueOrMissing = nextRun === undefined || now >= nextRun;
|
const isDueOrMissing = !isFiniteTimestamp(nextRun) || now >= nextRun;
|
||||||
if (isDueOrMissing) {
|
if (isDueOrMissing) {
|
||||||
if (recomputeJobNextRunAtMs({ state, job, nowMs: now })) {
|
if (recomputeJobNextRunAtMs({ state, job, nowMs: now })) {
|
||||||
changed = true;
|
changed = true;
|
||||||
@@ -321,7 +335,7 @@ export function recomputeNextRunsForMaintenance(state: CronServiceState): boolea
|
|||||||
// Only compute missing nextRunAtMs, do NOT recompute existing ones.
|
// Only compute missing nextRunAtMs, do NOT recompute existing ones.
|
||||||
// If a job was past-due but not found by findDueJobs, recomputing would
|
// If a job was past-due but not found by findDueJobs, recomputing would
|
||||||
// cause it to be silently skipped.
|
// cause it to be silently skipped.
|
||||||
if (job.state.nextRunAtMs === undefined) {
|
if (!isFiniteTimestamp(job.state.nextRunAtMs)) {
|
||||||
if (recomputeJobNextRunAtMs({ state, job, nowMs: now })) {
|
if (recomputeJobNextRunAtMs({ state, job, nowMs: now })) {
|
||||||
changed = true;
|
changed = true;
|
||||||
}
|
}
|
||||||
@@ -332,14 +346,18 @@ export function recomputeNextRunsForMaintenance(state: CronServiceState): boolea
|
|||||||
|
|
||||||
export function nextWakeAtMs(state: CronServiceState) {
|
export function nextWakeAtMs(state: CronServiceState) {
|
||||||
const jobs = state.store?.jobs ?? [];
|
const jobs = state.store?.jobs ?? [];
|
||||||
const enabled = jobs.filter((j) => j.enabled && typeof j.state.nextRunAtMs === "number");
|
const enabled = jobs.filter((j) => j.enabled && isFiniteTimestamp(j.state.nextRunAtMs));
|
||||||
if (enabled.length === 0) {
|
if (enabled.length === 0) {
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
return enabled.reduce(
|
const first = enabled[0]?.state.nextRunAtMs;
|
||||||
(min, j) => Math.min(min, j.state.nextRunAtMs as number),
|
if (!isFiniteTimestamp(first)) {
|
||||||
enabled[0].state.nextRunAtMs as number,
|
return undefined;
|
||||||
);
|
}
|
||||||
|
return enabled.reduce((min, j) => {
|
||||||
|
const next = j.state.nextRunAtMs;
|
||||||
|
return isFiniteTimestamp(next) ? Math.min(min, next) : min;
|
||||||
|
}, first);
|
||||||
}
|
}
|
||||||
|
|
||||||
export function createJob(state: CronServiceState, input: CronJobCreate): CronJob {
|
export function createJob(state: CronServiceState, input: CronJobCreate): CronJob {
|
||||||
|
|||||||
@@ -250,8 +250,12 @@ export function armTimer(state: CronServiceState) {
|
|||||||
const jobCount = state.store?.jobs.length ?? 0;
|
const jobCount = state.store?.jobs.length ?? 0;
|
||||||
const enabledCount = state.store?.jobs.filter((j) => j.enabled).length ?? 0;
|
const enabledCount = state.store?.jobs.filter((j) => j.enabled).length ?? 0;
|
||||||
const withNextRun =
|
const withNextRun =
|
||||||
state.store?.jobs.filter((j) => j.enabled && typeof j.state.nextRunAtMs === "number")
|
state.store?.jobs.filter(
|
||||||
.length ?? 0;
|
(j) =>
|
||||||
|
j.enabled &&
|
||||||
|
typeof j.state.nextRunAtMs === "number" &&
|
||||||
|
Number.isFinite(j.state.nextRunAtMs),
|
||||||
|
).length ?? 0;
|
||||||
state.deps.log.debug(
|
state.deps.log.debug(
|
||||||
{ jobCount, enabledCount, withNextRun },
|
{ jobCount, enabledCount, withNextRun },
|
||||||
"cron: armTimer skipped - no jobs with nextRunAtMs",
|
"cron: armTimer skipped - no jobs with nextRunAtMs",
|
||||||
@@ -476,7 +480,7 @@ function isRunnableJob(params: {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
const next = job.state.nextRunAtMs;
|
const next = job.state.nextRunAtMs;
|
||||||
return typeof next === "number" && nowMs >= next;
|
return typeof next === "number" && Number.isFinite(next) && nowMs >= next;
|
||||||
}
|
}
|
||||||
|
|
||||||
function collectRunnableJobs(
|
function collectRunnableJobs(
|
||||||
|
|||||||
Reference in New Issue
Block a user