feat: add cron agent binding

This commit is contained in:
Peter Steinberger
2026-01-12 10:23:45 +00:00
parent a3938d62f6
commit 115591c5b6
14 changed files with 383 additions and 83 deletions

View File

@@ -1,6 +1,7 @@
import crypto from "node:crypto";
import type { HeartbeatRunResult } from "../infra/heartbeat-wake.js";
import { normalizeAgentId } from "../routing/session-key.js";
import { truncateUtf16Safe } from "../utils.js";
import { migrateLegacyCronPayload } from "./payload-migration.js";
import { computeNextRunAtMs } from "./schedule.js";
@@ -36,7 +37,7 @@ export type CronServiceDeps = {
log: Logger;
storePath: string;
cronEnabled: boolean;
enqueueSystemEvent: (text: string) => void;
enqueueSystemEvent: (text: string, opts?: { agentId?: string }) => void;
requestHeartbeatNow: (opts?: { reason?: string }) => void;
runHeartbeatOnce?: (opts?: {
reason?: string;
@@ -74,6 +75,13 @@ function truncateText(input: string, maxLen: number) {
return `${truncateUtf16Safe(input, Math.max(0, maxLen - 1)).trimEnd()}`;
}
function normalizeOptionalAgentId(raw: unknown) {
if (typeof raw !== "string") return undefined;
const trimmed = raw.trim();
if (!trimmed) return undefined;
return normalizeAgentId(trimmed);
}
function inferLegacyName(job: {
schedule?: { kind?: unknown; everyMs?: unknown; expr?: unknown };
payload?: { kind?: unknown; text?: unknown; message?: unknown };
@@ -181,6 +189,7 @@ export class CronService {
const id = crypto.randomUUID();
const job: CronJob = {
id,
agentId: normalizeOptionalAgentId(input.agentId),
name: normalizeRequiredName(input.name),
description: normalizeOptionalText(input.description),
enabled: input.enabled !== false,
@@ -226,6 +235,11 @@ export class CronService {
if (patch.payload) job.payload = patch.payload;
if (patch.isolation) job.isolation = patch.isolation;
if (patch.state) job.state = { ...job.state, ...patch.state };
if ("agentId" in patch) {
job.agentId = normalizeOptionalAgentId(
(patch as { agentId?: unknown }).agentId,
);
}
job.updatedAtMs = now;
this.assertSupportedJobSpec(job);
@@ -495,7 +509,9 @@ export class CronService {
const prefix = job.isolation?.postToMainPrefix?.trim() || "Cron";
const body = (summary ?? err ?? status).trim();
const statusPrefix = status === "ok" ? prefix : `${prefix} (${status})`;
this.deps.enqueueSystemEvent(`${statusPrefix}: ${body}`);
this.deps.enqueueSystemEvent(`${statusPrefix}: ${body}`, {
agentId: job.agentId,
});
if (job.wakeMode === "now") {
this.deps.requestHeartbeatNow({ reason: `cron:${job.id}:post` });
}
@@ -519,7 +535,7 @@ export class CronService {
);
return;
}
this.deps.enqueueSystemEvent(text);
this.deps.enqueueSystemEvent(text, { agentId: job.agentId });
if (job.wakeMode === "now" && this.deps.runHeartbeatOnce) {
const reason = `cron:${job.id}`;
const delay = (ms: number) =>