Files
openclaw/src/cron/service/timer.ts
Marcus Widing ea95e88dd6 fix(cron): prevent duplicate delivery for isolated jobs with announce mode
When an isolated cron job delivers its output via deliverOutboundPayloads
or the subagent announce flow, the finish handler in executeJobCore
unconditionally posts a summary to the main agent session and wakes it
via requestHeartbeatNow. The main agent then generates a second response
that is also delivered to the target channel, resulting in duplicate
messages with different content.

Add a `delivered` flag to RunCronAgentTurnResult that is set to true
when the isolated run successfully delivers its output. In executeJobCore,
skip the enqueueSystemEvent + requestHeartbeatNow call when the flag is
set, preventing the main agent from waking up and double-posting.

Fixes #15692
2026-02-13 23:49:10 +01:00

600 lines
18 KiB
TypeScript

import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
import type { CronJob } from "../types.js";
import type { CronEvent, CronServiceState } from "./state.js";
import { DEFAULT_AGENT_ID } from "../../routing/session-key.js";
import { resolveCronDeliveryPlan } from "../delivery.js";
import { sweepCronRunSessions } from "../session-reaper.js";
import {
computeJobNextRunAtMs,
nextWakeAtMs,
recomputeNextRuns,
recomputeNextRunsForMaintenance,
resolveJobPayloadTextForMain,
} from "./jobs.js";
import { locked } from "./locked.js";
import { ensureLoaded, persist } from "./store.js";
const MAX_TIMER_DELAY_MS = 60_000;
/**
* Maximum wall-clock time for a single job execution. Acts as a safety net
* on top of the per-provider / per-agent timeouts to prevent one stuck job
* from wedging the entire cron lane.
*/
const DEFAULT_JOB_TIMEOUT_MS = 10 * 60_000; // 10 minutes
/**
* Exponential backoff delays (in ms) indexed by consecutive error count.
* After the last entry the delay stays constant.
*/
const ERROR_BACKOFF_SCHEDULE_MS = [
30_000, // 1st error → 30 s
60_000, // 2nd error → 1 min
5 * 60_000, // 3rd error → 5 min
15 * 60_000, // 4th error → 15 min
60 * 60_000, // 5th+ error → 60 min
];
function errorBackoffMs(consecutiveErrors: number): number {
const idx = Math.min(consecutiveErrors - 1, ERROR_BACKOFF_SCHEDULE_MS.length - 1);
return ERROR_BACKOFF_SCHEDULE_MS[Math.max(0, idx)];
}
/**
* Apply the result of a job execution to the job's state.
* Handles consecutive error tracking, exponential backoff, one-shot disable,
* and nextRunAtMs computation. Returns `true` if the job should be deleted.
*/
function applyJobResult(
state: CronServiceState,
job: CronJob,
result: {
status: "ok" | "error" | "skipped";
error?: string;
startedAt: number;
endedAt: number;
},
): boolean {
job.state.runningAtMs = undefined;
job.state.lastRunAtMs = result.startedAt;
job.state.lastStatus = result.status;
job.state.lastDurationMs = Math.max(0, result.endedAt - result.startedAt);
job.state.lastError = result.error;
job.updatedAtMs = result.endedAt;
// Track consecutive errors for backoff / auto-disable.
if (result.status === "error") {
job.state.consecutiveErrors = (job.state.consecutiveErrors ?? 0) + 1;
} else {
job.state.consecutiveErrors = 0;
}
const shouldDelete =
job.schedule.kind === "at" && job.deleteAfterRun === true && result.status === "ok";
if (!shouldDelete) {
if (job.schedule.kind === "at") {
// One-shot jobs are always disabled after ANY terminal status
// (ok, error, or skipped). This prevents tight-loop rescheduling
// when computeJobNextRunAtMs returns the past atMs value (#11452).
job.enabled = false;
job.state.nextRunAtMs = undefined;
if (result.status === "error") {
state.deps.log.warn(
{
jobId: job.id,
jobName: job.name,
consecutiveErrors: job.state.consecutiveErrors,
error: result.error,
},
"cron: disabling one-shot job after error",
);
}
} else if (result.status === "error" && job.enabled) {
// Apply exponential backoff for errored jobs to prevent retry storms.
const backoff = errorBackoffMs(job.state.consecutiveErrors ?? 1);
const normalNext = computeJobNextRunAtMs(job, result.endedAt);
const backoffNext = result.endedAt + backoff;
// Use whichever is later: the natural next run or the backoff delay.
job.state.nextRunAtMs =
normalNext !== undefined ? Math.max(normalNext, backoffNext) : backoffNext;
state.deps.log.info(
{
jobId: job.id,
consecutiveErrors: job.state.consecutiveErrors,
backoffMs: backoff,
nextRunAtMs: job.state.nextRunAtMs,
},
"cron: applying error backoff",
);
} else if (job.enabled) {
job.state.nextRunAtMs = computeJobNextRunAtMs(job, result.endedAt);
} else {
job.state.nextRunAtMs = undefined;
}
}
return shouldDelete;
}
export function armTimer(state: CronServiceState) {
if (state.timer) {
clearTimeout(state.timer);
}
state.timer = null;
if (!state.deps.cronEnabled) {
state.deps.log.debug({}, "cron: armTimer skipped - scheduler disabled");
return;
}
const nextAt = nextWakeAtMs(state);
if (!nextAt) {
const jobCount = state.store?.jobs.length ?? 0;
const enabledCount = state.store?.jobs.filter((j) => j.enabled).length ?? 0;
const withNextRun =
state.store?.jobs.filter((j) => j.enabled && typeof j.state.nextRunAtMs === "number")
.length ?? 0;
state.deps.log.debug(
{ jobCount, enabledCount, withNextRun },
"cron: armTimer skipped - no jobs with nextRunAtMs",
);
return;
}
const now = state.deps.nowMs();
const delay = Math.max(nextAt - now, 0);
// Wake at least once a minute to avoid schedule drift and recover quickly
// when the process was paused or wall-clock time jumps.
const clampedDelay = Math.min(delay, MAX_TIMER_DELAY_MS);
state.timer = setTimeout(async () => {
try {
await onTimer(state);
} catch (err) {
state.deps.log.error({ err: String(err) }, "cron: timer tick failed");
}
}, clampedDelay);
state.deps.log.debug(
{ nextAt, delayMs: clampedDelay, clamped: delay > MAX_TIMER_DELAY_MS },
"cron: timer armed",
);
}
export async function onTimer(state: CronServiceState) {
if (state.running) {
// Re-arm the timer so the scheduler keeps ticking even when a job is
// still executing. Without this, a long-running job (e.g. an agentTurn
// exceeding MAX_TIMER_DELAY_MS) causes the clamped 60 s timer to fire
// while `running` is true. The early return then leaves no timer set,
// silently killing the scheduler until the next gateway restart.
//
// We use MAX_TIMER_DELAY_MS as a fixed re-check interval to avoid a
// zero-delay hot-loop when past-due jobs are waiting for the current
// execution to finish.
// See: https://github.com/openclaw/openclaw/issues/12025
if (state.timer) {
clearTimeout(state.timer);
}
state.timer = setTimeout(async () => {
try {
await onTimer(state);
} catch (err) {
state.deps.log.error({ err: String(err) }, "cron: timer tick failed");
}
}, MAX_TIMER_DELAY_MS);
return;
}
state.running = true;
try {
const dueJobs = await locked(state, async () => {
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
const due = findDueJobs(state);
if (due.length === 0) {
// Use maintenance-only recompute to avoid advancing past-due nextRunAtMs
// values without execution. This prevents jobs from being silently skipped
// when the timer wakes up but findDueJobs returns empty (see #13992).
const changed = recomputeNextRunsForMaintenance(state);
if (changed) {
await persist(state);
}
return [];
}
const now = state.deps.nowMs();
for (const job of due) {
job.state.runningAtMs = now;
job.state.lastError = undefined;
}
await persist(state);
return due.map((j) => ({
id: j.id,
job: j,
}));
});
const results: Array<{
jobId: string;
status: "ok" | "error" | "skipped";
error?: string;
summary?: string;
sessionId?: string;
sessionKey?: string;
startedAt: number;
endedAt: number;
}> = [];
for (const { id, job } of dueJobs) {
const startedAt = state.deps.nowMs();
job.state.runningAtMs = startedAt;
emit(state, { jobId: job.id, action: "started", runAtMs: startedAt });
const jobTimeoutMs =
job.payload.kind === "agentTurn" && typeof job.payload.timeoutSeconds === "number"
? job.payload.timeoutSeconds * 1_000
: DEFAULT_JOB_TIMEOUT_MS;
try {
let timeoutId: NodeJS.Timeout;
const result = await Promise.race([
executeJobCore(state, job),
new Promise<never>((_, reject) => {
timeoutId = setTimeout(
() => reject(new Error("cron: job execution timed out")),
jobTimeoutMs,
);
}),
]).finally(() => clearTimeout(timeoutId!));
results.push({ jobId: id, ...result, startedAt, endedAt: state.deps.nowMs() });
} catch (err) {
state.deps.log.warn(
{ jobId: id, jobName: job.name, timeoutMs: jobTimeoutMs },
`cron: job failed: ${String(err)}`,
);
results.push({
jobId: id,
status: "error",
error: String(err),
startedAt,
endedAt: state.deps.nowMs(),
});
}
}
if (results.length > 0) {
await locked(state, async () => {
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
for (const result of results) {
const job = state.store?.jobs.find((j) => j.id === result.jobId);
if (!job) {
continue;
}
const shouldDelete = applyJobResult(state, job, {
status: result.status,
error: result.error,
startedAt: result.startedAt,
endedAt: result.endedAt,
});
emit(state, {
jobId: job.id,
action: "finished",
status: result.status,
error: result.error,
summary: result.summary,
sessionId: result.sessionId,
sessionKey: result.sessionKey,
runAtMs: result.startedAt,
durationMs: job.state.lastDurationMs,
nextRunAtMs: job.state.nextRunAtMs,
});
if (shouldDelete && state.store) {
state.store.jobs = state.store.jobs.filter((j) => j.id !== job.id);
emit(state, { jobId: job.id, action: "removed" });
}
}
recomputeNextRuns(state);
await persist(state);
});
}
// Piggyback session reaper on timer tick (self-throttled to every 5 min).
const storePaths = new Set<string>();
if (state.deps.resolveSessionStorePath) {
const defaultAgentId = state.deps.defaultAgentId ?? DEFAULT_AGENT_ID;
if (state.store?.jobs?.length) {
for (const job of state.store.jobs) {
const agentId =
typeof job.agentId === "string" && job.agentId.trim() ? job.agentId : defaultAgentId;
storePaths.add(state.deps.resolveSessionStorePath(agentId));
}
} else {
storePaths.add(state.deps.resolveSessionStorePath(defaultAgentId));
}
} else if (state.deps.sessionStorePath) {
storePaths.add(state.deps.sessionStorePath);
}
if (storePaths.size > 0) {
const nowMs = state.deps.nowMs();
for (const storePath of storePaths) {
try {
await sweepCronRunSessions({
cronConfig: state.deps.cronConfig,
sessionStorePath: storePath,
nowMs,
log: state.deps.log,
});
} catch (err) {
state.deps.log.warn({ err: String(err), storePath }, "cron: session reaper sweep failed");
}
}
}
} finally {
state.running = false;
armTimer(state);
}
}
function findDueJobs(state: CronServiceState): CronJob[] {
if (!state.store) {
return [];
}
const now = state.deps.nowMs();
return state.store.jobs.filter((j) => {
if (!j.state) {
j.state = {};
}
if (!j.enabled) {
return false;
}
if (typeof j.state.runningAtMs === "number") {
return false;
}
const next = j.state.nextRunAtMs;
return typeof next === "number" && now >= next;
});
}
export async function runMissedJobs(state: CronServiceState) {
if (!state.store) {
return;
}
const now = state.deps.nowMs();
const missed = state.store.jobs.filter((j) => {
if (!j.state) {
j.state = {};
}
if (!j.enabled) {
return false;
}
if (typeof j.state.runningAtMs === "number") {
return false;
}
const next = j.state.nextRunAtMs;
if (j.schedule.kind === "at" && j.state.lastStatus) {
// Any terminal status (ok, error, skipped) means the job already
// ran at least once. Don't re-fire it on restart — applyJobResult
// disables one-shot jobs, but guard here defensively (#13845).
return false;
}
return typeof next === "number" && now >= next;
});
if (missed.length > 0) {
state.deps.log.info(
{ count: missed.length, jobIds: missed.map((j) => j.id) },
"cron: running missed jobs after restart",
);
for (const job of missed) {
await executeJob(state, job, now, { forced: false });
}
}
}
export async function runDueJobs(state: CronServiceState) {
if (!state.store) {
return;
}
const now = state.deps.nowMs();
const due = state.store.jobs.filter((j) => {
if (!j.state) {
j.state = {};
}
if (!j.enabled) {
return false;
}
if (typeof j.state.runningAtMs === "number") {
return false;
}
const next = j.state.nextRunAtMs;
return typeof next === "number" && now >= next;
});
for (const job of due) {
await executeJob(state, job, now, { forced: false });
}
}
async function executeJobCore(
state: CronServiceState,
job: CronJob,
): Promise<{
status: "ok" | "error" | "skipped";
error?: string;
summary?: string;
sessionId?: string;
sessionKey?: string;
}> {
if (job.sessionTarget === "main") {
const text = resolveJobPayloadTextForMain(job);
if (!text) {
const kind = job.payload.kind;
return {
status: "skipped",
error:
kind === "systemEvent"
? "main job requires non-empty systemEvent text"
: 'main job requires payload.kind="systemEvent"',
};
}
state.deps.enqueueSystemEvent(text, { agentId: job.agentId });
if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) {
const reason = `cron:${job.id}`;
const delay = (ms: number) => new Promise<void>((resolve) => setTimeout(resolve, ms));
const maxWaitMs = 2 * 60_000;
const waitStartedAt = state.deps.nowMs();
let heartbeatResult: HeartbeatRunResult;
for (;;) {
heartbeatResult = await state.deps.runHeartbeatOnce({ reason, agentId: job.agentId });
if (
heartbeatResult.status !== "skipped" ||
heartbeatResult.reason !== "requests-in-flight"
) {
break;
}
if (state.deps.nowMs() - waitStartedAt > maxWaitMs) {
state.deps.requestHeartbeatNow({ reason });
return { status: "ok", summary: text };
}
await delay(250);
}
if (heartbeatResult.status === "ran") {
return { status: "ok", summary: text };
} else if (heartbeatResult.status === "skipped") {
return { status: "skipped", error: heartbeatResult.reason, summary: text };
} else {
return { status: "error", error: heartbeatResult.reason, summary: text };
}
} else {
state.deps.requestHeartbeatNow({ reason: `cron:${job.id}` });
return { status: "ok", summary: text };
}
}
if (job.payload.kind !== "agentTurn") {
return { status: "skipped", error: "isolated job requires payload.kind=agentTurn" };
}
const res = await state.deps.runIsolatedAgentJob({
job,
message: job.payload.message,
});
// Post a short summary back to the main session — but only when the
// isolated run did NOT already deliver its output to the target channel.
// When `res.delivered` is true the announce flow (or direct outbound
// delivery) already sent the result, so posting the summary to main
// would wake the main agent and cause a duplicate message.
// See: https://github.com/openclaw/openclaw/issues/15692
const summaryText = res.summary?.trim();
const deliveryPlan = resolveCronDeliveryPlan(job);
if (summaryText && deliveryPlan.requested && !res.delivered) {
const prefix = "Cron";
const label =
res.status === "error" ? `${prefix} (error): ${summaryText}` : `${prefix}: ${summaryText}`;
state.deps.enqueueSystemEvent(label, { agentId: job.agentId });
if (job.wakeMode === "now") {
state.deps.requestHeartbeatNow({ reason: `cron:${job.id}` });
}
}
return {
status: res.status,
error: res.error,
summary: res.summary,
sessionId: res.sessionId,
sessionKey: res.sessionKey,
};
}
/**
* Execute a job. This version is used by the `run` command and other
* places that need the full execution with state updates.
*/
export async function executeJob(
state: CronServiceState,
job: CronJob,
_nowMs: number,
_opts: { forced: boolean },
) {
if (!job.state) {
job.state = {};
}
const startedAt = state.deps.nowMs();
job.state.runningAtMs = startedAt;
job.state.lastError = undefined;
emit(state, { jobId: job.id, action: "started", runAtMs: startedAt });
let coreResult: {
status: "ok" | "error" | "skipped";
error?: string;
summary?: string;
sessionId?: string;
sessionKey?: string;
};
try {
coreResult = await executeJobCore(state, job);
} catch (err) {
coreResult = { status: "error", error: String(err) };
}
const endedAt = state.deps.nowMs();
const shouldDelete = applyJobResult(state, job, {
status: coreResult.status,
error: coreResult.error,
startedAt,
endedAt,
});
emit(state, {
jobId: job.id,
action: "finished",
status: coreResult.status,
error: coreResult.error,
summary: coreResult.summary,
sessionId: coreResult.sessionId,
sessionKey: coreResult.sessionKey,
runAtMs: startedAt,
durationMs: job.state.lastDurationMs,
nextRunAtMs: job.state.nextRunAtMs,
});
if (shouldDelete && state.store) {
state.store.jobs = state.store.jobs.filter((j) => j.id !== job.id);
emit(state, { jobId: job.id, action: "removed" });
}
}
export function wake(
state: CronServiceState,
opts: { mode: "now" | "next-heartbeat"; text: string },
) {
const text = opts.text.trim();
if (!text) {
return { ok: false } as const;
}
state.deps.enqueueSystemEvent(text);
if (opts.mode === "now") {
state.deps.requestHeartbeatNow({ reason: "wake" });
}
return { ok: true } as const;
}
export function stopTimer(state: CronServiceState) {
if (state.timer) {
clearTimeout(state.timer);
}
state.timer = null;
}
export function emit(state: CronServiceState, evt: CronEvent) {
try {
state.deps.onEvent?.(evt);
} catch {
/* ignore */
}
}