refactor(test): dedupe cron service test harness setup

This commit is contained in:
Peter Steinberger
2026-02-16 22:30:16 +00:00
parent 21e5c0ce57
commit 11f3da7669

View File

@@ -238,45 +238,74 @@ function createCronEventHarness() {
return { onEvent, waitFor, events }; return { onEvent, waitFor, events };
} }
async function createMainOneShotHarness() { type CronHarnessOptions = {
runIsolatedAgentJob?: ReturnType<typeof vi.fn>;
runHeartbeatOnce?: ReturnType<typeof vi.fn>;
nowMs?: () => number;
wakeNowHeartbeatBusyMaxWaitMs?: number;
wakeNowHeartbeatBusyRetryDelayMs?: number;
withEvents?: boolean;
};
async function createCronHarness(options: CronHarnessOptions = {}) {
ensureDir(fixturesRoot); ensureDir(fixturesRoot);
const store = await makeStorePath(); const store = await makeStorePath();
const enqueueSystemEvent = vi.fn(); const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn(); const requestHeartbeatNow = vi.fn();
const events = createCronEventHarness(); const events = options.withEvents === false ? undefined : createCronEventHarness();
const cron = new CronService({ const cron = new CronService({
storePath: store.storePath, storePath: store.storePath,
cronEnabled: true, cronEnabled: true,
log: noopLogger, log: noopLogger,
...(options.nowMs ? { nowMs: options.nowMs } : {}),
...(options.wakeNowHeartbeatBusyMaxWaitMs !== undefined
? { wakeNowHeartbeatBusyMaxWaitMs: options.wakeNowHeartbeatBusyMaxWaitMs }
: {}),
...(options.wakeNowHeartbeatBusyRetryDelayMs !== undefined
? { wakeNowHeartbeatBusyRetryDelayMs: options.wakeNowHeartbeatBusyRetryDelayMs }
: {}),
enqueueSystemEvent, enqueueSystemEvent,
requestHeartbeatNow, requestHeartbeatNow,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), ...(options.runHeartbeatOnce ? { runHeartbeatOnce: options.runHeartbeatOnce } : {}),
onEvent: events.onEvent, runIsolatedAgentJob: options.runIsolatedAgentJob ?? vi.fn(async () => ({ status: "ok" })),
...(events ? { onEvent: events.onEvent } : {}),
}); });
await cron.start(); await cron.start();
return { store, cron, enqueueSystemEvent, requestHeartbeatNow, events }; return { store, cron, enqueueSystemEvent, requestHeartbeatNow, events };
} }
async function createMainOneShotHarness() {
const harness = await createCronHarness();
if (!harness.events) {
throw new Error("missing event harness");
}
return { ...harness, events: harness.events };
}
async function createIsolatedAnnounceHarness(runIsolatedAgentJob: ReturnType<typeof vi.fn>) { async function createIsolatedAnnounceHarness(runIsolatedAgentJob: ReturnType<typeof vi.fn>) {
ensureDir(fixturesRoot); const harness = await createCronHarness({
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const events = createCronEventHarness();
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob, runIsolatedAgentJob,
onEvent: events.onEvent,
}); });
if (!harness.events) {
throw new Error("missing event harness");
}
return { ...harness, events: harness.events };
}
await cron.start(); async function createWakeModeNowMainHarness(options: {
return { store, cron, enqueueSystemEvent, requestHeartbeatNow, events }; nowMs?: () => number;
runHeartbeatOnce: ReturnType<typeof vi.fn>;
wakeNowHeartbeatBusyMaxWaitMs?: number;
wakeNowHeartbeatBusyRetryDelayMs?: number;
}) {
return createCronHarness({
runHeartbeatOnce: options.runHeartbeatOnce,
nowMs: options.nowMs,
wakeNowHeartbeatBusyMaxWaitMs: options.wakeNowHeartbeatBusyMaxWaitMs,
wakeNowHeartbeatBusyRetryDelayMs: options.wakeNowHeartbeatBusyRetryDelayMs,
withEvents: false,
});
} }
async function addDefaultIsolatedAnnounceJob(cron: CronService, name: string) { async function addDefaultIsolatedAnnounceJob(cron: CronService, name: string) {
@@ -293,6 +322,60 @@ async function addDefaultIsolatedAnnounceJob(cron: CronService, name: string) {
return { job, runAt }; return { job, runAt };
} }
async function runIsolatedAnnounceJobAndWait(params: {
cron: CronService;
events: ReturnType<typeof createCronEventHarness>;
name: string;
status: "ok" | "error";
}) {
const { job, runAt } = await addDefaultIsolatedAnnounceJob(params.cron, params.name);
vi.setSystemTime(runAt);
await vi.runOnlyPendingTimersAsync();
await params.events.waitFor(
(evt) => evt.jobId === job.id && evt.action === "finished" && evt.status === params.status,
);
return job;
}
async function addWakeModeNowMainSystemEventJob(
cron: CronService,
options?: { name?: string; agentId?: string },
) {
return cron.add({
name: options?.name ?? "wakeMode now",
...(options?.agentId ? { agentId: options.agentId } : {}),
enabled: true,
schedule: { kind: "at", at: new Date(1).toISOString() },
sessionTarget: "main",
wakeMode: "now",
payload: { kind: "systemEvent", text: "hello" },
});
}
function createLegacyDeliveryMigrationJob(options: {
id: string;
payload: { provider?: string; channel?: string };
}) {
return {
id: options.id,
name: "legacy",
enabled: true,
createdAtMs: Date.now(),
updatedAtMs: Date.now(),
schedule: { kind: "cron", expr: "* * * * *" },
sessionTarget: "isolated",
wakeMode: "now",
payload: {
kind: "agentTurn",
message: "hi",
deliver: true,
...options.payload,
to: "7200373102",
},
state: {},
};
}
async function loadLegacyDeliveryMigration(rawJob: Record<string, unknown>) { async function loadLegacyDeliveryMigration(rawJob: Record<string, unknown>) {
ensureDir(fixturesRoot); ensureDir(fixturesRoot);
const store = await makeStorePath(); const store = await makeStorePath();
@@ -377,11 +460,6 @@ describe("CronService", () => {
}); });
it("wakeMode now waits for heartbeat completion when available", async () => { it("wakeMode now waits for heartbeat completion when available", async () => {
ensureDir(fixturesRoot);
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
let now = 0; let now = 0;
const nowMs = () => { const nowMs = () => {
now += 10; now += 10;
@@ -396,26 +474,12 @@ describe("CronService", () => {
}), }),
); );
const cron = new CronService({ const { store, cron, enqueueSystemEvent, requestHeartbeatNow } =
storePath: store.storePath, await createWakeModeNowMainHarness({
cronEnabled: true, runHeartbeatOnce,
log: noopLogger, nowMs,
nowMs, });
enqueueSystemEvent, const job = await addWakeModeNowMainSystemEventJob(cron, { name: "wakeMode now waits" });
requestHeartbeatNow,
runHeartbeatOnce,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
});
await cron.start();
const job = await cron.add({
name: "wakeMode now waits",
enabled: true,
schedule: { kind: "at", at: new Date(1).toISOString() },
sessionTarget: "main",
wakeMode: "now",
payload: { kind: "systemEvent", text: "hello" },
});
const runPromise = cron.run(job.id, "force"); const runPromise = cron.run(job.id, "force");
for (let i = 0; i < 10; i++) { for (let i = 0; i < 10; i++) {
@@ -445,34 +509,19 @@ describe("CronService", () => {
}); });
it("passes agentId to runHeartbeatOnce for main-session wakeMode now jobs", async () => { it("passes agentId to runHeartbeatOnce for main-session wakeMode now jobs", async () => {
ensureDir(fixturesRoot);
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const runHeartbeatOnce = vi.fn(async () => ({ status: "ran" as const, durationMs: 1 })); const runHeartbeatOnce = vi.fn(async () => ({ status: "ran" as const, durationMs: 1 }));
const cron = new CronService({ const { store, cron, enqueueSystemEvent, requestHeartbeatNow } =
storePath: store.storePath, await createWakeModeNowMainHarness({
cronEnabled: true, runHeartbeatOnce,
log: noopLogger, // Perf: avoid advancing fake timers by 2+ minutes for the busy-heartbeat fallback.
// Perf: avoid advancing fake timers by 2+ minutes for the busy-heartbeat fallback. wakeNowHeartbeatBusyMaxWaitMs: 1,
wakeNowHeartbeatBusyMaxWaitMs: 1, wakeNowHeartbeatBusyRetryDelayMs: 2,
wakeNowHeartbeatBusyRetryDelayMs: 2, });
enqueueSystemEvent,
requestHeartbeatNow,
runHeartbeatOnce,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
});
await cron.start(); const job = await addWakeModeNowMainSystemEventJob(cron, {
const job = await cron.add({
name: "wakeMode now with agent", name: "wakeMode now with agent",
agentId: "ops", agentId: "ops",
enabled: true,
schedule: { kind: "at", at: new Date(1).toISOString() },
sessionTarget: "main",
wakeMode: "now",
payload: { kind: "systemEvent", text: "hello" },
}); });
await cron.run(job.id, "force"); await cron.run(job.id, "force");
@@ -495,10 +544,6 @@ describe("CronService", () => {
}); });
it("wakeMode now falls back to queued heartbeat when main lane stays busy", async () => { it("wakeMode now falls back to queued heartbeat when main lane stays busy", async () => {
ensureDir(fixturesRoot);
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const runHeartbeatOnce = vi.fn(async () => ({ const runHeartbeatOnce = vi.fn(async () => ({
status: "skipped" as const, status: "skipped" as const,
reason: "requests-in-flight", reason: "requests-in-flight",
@@ -509,29 +554,15 @@ describe("CronService", () => {
return now; return now;
}; };
const cron = new CronService({ const { store, cron, requestHeartbeatNow } = await createWakeModeNowMainHarness({
storePath: store.storePath, runHeartbeatOnce,
cronEnabled: true,
log: noopLogger,
nowMs, nowMs,
// Perf: avoid advancing fake timers by 2+ minutes for the busy-heartbeat fallback. // Perf: avoid advancing fake timers by 2+ minutes for the busy-heartbeat fallback.
wakeNowHeartbeatBusyMaxWaitMs: 1, wakeNowHeartbeatBusyMaxWaitMs: 1,
wakeNowHeartbeatBusyRetryDelayMs: 2, wakeNowHeartbeatBusyRetryDelayMs: 2,
enqueueSystemEvent,
requestHeartbeatNow,
runHeartbeatOnce,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
}); });
await cron.start(); const job = await addWakeModeNowMainSystemEventJob(cron, { name: "wakeMode now fallback" });
const job = await cron.add({
name: "wakeMode now fallback",
enabled: true,
schedule: { kind: "at", at: new Date(1).toISOString() },
sessionTarget: "main",
wakeMode: "now",
payload: { kind: "systemEvent", text: "hello" },
});
await cron.run(job.id, "force"); await cron.run(job.id, "force");
@@ -549,14 +580,7 @@ describe("CronService", () => {
const runIsolatedAgentJob = vi.fn(async () => ({ status: "ok" as const, summary: "done" })); const runIsolatedAgentJob = vi.fn(async () => ({ status: "ok" as const, summary: "done" }));
const { store, cron, enqueueSystemEvent, requestHeartbeatNow, events } = const { store, cron, enqueueSystemEvent, requestHeartbeatNow, events } =
await createIsolatedAnnounceHarness(runIsolatedAgentJob); await createIsolatedAnnounceHarness(runIsolatedAgentJob);
const { job, runAt } = await addDefaultIsolatedAnnounceJob(cron, "weekly"); await runIsolatedAnnounceJobAndWait({ cron, events, name: "weekly", status: "ok" });
vi.setSystemTime(runAt);
await vi.runOnlyPendingTimersAsync();
await events.waitFor(
(evt) => evt.jobId === job.id && evt.action === "finished" && evt.status === "ok",
);
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1); expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1);
expect(enqueueSystemEvent).toHaveBeenCalledWith( expect(enqueueSystemEvent).toHaveBeenCalledWith(
"Cron: done", "Cron: done",
@@ -575,14 +599,12 @@ describe("CronService", () => {
})); }));
const { store, cron, enqueueSystemEvent, requestHeartbeatNow, events } = const { store, cron, enqueueSystemEvent, requestHeartbeatNow, events } =
await createIsolatedAnnounceHarness(runIsolatedAgentJob); await createIsolatedAnnounceHarness(runIsolatedAgentJob);
await runIsolatedAnnounceJobAndWait({
const { job, runAt } = await addDefaultIsolatedAnnounceJob(cron, "weekly delivered"); cron,
vi.setSystemTime(runAt); events,
await vi.runOnlyPendingTimersAsync(); name: "weekly delivered",
status: "ok",
await events.waitFor( });
(evt) => evt.jobId === job.id && evt.action === "finished" && evt.status === "ok",
);
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1); expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1);
expect(enqueueSystemEvent).not.toHaveBeenCalled(); expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled(); expect(requestHeartbeatNow).not.toHaveBeenCalled();
@@ -591,24 +613,10 @@ describe("CronService", () => {
}); });
it("migrates legacy payload.provider to payload.channel on load", async () => { it("migrates legacy payload.provider to payload.channel on load", async () => {
const rawJob = { const rawJob = createLegacyDeliveryMigrationJob({
id: "legacy-1", id: "legacy-1",
name: "legacy", payload: { provider: " TeLeGrAm " },
enabled: true, });
createdAtMs: Date.now(),
updatedAtMs: Date.now(),
schedule: { kind: "cron", expr: "* * * * *" },
sessionTarget: "isolated",
wakeMode: "now",
payload: {
kind: "agentTurn",
message: "hi",
deliver: true,
provider: " TeLeGrAm ",
to: "7200373102",
},
state: {},
};
const { store, cron, job } = await loadLegacyDeliveryMigration(rawJob); const { store, cron, job } = await loadLegacyDeliveryMigration(rawJob);
// Legacy delivery fields are migrated to the top-level delivery object // Legacy delivery fields are migrated to the top-level delivery object
const delivery = job?.delivery as unknown as Record<string, unknown>; const delivery = job?.delivery as unknown as Record<string, unknown>;
@@ -622,24 +630,10 @@ describe("CronService", () => {
}); });
it("canonicalizes payload.channel casing on load", async () => { it("canonicalizes payload.channel casing on load", async () => {
const rawJob = { const rawJob = createLegacyDeliveryMigrationJob({
id: "legacy-2", id: "legacy-2",
name: "legacy", payload: { channel: "Telegram" },
enabled: true, });
createdAtMs: Date.now(),
updatedAtMs: Date.now(),
schedule: { kind: "cron", expr: "* * * * *" },
sessionTarget: "isolated",
wakeMode: "now",
payload: {
kind: "agentTurn",
message: "hi",
deliver: true,
channel: "Telegram",
to: "7200373102",
},
state: {},
};
const { store, cron, job } = await loadLegacyDeliveryMigration(rawJob); const { store, cron, job } = await loadLegacyDeliveryMigration(rawJob);
// Legacy delivery fields are migrated to the top-level delivery object // Legacy delivery fields are migrated to the top-level delivery object
const delivery = job?.delivery as unknown as Record<string, unknown>; const delivery = job?.delivery as unknown as Record<string, unknown>;
@@ -657,13 +651,12 @@ describe("CronService", () => {
})); }));
const { store, cron, enqueueSystemEvent, requestHeartbeatNow, events } = const { store, cron, enqueueSystemEvent, requestHeartbeatNow, events } =
await createIsolatedAnnounceHarness(runIsolatedAgentJob); await createIsolatedAnnounceHarness(runIsolatedAgentJob);
const { job, runAt } = await addDefaultIsolatedAnnounceJob(cron, "isolated error test"); await runIsolatedAnnounceJobAndWait({
cron,
vi.setSystemTime(runAt); events,
await vi.runOnlyPendingTimersAsync(); name: "isolated error test",
await events.waitFor( status: "error",
(evt) => evt.jobId === job.id && evt.action === "finished" && evt.status === "error", });
);
expect(enqueueSystemEvent).toHaveBeenCalledWith( expect(enqueueSystemEvent).toHaveBeenCalledWith(
"Cron (error): last output", "Cron (error): last output",