fix(cron): add retry policy for one-shot jobs on transient errors (#24355) (openclaw#24435) thanks @hugenshen

Verified:
- pnpm install --frozen-lockfile
- pnpm check
- pnpm test -- --run src/cron/service.issue-regressions.test.ts src/config/config-misc.test.ts

Co-authored-by: hugenshen <16300669+hugenshen@users.noreply.github.com>
Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
This commit is contained in:
NIO
2026-03-01 20:58:03 +08:00
committed by GitHub
parent 62a7683ce6
commit ea3955cd78
10 changed files with 406 additions and 23 deletions

View File

@@ -752,6 +752,224 @@ describe("Cron issue regressions", () => {
}
});
it("#24355: one-shot job retries on transient error, then succeeds", async () => {
const store = await makeStorePath();
const scheduledAt = Date.parse("2026-02-06T10:00:00.000Z");
const cronJob = createIsolatedRegressionJob({
id: "oneshot-retry",
name: "reminder",
scheduledAt,
schedule: { kind: "at", at: new Date(scheduledAt).toISOString() },
payload: { kind: "agentTurn", message: "remind me" },
state: { nextRunAtMs: scheduledAt },
});
cronJob.deleteAfterRun = false;
await writeCronJobs(store.storePath, [cronJob]);
let now = scheduledAt;
const runIsolatedAgentJob = vi
.fn()
.mockResolvedValueOnce({ status: "error", error: "429 rate limit exceeded" })
.mockResolvedValueOnce({ status: "ok", summary: "done" });
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
nowMs: () => now,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob,
});
await onTimer(state);
let job = state.store?.jobs.find((j) => j.id === "oneshot-retry");
expect(job).toBeDefined();
expect(job!.enabled).toBe(true);
expect(job!.state.lastStatus).toBe("error");
expect(job!.state.nextRunAtMs).toBeDefined();
expect(job!.state.nextRunAtMs).toBeGreaterThan(scheduledAt);
now = (job!.state.nextRunAtMs ?? 0) + 1;
await onTimer(state);
job = state.store?.jobs.find((j) => j.id === "oneshot-retry");
expect(job).toBeDefined();
expect(job!.state.lastStatus).toBe("ok");
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(2);
});
it("#24355: one-shot job disabled after max transient retries", async () => {
const store = await makeStorePath();
const scheduledAt = Date.parse("2026-02-06T10:00:00.000Z");
const cronJob = createIsolatedRegressionJob({
id: "oneshot-max-retries",
name: "reminder",
scheduledAt,
schedule: { kind: "at", at: new Date(scheduledAt).toISOString() },
payload: { kind: "agentTurn", message: "remind me" },
state: { nextRunAtMs: scheduledAt },
});
await writeCronJobs(store.storePath, [cronJob]);
let now = scheduledAt;
const runIsolatedAgentJob = vi.fn().mockResolvedValue({
status: "error",
error: "429 rate limit exceeded",
});
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
nowMs: () => now,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob,
});
for (let i = 0; i < 4; i++) {
await onTimer(state);
const job = state.store?.jobs.find((j) => j.id === "oneshot-max-retries");
expect(job).toBeDefined();
if (i < 3) {
expect(job!.enabled).toBe(true);
now = (job!.state.nextRunAtMs ?? now) + 1;
} else {
expect(job!.enabled).toBe(false);
}
}
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(4);
});
it("#24355: one-shot job respects cron.retry config", async () => {
const store = await makeStorePath();
const scheduledAt = Date.parse("2026-02-06T10:00:00.000Z");
const cronJob = createIsolatedRegressionJob({
id: "oneshot-custom-retry",
name: "reminder",
scheduledAt,
schedule: { kind: "at", at: new Date(scheduledAt).toISOString() },
payload: { kind: "agentTurn", message: "remind me" },
state: { nextRunAtMs: scheduledAt },
});
await writeCronJobs(store.storePath, [cronJob]);
let now = scheduledAt;
const runIsolatedAgentJob = vi.fn().mockResolvedValue({
status: "error",
error: "429 rate limit exceeded",
});
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
nowMs: () => now,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob,
cronConfig: {
retry: { maxAttempts: 2, backoffMs: [1000, 2000] },
},
});
for (let i = 0; i < 4; i++) {
await onTimer(state);
const job = state.store?.jobs.find((j) => j.id === "oneshot-custom-retry");
expect(job).toBeDefined();
if (i < 2) {
expect(job!.enabled).toBe(true);
now = (job!.state.nextRunAtMs ?? now) + 1;
} else {
expect(job!.enabled).toBe(false);
}
}
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(3);
});
it("#24355: one-shot job disabled immediately on permanent error", async () => {
const store = await makeStorePath();
const scheduledAt = Date.parse("2026-02-06T10:00:00.000Z");
const cronJob = createIsolatedRegressionJob({
id: "oneshot-permanent-error",
name: "reminder",
scheduledAt,
schedule: { kind: "at", at: new Date(scheduledAt).toISOString() },
payload: { kind: "agentTurn", message: "remind me" },
state: { nextRunAtMs: scheduledAt },
});
await writeCronJobs(store.storePath, [cronJob]);
let now = scheduledAt;
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
nowMs: () => now,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn().mockResolvedValue({
status: "error",
error: "invalid API key",
}),
});
await onTimer(state);
const job = state.store?.jobs.find((j) => j.id === "oneshot-permanent-error");
expect(job).toBeDefined();
expect(job!.enabled).toBe(false);
expect(job!.state.lastStatus).toBe("error");
expect(job!.state.nextRunAtMs).toBeUndefined();
});
it("#24355: deleteAfterRun:true one-shot job is deleted after successful retry", async () => {
const store = await makeStorePath();
const scheduledAt = Date.parse("2026-02-06T10:00:00.000Z");
const cronJob = createIsolatedRegressionJob({
id: "oneshot-deleteAfterRun-retry",
name: "reminder",
scheduledAt,
schedule: { kind: "at", at: new Date(scheduledAt).toISOString() },
payload: { kind: "agentTurn", message: "remind me" },
state: { nextRunAtMs: scheduledAt },
});
cronJob.deleteAfterRun = true;
await writeCronJobs(store.storePath, [cronJob]);
let now = scheduledAt;
const runIsolatedAgentJob = vi
.fn()
.mockResolvedValueOnce({ status: "error", error: "429 rate limit exceeded" })
.mockResolvedValueOnce({ status: "ok", summary: "done" });
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
nowMs: () => now,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob,
});
// First run: transient error → retry scheduled, job still in store.
await onTimer(state);
let job = state.store?.jobs.find((j) => j.id === "oneshot-deleteAfterRun-retry");
expect(job).toBeDefined();
expect(job!.enabled).toBe(true);
expect(job!.state.lastStatus).toBe("error");
expect(job!.state.nextRunAtMs).toBeGreaterThan(scheduledAt);
// Second run: success → deleteAfterRun removes the job from the store.
now = (job!.state.nextRunAtMs ?? 0) + 1;
await onTimer(state);
const deleted = state.store?.jobs.find((j) => j.id === "oneshot-deleteAfterRun-retry");
expect(deleted).toBeUndefined();
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(2);
});
it("prevents spin loop when cron job completes within the scheduled second (#17821)", async () => {
const store = await makeStorePath();
// Simulate a cron job "0 13 * * *" (daily 13:00 UTC) that fires exactly

View File

@@ -1,3 +1,4 @@
import type { CronConfig, CronRetryOn } from "../../config/types.cron.js";
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
import { DEFAULT_AGENT_ID } from "../../routing/session-key.js";
import { resolveCronDeliveryPlan } from "../delivery.js";
@@ -91,7 +92,7 @@ function isAbortError(err: unknown): boolean {
* Exponential backoff delays (in ms) indexed by consecutive error count.
* After the last entry the delay stays constant.
*/
const ERROR_BACKOFF_SCHEDULE_MS = [
const DEFAULT_BACKOFF_SCHEDULE_MS = [
30_000, // 1st error → 30 s
60_000, // 2nd error → 1 min
5 * 60_000, // 3rd error → 5 min
@@ -99,9 +100,43 @@ const ERROR_BACKOFF_SCHEDULE_MS = [
60 * 60_000, // 5th+ error → 60 min
];
function errorBackoffMs(consecutiveErrors: number): number {
const idx = Math.min(consecutiveErrors - 1, ERROR_BACKOFF_SCHEDULE_MS.length - 1);
return ERROR_BACKOFF_SCHEDULE_MS[Math.max(0, idx)];
function errorBackoffMs(
consecutiveErrors: number,
scheduleMs = DEFAULT_BACKOFF_SCHEDULE_MS,
): number {
const idx = Math.min(consecutiveErrors - 1, scheduleMs.length - 1);
return scheduleMs[Math.max(0, idx)];
}
/** Default max retries for one-shot jobs on transient errors (#24355). */
const DEFAULT_MAX_TRANSIENT_RETRIES = 3;
const TRANSIENT_PATTERNS: Record<string, RegExp> = {
rate_limit: /(rate[_ ]limit|too many requests|429|resource has been exhausted|cloudflare)/i,
network: /(network|econnreset|econnrefused|fetch failed|socket)/i,
timeout: /(timeout|etimedout)/i,
server_error: /\b5\d{2}\b/,
};
function isTransientCronError(error: string | undefined, retryOn?: CronRetryOn[]): boolean {
if (!error || typeof error !== "string") {
return false;
}
const keys = retryOn?.length ? retryOn : (Object.keys(TRANSIENT_PATTERNS) as CronRetryOn[]);
return keys.some((k) => TRANSIENT_PATTERNS[k]?.test(error));
}
function resolveRetryConfig(cronConfig?: CronConfig) {
const retry = cronConfig?.retry;
return {
maxAttempts:
typeof retry?.maxAttempts === "number" ? retry.maxAttempts : DEFAULT_MAX_TRANSIENT_RETRIES,
backoffMs:
Array.isArray(retry?.backoffMs) && retry.backoffMs.length > 0
? retry.backoffMs
: DEFAULT_BACKOFF_SCHEDULE_MS.slice(0, 3),
retryOn: Array.isArray(retry?.retryOn) && retry.retryOn.length > 0 ? retry.retryOn : undefined,
};
}
function resolveDeliveryStatus(params: { job: CronJob; delivered?: boolean }): CronDeliveryStatus {
@@ -155,21 +190,47 @@ export function applyJobResult(
if (!shouldDelete) {
if (job.schedule.kind === "at") {
// One-shot jobs are always disabled after ANY terminal status
// (ok, error, or skipped). This prevents tight-loop rescheduling
// when computeJobNextRunAtMs returns the past atMs value (#11452).
job.enabled = false;
job.state.nextRunAtMs = undefined;
if (result.status === "error") {
state.deps.log.warn(
{
jobId: job.id,
jobName: job.name,
consecutiveErrors: job.state.consecutiveErrors,
error: result.error,
},
"cron: disabling one-shot job after error",
);
if (result.status === "ok" || result.status === "skipped") {
// One-shot done or skipped: disable to prevent tight-loop (#11452).
job.enabled = false;
job.state.nextRunAtMs = undefined;
} else if (result.status === "error") {
const retryConfig = resolveRetryConfig(state.deps.cronConfig);
const transient = isTransientCronError(result.error, retryConfig.retryOn);
// consecutiveErrors is always set to ≥1 by the increment block above.
const consecutive = job.state.consecutiveErrors;
if (transient && consecutive <= retryConfig.maxAttempts) {
// Schedule retry with backoff (#24355).
const backoff = errorBackoffMs(consecutive, retryConfig.backoffMs);
job.state.nextRunAtMs = result.endedAt + backoff;
state.deps.log.info(
{
jobId: job.id,
jobName: job.name,
consecutiveErrors: consecutive,
backoffMs: backoff,
nextRunAtMs: job.state.nextRunAtMs,
},
"cron: scheduling one-shot retry after transient error",
);
} else {
// Permanent error or max retries exhausted: disable.
// Note: deleteAfterRun:true only triggers on ok (see shouldDelete above),
// so exhausted-retry jobs are disabled but intentionally kept in the store
// to preserve the error state for inspection.
job.enabled = false;
job.state.nextRunAtMs = undefined;
state.deps.log.warn(
{
jobId: job.id,
jobName: job.name,
consecutiveErrors: consecutive,
error: result.error,
reason: transient ? "max retries exhausted" : "permanent error",
},
"cron: disabling one-shot job after error",
);
}
}
} else if (result.status === "error" && job.enabled) {
// Apply exponential backoff for errored jobs to prevent retry storms.
@@ -474,9 +535,20 @@ function isRunnableJob(params: {
return false;
}
if (params.skipAtIfAlreadyRan && job.schedule.kind === "at" && job.state.lastStatus) {
// Any terminal status (ok, error, skipped) means the job already ran at least once.
// Don't re-fire it on restart — applyJobResult disables one-shot jobs, but guard
// here defensively (#13845).
// One-shot with terminal status: skip unless it's a transient-error retry.
// Retries have nextRunAtMs > lastRunAtMs (scheduled after the failed run) (#24355).
// ok/skipped or error-without-retry always skip (#13845).
const lastRun = job.state.lastRunAtMs;
const nextRun = job.state.nextRunAtMs;
if (
job.state.lastStatus === "error" &&
job.enabled &&
typeof nextRun === "number" &&
typeof lastRun === "number" &&
nextRun > lastRun
) {
return nowMs >= nextRun;
}
return false;
}
const next = job.state.nextRunAtMs;