fix: cron scheduler reliability, store hardening, and UX improvements (#10776)

* refactor: update cron job wake mode and run mode handling

- Changed default wake mode from 'next-heartbeat' to 'now' in CronJobEditor and related CLI commands.
- Updated cron-tool tests to reflect changes in run mode, introducing 'due' and 'force' options.
- Enhanced cron-tool logic to handle new run modes and ensure compatibility with existing job structures.
- Added new tests for delivery plan consistency and job execution behavior under various conditions.
- Improved normalization functions to handle wake mode and session target casing.

This refactor aims to streamline cron job configurations and enhance the overall user experience with clearer defaults and improved functionality.

* test: enhance cron job functionality and UI

- Added tests to ensure the isolated agent correctly announces the final payload text when delivering messages via Telegram.
- Implemented a new function to pick the last deliverable payload from a list of delivery payloads.
- Enhanced the cron service to maintain legacy "every" jobs while minute cron jobs recompute schedules.
- Updated the cron store migration tests to verify the addition of anchorMs to legacy every schedules.
- Improved the UI for displaying cron job details, including job state and delivery information, with new styles and layout adjustments.

These changes aim to improve the reliability and user experience of the cron job system.

* test: enhance sessions thinking level handling

- Added tests to verify that the correct thinking levels are applied during session spawning.
- Updated the sessions-spawn-tool to include a new parameter for overriding thinking levels.
- Enhanced the UI to support additional thinking levels, including "xhigh" and "full", and improved the handling of current options in dropdowns.

These changes aim to improve the flexibility and accuracy of thinking level configurations in session management.

* feat: enhance session management and cron job functionality

- Introduced passthrough arguments in the test-parallel script to allow for flexible command-line options.
- Updated session handling to hide cron run alias session keys from the sessions list, improving clarity.
- Enhanced the cron service to accurately record job start times and durations, ensuring better tracking of job execution.
- Added tests to verify the correct behavior of the cron service under various conditions, including zero-delay timers.

These changes aim to improve the usability and reliability of session and cron job management.

* feat: implement job running state checks in cron service

- Added functionality to prevent manual job runs if a job is already in progress, enhancing job management.
- Updated the `isJobDue` function to include checks for running jobs, ensuring accurate scheduling.
- Enhanced the `run` function to return a specific reason when a job is already running.
- Introduced a new test case to verify the behavior of forced manual runs during active job execution.

These changes aim to improve the reliability and clarity of cron job execution and management.

* feat: add session ID and key to CronRunLogEntry model

- Introduced `sessionid` and `sessionkey` properties to the `CronRunLogEntry` struct for enhanced tracking of session-related information.
- Updated the initializer and Codable conformance to accommodate the new properties, ensuring proper serialization and deserialization.

These changes aim to improve the granularity of logging and session management within the cron job system.

* fix: improve session display name resolution

- Updated the `resolveSessionDisplayName` function to ensure that both label and displayName are trimmed and default to an empty string if not present.
- Enhanced the logic to prevent returning the key if it matches the label or displayName, improving clarity in session naming.

These changes aim to enhance the accuracy and usability of session display names in the UI.

* perf: skip cron store persist when idle timer tick produces no changes

recomputeNextRuns now returns a boolean indicating whether any job
state was mutated. The idle path in onTimer only persists when the
return value is true, eliminating unnecessary file writes every 60s
for far-future or idle schedules.

* fix: prep for merge - explicit delivery mode migration, docs + changelog (#10776) (thanks @tyler6204)
This commit is contained in:
Tyler Yust
2026-02-06 18:03:03 -08:00
committed by GitHub
parent 0dd7033521
commit d90cac990c
58 changed files with 2952 additions and 250 deletions

View File

@@ -20,6 +20,17 @@ import {
const STUCK_RUN_MS = 2 * 60 * 60 * 1000;
function resolveEveryAnchorMs(params: {
schedule: { everyMs: number; anchorMs?: number };
fallbackAnchorMs: number;
}) {
const raw = params.schedule.anchorMs;
if (typeof raw === "number" && Number.isFinite(raw)) {
return Math.max(0, Math.floor(raw));
}
return Math.max(0, Math.floor(params.fallbackAnchorMs));
}
export function assertSupportedJobSpec(job: Pick<CronJob, "sessionTarget" | "payload">) {
if (job.sessionTarget === "main" && job.payload.kind !== "systemEvent") {
throw new Error('main cron jobs require payload.kind="systemEvent"');
@@ -47,6 +58,13 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und
if (!job.enabled) {
return undefined;
}
if (job.schedule.kind === "every") {
const anchorMs = resolveEveryAnchorMs({
schedule: job.schedule,
fallbackAnchorMs: job.createdAtMs,
});
return computeNextRunAtMs({ ...job.schedule, anchorMs }, nowMs);
}
if (job.schedule.kind === "at") {
// One-shot jobs stay due until they successfully finish.
if (job.state.lastStatus === "ok" && job.state.lastRunAtMs) {
@@ -69,18 +87,26 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und
return computeNextRunAtMs(job.schedule, nowMs);
}
export function recomputeNextRuns(state: CronServiceState) {
export function recomputeNextRuns(state: CronServiceState): boolean {
if (!state.store) {
return;
return false;
}
let changed = false;
const now = state.deps.nowMs();
for (const job of state.store.jobs) {
if (!job.state) {
job.state = {};
changed = true;
}
if (!job.enabled) {
job.state.nextRunAtMs = undefined;
job.state.runningAtMs = undefined;
if (job.state.nextRunAtMs !== undefined) {
job.state.nextRunAtMs = undefined;
changed = true;
}
if (job.state.runningAtMs !== undefined) {
job.state.runningAtMs = undefined;
changed = true;
}
continue;
}
const runningAt = job.state.runningAtMs;
@@ -90,9 +116,15 @@ export function recomputeNextRuns(state: CronServiceState) {
"cron: clearing stuck running marker",
);
job.state.runningAtMs = undefined;
changed = true;
}
const newNext = computeJobNextRunAtMs(job, now);
if (job.state.nextRunAtMs !== newNext) {
job.state.nextRunAtMs = newNext;
changed = true;
}
job.state.nextRunAtMs = computeJobNextRunAtMs(job, now);
}
return changed;
}
export function nextWakeAtMs(state: CronServiceState) {
@@ -110,10 +142,20 @@ export function nextWakeAtMs(state: CronServiceState) {
export function createJob(state: CronServiceState, input: CronJobCreate): CronJob {
const now = state.deps.nowMs();
const id = crypto.randomUUID();
const schedule =
input.schedule.kind === "every"
? {
...input.schedule,
anchorMs: resolveEveryAnchorMs({
schedule: input.schedule,
fallbackAnchorMs: now,
}),
}
: input.schedule;
const deleteAfterRun =
typeof input.deleteAfterRun === "boolean"
? input.deleteAfterRun
: input.schedule.kind === "at"
: schedule.kind === "at"
? true
: undefined;
const enabled = typeof input.enabled === "boolean" ? input.enabled : true;
@@ -126,7 +168,7 @@ export function createJob(state: CronServiceState, input: CronJobCreate): CronJo
deleteAfterRun,
createdAtMs: now,
updatedAtMs: now,
schedule: input.schedule,
schedule,
sessionTarget: input.sessionTarget,
wakeMode: input.wakeMode,
payload: input.payload,
@@ -223,6 +265,9 @@ function mergeCronPayload(existing: CronPayload, patch: CronPayloadPatch): CronP
if (typeof patch.timeoutSeconds === "number") {
next.timeoutSeconds = patch.timeoutSeconds;
}
if (typeof patch.allowUnsafeExternalContent === "boolean") {
next.allowUnsafeExternalContent = patch.allowUnsafeExternalContent;
}
if (typeof patch.deliver === "boolean") {
next.deliver = patch.deliver;
}
@@ -297,6 +342,7 @@ function buildPayloadFromPatch(patch: CronPayloadPatch): CronPayload {
model: patch.model,
thinking: patch.thinking,
timeoutSeconds: patch.timeoutSeconds,
allowUnsafeExternalContent: patch.allowUnsafeExternalContent,
deliver: patch.deliver,
channel: patch.channel,
to: patch.to,
@@ -334,6 +380,9 @@ function mergeCronDelivery(
}
export function isJobDue(job: CronJob, nowMs: number, opts: { forced: boolean }) {
if (typeof job.state.runningAtMs === "number") {
return false;
}
if (opts.forced) {
return true;
}

View File

@@ -11,7 +11,7 @@ import {
} from "./jobs.js";
import { locked } from "./locked.js";
import { ensureLoaded, persist, warnIfDisabled } from "./store.js";
import { armTimer, emit, executeJob, stopTimer, wake } from "./timer.js";
import { armTimer, emit, executeJob, runMissedJobs, stopTimer, wake } from "./timer.js";
export async function start(state: CronServiceState) {
await locked(state, async () => {
@@ -19,7 +19,18 @@ export async function start(state: CronServiceState) {
state.deps.log.info({ enabled: false }, "cron: disabled");
return;
}
await ensureLoaded(state);
await ensureLoaded(state, { skipRecompute: true });
const jobs = state.store?.jobs ?? [];
for (const job of jobs) {
if (typeof job.state.runningAtMs === "number") {
state.deps.log.warn(
{ jobId: job.id, runningAtMs: job.state.runningAtMs },
"cron: clearing stale running marker on startup",
);
job.state.runningAtMs = undefined;
}
}
await runMissedJobs(state);
recomputeNextRuns(state);
await persist(state);
armTimer(state);
@@ -40,7 +51,7 @@ export function stop(state: CronServiceState) {
export async function status(state: CronServiceState) {
return await locked(state, async () => {
await ensureLoaded(state);
await ensureLoaded(state, { skipRecompute: true });
return {
enabled: state.deps.cronEnabled,
storePath: state.deps.storePath,
@@ -52,7 +63,7 @@ export async function status(state: CronServiceState) {
export async function list(state: CronServiceState, opts?: { includeDisabled?: boolean }) {
return await locked(state, async () => {
await ensureLoaded(state);
await ensureLoaded(state, { skipRecompute: true });
const includeDisabled = opts?.includeDisabled === true;
const jobs = (state.store?.jobs ?? []).filter((j) => includeDisabled || j.enabled);
return jobs.toSorted((a, b) => (a.state.nextRunAtMs ?? 0) - (b.state.nextRunAtMs ?? 0));
@@ -83,6 +94,22 @@ export async function update(state: CronServiceState, id: string, patch: CronJob
const job = findJobOrThrow(state, id);
const now = state.deps.nowMs();
applyJobPatch(job, patch);
if (job.schedule.kind === "every") {
const anchor = job.schedule.anchorMs;
if (typeof anchor !== "number" || !Number.isFinite(anchor)) {
const patchSchedule = patch.schedule;
const fallbackAnchorMs =
patchSchedule?.kind === "every"
? now
: typeof job.createdAtMs === "number" && Number.isFinite(job.createdAtMs)
? job.createdAtMs
: now;
job.schedule = {
...job.schedule,
anchorMs: Math.max(0, Math.floor(fallbackAnchorMs)),
};
}
}
job.updatedAtMs = now;
if (job.enabled) {
job.state.nextRunAtMs = computeJobNextRunAtMs(job, now);
@@ -124,14 +151,18 @@ export async function remove(state: CronServiceState, id: string) {
export async function run(state: CronServiceState, id: string, mode?: "due" | "force") {
return await locked(state, async () => {
warnIfDisabled(state, "run");
await ensureLoaded(state);
await ensureLoaded(state, { skipRecompute: true });
const job = findJobOrThrow(state, id);
if (typeof job.state.runningAtMs === "number") {
return { ok: true, ran: false, reason: "already-running" as const };
}
const now = state.deps.nowMs();
const due = isJobDue(job, now, { forced: mode === "force" });
if (!due) {
return { ok: true, ran: false, reason: "not-due" as const };
}
await executeJob(state, job, now, { forced: mode === "force" });
recomputeNextRuns(state);
await persist(state);
armTimer(state);
return { ok: true, ran: true } as const;

View File

@@ -9,6 +9,8 @@ export type CronEvent = {
status?: "ok" | "error" | "skipped";
error?: string;
summary?: string;
sessionId?: string;
sessionKey?: string;
nextRunAtMs?: number;
};
@@ -33,6 +35,8 @@ export type CronServiceDeps = {
/** Last non-empty agent text output (not truncated). */
outputText?: string;
error?: string;
sessionId?: string;
sessionKey?: string;
}>;
onEvent?: (evt: CronEvent) => void;
};
@@ -78,6 +82,7 @@ export type CronStatusSummary = {
export type CronRunResult =
| { ok: true; ran: true }
| { ok: true; ran: false; reason: "not-due" }
| { ok: true; ran: false; reason: "already-running" }
| { ok: false };
export type CronRemoveResult = { ok: true; removed: boolean } | { ok: false; removed: false };

View File

@@ -117,6 +117,141 @@ function stripLegacyDeliveryFields(payload: Record<string, unknown>) {
}
}
function normalizePayloadKind(payload: Record<string, unknown>) {
const raw = typeof payload.kind === "string" ? payload.kind.trim().toLowerCase() : "";
if (raw === "agentturn") {
payload.kind = "agentTurn";
return true;
}
if (raw === "systemevent") {
payload.kind = "systemEvent";
return true;
}
return false;
}
function inferPayloadIfMissing(raw: Record<string, unknown>) {
const message = typeof raw.message === "string" ? raw.message.trim() : "";
const text = typeof raw.text === "string" ? raw.text.trim() : "";
if (message) {
raw.payload = { kind: "agentTurn", message };
return true;
}
if (text) {
raw.payload = { kind: "systemEvent", text };
return true;
}
return false;
}
function copyTopLevelAgentTurnFields(
raw: Record<string, unknown>,
payload: Record<string, unknown>,
) {
let mutated = false;
const copyTrimmedString = (field: "model" | "thinking") => {
const existing = payload[field];
if (typeof existing === "string" && existing.trim()) {
return;
}
const value = raw[field];
if (typeof value === "string" && value.trim()) {
payload[field] = value.trim();
mutated = true;
}
};
copyTrimmedString("model");
copyTrimmedString("thinking");
if (
typeof payload.timeoutSeconds !== "number" &&
typeof raw.timeoutSeconds === "number" &&
Number.isFinite(raw.timeoutSeconds)
) {
payload.timeoutSeconds = Math.max(1, Math.floor(raw.timeoutSeconds));
mutated = true;
}
if (
typeof payload.allowUnsafeExternalContent !== "boolean" &&
typeof raw.allowUnsafeExternalContent === "boolean"
) {
payload.allowUnsafeExternalContent = raw.allowUnsafeExternalContent;
mutated = true;
}
if (typeof payload.deliver !== "boolean" && typeof raw.deliver === "boolean") {
payload.deliver = raw.deliver;
mutated = true;
}
if (
typeof payload.channel !== "string" &&
typeof raw.channel === "string" &&
raw.channel.trim()
) {
payload.channel = raw.channel.trim();
mutated = true;
}
if (typeof payload.to !== "string" && typeof raw.to === "string" && raw.to.trim()) {
payload.to = raw.to.trim();
mutated = true;
}
if (
typeof payload.bestEffortDeliver !== "boolean" &&
typeof raw.bestEffortDeliver === "boolean"
) {
payload.bestEffortDeliver = raw.bestEffortDeliver;
mutated = true;
}
if (
typeof payload.provider !== "string" &&
typeof raw.provider === "string" &&
raw.provider.trim()
) {
payload.provider = raw.provider.trim();
mutated = true;
}
return mutated;
}
function stripLegacyTopLevelFields(raw: Record<string, unknown>) {
if ("model" in raw) {
delete raw.model;
}
if ("thinking" in raw) {
delete raw.thinking;
}
if ("timeoutSeconds" in raw) {
delete raw.timeoutSeconds;
}
if ("allowUnsafeExternalContent" in raw) {
delete raw.allowUnsafeExternalContent;
}
if ("message" in raw) {
delete raw.message;
}
if ("text" in raw) {
delete raw.text;
}
if ("deliver" in raw) {
delete raw.deliver;
}
if ("channel" in raw) {
delete raw.channel;
}
if ("to" in raw) {
delete raw.to;
}
if ("bestEffortDeliver" in raw) {
delete raw.bestEffortDeliver;
}
if ("provider" in raw) {
delete raw.provider;
}
}
async function getFileMtimeMs(path: string): Promise<number | null> {
try {
const stats = await fs.promises.stat(path);
@@ -148,6 +283,12 @@ export async function ensureLoaded(
const jobs = (loaded.jobs ?? []) as unknown as Array<Record<string, unknown>>;
let mutated = false;
for (const raw of jobs) {
const state = raw.state;
if (!state || typeof state !== "object" || Array.isArray(state)) {
raw.state = {};
mutated = true;
}
const nameRaw = raw.name;
if (typeof nameRaw !== "string" || nameRaw.trim().length === 0) {
raw.name = inferLegacyName({
@@ -171,8 +312,57 @@ export async function ensureLoaded(
}
const payload = raw.payload;
if (payload && typeof payload === "object" && !Array.isArray(payload)) {
if (migrateLegacyCronPayload(payload as Record<string, unknown>)) {
if (
(!payload || typeof payload !== "object" || Array.isArray(payload)) &&
inferPayloadIfMissing(raw)
) {
mutated = true;
}
const payloadRecord =
raw.payload && typeof raw.payload === "object" && !Array.isArray(raw.payload)
? (raw.payload as Record<string, unknown>)
: null;
if (payloadRecord) {
if (normalizePayloadKind(payloadRecord)) {
mutated = true;
}
if (!payloadRecord.kind) {
if (typeof payloadRecord.message === "string" && payloadRecord.message.trim()) {
payloadRecord.kind = "agentTurn";
mutated = true;
} else if (typeof payloadRecord.text === "string" && payloadRecord.text.trim()) {
payloadRecord.kind = "systemEvent";
mutated = true;
}
}
if (payloadRecord.kind === "agentTurn") {
if (copyTopLevelAgentTurnFields(raw, payloadRecord)) {
mutated = true;
}
}
}
const hadLegacyTopLevelFields =
"model" in raw ||
"thinking" in raw ||
"timeoutSeconds" in raw ||
"allowUnsafeExternalContent" in raw ||
"message" in raw ||
"text" in raw ||
"deliver" in raw ||
"channel" in raw ||
"to" in raw ||
"bestEffortDeliver" in raw ||
"provider" in raw;
if (hadLegacyTopLevelFields) {
stripLegacyTopLevelFields(raw);
mutated = true;
}
if (payloadRecord) {
if (migrateLegacyCronPayload(payloadRecord)) {
mutated = true;
}
}
@@ -202,6 +392,27 @@ export async function ensureLoaded(
}
mutated = true;
}
const everyMsRaw = sched.everyMs;
const everyMs =
typeof everyMsRaw === "number" && Number.isFinite(everyMsRaw)
? Math.floor(everyMsRaw)
: null;
if ((kind === "every" || sched.kind === "every") && everyMs !== null) {
const anchorRaw = sched.anchorMs;
const normalizedAnchor =
typeof anchorRaw === "number" && Number.isFinite(anchorRaw)
? Math.max(0, Math.floor(anchorRaw))
: typeof raw.createdAtMs === "number" && Number.isFinite(raw.createdAtMs)
? Math.max(0, Math.floor(raw.createdAtMs))
: typeof raw.updatedAtMs === "number" && Number.isFinite(raw.updatedAtMs)
? Math.max(0, Math.floor(raw.updatedAtMs))
: null;
if (normalizedAnchor !== null && anchorRaw !== normalizedAnchor) {
sched.anchorMs = normalizedAnchor;
mutated = true;
}
}
}
const delivery = raw.delivery;
@@ -213,6 +424,11 @@ export async function ensureLoaded(
(delivery as { mode?: unknown }).mode = "announce";
mutated = true;
}
} else if (modeRaw === undefined || modeRaw === null) {
// Explicitly persist the default so existing jobs don't silently
// change behaviour when the runtime default shifts.
(delivery as { mode?: unknown }).mode = "announce";
mutated = true;
}
}
@@ -222,10 +438,6 @@ export async function ensureLoaded(
mutated = true;
}
const payloadRecord =
payload && typeof payload === "object" && !Array.isArray(payload)
? (payload as Record<string, unknown>)
: null;
const payloadKind =
payloadRecord && typeof payloadRecord.kind === "string" ? payloadRecord.kind : "";
const sessionTarget =

View File

@@ -1,6 +1,7 @@
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
import type { CronJob } from "../types.js";
import type { CronEvent, CronServiceState } from "./state.js";
import { resolveCronDeliveryPlan } from "../delivery.js";
import {
computeJobNextRunAtMs,
nextWakeAtMs,
@@ -10,7 +11,7 @@ import {
import { locked } from "./locked.js";
import { ensureLoaded, persist } from "./store.js";
const MAX_TIMEOUT_MS = 2 ** 31 - 1;
const MAX_TIMER_DELAY_MS = 60_000;
export function armTimer(state: CronServiceState) {
if (state.timer) {
@@ -25,12 +26,15 @@ export function armTimer(state: CronServiceState) {
return;
}
const delay = Math.max(nextAt - state.deps.nowMs(), 0);
// Avoid TimeoutOverflowWarning when a job is far in the future.
const clampedDelay = Math.min(delay, MAX_TIMEOUT_MS);
state.timer = setTimeout(() => {
void onTimer(state).catch((err) => {
// Wake at least once a minute to avoid schedule drift and recover quickly
// when the process was paused or wall-clock time jumps.
const clampedDelay = Math.min(delay, MAX_TIMER_DELAY_MS);
state.timer = setTimeout(async () => {
try {
await onTimer(state);
} catch (err) {
state.deps.log.error({ err: String(err) }, "cron: timer tick failed");
});
}
}, clampedDelay);
}
@@ -40,22 +44,169 @@ export async function onTimer(state: CronServiceState) {
}
state.running = true;
try {
await locked(state, async () => {
// Reload persisted due-times without recomputing so runDueJobs sees
// the original nextRunAtMs values. Recomputing first would advance
// every/cron slots past the current tick when the timer fires late (#9788).
const dueJobs = await locked(state, async () => {
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
await runDueJobs(state);
recomputeNextRuns(state);
const due = findDueJobs(state);
if (due.length === 0) {
const changed = recomputeNextRuns(state);
if (changed) {
await persist(state);
}
return [];
}
const now = state.deps.nowMs();
for (const job of due) {
job.state.runningAtMs = now;
job.state.lastError = undefined;
}
await persist(state);
return due.map((j) => ({
id: j.id,
job: j,
}));
});
const results: Array<{
jobId: string;
status: "ok" | "error" | "skipped";
error?: string;
summary?: string;
sessionId?: string;
sessionKey?: string;
startedAt: number;
endedAt: number;
}> = [];
for (const { id, job } of dueJobs) {
const startedAt = state.deps.nowMs();
job.state.runningAtMs = startedAt;
emit(state, { jobId: job.id, action: "started", runAtMs: startedAt });
try {
const result = await executeJobCore(state, job);
results.push({ jobId: id, ...result, startedAt, endedAt: state.deps.nowMs() });
} catch (err) {
results.push({
jobId: id,
status: "error",
error: String(err),
startedAt,
endedAt: state.deps.nowMs(),
});
}
}
if (results.length > 0) {
await locked(state, async () => {
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
for (const result of results) {
const job = state.store?.jobs.find((j) => j.id === result.jobId);
if (!job) {
continue;
}
const startedAt = result.startedAt;
job.state.runningAtMs = undefined;
job.state.lastRunAtMs = startedAt;
job.state.lastStatus = result.status;
job.state.lastDurationMs = Math.max(0, result.endedAt - startedAt);
job.state.lastError = result.error;
const shouldDelete =
job.schedule.kind === "at" && result.status === "ok" && job.deleteAfterRun === true;
if (!shouldDelete) {
if (job.schedule.kind === "at" && result.status === "ok") {
job.enabled = false;
job.state.nextRunAtMs = undefined;
} else if (job.enabled) {
job.state.nextRunAtMs = computeJobNextRunAtMs(job, result.endedAt);
} else {
job.state.nextRunAtMs = undefined;
}
}
emit(state, {
jobId: job.id,
action: "finished",
status: result.status,
error: result.error,
summary: result.summary,
sessionId: result.sessionId,
sessionKey: result.sessionKey,
runAtMs: startedAt,
durationMs: job.state.lastDurationMs,
nextRunAtMs: job.state.nextRunAtMs,
});
if (shouldDelete && state.store) {
state.store.jobs = state.store.jobs.filter((j) => j.id !== job.id);
emit(state, { jobId: job.id, action: "removed" });
}
job.updatedAtMs = result.endedAt;
}
recomputeNextRuns(state);
await persist(state);
});
}
} finally {
state.running = false;
// Always re-arm so transient errors (e.g. ENOSPC) don't kill the scheduler.
armTimer(state);
}
}
function findDueJobs(state: CronServiceState): CronJob[] {
if (!state.store) {
return [];
}
const now = state.deps.nowMs();
return state.store.jobs.filter((j) => {
if (!j.enabled) {
return false;
}
if (typeof j.state.runningAtMs === "number") {
return false;
}
const next = j.state.nextRunAtMs;
return typeof next === "number" && now >= next;
});
}
export async function runMissedJobs(state: CronServiceState) {
if (!state.store) {
return;
}
const now = state.deps.nowMs();
const missed = state.store.jobs.filter((j) => {
if (!j.enabled) {
return false;
}
if (typeof j.state.runningAtMs === "number") {
return false;
}
const next = j.state.nextRunAtMs;
if (j.schedule.kind === "at" && j.state.lastStatus === "ok") {
return false;
}
return typeof next === "number" && now >= next;
});
if (missed.length > 0) {
state.deps.log.info(
{ count: missed.length, jobIds: missed.map((j) => j.id) },
"cron: running missed jobs after restart",
);
for (const job of missed) {
await executeJob(state, job, now, { forced: false });
}
}
}
export async function runDueJobs(state: CronServiceState) {
if (!state.store) {
return;
@@ -76,6 +227,99 @@ export async function runDueJobs(state: CronServiceState) {
}
}
async function executeJobCore(
state: CronServiceState,
job: CronJob,
): Promise<{
status: "ok" | "error" | "skipped";
error?: string;
summary?: string;
sessionId?: string;
sessionKey?: string;
}> {
if (job.sessionTarget === "main") {
const text = resolveJobPayloadTextForMain(job);
if (!text) {
const kind = job.payload.kind;
return {
status: "skipped",
error:
kind === "systemEvent"
? "main job requires non-empty systemEvent text"
: 'main job requires payload.kind="systemEvent"',
};
}
state.deps.enqueueSystemEvent(text, { agentId: job.agentId });
if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) {
const reason = `cron:${job.id}`;
const delay = (ms: number) => new Promise<void>((resolve) => setTimeout(resolve, ms));
const maxWaitMs = 2 * 60_000;
const waitStartedAt = state.deps.nowMs();
let heartbeatResult: HeartbeatRunResult;
for (;;) {
heartbeatResult = await state.deps.runHeartbeatOnce({ reason });
if (
heartbeatResult.status !== "skipped" ||
heartbeatResult.reason !== "requests-in-flight"
) {
break;
}
if (state.deps.nowMs() - waitStartedAt > maxWaitMs) {
state.deps.requestHeartbeatNow({ reason });
return { status: "ok", summary: text };
}
await delay(250);
}
if (heartbeatResult.status === "ran") {
return { status: "ok", summary: text };
} else if (heartbeatResult.status === "skipped") {
return { status: "skipped", error: heartbeatResult.reason, summary: text };
} else {
return { status: "error", error: heartbeatResult.reason, summary: text };
}
} else {
state.deps.requestHeartbeatNow({ reason: `cron:${job.id}` });
return { status: "ok", summary: text };
}
}
if (job.payload.kind !== "agentTurn") {
return { status: "skipped", error: "isolated job requires payload.kind=agentTurn" };
}
const res = await state.deps.runIsolatedAgentJob({
job,
message: job.payload.message,
});
// Post a short summary back to the main session.
const summaryText = res.summary?.trim();
const deliveryPlan = resolveCronDeliveryPlan(job);
if (summaryText && deliveryPlan.requested) {
const prefix = "Cron";
const label =
res.status === "error" ? `${prefix} (error): ${summaryText}` : `${prefix}: ${summaryText}`;
state.deps.enqueueSystemEvent(label, { agentId: job.agentId });
if (job.wakeMode === "now") {
state.deps.requestHeartbeatNow({ reason: `cron:${job.id}` });
}
}
return {
status: res.status,
error: res.error,
summary: res.summary,
sessionId: res.sessionId,
sessionKey: res.sessionKey,
};
}
/**
* Execute a job. This version is used by the `run` command and other
* places that need the full execution with state updates.
*/
export async function executeJob(
state: CronServiceState,
job: CronJob,
@@ -89,7 +333,12 @@ export async function executeJob(
let deleted = false;
const finish = async (status: "ok" | "error" | "skipped", err?: string, summary?: string) => {
const finish = async (
status: "ok" | "error" | "skipped",
err?: string,
summary?: string,
session?: { sessionId?: string; sessionKey?: string },
) => {
const endedAt = state.deps.nowMs();
job.state.runningAtMs = undefined;
job.state.lastRunAtMs = startedAt;
@@ -102,7 +351,6 @@ export async function executeJob(
if (!shouldDelete) {
if (job.schedule.kind === "at" && status === "ok") {
// One-shot job completed successfully; disable it.
job.enabled = false;
job.state.nextRunAtMs = undefined;
} else if (job.enabled) {
@@ -118,6 +366,8 @@ export async function executeJob(
status,
error: err,
summary,
sessionId: session?.sessionId,
sessionKey: session?.sessionKey,
runAtMs: startedAt,
durationMs: job.state.lastDurationMs,
nextRunAtMs: job.state.nextRunAtMs,
@@ -131,96 +381,16 @@ export async function executeJob(
};
try {
if (job.sessionTarget === "main") {
const text = resolveJobPayloadTextForMain(job);
if (!text) {
const kind = job.payload.kind;
await finish(
"skipped",
kind === "systemEvent"
? "main job requires non-empty systemEvent text"
: 'main job requires payload.kind="systemEvent"',
);
return;
}
state.deps.enqueueSystemEvent(text, { agentId: job.agentId });
if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) {
const reason = `cron:${job.id}`;
const delay = (ms: number) => new Promise<void>((resolve) => setTimeout(resolve, ms));
const maxWaitMs = 2 * 60_000;
const waitStartedAt = state.deps.nowMs();
let heartbeatResult: HeartbeatRunResult;
for (;;) {
heartbeatResult = await state.deps.runHeartbeatOnce({ reason });
if (
heartbeatResult.status !== "skipped" ||
heartbeatResult.reason !== "requests-in-flight"
) {
break;
}
if (state.deps.nowMs() - waitStartedAt > maxWaitMs) {
heartbeatResult = {
status: "skipped",
reason: "timeout waiting for main lane to become idle",
};
break;
}
await delay(250);
}
if (heartbeatResult.status === "ran") {
await finish("ok", undefined, text);
} else if (heartbeatResult.status === "skipped") {
await finish("skipped", heartbeatResult.reason, text);
} else {
await finish("error", heartbeatResult.reason, text);
}
} else {
// wakeMode is "next-heartbeat" or runHeartbeatOnce not available
state.deps.requestHeartbeatNow({ reason: `cron:${job.id}` });
await finish("ok", undefined, text);
}
return;
}
if (job.payload.kind !== "agentTurn") {
await finish("skipped", "isolated job requires payload.kind=agentTurn");
return;
}
const res = await state.deps.runIsolatedAgentJob({
job,
message: job.payload.message,
const result = await executeJobCore(state, job);
await finish(result.status, result.error, result.summary, {
sessionId: result.sessionId,
sessionKey: result.sessionKey,
});
// Post a short summary back to the main session so the user sees
// the cron result without opening the isolated session.
const summaryText = res.summary?.trim();
const deliveryMode = job.delivery?.mode ?? "announce";
if (summaryText && deliveryMode !== "none") {
const prefix = "Cron";
const label =
res.status === "error" ? `${prefix} (error): ${summaryText}` : `${prefix}: ${summaryText}`;
state.deps.enqueueSystemEvent(label, { agentId: job.agentId });
if (job.wakeMode === "now") {
state.deps.requestHeartbeatNow({ reason: `cron:${job.id}` });
}
}
if (res.status === "ok") {
await finish("ok", undefined, res.summary);
} else if (res.status === "skipped") {
await finish("skipped", undefined, res.summary);
} else {
await finish("error", res.error ?? "cron job failed", res.summary);
}
} catch (err) {
await finish("error", String(err));
} finally {
job.updatedAtMs = nowMs;
if (!opts.forced && job.enabled && !deleted) {
// Keep nextRunAtMs in sync in case the schedule advanced during a long run.
job.state.nextRunAtMs = computeJobNextRunAtMs(job, state.deps.nowMs());
}
}