From 09d5f508b1ddf279f153157da0c75a2316a934b6 Mon Sep 17 00:00:00 2001 From: Simone Macario <2116609+simonemacario@users.noreply.github.com> Date: Sun, 22 Feb 2026 02:47:29 +0800 Subject: [PATCH] fix(cron): persist delivered flag in job state to surface delivery failures (openclaw#19174) thanks @simonemacario Verified: - pnpm build - pnpm check - pnpm test:macmini Co-authored-by: simonemacario <2116609+simonemacario@users.noreply.github.com> Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com> --- CHANGELOG.md | 1 + src/cron/run-log.ts | 4 + .../service.persists-delivered-status.test.ts | 210 ++++++++++++++++++ src/cron/service/state.ts | 1 + src/cron/service/timer.ts | 11 +- src/cron/types.ts | 2 + src/gateway/protocol/schema/cron.ts | 1 + src/gateway/server-cron.ts | 1 + 8 files changed, 230 insertions(+), 1 deletion(-) create mode 100644 src/cron/service.persists-delivered-status.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index a9ed3b02beb..b8fa5ddc439 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -98,6 +98,7 @@ Docs: https://docs.openclaw.ai - Gateway/Pairing: tolerate legacy paired devices missing `roles`/`scopes` metadata in websocket upgrade checks and backfill metadata on reconnect. (#21447, fixes #21236) Thanks @joshavant. - Gateway/Pairing/CLI: align read-scope compatibility in pairing/device-token checks and add local `openclaw devices` fallback recovery for loopback `pairing required` deadlocks, with explicit fallback notice to unblock approval bootstrap flows. (#21616) Thanks @shakkernerd. - Cron: honor `cron.maxConcurrentRuns` in the timer loop so due jobs can execute up to the configured parallelism instead of always running serially. (#11595) Thanks @Takhoffman. +- Cron/Isolated delivery: persist `lastDelivered` in cron job state and run logs for isolated-session runs so delivery failures are visible even when execution status is `ok`. (#19154) Thanks @simonemacario. - Agents/Compaction: restore embedded compaction safeguard/context-pruning extension loading in production by wiring bundled extension factories into the resource loader instead of runtime file-path resolution. (#22349) Thanks @Glucksberg. - Agents/Subagents: restore announce-chain delivery to agent injection, defer nested announce output until descendant follow-up content is ready, and prevent descendant deferrals from consuming announce retry budget so deep chains do not drop final completions. (#22223) Thanks @tyler6204. - Agents/System Prompt: label allowlisted senders as authorized senders to avoid implying ownership. Thanks @thewilloftheshadow. diff --git a/src/cron/run-log.ts b/src/cron/run-log.ts index bcb27c9e157..0a2c74959fe 100644 --- a/src/cron/run-log.ts +++ b/src/cron/run-log.ts @@ -9,6 +9,7 @@ export type CronRunLogEntry = { status?: CronRunStatus; error?: string; summary?: string; + delivered?: boolean; sessionId?: string; sessionKey?: string; runAtMs?: number; @@ -127,6 +128,9 @@ export async function readCronRunLogEntries( } : undefined, }; + if (typeof obj.delivered === "boolean") { + entry.delivered = obj.delivered; + } if (typeof obj.sessionId === "string" && obj.sessionId.trim().length > 0) { entry.sessionId = obj.sessionId; } diff --git a/src/cron/service.persists-delivered-status.test.ts b/src/cron/service.persists-delivered-status.test.ts new file mode 100644 index 00000000000..ea9712aca59 --- /dev/null +++ b/src/cron/service.persists-delivered-status.test.ts @@ -0,0 +1,210 @@ +import { describe, expect, it, vi } from "vitest"; +import { CronService } from "./service.js"; +import { + createStartedCronServiceWithFinishedBarrier, + createCronStoreHarness, + createNoopLogger, + installCronTestHooks, +} from "./service.test-harness.js"; + +const noopLogger = createNoopLogger(); +const { makeStorePath } = createCronStoreHarness(); +installCronTestHooks({ logger: noopLogger }); + +describe("CronService persists delivered status", () => { + it("persists lastDelivered=true when isolated job reports delivered", async () => { + const store = await makeStorePath(); + const finished = { + resolvers: new Map void>(), + waitForOk(jobId: string) { + return new Promise((resolve) => { + this.resolvers.set(jobId, resolve); + }); + }, + }; + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ + status: "ok" as const, + summary: "done", + delivered: true, + })), + onEvent: (evt) => { + if (evt.action === "finished" && evt.status === "ok") { + finished.resolvers.get(evt.jobId)?.(); + finished.resolvers.delete(evt.jobId); + } + }, + }); + + await cron.start(); + const job = await cron.add({ + name: "delivered-true", + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "test" }, + delivery: { mode: "none" }, + }); + + vi.setSystemTime(new Date(job.state.nextRunAtMs! + 5)); + await vi.runOnlyPendingTimersAsync(); + await finished.waitForOk(job.id); + + const jobs = await cron.list({ includeDisabled: true }); + const updated = jobs.find((j) => j.id === job.id); + + expect(updated?.state.lastStatus).toBe("ok"); + expect(updated?.state.lastDelivered).toBe(true); + + cron.stop(); + }); + + it("persists lastDelivered=undefined when isolated job does not deliver", async () => { + const store = await makeStorePath(); + const finished = { + resolvers: new Map void>(), + waitForOk(jobId: string) { + return new Promise((resolve) => { + this.resolvers.set(jobId, resolve); + }); + }, + }; + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ + status: "ok" as const, + summary: "done", + })), + onEvent: (evt) => { + if (evt.action === "finished" && evt.status === "ok") { + finished.resolvers.get(evt.jobId)?.(); + finished.resolvers.delete(evt.jobId); + } + }, + }); + + await cron.start(); + const job = await cron.add({ + name: "no-delivery", + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "test" }, + delivery: { mode: "none" }, + }); + + vi.setSystemTime(new Date(job.state.nextRunAtMs! + 5)); + await vi.runOnlyPendingTimersAsync(); + await finished.waitForOk(job.id); + + const jobs = await cron.list({ includeDisabled: true }); + const updated = jobs.find((j) => j.id === job.id); + + expect(updated?.state.lastStatus).toBe("ok"); + expect(updated?.state.lastDelivered).toBeUndefined(); + + cron.stop(); + }); + + it("does not set lastDelivered for main session jobs", async () => { + const store = await makeStorePath(); + const { cron, enqueueSystemEvent, finished } = createStartedCronServiceWithFinishedBarrier({ + storePath: store.storePath, + logger: noopLogger, + }); + + await cron.start(); + const job = await cron.add({ + name: "main-session", + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "tick" }, + }); + + vi.setSystemTime(new Date(job.state.nextRunAtMs! + 5)); + await vi.runOnlyPendingTimersAsync(); + await finished.waitForOk(job.id); + + const jobs = await cron.list({ includeDisabled: true }); + const updated = jobs.find((j) => j.id === job.id); + + expect(updated?.state.lastStatus).toBe("ok"); + expect(updated?.state.lastDelivered).toBeUndefined(); + expect(enqueueSystemEvent).toHaveBeenCalled(); + + cron.stop(); + }); + + it("emits delivered in the finished event", async () => { + const store = await makeStorePath(); + let capturedEvent: { jobId: string; delivered?: boolean } | undefined; + const finished = { + resolvers: new Map void>(), + waitForOk(jobId: string) { + return new Promise((resolve) => { + this.resolvers.set(jobId, resolve); + }); + }, + }; + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ + status: "ok" as const, + summary: "done", + delivered: true, + })), + onEvent: (evt) => { + if (evt.action === "finished") { + capturedEvent = { jobId: evt.jobId, delivered: evt.delivered }; + if (evt.status === "ok") { + finished.resolvers.get(evt.jobId)?.(); + finished.resolvers.delete(evt.jobId); + } + } + }, + }); + + await cron.start(); + const job = await cron.add({ + name: "event-test", + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "test" }, + delivery: { mode: "none" }, + }); + + vi.setSystemTime(new Date(job.state.nextRunAtMs! + 5)); + await vi.runOnlyPendingTimersAsync(); + await finished.waitForOk(job.id); + + expect(capturedEvent).toBeDefined(); + expect(capturedEvent?.delivered).toBe(true); + + // Flush pending store writes before stopping so the temp file is released + // (prevents ENOTEMPTY on Windows when afterAll removes the fixture dir). + await cron.list({ includeDisabled: true }); + cron.stop(); + }); +}); diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index 050ab9c3b0f..c331fa1290b 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -18,6 +18,7 @@ export type CronEvent = { status?: CronRunStatus; error?: string; summary?: string; + delivered?: boolean; sessionId?: string; sessionKey?: string; nextRunAtMs?: number; diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index a51813bbc6c..96b6ccad2e1 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -34,6 +34,7 @@ const DEFAULT_JOB_TIMEOUT_MS = 10 * 60_000; // 10 minutes type TimedCronRunOutcome = CronRunOutcome & CronRunTelemetry & { jobId: string; + delivered?: boolean; startedAt: number; endedAt: number; }; @@ -73,6 +74,7 @@ function applyJobResult( result: { status: CronRunStatus; error?: string; + delivered?: boolean; startedAt: number; endedAt: number; }, @@ -82,6 +84,7 @@ function applyJobResult( job.state.lastStatus = result.status; job.state.lastDurationMs = Math.max(0, result.endedAt - result.startedAt); job.state.lastError = result.error; + job.state.lastDelivered = result.delivered; job.updatedAtMs = result.endedAt; // Track consecutive errors for backoff / auto-disable. @@ -336,6 +339,7 @@ export async function onTimer(state: CronServiceState) { const shouldDelete = applyJobResult(state, job, { status: result.status, error: result.error, + delivered: result.delivered, startedAt: result.startedAt, endedAt: result.endedAt, }); @@ -486,7 +490,7 @@ export async function runDueJobs(state: CronServiceState) { async function executeJobCore( state: CronServiceState, job: CronJob, -): Promise { +): Promise { if (job.sessionTarget === "main") { const text = resolveJobPayloadTextForMain(job); if (!text) { @@ -591,6 +595,7 @@ async function executeJobCore( status: res.status, error: res.error, summary: res.summary, + delivered: res.delivered, sessionId: res.sessionId, sessionKey: res.sessionKey, model: res.model, @@ -619,6 +624,7 @@ export async function executeJob( let coreResult: { status: CronRunStatus; + delivered?: boolean; } & CronRunOutcome & CronRunTelemetry; try { @@ -631,6 +637,7 @@ export async function executeJob( const shouldDelete = applyJobResult(state, job, { status: coreResult.status, error: coreResult.error, + delivered: coreResult.delivered, startedAt, endedAt, }); @@ -648,6 +655,7 @@ function emitJobFinished( job: CronJob, result: { status: CronRunStatus; + delivered?: boolean; } & CronRunOutcome & CronRunTelemetry, runAtMs: number, @@ -658,6 +666,7 @@ function emitJobFinished( status: result.status, error: result.error, summary: result.summary, + delivered: result.delivered, sessionId: result.sessionId, sessionKey: result.sessionKey, runAtMs, diff --git a/src/cron/types.ts b/src/cron/types.ts index 435a1ddaf3c..36a5c28fa83 100644 --- a/src/cron/types.ts +++ b/src/cron/types.ts @@ -93,6 +93,8 @@ export type CronJobState = { consecutiveErrors?: number; /** Number of consecutive schedule computation errors. Auto-disables job after threshold. */ scheduleErrorCount?: number; + /** Whether the last run's output was delivered to the target channel. */ + lastDelivered?: boolean; }; export type CronJob = { diff --git a/src/gateway/protocol/schema/cron.ts b/src/gateway/protocol/schema/cron.ts index 99672b05211..c2e0d06203c 100644 --- a/src/gateway/protocol/schema/cron.ts +++ b/src/gateway/protocol/schema/cron.ts @@ -157,6 +157,7 @@ export const CronJobStateSchema = Type.Object( lastError: Type.Optional(Type.String()), lastDurationMs: Type.Optional(Type.Integer({ minimum: 0 })), consecutiveErrors: Type.Optional(Type.Integer({ minimum: 0 })), + lastDelivered: Type.Optional(Type.Boolean()), }, { additionalProperties: false }, ); diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index a4febc90ff6..b681377b13c 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -295,6 +295,7 @@ export function buildGatewayCronService(params: { status: evt.status, error: evt.error, summary: evt.summary, + delivered: evt.delivered, sessionId: evt.sessionId, sessionKey: evt.sessionKey, runAtMs: evt.runAtMs,