mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-08 00:11:23 +00:00
fix(cron): persist delivered flag in job state to surface delivery failures (openclaw#19174) thanks @simonemacario
Verified: - pnpm build - pnpm check - pnpm test:macmini Co-authored-by: simonemacario <2116609+simonemacario@users.noreply.github.com> Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
This commit is contained in:
@@ -9,6 +9,7 @@ export type CronRunLogEntry = {
|
||||
status?: CronRunStatus;
|
||||
error?: string;
|
||||
summary?: string;
|
||||
delivered?: boolean;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
runAtMs?: number;
|
||||
@@ -127,6 +128,9 @@ export async function readCronRunLogEntries(
|
||||
}
|
||||
: undefined,
|
||||
};
|
||||
if (typeof obj.delivered === "boolean") {
|
||||
entry.delivered = obj.delivered;
|
||||
}
|
||||
if (typeof obj.sessionId === "string" && obj.sessionId.trim().length > 0) {
|
||||
entry.sessionId = obj.sessionId;
|
||||
}
|
||||
|
||||
210
src/cron/service.persists-delivered-status.test.ts
Normal file
210
src/cron/service.persists-delivered-status.test.ts
Normal file
@@ -0,0 +1,210 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { CronService } from "./service.js";
|
||||
import {
|
||||
createStartedCronServiceWithFinishedBarrier,
|
||||
createCronStoreHarness,
|
||||
createNoopLogger,
|
||||
installCronTestHooks,
|
||||
} from "./service.test-harness.js";
|
||||
|
||||
const noopLogger = createNoopLogger();
|
||||
const { makeStorePath } = createCronStoreHarness();
|
||||
installCronTestHooks({ logger: noopLogger });
|
||||
|
||||
describe("CronService persists delivered status", () => {
|
||||
it("persists lastDelivered=true when isolated job reports delivered", async () => {
|
||||
const store = await makeStorePath();
|
||||
const finished = {
|
||||
resolvers: new Map<string, () => void>(),
|
||||
waitForOk(jobId: string) {
|
||||
return new Promise<void>((resolve) => {
|
||||
this.resolvers.set(jobId, resolve);
|
||||
});
|
||||
},
|
||||
};
|
||||
|
||||
const cron = new CronService({
|
||||
storePath: store.storePath,
|
||||
cronEnabled: true,
|
||||
log: noopLogger,
|
||||
enqueueSystemEvent: vi.fn(),
|
||||
requestHeartbeatNow: vi.fn(),
|
||||
runIsolatedAgentJob: vi.fn(async () => ({
|
||||
status: "ok" as const,
|
||||
summary: "done",
|
||||
delivered: true,
|
||||
})),
|
||||
onEvent: (evt) => {
|
||||
if (evt.action === "finished" && evt.status === "ok") {
|
||||
finished.resolvers.get(evt.jobId)?.();
|
||||
finished.resolvers.delete(evt.jobId);
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
await cron.start();
|
||||
const job = await cron.add({
|
||||
name: "delivered-true",
|
||||
enabled: true,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "isolated",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "agentTurn", message: "test" },
|
||||
delivery: { mode: "none" },
|
||||
});
|
||||
|
||||
vi.setSystemTime(new Date(job.state.nextRunAtMs! + 5));
|
||||
await vi.runOnlyPendingTimersAsync();
|
||||
await finished.waitForOk(job.id);
|
||||
|
||||
const jobs = await cron.list({ includeDisabled: true });
|
||||
const updated = jobs.find((j) => j.id === job.id);
|
||||
|
||||
expect(updated?.state.lastStatus).toBe("ok");
|
||||
expect(updated?.state.lastDelivered).toBe(true);
|
||||
|
||||
cron.stop();
|
||||
});
|
||||
|
||||
it("persists lastDelivered=undefined when isolated job does not deliver", async () => {
|
||||
const store = await makeStorePath();
|
||||
const finished = {
|
||||
resolvers: new Map<string, () => void>(),
|
||||
waitForOk(jobId: string) {
|
||||
return new Promise<void>((resolve) => {
|
||||
this.resolvers.set(jobId, resolve);
|
||||
});
|
||||
},
|
||||
};
|
||||
|
||||
const cron = new CronService({
|
||||
storePath: store.storePath,
|
||||
cronEnabled: true,
|
||||
log: noopLogger,
|
||||
enqueueSystemEvent: vi.fn(),
|
||||
requestHeartbeatNow: vi.fn(),
|
||||
runIsolatedAgentJob: vi.fn(async () => ({
|
||||
status: "ok" as const,
|
||||
summary: "done",
|
||||
})),
|
||||
onEvent: (evt) => {
|
||||
if (evt.action === "finished" && evt.status === "ok") {
|
||||
finished.resolvers.get(evt.jobId)?.();
|
||||
finished.resolvers.delete(evt.jobId);
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
await cron.start();
|
||||
const job = await cron.add({
|
||||
name: "no-delivery",
|
||||
enabled: true,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "isolated",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "agentTurn", message: "test" },
|
||||
delivery: { mode: "none" },
|
||||
});
|
||||
|
||||
vi.setSystemTime(new Date(job.state.nextRunAtMs! + 5));
|
||||
await vi.runOnlyPendingTimersAsync();
|
||||
await finished.waitForOk(job.id);
|
||||
|
||||
const jobs = await cron.list({ includeDisabled: true });
|
||||
const updated = jobs.find((j) => j.id === job.id);
|
||||
|
||||
expect(updated?.state.lastStatus).toBe("ok");
|
||||
expect(updated?.state.lastDelivered).toBeUndefined();
|
||||
|
||||
cron.stop();
|
||||
});
|
||||
|
||||
it("does not set lastDelivered for main session jobs", async () => {
|
||||
const store = await makeStorePath();
|
||||
const { cron, enqueueSystemEvent, finished } = createStartedCronServiceWithFinishedBarrier({
|
||||
storePath: store.storePath,
|
||||
logger: noopLogger,
|
||||
});
|
||||
|
||||
await cron.start();
|
||||
const job = await cron.add({
|
||||
name: "main-session",
|
||||
enabled: true,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "tick" },
|
||||
});
|
||||
|
||||
vi.setSystemTime(new Date(job.state.nextRunAtMs! + 5));
|
||||
await vi.runOnlyPendingTimersAsync();
|
||||
await finished.waitForOk(job.id);
|
||||
|
||||
const jobs = await cron.list({ includeDisabled: true });
|
||||
const updated = jobs.find((j) => j.id === job.id);
|
||||
|
||||
expect(updated?.state.lastStatus).toBe("ok");
|
||||
expect(updated?.state.lastDelivered).toBeUndefined();
|
||||
expect(enqueueSystemEvent).toHaveBeenCalled();
|
||||
|
||||
cron.stop();
|
||||
});
|
||||
|
||||
it("emits delivered in the finished event", async () => {
|
||||
const store = await makeStorePath();
|
||||
let capturedEvent: { jobId: string; delivered?: boolean } | undefined;
|
||||
const finished = {
|
||||
resolvers: new Map<string, () => void>(),
|
||||
waitForOk(jobId: string) {
|
||||
return new Promise<void>((resolve) => {
|
||||
this.resolvers.set(jobId, resolve);
|
||||
});
|
||||
},
|
||||
};
|
||||
|
||||
const cron = new CronService({
|
||||
storePath: store.storePath,
|
||||
cronEnabled: true,
|
||||
log: noopLogger,
|
||||
enqueueSystemEvent: vi.fn(),
|
||||
requestHeartbeatNow: vi.fn(),
|
||||
runIsolatedAgentJob: vi.fn(async () => ({
|
||||
status: "ok" as const,
|
||||
summary: "done",
|
||||
delivered: true,
|
||||
})),
|
||||
onEvent: (evt) => {
|
||||
if (evt.action === "finished") {
|
||||
capturedEvent = { jobId: evt.jobId, delivered: evt.delivered };
|
||||
if (evt.status === "ok") {
|
||||
finished.resolvers.get(evt.jobId)?.();
|
||||
finished.resolvers.delete(evt.jobId);
|
||||
}
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
await cron.start();
|
||||
const job = await cron.add({
|
||||
name: "event-test",
|
||||
enabled: true,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "isolated",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "agentTurn", message: "test" },
|
||||
delivery: { mode: "none" },
|
||||
});
|
||||
|
||||
vi.setSystemTime(new Date(job.state.nextRunAtMs! + 5));
|
||||
await vi.runOnlyPendingTimersAsync();
|
||||
await finished.waitForOk(job.id);
|
||||
|
||||
expect(capturedEvent).toBeDefined();
|
||||
expect(capturedEvent?.delivered).toBe(true);
|
||||
|
||||
// Flush pending store writes before stopping so the temp file is released
|
||||
// (prevents ENOTEMPTY on Windows when afterAll removes the fixture dir).
|
||||
await cron.list({ includeDisabled: true });
|
||||
cron.stop();
|
||||
});
|
||||
});
|
||||
@@ -18,6 +18,7 @@ export type CronEvent = {
|
||||
status?: CronRunStatus;
|
||||
error?: string;
|
||||
summary?: string;
|
||||
delivered?: boolean;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
nextRunAtMs?: number;
|
||||
|
||||
@@ -34,6 +34,7 @@ const DEFAULT_JOB_TIMEOUT_MS = 10 * 60_000; // 10 minutes
|
||||
type TimedCronRunOutcome = CronRunOutcome &
|
||||
CronRunTelemetry & {
|
||||
jobId: string;
|
||||
delivered?: boolean;
|
||||
startedAt: number;
|
||||
endedAt: number;
|
||||
};
|
||||
@@ -73,6 +74,7 @@ function applyJobResult(
|
||||
result: {
|
||||
status: CronRunStatus;
|
||||
error?: string;
|
||||
delivered?: boolean;
|
||||
startedAt: number;
|
||||
endedAt: number;
|
||||
},
|
||||
@@ -82,6 +84,7 @@ function applyJobResult(
|
||||
job.state.lastStatus = result.status;
|
||||
job.state.lastDurationMs = Math.max(0, result.endedAt - result.startedAt);
|
||||
job.state.lastError = result.error;
|
||||
job.state.lastDelivered = result.delivered;
|
||||
job.updatedAtMs = result.endedAt;
|
||||
|
||||
// Track consecutive errors for backoff / auto-disable.
|
||||
@@ -336,6 +339,7 @@ export async function onTimer(state: CronServiceState) {
|
||||
const shouldDelete = applyJobResult(state, job, {
|
||||
status: result.status,
|
||||
error: result.error,
|
||||
delivered: result.delivered,
|
||||
startedAt: result.startedAt,
|
||||
endedAt: result.endedAt,
|
||||
});
|
||||
@@ -486,7 +490,7 @@ export async function runDueJobs(state: CronServiceState) {
|
||||
async function executeJobCore(
|
||||
state: CronServiceState,
|
||||
job: CronJob,
|
||||
): Promise<CronRunOutcome & CronRunTelemetry> {
|
||||
): Promise<CronRunOutcome & CronRunTelemetry & { delivered?: boolean }> {
|
||||
if (job.sessionTarget === "main") {
|
||||
const text = resolveJobPayloadTextForMain(job);
|
||||
if (!text) {
|
||||
@@ -591,6 +595,7 @@ async function executeJobCore(
|
||||
status: res.status,
|
||||
error: res.error,
|
||||
summary: res.summary,
|
||||
delivered: res.delivered,
|
||||
sessionId: res.sessionId,
|
||||
sessionKey: res.sessionKey,
|
||||
model: res.model,
|
||||
@@ -619,6 +624,7 @@ export async function executeJob(
|
||||
|
||||
let coreResult: {
|
||||
status: CronRunStatus;
|
||||
delivered?: boolean;
|
||||
} & CronRunOutcome &
|
||||
CronRunTelemetry;
|
||||
try {
|
||||
@@ -631,6 +637,7 @@ export async function executeJob(
|
||||
const shouldDelete = applyJobResult(state, job, {
|
||||
status: coreResult.status,
|
||||
error: coreResult.error,
|
||||
delivered: coreResult.delivered,
|
||||
startedAt,
|
||||
endedAt,
|
||||
});
|
||||
@@ -648,6 +655,7 @@ function emitJobFinished(
|
||||
job: CronJob,
|
||||
result: {
|
||||
status: CronRunStatus;
|
||||
delivered?: boolean;
|
||||
} & CronRunOutcome &
|
||||
CronRunTelemetry,
|
||||
runAtMs: number,
|
||||
@@ -658,6 +666,7 @@ function emitJobFinished(
|
||||
status: result.status,
|
||||
error: result.error,
|
||||
summary: result.summary,
|
||||
delivered: result.delivered,
|
||||
sessionId: result.sessionId,
|
||||
sessionKey: result.sessionKey,
|
||||
runAtMs,
|
||||
|
||||
@@ -93,6 +93,8 @@ export type CronJobState = {
|
||||
consecutiveErrors?: number;
|
||||
/** Number of consecutive schedule computation errors. Auto-disables job after threshold. */
|
||||
scheduleErrorCount?: number;
|
||||
/** Whether the last run's output was delivered to the target channel. */
|
||||
lastDelivered?: boolean;
|
||||
};
|
||||
|
||||
export type CronJob = {
|
||||
|
||||
Reference in New Issue
Block a user