mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-10 19:34:59 +00:00
fix(cron): move session reaper to finally block so it runs reliably (#31996)
* fix(cron): move session reaper to finally block so it runs reliably The cron session reaper was placed inside the try block of onTimer(), after job execution and state updates. If the locked persist section threw, the reaper was skipped — causing isolated cron run sessions to accumulate indefinitely in sessions.json. Move the reaper into the finally block so it always executes after a timer tick, regardless of whether job execution succeeded. The reaper is already self-throttled (MIN_SWEEP_INTERVAL_MS = 5 min) so calling it more reliably has no performance impact. Closes #31946 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: strengthen cron reaper failure-path coverage and changelog (#31996) (thanks @scoootscooob) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Peter Steinberger <steipete@gmail.com>
This commit is contained in:
165
src/cron/service.session-reaper-in-finally.test.ts
Normal file
165
src/cron/service.session-reaper-in-finally.test.ts
Normal file
@@ -0,0 +1,165 @@
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { createNoopLogger, createCronStoreHarness } from "./service.test-harness.js";
|
||||
import { createCronServiceState } from "./service/state.js";
|
||||
import { onTimer } from "./service/timer.js";
|
||||
import { resetReaperThrottle } from "./session-reaper.js";
|
||||
import type { CronJob } from "./types.js";
|
||||
|
||||
const noopLogger = createNoopLogger();
|
||||
const { makeStorePath } = createCronStoreHarness({
|
||||
prefix: "openclaw-cron-reaper-finally-",
|
||||
});
|
||||
|
||||
function createDueIsolatedJob(params: { id: string; nowMs: number }): CronJob {
|
||||
return {
|
||||
id: params.id,
|
||||
name: params.id,
|
||||
enabled: true,
|
||||
deleteAfterRun: false,
|
||||
createdAtMs: params.nowMs,
|
||||
updatedAtMs: params.nowMs,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "isolated",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "agentTurn", message: "test" },
|
||||
delivery: { mode: "none" },
|
||||
state: { nextRunAtMs: params.nowMs },
|
||||
};
|
||||
}
|
||||
|
||||
describe("CronService - session reaper runs in finally block (#31946)", () => {
|
||||
beforeEach(() => {
|
||||
noopLogger.debug.mockClear();
|
||||
noopLogger.info.mockClear();
|
||||
noopLogger.warn.mockClear();
|
||||
noopLogger.error.mockClear();
|
||||
resetReaperThrottle();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
it("session reaper runs even when job execution throws", async () => {
|
||||
const store = await makeStorePath();
|
||||
const now = Date.parse("2026-02-10T10:00:00.000Z");
|
||||
|
||||
// Write a store with a due job that will trigger execution.
|
||||
await fs.mkdir(path.dirname(store.storePath), { recursive: true });
|
||||
await fs.writeFile(
|
||||
store.storePath,
|
||||
JSON.stringify({
|
||||
version: 1,
|
||||
jobs: [createDueIsolatedJob({ id: "failing-job", nowMs: now })],
|
||||
}),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
// Create a mock sessionStorePath to track if the reaper is called.
|
||||
const sessionStorePath = path.join(path.dirname(store.storePath), "sessions", "sessions.json");
|
||||
|
||||
const state = createCronServiceState({
|
||||
storePath: store.storePath,
|
||||
cronEnabled: true,
|
||||
log: noopLogger,
|
||||
nowMs: () => now,
|
||||
enqueueSystemEvent: vi.fn(),
|
||||
requestHeartbeatNow: vi.fn(),
|
||||
// This will throw, simulating a failure during job execution.
|
||||
runIsolatedAgentJob: vi.fn().mockRejectedValue(new Error("gateway down")),
|
||||
sessionStorePath,
|
||||
});
|
||||
|
||||
await onTimer(state);
|
||||
|
||||
// After onTimer finishes (even with a job error), state.running must be
|
||||
// false — proving the finally block executed.
|
||||
expect(state.running).toBe(false);
|
||||
|
||||
// The timer must be re-armed.
|
||||
expect(state.timer).not.toBeNull();
|
||||
});
|
||||
|
||||
it("session reaper runs when resolveSessionStorePath is provided", async () => {
|
||||
const store = await makeStorePath();
|
||||
const now = Date.parse("2026-02-10T10:00:00.000Z");
|
||||
|
||||
await fs.mkdir(path.dirname(store.storePath), { recursive: true });
|
||||
await fs.writeFile(
|
||||
store.storePath,
|
||||
JSON.stringify({
|
||||
version: 1,
|
||||
jobs: [createDueIsolatedJob({ id: "ok-job", nowMs: now })],
|
||||
}),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
const resolvedPaths: string[] = [];
|
||||
const state = createCronServiceState({
|
||||
storePath: store.storePath,
|
||||
cronEnabled: true,
|
||||
log: noopLogger,
|
||||
nowMs: () => now,
|
||||
enqueueSystemEvent: vi.fn(),
|
||||
requestHeartbeatNow: vi.fn(),
|
||||
runIsolatedAgentJob: vi.fn().mockResolvedValue({ status: "ok", summary: "done" }),
|
||||
resolveSessionStorePath: (agentId) => {
|
||||
const p = path.join(path.dirname(store.storePath), `${agentId}-sessions`, "sessions.json");
|
||||
resolvedPaths.push(p);
|
||||
return p;
|
||||
},
|
||||
});
|
||||
|
||||
await onTimer(state);
|
||||
|
||||
// The resolveSessionStorePath callback should have been invoked to build
|
||||
// the set of store paths for the session reaper.
|
||||
expect(resolvedPaths.length).toBeGreaterThan(0);
|
||||
expect(state.running).toBe(false);
|
||||
});
|
||||
|
||||
it("prunes expired cron-run sessions even when cron store load throws", async () => {
|
||||
const store = await makeStorePath();
|
||||
const now = Date.parse("2026-02-10T10:00:00.000Z");
|
||||
const sessionStorePath = path.join(path.dirname(store.storePath), "sessions", "sessions.json");
|
||||
|
||||
// Force onTimer's try-block to throw before normal execution flow.
|
||||
await fs.mkdir(path.dirname(store.storePath), { recursive: true });
|
||||
await fs.writeFile(store.storePath, "{invalid-json", "utf-8");
|
||||
|
||||
// Seed an expired cron-run session entry that should be pruned by the reaper.
|
||||
await fs.mkdir(path.dirname(sessionStorePath), { recursive: true });
|
||||
await fs.writeFile(
|
||||
sessionStorePath,
|
||||
JSON.stringify({
|
||||
"agent:agent-default:cron:failing-job:run:stale": {
|
||||
sessionId: "session-stale",
|
||||
updatedAt: now - 3 * 24 * 3_600_000,
|
||||
},
|
||||
}),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
const state = createCronServiceState({
|
||||
storePath: store.storePath,
|
||||
cronEnabled: true,
|
||||
log: noopLogger,
|
||||
nowMs: () => now,
|
||||
enqueueSystemEvent: vi.fn(),
|
||||
requestHeartbeatNow: vi.fn(),
|
||||
runIsolatedAgentJob: vi.fn(),
|
||||
sessionStorePath,
|
||||
});
|
||||
|
||||
await expect(onTimer(state)).rejects.toThrow("Failed to parse cron store");
|
||||
|
||||
const updatedSessionStore = JSON.parse(await fs.readFile(sessionStorePath, "utf-8")) as Record<
|
||||
string,
|
||||
unknown
|
||||
>;
|
||||
expect(updatedSessionStore).toEqual({});
|
||||
expect(state.running).toBe(false);
|
||||
});
|
||||
});
|
||||
@@ -643,7 +643,11 @@ export async function onTimer(state: CronServiceState) {
|
||||
await persist(state);
|
||||
});
|
||||
}
|
||||
} finally {
|
||||
// Piggyback session reaper on timer tick (self-throttled to every 5 min).
|
||||
// Placed in `finally` so the reaper runs even when a long-running job keeps
|
||||
// `state.running` true across multiple timer ticks — the early return at the
|
||||
// top of onTimer would otherwise skip the reaper indefinitely.
|
||||
const storePaths = new Set<string>();
|
||||
if (state.deps.resolveSessionStorePath) {
|
||||
const defaultAgentId = state.deps.defaultAgentId ?? DEFAULT_AGENT_ID;
|
||||
@@ -675,7 +679,7 @@ export async function onTimer(state: CronServiceState) {
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
||||
state.running = false;
|
||||
armTimer(state);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user