mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-21 11:25:00 +00:00
fix(cron): cancel timed-out runs before side effects
This commit is contained in:
@@ -155,9 +155,17 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
job: CronJob;
|
||||
message: string;
|
||||
sessionKey: string;
|
||||
signal?: AbortSignal;
|
||||
agentId?: string;
|
||||
lane?: string;
|
||||
}): Promise<RunCronAgentTurnResult> {
|
||||
const isAborted = () => params.signal?.aborted === true;
|
||||
const abortReason = () => {
|
||||
const reason = params.signal?.reason;
|
||||
return typeof reason === "string" && reason.trim()
|
||||
? reason.trim()
|
||||
: "cron: job execution timed out";
|
||||
};
|
||||
const isFastTestEnv = process.env.OPENCLAW_TEST_FAST === "1";
|
||||
const defaultAgentId = resolveDefaultAgentId(params.cfg);
|
||||
const requestedAgentId =
|
||||
@@ -503,6 +511,10 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
return withRunSession({ status: "error", error: String(err) });
|
||||
}
|
||||
|
||||
if (isAborted()) {
|
||||
return withRunSession({ status: "error", error: abortReason() });
|
||||
}
|
||||
|
||||
const payloads = runResult.payloads ?? [];
|
||||
|
||||
// Update token+model fields in the session store.
|
||||
@@ -556,6 +568,10 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
}
|
||||
await persistSessionEntry();
|
||||
}
|
||||
|
||||
if (isAborted()) {
|
||||
return withRunSession({ status: "error", error: abortReason(), ...telemetry });
|
||||
}
|
||||
const firstText = payloads[0]?.text ?? "";
|
||||
let summary = pickSummaryFromPayloads(payloads) ?? pickSummaryFromOutput(firstText);
|
||||
let outputText = pickLastNonEmptyTextFromPayloads(payloads);
|
||||
|
||||
@@ -683,6 +683,58 @@ describe("Cron issue regressions", () => {
|
||||
expect(job?.state.lastStatus).toBe("ok");
|
||||
});
|
||||
|
||||
it("suppresses isolated follow-up side effects after timeout", async () => {
|
||||
vi.useRealTimers();
|
||||
const store = await makeStorePath();
|
||||
const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z");
|
||||
const enqueueSystemEvent = vi.fn();
|
||||
|
||||
const cronJob = createIsolatedRegressionJob({
|
||||
id: "timeout-side-effects",
|
||||
name: "timeout side effects",
|
||||
scheduledAt,
|
||||
schedule: { kind: "every", everyMs: 60_000, anchorMs: scheduledAt },
|
||||
payload: { kind: "agentTurn", message: "work", timeoutSeconds: 0.01 },
|
||||
state: { nextRunAtMs: scheduledAt },
|
||||
});
|
||||
await writeCronJobs(store.storePath, [cronJob]);
|
||||
|
||||
let now = scheduledAt;
|
||||
const state = createCronServiceState({
|
||||
cronEnabled: true,
|
||||
storePath: store.storePath,
|
||||
log: noopLogger,
|
||||
nowMs: () => now,
|
||||
enqueueSystemEvent,
|
||||
requestHeartbeatNow: vi.fn(),
|
||||
runIsolatedAgentJob: vi.fn(async (params: { signal?: AbortSignal }) => {
|
||||
const signal = params.signal;
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const onAbort = () => {
|
||||
signal?.removeEventListener("abort", onAbort);
|
||||
now += 100;
|
||||
reject(new Error("aborted"));
|
||||
};
|
||||
signal?.addEventListener("abort", onAbort, { once: true });
|
||||
});
|
||||
return {
|
||||
status: "ok" as const,
|
||||
summary: "late-summary",
|
||||
delivered: false,
|
||||
error: signal?.aborted && typeof signal.reason === "string" ? signal.reason : undefined,
|
||||
};
|
||||
}),
|
||||
});
|
||||
|
||||
const timerPromise = onTimer(state);
|
||||
await timerPromise;
|
||||
|
||||
const jobAfterTimeout = state.store?.jobs.find((j) => j.id === "timeout-side-effects");
|
||||
expect(jobAfterTimeout?.state.lastStatus).toBe("error");
|
||||
expect(jobAfterTimeout?.state.lastError).toContain("timed out");
|
||||
expect(enqueueSystemEvent).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("retries cron schedule computation from the next second when the first attempt returns undefined (#17821)", () => {
|
||||
const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z");
|
||||
const cronJob = createIsolatedRegressionJob({
|
||||
|
||||
@@ -61,7 +61,7 @@ export type CronServiceDeps = {
|
||||
wakeNowHeartbeatBusyMaxWaitMs?: number;
|
||||
/** WakeMode=now: delay between runHeartbeatOnce retries while busy. */
|
||||
wakeNowHeartbeatBusyRetryDelayMs?: number;
|
||||
runIsolatedAgentJob: (params: { job: CronJob; message: string }) => Promise<
|
||||
runIsolatedAgentJob: (params: { job: CronJob; message: string; signal?: AbortSignal }) => Promise<
|
||||
{
|
||||
summary?: string;
|
||||
/** Last non-empty agent text output (not truncated). */
|
||||
|
||||
@@ -38,6 +38,17 @@ type TimedCronRunOutcome = CronRunOutcome &
|
||||
endedAt: number;
|
||||
};
|
||||
|
||||
function timeoutErrorMessage(): string {
|
||||
return "cron: job execution timed out";
|
||||
}
|
||||
|
||||
function isAbortError(err: unknown): boolean {
|
||||
if (!(err instanceof Error)) {
|
||||
return false;
|
||||
}
|
||||
return err.name === "AbortError" || err.message === timeoutErrorMessage();
|
||||
}
|
||||
|
||||
/**
|
||||
* Exponential backoff delays (in ms) indexed by consecutive error count.
|
||||
* After the last entry the delay stays constant.
|
||||
@@ -258,15 +269,16 @@ export async function onTimer(state: CronServiceState) {
|
||||
const result =
|
||||
typeof jobTimeoutMs === "number"
|
||||
? await (async () => {
|
||||
const timeoutController = new AbortController();
|
||||
let timeoutId: NodeJS.Timeout | undefined;
|
||||
try {
|
||||
return await Promise.race([
|
||||
executeJobCore(state, job),
|
||||
executeJobCore(state, job, timeoutController.signal),
|
||||
new Promise<never>((_, reject) => {
|
||||
timeoutId = setTimeout(
|
||||
() => reject(new Error("cron: job execution timed out")),
|
||||
jobTimeoutMs,
|
||||
);
|
||||
timeoutId = setTimeout(() => {
|
||||
timeoutController.abort(timeoutErrorMessage());
|
||||
reject(new Error(timeoutErrorMessage()));
|
||||
}, jobTimeoutMs);
|
||||
}),
|
||||
]);
|
||||
} finally {
|
||||
@@ -278,14 +290,15 @@ export async function onTimer(state: CronServiceState) {
|
||||
: await executeJobCore(state, job);
|
||||
results.push({ jobId: id, ...result, startedAt, endedAt: state.deps.nowMs() });
|
||||
} catch (err) {
|
||||
const errorText = isAbortError(err) ? timeoutErrorMessage() : String(err);
|
||||
state.deps.log.warn(
|
||||
{ jobId: id, jobName: job.name, timeoutMs: jobTimeoutMs ?? null },
|
||||
`cron: job failed: ${String(err)}`,
|
||||
`cron: job failed: ${errorText}`,
|
||||
);
|
||||
results.push({
|
||||
jobId: id,
|
||||
status: "error",
|
||||
error: String(err),
|
||||
error: errorText,
|
||||
startedAt,
|
||||
endedAt: state.deps.nowMs(),
|
||||
});
|
||||
@@ -455,7 +468,11 @@ export async function runDueJobs(state: CronServiceState) {
|
||||
async function executeJobCore(
|
||||
state: CronServiceState,
|
||||
job: CronJob,
|
||||
signal?: AbortSignal,
|
||||
): Promise<CronRunOutcome & CronRunTelemetry> {
|
||||
if (signal?.aborted) {
|
||||
return { status: "error", error: timeoutErrorMessage() };
|
||||
}
|
||||
if (job.sessionTarget === "main") {
|
||||
const text = resolveJobPayloadTextForMain(job);
|
||||
if (!text) {
|
||||
@@ -482,6 +499,9 @@ async function executeJobCore(
|
||||
|
||||
let heartbeatResult: HeartbeatRunResult;
|
||||
for (;;) {
|
||||
if (signal?.aborted) {
|
||||
return { status: "error", error: timeoutErrorMessage() };
|
||||
}
|
||||
heartbeatResult = await state.deps.runHeartbeatOnce({
|
||||
reason,
|
||||
agentId: job.agentId,
|
||||
@@ -528,8 +548,13 @@ async function executeJobCore(
|
||||
const res = await state.deps.runIsolatedAgentJob({
|
||||
job,
|
||||
message: job.payload.message,
|
||||
signal,
|
||||
});
|
||||
|
||||
if (signal?.aborted) {
|
||||
return { status: "error", error: timeoutErrorMessage() };
|
||||
}
|
||||
|
||||
// Post a short summary back to the main session — but only when the
|
||||
// isolated run did NOT already deliver its output to the target channel.
|
||||
// When `res.delivered` is true the announce flow (or direct outbound
|
||||
|
||||
@@ -185,13 +185,14 @@ export function buildGatewayCronService(params: {
|
||||
deps: { ...params.deps, runtime: defaultRuntime },
|
||||
});
|
||||
},
|
||||
runIsolatedAgentJob: async ({ job, message }) => {
|
||||
runIsolatedAgentJob: async ({ job, message, signal }) => {
|
||||
const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId);
|
||||
return await runCronIsolatedAgentTurn({
|
||||
cfg: runtimeConfig,
|
||||
deps: params.deps,
|
||||
job,
|
||||
message,
|
||||
signal,
|
||||
agentId,
|
||||
sessionKey: `cron:${job.id}`,
|
||||
lane: "cron",
|
||||
|
||||
Reference in New Issue
Block a user