mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-08 05:01:23 +00:00
cron: separate webhook POST delivery from announce (#17901)
* cron: split webhook delivery from announce mode * cron: validate webhook delivery target * cron: remove legacy webhook fallback config * fix: finalize cron webhook delivery prep (#17901) (thanks @advaitpaliwal) --------- Co-authored-by: Tyler Yust <TYTYYUST@YAHOO.COM>
This commit is contained in:
@@ -67,24 +67,51 @@ export const CronPayloadPatchSchema = Type.Union([
|
||||
cronAgentTurnPayloadSchema({ message: Type.Optional(NonEmptyString) }),
|
||||
]);
|
||||
|
||||
const CronDeliveryBaseProperties = {
|
||||
const CronDeliverySharedProperties = {
|
||||
channel: Type.Optional(Type.Union([Type.Literal("last"), NonEmptyString])),
|
||||
to: Type.Optional(Type.String()),
|
||||
bestEffort: Type.Optional(Type.Boolean()),
|
||||
};
|
||||
|
||||
export const CronDeliverySchema = Type.Object(
|
||||
const CronDeliveryNoopSchema = Type.Object(
|
||||
{
|
||||
mode: Type.Union([Type.Literal("none"), Type.Literal("announce")]),
|
||||
...CronDeliveryBaseProperties,
|
||||
mode: Type.Literal("none"),
|
||||
...CronDeliverySharedProperties,
|
||||
to: Type.Optional(Type.String()),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
const CronDeliveryAnnounceSchema = Type.Object(
|
||||
{
|
||||
mode: Type.Literal("announce"),
|
||||
...CronDeliverySharedProperties,
|
||||
to: Type.Optional(Type.String()),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
const CronDeliveryWebhookSchema = Type.Object(
|
||||
{
|
||||
mode: Type.Literal("webhook"),
|
||||
...CronDeliverySharedProperties,
|
||||
to: NonEmptyString,
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const CronDeliverySchema = Type.Union([
|
||||
CronDeliveryNoopSchema,
|
||||
CronDeliveryAnnounceSchema,
|
||||
CronDeliveryWebhookSchema,
|
||||
]);
|
||||
|
||||
export const CronDeliveryPatchSchema = Type.Object(
|
||||
{
|
||||
mode: Type.Optional(Type.Union([Type.Literal("none"), Type.Literal("announce")])),
|
||||
...CronDeliveryBaseProperties,
|
||||
mode: Type.Optional(
|
||||
Type.Union([Type.Literal("none"), Type.Literal("announce"), Type.Literal("webhook")]),
|
||||
),
|
||||
...CronDeliverySharedProperties,
|
||||
to: Type.Optional(Type.String()),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
@@ -111,7 +138,6 @@ export const CronJobSchema = Type.Object(
|
||||
name: NonEmptyString,
|
||||
description: Type.Optional(Type.String()),
|
||||
enabled: Type.Boolean(),
|
||||
notify: Type.Optional(Type.Boolean()),
|
||||
deleteAfterRun: Type.Optional(Type.Boolean()),
|
||||
createdAtMs: Type.Integer({ minimum: 0 }),
|
||||
updatedAtMs: Type.Integer({ minimum: 0 }),
|
||||
@@ -140,7 +166,6 @@ export const CronAddParamsSchema = Type.Object(
|
||||
agentId: Type.Optional(Type.Union([NonEmptyString, Type.Null()])),
|
||||
description: Type.Optional(Type.String()),
|
||||
enabled: Type.Optional(Type.Boolean()),
|
||||
notify: Type.Optional(Type.Boolean()),
|
||||
deleteAfterRun: Type.Optional(Type.Boolean()),
|
||||
schedule: CronScheduleSchema,
|
||||
sessionTarget: Type.Union([Type.Literal("main"), Type.Literal("isolated")]),
|
||||
@@ -157,7 +182,6 @@ export const CronJobPatchSchema = Type.Object(
|
||||
agentId: Type.Optional(Type.Union([NonEmptyString, Type.Null()])),
|
||||
description: Type.Optional(Type.String()),
|
||||
enabled: Type.Optional(Type.Boolean()),
|
||||
notify: Type.Optional(Type.Boolean()),
|
||||
deleteAfterRun: Type.Optional(Type.Boolean()),
|
||||
schedule: Type.Optional(CronScheduleSchema),
|
||||
sessionTarget: Type.Optional(Type.Union([Type.Literal("main"), Type.Literal("isolated")])),
|
||||
|
||||
@@ -7,6 +7,7 @@ import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js";
|
||||
import { appendCronRunLog, resolveCronRunLogPath } from "../cron/run-log.js";
|
||||
import { CronService } from "../cron/service.js";
|
||||
import { resolveCronStorePath } from "../cron/store.js";
|
||||
import { normalizeHttpWebhookUrl } from "../cron/webhook-url.js";
|
||||
import { runHeartbeatOnce } from "../infra/heartbeat-runner.js";
|
||||
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
|
||||
import { enqueueSystemEvent } from "../infra/system-events.js";
|
||||
@@ -31,6 +32,32 @@ function redactWebhookUrl(url: string): string {
|
||||
}
|
||||
}
|
||||
|
||||
type CronWebhookTarget = {
|
||||
url: string;
|
||||
source: "delivery" | "legacy";
|
||||
};
|
||||
|
||||
function resolveCronWebhookTarget(params: {
|
||||
delivery?: { mode?: string; to?: string };
|
||||
legacyNotify?: boolean;
|
||||
legacyWebhook?: string;
|
||||
}): CronWebhookTarget | null {
|
||||
const mode = params.delivery?.mode?.trim().toLowerCase();
|
||||
if (mode === "webhook") {
|
||||
const url = normalizeHttpWebhookUrl(params.delivery?.to);
|
||||
return url ? { url, source: "delivery" } : null;
|
||||
}
|
||||
|
||||
if (params.legacyNotify) {
|
||||
const legacyUrl = normalizeHttpWebhookUrl(params.legacyWebhook);
|
||||
if (legacyUrl) {
|
||||
return { url: legacyUrl, source: "legacy" };
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
export function buildGatewayCronService(params: {
|
||||
cfg: ReturnType<typeof loadConfig>;
|
||||
deps: CliDeps;
|
||||
@@ -61,6 +88,7 @@ export function buildGatewayCronService(params: {
|
||||
agentId: agentId ?? defaultAgentId,
|
||||
});
|
||||
const sessionStorePath = resolveSessionStorePath(defaultAgentId);
|
||||
const warnedLegacyWebhookJobs = new Set<string>();
|
||||
|
||||
const cron = new CronService({
|
||||
storePath,
|
||||
@@ -104,10 +132,41 @@ export function buildGatewayCronService(params: {
|
||||
onEvent: (evt) => {
|
||||
params.broadcast("cron", evt, { dropIfSlow: true });
|
||||
if (evt.action === "finished") {
|
||||
const webhookUrl = params.cfg.cron?.webhook?.trim();
|
||||
const webhookToken = params.cfg.cron?.webhookToken?.trim();
|
||||
const legacyWebhook = params.cfg.cron?.webhook?.trim();
|
||||
const job = cron.getJob(evt.jobId);
|
||||
if (webhookUrl && evt.summary && job?.notify === true) {
|
||||
const legacyNotify = (job as { notify?: unknown } | undefined)?.notify === true;
|
||||
const webhookTarget = resolveCronWebhookTarget({
|
||||
delivery:
|
||||
job?.delivery && typeof job.delivery.mode === "string"
|
||||
? { mode: job.delivery.mode, to: job.delivery.to }
|
||||
: undefined,
|
||||
legacyNotify,
|
||||
legacyWebhook,
|
||||
});
|
||||
|
||||
if (!webhookTarget && job?.delivery?.mode === "webhook") {
|
||||
cronLogger.warn(
|
||||
{
|
||||
jobId: evt.jobId,
|
||||
deliveryTo: job.delivery.to,
|
||||
},
|
||||
"cron: skipped webhook delivery, delivery.to must be a valid http(s) URL",
|
||||
);
|
||||
}
|
||||
|
||||
if (webhookTarget?.source === "legacy" && !warnedLegacyWebhookJobs.has(evt.jobId)) {
|
||||
warnedLegacyWebhookJobs.add(evt.jobId);
|
||||
cronLogger.warn(
|
||||
{
|
||||
jobId: evt.jobId,
|
||||
legacyWebhook: redactWebhookUrl(webhookTarget.url),
|
||||
},
|
||||
"cron: deprecated notify+cron.webhook fallback in use, migrate to delivery.mode=webhook with delivery.to",
|
||||
);
|
||||
}
|
||||
|
||||
if (webhookTarget && evt.summary) {
|
||||
const headers: Record<string, string> = {
|
||||
"Content-Type": "application/json",
|
||||
};
|
||||
@@ -118,7 +177,7 @@ export function buildGatewayCronService(params: {
|
||||
const timeout = setTimeout(() => {
|
||||
abortController.abort();
|
||||
}, CRON_WEBHOOK_TIMEOUT_MS);
|
||||
void fetch(webhookUrl, {
|
||||
void fetch(webhookTarget.url, {
|
||||
method: "POST",
|
||||
headers,
|
||||
body: JSON.stringify(evt),
|
||||
@@ -129,7 +188,7 @@ export function buildGatewayCronService(params: {
|
||||
{
|
||||
err: String(err),
|
||||
jobId: evt.jobId,
|
||||
webhookUrl: redactWebhookUrl(webhookUrl),
|
||||
webhookUrl: redactWebhookUrl(webhookTarget.url),
|
||||
},
|
||||
"cron: webhook delivery failed",
|
||||
);
|
||||
|
||||
@@ -83,11 +83,11 @@ describe("gateway server cron", () => {
|
||||
const addRes = await rpcReq(ws, "cron.add", {
|
||||
name: "daily",
|
||||
enabled: true,
|
||||
notify: true,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "hello" },
|
||||
delivery: { mode: "webhook", to: "https://example.invalid/cron-finished" },
|
||||
});
|
||||
expect(addRes.ok).toBe(true);
|
||||
expect(typeof (addRes.payload as { id?: unknown } | null)?.id).toBe("string");
|
||||
@@ -101,8 +101,8 @@ describe("gateway server cron", () => {
|
||||
expect((jobs as unknown[]).length).toBe(1);
|
||||
expect(((jobs as Array<{ name?: unknown }>)[0]?.name as string) ?? "").toBe("daily");
|
||||
expect(
|
||||
((jobs as Array<{ notify?: unknown }>)[0]?.notify as boolean | undefined) ?? false,
|
||||
).toBe(true);
|
||||
((jobs as Array<{ delivery?: { mode?: unknown } }>)[0]?.delivery?.mode as string) ?? "",
|
||||
).toBe("webhook");
|
||||
|
||||
const routeAtMs = Date.now() - 1;
|
||||
const routeRes = await rpcReq(ws, "cron.add", {
|
||||
@@ -423,14 +423,31 @@ describe("gateway server cron", () => {
|
||||
}
|
||||
}, 45_000);
|
||||
|
||||
test("posts webhooks only when notify is true and summary exists", async () => {
|
||||
test("posts webhooks for delivery mode and legacy notify fallback only when summary exists", async () => {
|
||||
const prevSkipCron = process.env.OPENCLAW_SKIP_CRON;
|
||||
process.env.OPENCLAW_SKIP_CRON = "0";
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-cron-webhook-"));
|
||||
testState.cronStorePath = path.join(dir, "cron", "jobs.json");
|
||||
testState.cronEnabled = false;
|
||||
await fs.mkdir(path.dirname(testState.cronStorePath), { recursive: true });
|
||||
await fs.writeFile(testState.cronStorePath, JSON.stringify({ version: 1, jobs: [] }));
|
||||
|
||||
const legacyNotifyJob = {
|
||||
id: "legacy-notify-job",
|
||||
name: "legacy notify job",
|
||||
enabled: true,
|
||||
notify: true,
|
||||
createdAtMs: Date.now(),
|
||||
updatedAtMs: Date.now(),
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "legacy webhook" },
|
||||
state: {},
|
||||
};
|
||||
await fs.writeFile(
|
||||
testState.cronStorePath,
|
||||
JSON.stringify({ version: 1, jobs: [legacyNotifyJob] }),
|
||||
);
|
||||
|
||||
const configPath = process.env.OPENCLAW_CONFIG_PATH;
|
||||
expect(typeof configPath).toBe("string");
|
||||
@@ -440,7 +457,7 @@ describe("gateway server cron", () => {
|
||||
JSON.stringify(
|
||||
{
|
||||
cron: {
|
||||
webhook: "https://example.invalid/cron-finished",
|
||||
webhook: "https://legacy.example.invalid/cron-finished",
|
||||
webhookToken: "cron-webhook-token",
|
||||
},
|
||||
},
|
||||
@@ -457,14 +474,25 @@ describe("gateway server cron", () => {
|
||||
await connectOk(ws);
|
||||
|
||||
try {
|
||||
const notifyRes = await rpcReq(ws, "cron.add", {
|
||||
name: "notify true",
|
||||
const invalidWebhookRes = await rpcReq(ws, "cron.add", {
|
||||
name: "invalid webhook",
|
||||
enabled: true,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "invalid" },
|
||||
delivery: { mode: "webhook", to: "ftp://example.invalid/cron-finished" },
|
||||
});
|
||||
expect(invalidWebhookRes.ok).toBe(false);
|
||||
|
||||
const notifyRes = await rpcReq(ws, "cron.add", {
|
||||
name: "webhook enabled",
|
||||
enabled: true,
|
||||
notify: true,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "send webhook" },
|
||||
delivery: { mode: "webhook", to: "https://example.invalid/cron-finished" },
|
||||
});
|
||||
expect(notifyRes.ok).toBe(true);
|
||||
const notifyJobIdValue = (notifyRes.payload as { id?: unknown } | null)?.id;
|
||||
@@ -491,10 +519,32 @@ describe("gateway server cron", () => {
|
||||
expect(notifyBody.action).toBe("finished");
|
||||
expect(notifyBody.jobId).toBe(notifyJobId);
|
||||
|
||||
const legacyRunRes = await rpcReq(
|
||||
ws,
|
||||
"cron.run",
|
||||
{ id: "legacy-notify-job", mode: "force" },
|
||||
20_000,
|
||||
);
|
||||
expect(legacyRunRes.ok).toBe(true);
|
||||
await waitForCondition(() => fetchMock.mock.calls.length === 2, 5000);
|
||||
const [legacyUrl, legacyInit] = fetchMock.mock.calls[1] as [
|
||||
string,
|
||||
{
|
||||
method?: string;
|
||||
headers?: Record<string, string>;
|
||||
body?: string;
|
||||
},
|
||||
];
|
||||
expect(legacyUrl).toBe("https://legacy.example.invalid/cron-finished");
|
||||
expect(legacyInit.method).toBe("POST");
|
||||
expect(legacyInit.headers?.Authorization).toBe("Bearer cron-webhook-token");
|
||||
const legacyBody = JSON.parse(legacyInit.body ?? "{}");
|
||||
expect(legacyBody.action).toBe("finished");
|
||||
expect(legacyBody.jobId).toBe("legacy-notify-job");
|
||||
|
||||
const silentRes = await rpcReq(ws, "cron.add", {
|
||||
name: "notify false",
|
||||
name: "webhook disabled",
|
||||
enabled: true,
|
||||
notify: false,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
@@ -509,17 +559,17 @@ describe("gateway server cron", () => {
|
||||
expect(silentRunRes.ok).toBe(true);
|
||||
await yieldToEventLoop();
|
||||
await yieldToEventLoop();
|
||||
expect(fetchMock).toHaveBeenCalledTimes(1);
|
||||
expect(fetchMock).toHaveBeenCalledTimes(2);
|
||||
|
||||
cronIsolatedRun.mockResolvedValueOnce({ status: "ok" });
|
||||
const noSummaryRes = await rpcReq(ws, "cron.add", {
|
||||
name: "notify no summary",
|
||||
name: "webhook no summary",
|
||||
enabled: true,
|
||||
notify: true,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "isolated",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "agentTurn", message: "test" },
|
||||
delivery: { mode: "webhook", to: "https://example.invalid/cron-finished" },
|
||||
});
|
||||
expect(noSummaryRes.ok).toBe(true);
|
||||
const noSummaryJobIdValue = (noSummaryRes.payload as { id?: unknown } | null)?.id;
|
||||
@@ -535,7 +585,7 @@ describe("gateway server cron", () => {
|
||||
expect(noSummaryRunRes.ok).toBe(true);
|
||||
await yieldToEventLoop();
|
||||
await yieldToEventLoop();
|
||||
expect(fetchMock).toHaveBeenCalledTimes(1);
|
||||
expect(fetchMock).toHaveBeenCalledTimes(2);
|
||||
} finally {
|
||||
ws.close();
|
||||
await server.close();
|
||||
|
||||
Reference in New Issue
Block a user