diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index b59ba33d88e..706c2630d1c 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -155,9 +155,17 @@ export async function runCronIsolatedAgentTurn(params: { job: CronJob; message: string; sessionKey: string; + signal?: AbortSignal; agentId?: string; lane?: string; }): Promise { + const isAborted = () => params.signal?.aborted === true; + const abortReason = () => { + const reason = params.signal?.reason; + return typeof reason === "string" && reason.trim() + ? reason.trim() + : "cron: job execution timed out"; + }; const isFastTestEnv = process.env.OPENCLAW_TEST_FAST === "1"; const defaultAgentId = resolveDefaultAgentId(params.cfg); const requestedAgentId = @@ -503,6 +511,10 @@ export async function runCronIsolatedAgentTurn(params: { return withRunSession({ status: "error", error: String(err) }); } + if (isAborted()) { + return withRunSession({ status: "error", error: abortReason() }); + } + const payloads = runResult.payloads ?? []; // Update token+model fields in the session store. @@ -556,6 +568,10 @@ export async function runCronIsolatedAgentTurn(params: { } await persistSessionEntry(); } + + if (isAborted()) { + return withRunSession({ status: "error", error: abortReason(), ...telemetry }); + } const firstText = payloads[0]?.text ?? ""; let summary = pickSummaryFromPayloads(payloads) ?? pickSummaryFromOutput(firstText); let outputText = pickLastNonEmptyTextFromPayloads(payloads); diff --git a/src/cron/service.issue-regressions.test.ts b/src/cron/service.issue-regressions.test.ts index 1899f54fc8f..55ef1f9f204 100644 --- a/src/cron/service.issue-regressions.test.ts +++ b/src/cron/service.issue-regressions.test.ts @@ -683,6 +683,58 @@ describe("Cron issue regressions", () => { expect(job?.state.lastStatus).toBe("ok"); }); + it("suppresses isolated follow-up side effects after timeout", async () => { + vi.useRealTimers(); + const store = await makeStorePath(); + const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z"); + const enqueueSystemEvent = vi.fn(); + + const cronJob = createIsolatedRegressionJob({ + id: "timeout-side-effects", + name: "timeout side effects", + scheduledAt, + schedule: { kind: "every", everyMs: 60_000, anchorMs: scheduledAt }, + payload: { kind: "agentTurn", message: "work", timeoutSeconds: 0.01 }, + state: { nextRunAtMs: scheduledAt }, + }); + await writeCronJobs(store.storePath, [cronJob]); + + let now = scheduledAt; + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => now, + enqueueSystemEvent, + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async (params: { signal?: AbortSignal }) => { + const signal = params.signal; + await new Promise((resolve, reject) => { + const onAbort = () => { + signal?.removeEventListener("abort", onAbort); + now += 100; + reject(new Error("aborted")); + }; + signal?.addEventListener("abort", onAbort, { once: true }); + }); + return { + status: "ok" as const, + summary: "late-summary", + delivered: false, + error: signal?.aborted && typeof signal.reason === "string" ? signal.reason : undefined, + }; + }), + }); + + const timerPromise = onTimer(state); + await timerPromise; + + const jobAfterTimeout = state.store?.jobs.find((j) => j.id === "timeout-side-effects"); + expect(jobAfterTimeout?.state.lastStatus).toBe("error"); + expect(jobAfterTimeout?.state.lastError).toContain("timed out"); + expect(enqueueSystemEvent).not.toHaveBeenCalled(); + }); + it("retries cron schedule computation from the next second when the first attempt returns undefined (#17821)", () => { const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z"); const cronJob = createIsolatedRegressionJob({ diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index 050ab9c3b0f..820875b27cb 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -61,7 +61,7 @@ export type CronServiceDeps = { wakeNowHeartbeatBusyMaxWaitMs?: number; /** WakeMode=now: delay between runHeartbeatOnce retries while busy. */ wakeNowHeartbeatBusyRetryDelayMs?: number; - runIsolatedAgentJob: (params: { job: CronJob; message: string }) => Promise< + runIsolatedAgentJob: (params: { job: CronJob; message: string; signal?: AbortSignal }) => Promise< { summary?: string; /** Last non-empty agent text output (not truncated). */ diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 18fda9aa78e..7eb6e1ce8aa 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -38,6 +38,17 @@ type TimedCronRunOutcome = CronRunOutcome & endedAt: number; }; +function timeoutErrorMessage(): string { + return "cron: job execution timed out"; +} + +function isAbortError(err: unknown): boolean { + if (!(err instanceof Error)) { + return false; + } + return err.name === "AbortError" || err.message === timeoutErrorMessage(); +} + /** * Exponential backoff delays (in ms) indexed by consecutive error count. * After the last entry the delay stays constant. @@ -258,15 +269,16 @@ export async function onTimer(state: CronServiceState) { const result = typeof jobTimeoutMs === "number" ? await (async () => { + const timeoutController = new AbortController(); let timeoutId: NodeJS.Timeout | undefined; try { return await Promise.race([ - executeJobCore(state, job), + executeJobCore(state, job, timeoutController.signal), new Promise((_, reject) => { - timeoutId = setTimeout( - () => reject(new Error("cron: job execution timed out")), - jobTimeoutMs, - ); + timeoutId = setTimeout(() => { + timeoutController.abort(timeoutErrorMessage()); + reject(new Error(timeoutErrorMessage())); + }, jobTimeoutMs); }), ]); } finally { @@ -278,14 +290,15 @@ export async function onTimer(state: CronServiceState) { : await executeJobCore(state, job); results.push({ jobId: id, ...result, startedAt, endedAt: state.deps.nowMs() }); } catch (err) { + const errorText = isAbortError(err) ? timeoutErrorMessage() : String(err); state.deps.log.warn( { jobId: id, jobName: job.name, timeoutMs: jobTimeoutMs ?? null }, - `cron: job failed: ${String(err)}`, + `cron: job failed: ${errorText}`, ); results.push({ jobId: id, status: "error", - error: String(err), + error: errorText, startedAt, endedAt: state.deps.nowMs(), }); @@ -455,7 +468,11 @@ export async function runDueJobs(state: CronServiceState) { async function executeJobCore( state: CronServiceState, job: CronJob, + signal?: AbortSignal, ): Promise { + if (signal?.aborted) { + return { status: "error", error: timeoutErrorMessage() }; + } if (job.sessionTarget === "main") { const text = resolveJobPayloadTextForMain(job); if (!text) { @@ -482,6 +499,9 @@ async function executeJobCore( let heartbeatResult: HeartbeatRunResult; for (;;) { + if (signal?.aborted) { + return { status: "error", error: timeoutErrorMessage() }; + } heartbeatResult = await state.deps.runHeartbeatOnce({ reason, agentId: job.agentId, @@ -528,8 +548,13 @@ async function executeJobCore( const res = await state.deps.runIsolatedAgentJob({ job, message: job.payload.message, + signal, }); + if (signal?.aborted) { + return { status: "error", error: timeoutErrorMessage() }; + } + // Post a short summary back to the main session — but only when the // isolated run did NOT already deliver its output to the target channel. // When `res.delivered` is true the announce flow (or direct outbound diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index a4febc90ff6..e9d6ccff5e4 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -185,13 +185,14 @@ export function buildGatewayCronService(params: { deps: { ...params.deps, runtime: defaultRuntime }, }); }, - runIsolatedAgentJob: async ({ job, message }) => { + runIsolatedAgentJob: async ({ job, message, signal }) => { const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId); return await runCronIsolatedAgentTurn({ cfg: runtimeConfig, deps: params.deps, job, message, + signal, agentId, sessionKey: `cron:${job.id}`, lane: "cron",