Files
openclaw/src/gateway/server-cron.ts
Advait Paliwal 115cfb4430 gateway: add cron finished-run webhook (#14535)
* gateway: add cron finished webhook delivery

* config: allow cron webhook in runtime schema

* cron: require notify flag for webhook posts

* ui/docs: add cron notify toggle and webhook docs

* fix: harden cron webhook auth and fill notify coverage (#14535) (thanks @advaitpaliwal)

---------

Co-authored-by: Tyler Yust <TYTYYUST@YAHOO.COM>
2026-02-15 16:14:17 -08:00

166 lines
5.8 KiB
TypeScript

import type { CliDeps } from "../cli/deps.js";
import { resolveDefaultAgentId } from "../agents/agent-scope.js";
import { loadConfig } from "../config/config.js";
import { resolveAgentMainSessionKey } from "../config/sessions.js";
import { resolveStorePath } from "../config/sessions/paths.js";
import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js";
import { appendCronRunLog, resolveCronRunLogPath } from "../cron/run-log.js";
import { CronService } from "../cron/service.js";
import { resolveCronStorePath } from "../cron/store.js";
import { runHeartbeatOnce } from "../infra/heartbeat-runner.js";
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { getChildLogger } from "../logging.js";
import { normalizeAgentId } from "../routing/session-key.js";
import { defaultRuntime } from "../runtime.js";
export type GatewayCronState = {
cron: CronService;
storePath: string;
cronEnabled: boolean;
};
const CRON_WEBHOOK_TIMEOUT_MS = 10_000;
function redactWebhookUrl(url: string): string {
try {
const parsed = new URL(url);
return `${parsed.origin}${parsed.pathname}`;
} catch {
return "<invalid-webhook-url>";
}
}
export function buildGatewayCronService(params: {
cfg: ReturnType<typeof loadConfig>;
deps: CliDeps;
broadcast: (event: string, payload: unknown, opts?: { dropIfSlow?: boolean }) => void;
}): GatewayCronState {
const cronLogger = getChildLogger({ module: "cron" });
const storePath = resolveCronStorePath(params.cfg.cron?.store);
const cronEnabled = process.env.OPENCLAW_SKIP_CRON !== "1" && params.cfg.cron?.enabled !== false;
const resolveCronAgent = (requested?: string | null) => {
const runtimeConfig = loadConfig();
const normalized =
typeof requested === "string" && requested.trim() ? normalizeAgentId(requested) : undefined;
const hasAgent =
normalized !== undefined &&
Array.isArray(runtimeConfig.agents?.list) &&
runtimeConfig.agents.list.some(
(entry) =>
entry && typeof entry.id === "string" && normalizeAgentId(entry.id) === normalized,
);
const agentId = hasAgent ? normalized : resolveDefaultAgentId(runtimeConfig);
return { agentId, cfg: runtimeConfig };
};
const defaultAgentId = resolveDefaultAgentId(params.cfg);
const resolveSessionStorePath = (agentId?: string) =>
resolveStorePath(params.cfg.session?.store, {
agentId: agentId ?? defaultAgentId,
});
const sessionStorePath = resolveSessionStorePath(defaultAgentId);
const cron = new CronService({
storePath,
cronEnabled,
cronConfig: params.cfg.cron,
defaultAgentId,
resolveSessionStorePath,
sessionStorePath,
enqueueSystemEvent: (text, opts) => {
const { agentId, cfg: runtimeConfig } = resolveCronAgent(opts?.agentId);
const sessionKey = resolveAgentMainSessionKey({
cfg: runtimeConfig,
agentId,
});
enqueueSystemEvent(text, { sessionKey, contextKey: opts?.contextKey });
},
requestHeartbeatNow,
runHeartbeatOnce: async (opts) => {
const runtimeConfig = loadConfig();
const agentId = opts?.agentId ? resolveCronAgent(opts.agentId).agentId : undefined;
return await runHeartbeatOnce({
cfg: runtimeConfig,
reason: opts?.reason,
agentId,
deps: { ...params.deps, runtime: defaultRuntime },
});
},
runIsolatedAgentJob: async ({ job, message }) => {
const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId);
return await runCronIsolatedAgentTurn({
cfg: runtimeConfig,
deps: params.deps,
job,
message,
agentId,
sessionKey: `cron:${job.id}`,
lane: "cron",
});
},
log: getChildLogger({ module: "cron", storePath }),
onEvent: (evt) => {
params.broadcast("cron", evt, { dropIfSlow: true });
if (evt.action === "finished") {
const webhookUrl = params.cfg.cron?.webhook?.trim();
const webhookToken = params.cfg.cron?.webhookToken?.trim();
const job = cron.getJob(evt.jobId);
if (webhookUrl && evt.summary && job?.notify === true) {
const headers: Record<string, string> = {
"Content-Type": "application/json",
};
if (webhookToken) {
headers.Authorization = `Bearer ${webhookToken}`;
}
const abortController = new AbortController();
const timeout = setTimeout(() => {
abortController.abort();
}, CRON_WEBHOOK_TIMEOUT_MS);
void fetch(webhookUrl, {
method: "POST",
headers,
body: JSON.stringify(evt),
signal: abortController.signal,
})
.catch((err) => {
cronLogger.warn(
{
err: String(err),
jobId: evt.jobId,
webhookUrl: redactWebhookUrl(webhookUrl),
},
"cron: webhook delivery failed",
);
})
.finally(() => {
clearTimeout(timeout);
});
}
const logPath = resolveCronRunLogPath({
storePath,
jobId: evt.jobId,
});
void appendCronRunLog(logPath, {
ts: Date.now(),
jobId: evt.jobId,
action: "finished",
status: evt.status,
error: evt.error,
summary: evt.summary,
sessionId: evt.sessionId,
sessionKey: evt.sessionKey,
runAtMs: evt.runAtMs,
durationMs: evt.durationMs,
nextRunAtMs: evt.nextRunAtMs,
}).catch((err) => {
cronLogger.warn({ err: String(err), logPath }, "cron: run log append failed");
});
}
},
});
return { cron, storePath, cronEnabled };
}