gateway: add cron finished-run webhook (#14535)

* gateway: add cron finished webhook delivery

* config: allow cron webhook in runtime schema

* cron: require notify flag for webhook posts

* ui/docs: add cron notify toggle and webhook docs

* fix: harden cron webhook auth and fill notify coverage (#14535) (thanks @advaitpaliwal)

---------

Co-authored-by: Tyler Yust <TYTYYUST@YAHOO.COM>
This commit is contained in:
Advait Paliwal
2026-02-15 16:14:17 -08:00
committed by GitHub
parent ab000bc411
commit 115cfb4430
25 changed files with 519 additions and 4 deletions

View File

@@ -219,7 +219,8 @@ JOB SCHEMA (for add action):
"payload": { ... }, // Required: what to execute
"delivery": { ... }, // Optional: announce summary (isolated only)
"sessionTarget": "main" | "isolated", // Required
"enabled": true | false // Optional, default true
"enabled": true | false, // Optional, default true
"notify": true | false // Optional webhook opt-in; set true for user-facing reminders
}
SCHEDULE TYPES (schedule.kind):
@@ -246,6 +247,7 @@ DELIVERY (isolated-only, top-level):
CRITICAL CONSTRAINTS:
- sessionTarget="main" REQUIRES payload.kind="systemEvent"
- sessionTarget="isolated" REQUIRES payload.kind="agentTurn"
- For reminders users should be notified about, set notify=true.
Default: prefer isolated agentTurn jobs unless the user explicitly wants a main-session system event.
WAKE MODES (for wake action):
@@ -292,6 +294,7 @@ Use jobId as the canonical identifier; id is accepted for compatibility. Use con
"payload",
"delivery",
"enabled",
"notify",
"description",
"deleteAfterRun",
"agentId",

View File

@@ -0,0 +1,26 @@
import { describe, expect, it } from "vitest";
import { OpenClawSchema } from "./zod-schema.js";
describe("cron webhook schema", () => {
it("accepts cron.webhook and cron.webhookToken", () => {
const res = OpenClawSchema.safeParse({
cron: {
enabled: true,
webhook: "https://example.invalid/cron",
webhookToken: "secret-token",
},
});
expect(res.success).toBe(true);
});
it("rejects non-http(s) cron.webhook URLs", () => {
const res = OpenClawSchema.safeParse({
cron: {
webhook: "ftp://example.invalid/cron",
},
});
expect(res.success).toBe(false);
});
});

View File

@@ -2,6 +2,8 @@ export type CronConfig = {
enabled?: boolean;
store?: string;
maxConcurrentRuns?: number;
webhook?: string;
webhookToken?: string;
/**
* How long to retain completed cron run sessions before automatic pruning.
* Accepts a duration string (e.g. "24h", "7d", "1h30m") or `false` to disable pruning.

View File

@@ -93,6 +93,14 @@ const MemorySchema = z
.strict()
.optional();
const HttpUrlSchema = z
.string()
.url()
.refine((value) => {
const protocol = new URL(value).protocol;
return protocol === "http:" || protocol === "https:";
}, "Expected http:// or https:// URL");
export const OpenClawSchema = z
.object({
$schema: z.string().optional(),
@@ -295,6 +303,8 @@ export const OpenClawSchema = z
enabled: z.boolean().optional(),
store: z.string().optional(),
maxConcurrentRuns: z.number().int().positive().optional(),
webhook: HttpUrlSchema.optional(),
webhookToken: z.string().optional().register(sensitive),
sessionRetention: z.union([z.string(), z.literal(false)]).optional(),
})
.strict()

View File

@@ -0,0 +1,87 @@
import { describe, expect, it, vi } from "vitest";
import { CronService } from "./service.js";
import {
createCronStoreHarness,
createNoopLogger,
installCronTestHooks,
} from "./service.test-harness.js";
const logger = createNoopLogger();
const { makeStorePath } = createCronStoreHarness({ prefix: "openclaw-cron-get-job-" });
installCronTestHooks({ logger });
function createCronService(storePath: string) {
return new CronService({
storePath,
cronEnabled: true,
log: logger,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
});
}
describe("CronService.getJob", () => {
it("returns added jobs and undefined for missing ids", async () => {
const { storePath } = await makeStorePath();
const cron = createCronService(storePath);
await cron.start();
try {
const added = await cron.add({
name: "lookup-test",
enabled: true,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "ping" },
});
expect(cron.getJob(added.id)?.id).toBe(added.id);
expect(cron.getJob("missing-job-id")).toBeUndefined();
} finally {
cron.stop();
}
});
it("preserves notify on create for true, false, and omitted", async () => {
const { storePath } = await makeStorePath();
const cron = createCronService(storePath);
await cron.start();
try {
const notifyTrue = await cron.add({
name: "notify-true",
enabled: true,
notify: true,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "ping" },
});
const notifyFalse = await cron.add({
name: "notify-false",
enabled: true,
notify: false,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "ping" },
});
const notifyOmitted = await cron.add({
name: "notify-omitted",
enabled: true,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "ping" },
});
expect(cron.getJob(notifyTrue.id)?.notify).toBe(true);
expect(cron.getJob(notifyFalse.id)?.notify).toBe(false);
expect(cron.getJob(notifyOmitted.id)?.notify).toBeUndefined();
} finally {
cron.stop();
}
});
});

View File

@@ -100,4 +100,24 @@ describe("applyJobPatch", () => {
bestEffort: undefined,
});
});
it("updates notify via patch", () => {
const now = Date.now();
const job: CronJob = {
id: "job-4",
name: "job-4",
enabled: true,
notify: false,
createdAtMs: now,
updatedAtMs: now,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "isolated",
wakeMode: "now",
payload: { kind: "agentTurn", message: "do it" },
state: {},
};
expect(() => applyJobPatch(job, { notify: true })).not.toThrow();
expect(job.notify).toBe(true);
});
});

View File

@@ -1,4 +1,4 @@
import type { CronJobCreate, CronJobPatch } from "./types.js";
import type { CronJob, CronJobCreate, CronJobPatch } from "./types.js";
import * as ops from "./service/ops.js";
import { type CronServiceDeps, createCronServiceState } from "./service/state.js";
@@ -42,6 +42,10 @@ export class CronService {
return await ops.run(this.state, id, mode);
}
getJob(id: string): CronJob | undefined {
return this.state.store?.jobs.find((job) => job.id === id);
}
wake(opts: { mode: "now" | "next-heartbeat"; text: string }) {
return ops.wakeNow(this.state, opts);
}

View File

@@ -256,6 +256,7 @@ export function createJob(state: CronServiceState, input: CronJobCreate): CronJo
name: normalizeRequiredName(input.name),
description: normalizeOptionalText(input.description),
enabled,
notify: typeof input.notify === "boolean" ? input.notify : undefined,
deleteAfterRun,
createdAtMs: now,
updatedAtMs: now,
@@ -284,6 +285,9 @@ export function applyJobPatch(job: CronJob, patch: CronJobPatch) {
if (typeof patch.enabled === "boolean") {
job.enabled = patch.enabled;
}
if (typeof patch.notify === "boolean") {
job.notify = patch.notify;
}
if (typeof patch.deleteAfterRun === "boolean") {
job.deleteAfterRun = patch.deleteAfterRun;
}

View File

@@ -71,6 +71,7 @@ export type CronJob = {
name: string;
description?: string;
enabled: boolean;
notify?: boolean;
deleteAfterRun?: boolean;
createdAtMs: number;
updatedAtMs: number;

View File

@@ -119,6 +119,7 @@ export const CronJobSchema = Type.Object(
name: NonEmptyString,
description: Type.Optional(Type.String()),
enabled: Type.Boolean(),
notify: Type.Optional(Type.Boolean()),
deleteAfterRun: Type.Optional(Type.Boolean()),
createdAtMs: Type.Integer({ minimum: 0 }),
updatedAtMs: Type.Integer({ minimum: 0 }),
@@ -147,6 +148,7 @@ export const CronAddParamsSchema = Type.Object(
agentId: Type.Optional(Type.Union([NonEmptyString, Type.Null()])),
description: Type.Optional(Type.String()),
enabled: Type.Optional(Type.Boolean()),
notify: Type.Optional(Type.Boolean()),
deleteAfterRun: Type.Optional(Type.Boolean()),
schedule: CronScheduleSchema,
sessionTarget: Type.Union([Type.Literal("main"), Type.Literal("isolated")]),
@@ -163,6 +165,7 @@ export const CronJobPatchSchema = Type.Object(
agentId: Type.Optional(Type.Union([NonEmptyString, Type.Null()])),
description: Type.Optional(Type.String()),
enabled: Type.Optional(Type.Boolean()),
notify: Type.Optional(Type.Boolean()),
deleteAfterRun: Type.Optional(Type.Boolean()),
schedule: Type.Optional(CronScheduleSchema),
sessionTarget: Type.Optional(Type.Union([Type.Literal("main"), Type.Literal("isolated")])),

View File

@@ -20,6 +20,17 @@ export type GatewayCronState = {
cronEnabled: boolean;
};
const CRON_WEBHOOK_TIMEOUT_MS = 10_000;
function redactWebhookUrl(url: string): string {
try {
const parsed = new URL(url);
return `${parsed.origin}${parsed.pathname}`;
} catch {
return "<invalid-webhook-url>";
}
}
export function buildGatewayCronService(params: {
cfg: ReturnType<typeof loadConfig>;
deps: CliDeps;
@@ -93,6 +104,40 @@ export function buildGatewayCronService(params: {
onEvent: (evt) => {
params.broadcast("cron", evt, { dropIfSlow: true });
if (evt.action === "finished") {
const webhookUrl = params.cfg.cron?.webhook?.trim();
const webhookToken = params.cfg.cron?.webhookToken?.trim();
const job = cron.getJob(evt.jobId);
if (webhookUrl && evt.summary && job?.notify === true) {
const headers: Record<string, string> = {
"Content-Type": "application/json",
};
if (webhookToken) {
headers.Authorization = `Bearer ${webhookToken}`;
}
const abortController = new AbortController();
const timeout = setTimeout(() => {
abortController.abort();
}, CRON_WEBHOOK_TIMEOUT_MS);
void fetch(webhookUrl, {
method: "POST",
headers,
body: JSON.stringify(evt),
signal: abortController.signal,
})
.catch((err) => {
cronLogger.warn(
{
err: String(err),
jobId: evt.jobId,
webhookUrl: redactWebhookUrl(webhookUrl),
},
"cron: webhook delivery failed",
);
})
.finally(() => {
clearTimeout(timeout);
});
}
const logPath = resolveCronRunLogPath({
storePath,
jobId: evt.jobId,

View File

@@ -1,9 +1,10 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, test } from "vitest";
import { describe, expect, test, vi } from "vitest";
import {
connectOk,
cronIsolatedRun,
installGatewayTestHooks,
rpcReq,
startServerWithClient,
@@ -50,6 +51,20 @@ async function waitForNonEmptyFile(pathname: string, timeoutMs = 2000) {
}
}
async function waitForCondition(check: () => boolean, timeoutMs = 2000) {
const startedAt = process.hrtime.bigint();
for (;;) {
if (check()) {
return;
}
const elapsedMs = Number(process.hrtime.bigint() - startedAt) / 1e6;
if (elapsedMs >= timeoutMs) {
throw new Error("timeout waiting for condition");
}
await yieldToEventLoop();
}
}
describe("gateway server cron", () => {
test("handles cron CRUD, normalization, and patch semantics", { timeout: 120_000 }, async () => {
const prevSkipCron = process.env.OPENCLAW_SKIP_CRON;
@@ -68,6 +83,7 @@ describe("gateway server cron", () => {
const addRes = await rpcReq(ws, "cron.add", {
name: "daily",
enabled: true,
notify: true,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "main",
wakeMode: "next-heartbeat",
@@ -84,6 +100,9 @@ describe("gateway server cron", () => {
expect(Array.isArray(jobs)).toBe(true);
expect((jobs as unknown[]).length).toBe(1);
expect(((jobs as Array<{ name?: unknown }>)[0]?.name as string) ?? "").toBe("daily");
expect(
((jobs as Array<{ notify?: unknown }>)[0]?.notify as boolean | undefined) ?? false,
).toBe(true);
const routeAtMs = Date.now() - 1;
const routeRes = await rpcReq(ws, "cron.add", {
@@ -403,4 +422,132 @@ describe("gateway server cron", () => {
}
}
}, 45_000);
test("posts webhooks only when notify is true and summary exists", async () => {
const prevSkipCron = process.env.OPENCLAW_SKIP_CRON;
process.env.OPENCLAW_SKIP_CRON = "0";
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-cron-webhook-"));
testState.cronStorePath = path.join(dir, "cron", "jobs.json");
testState.cronEnabled = false;
await fs.mkdir(path.dirname(testState.cronStorePath), { recursive: true });
await fs.writeFile(testState.cronStorePath, JSON.stringify({ version: 1, jobs: [] }));
const configPath = process.env.OPENCLAW_CONFIG_PATH;
expect(typeof configPath).toBe("string");
await fs.mkdir(path.dirname(configPath as string), { recursive: true });
await fs.writeFile(
configPath as string,
JSON.stringify(
{
cron: {
webhook: "https://example.invalid/cron-finished",
webhookToken: "cron-webhook-token",
},
},
null,
2,
),
"utf-8",
);
const fetchMock = vi.fn(async () => new Response("ok", { status: 200 }));
vi.stubGlobal("fetch", fetchMock);
const { server, ws } = await startServerWithClient();
await connectOk(ws);
try {
const notifyRes = await rpcReq(ws, "cron.add", {
name: "notify true",
enabled: true,
notify: true,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "send webhook" },
});
expect(notifyRes.ok).toBe(true);
const notifyJobIdValue = (notifyRes.payload as { id?: unknown } | null)?.id;
const notifyJobId = typeof notifyJobIdValue === "string" ? notifyJobIdValue : "";
expect(notifyJobId.length > 0).toBe(true);
const notifyRunRes = await rpcReq(ws, "cron.run", { id: notifyJobId, mode: "force" }, 20_000);
expect(notifyRunRes.ok).toBe(true);
await waitForCondition(() => fetchMock.mock.calls.length === 1, 5000);
const [notifyUrl, notifyInit] = fetchMock.mock.calls[0] as [
string,
{
method?: string;
headers?: Record<string, string>;
body?: string;
},
];
expect(notifyUrl).toBe("https://example.invalid/cron-finished");
expect(notifyInit.method).toBe("POST");
expect(notifyInit.headers?.Authorization).toBe("Bearer cron-webhook-token");
expect(notifyInit.headers?.["Content-Type"]).toBe("application/json");
const notifyBody = JSON.parse(notifyInit.body ?? "{}");
expect(notifyBody.action).toBe("finished");
expect(notifyBody.jobId).toBe(notifyJobId);
const silentRes = await rpcReq(ws, "cron.add", {
name: "notify false",
enabled: true,
notify: false,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "do not send" },
});
expect(silentRes.ok).toBe(true);
const silentJobIdValue = (silentRes.payload as { id?: unknown } | null)?.id;
const silentJobId = typeof silentJobIdValue === "string" ? silentJobIdValue : "";
expect(silentJobId.length > 0).toBe(true);
const silentRunRes = await rpcReq(ws, "cron.run", { id: silentJobId, mode: "force" }, 20_000);
expect(silentRunRes.ok).toBe(true);
await yieldToEventLoop();
await yieldToEventLoop();
expect(fetchMock).toHaveBeenCalledTimes(1);
cronIsolatedRun.mockResolvedValueOnce({ status: "ok" });
const noSummaryRes = await rpcReq(ws, "cron.add", {
name: "notify no summary",
enabled: true,
notify: true,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: { kind: "agentTurn", message: "test" },
});
expect(noSummaryRes.ok).toBe(true);
const noSummaryJobIdValue = (noSummaryRes.payload as { id?: unknown } | null)?.id;
const noSummaryJobId = typeof noSummaryJobIdValue === "string" ? noSummaryJobIdValue : "";
expect(noSummaryJobId.length > 0).toBe(true);
const noSummaryRunRes = await rpcReq(
ws,
"cron.run",
{ id: noSummaryJobId, mode: "force" },
20_000,
);
expect(noSummaryRunRes.ok).toBe(true);
await yieldToEventLoop();
await yieldToEventLoop();
expect(fetchMock).toHaveBeenCalledTimes(1);
} finally {
ws.close();
await server.close();
await rmTempDir(dir);
vi.unstubAllGlobals();
testState.cronStorePath = undefined;
testState.cronEnabled = undefined;
if (prevSkipCron === undefined) {
delete process.env.OPENCLAW_SKIP_CRON;
} else {
process.env.OPENCLAW_SKIP_CRON = prevSkipCron;
}
}
}, 60_000);
});