mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-08 12:41:23 +00:00
test(cron): dedupe delivered-status run scaffolding
This commit is contained in:
@@ -1,6 +1,7 @@
|
|||||||
import { describe, expect, it, vi } from "vitest";
|
import { describe, expect, it, vi } from "vitest";
|
||||||
import { CronService } from "./service.js";
|
import { CronService } from "./service.js";
|
||||||
import {
|
import {
|
||||||
|
createFinishedBarrier,
|
||||||
createStartedCronServiceWithFinishedBarrier,
|
createStartedCronServiceWithFinishedBarrier,
|
||||||
createCronStoreHarness,
|
createCronStoreHarness,
|
||||||
createNoopLogger,
|
createNoopLogger,
|
||||||
@@ -11,107 +12,125 @@ const noopLogger = createNoopLogger();
|
|||||||
const { makeStorePath } = createCronStoreHarness();
|
const { makeStorePath } = createCronStoreHarness();
|
||||||
installCronTestHooks({ logger: noopLogger });
|
installCronTestHooks({ logger: noopLogger });
|
||||||
|
|
||||||
|
type CronAddInput = Parameters<CronService["add"]>[0];
|
||||||
|
|
||||||
|
function buildIsolatedAgentTurnJob(name: string): CronAddInput {
|
||||||
|
return {
|
||||||
|
name,
|
||||||
|
enabled: true,
|
||||||
|
schedule: { kind: "every", everyMs: 60_000 },
|
||||||
|
sessionTarget: "isolated",
|
||||||
|
wakeMode: "next-heartbeat",
|
||||||
|
payload: { kind: "agentTurn", message: "test" },
|
||||||
|
delivery: { mode: "none" },
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function buildMainSessionSystemEventJob(name: string): CronAddInput {
|
||||||
|
return {
|
||||||
|
name,
|
||||||
|
enabled: true,
|
||||||
|
schedule: { kind: "every", everyMs: 60_000 },
|
||||||
|
sessionTarget: "main",
|
||||||
|
wakeMode: "next-heartbeat",
|
||||||
|
payload: { kind: "systemEvent", text: "tick" },
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function createIsolatedCronWithFinishedBarrier(params: {
|
||||||
|
storePath: string;
|
||||||
|
delivered?: boolean;
|
||||||
|
onFinished?: (evt: { jobId: string; delivered?: boolean }) => void;
|
||||||
|
}) {
|
||||||
|
const finished = createFinishedBarrier();
|
||||||
|
const cron = new CronService({
|
||||||
|
storePath: params.storePath,
|
||||||
|
cronEnabled: true,
|
||||||
|
log: noopLogger,
|
||||||
|
enqueueSystemEvent: vi.fn(),
|
||||||
|
requestHeartbeatNow: vi.fn(),
|
||||||
|
runIsolatedAgentJob: vi.fn(async () => ({
|
||||||
|
status: "ok" as const,
|
||||||
|
summary: "done",
|
||||||
|
...(params.delivered === undefined ? {} : { delivered: params.delivered }),
|
||||||
|
})),
|
||||||
|
onEvent: (evt) => {
|
||||||
|
if (evt.action === "finished") {
|
||||||
|
params.onFinished?.({ jobId: evt.jobId, delivered: evt.delivered });
|
||||||
|
}
|
||||||
|
finished.onEvent(evt);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
return { cron, finished };
|
||||||
|
}
|
||||||
|
|
||||||
|
async function runSingleJobAndReadState(params: {
|
||||||
|
cron: CronService;
|
||||||
|
finished: ReturnType<typeof createFinishedBarrier>;
|
||||||
|
job: CronAddInput;
|
||||||
|
}) {
|
||||||
|
const job = await params.cron.add(params.job);
|
||||||
|
vi.setSystemTime(new Date(job.state.nextRunAtMs! + 5));
|
||||||
|
await vi.runOnlyPendingTimersAsync();
|
||||||
|
await params.finished.waitForOk(job.id);
|
||||||
|
|
||||||
|
const jobs = await params.cron.list({ includeDisabled: true });
|
||||||
|
return { job, updated: jobs.find((entry) => entry.id === job.id) };
|
||||||
|
}
|
||||||
|
|
||||||
describe("CronService persists delivered status", () => {
|
describe("CronService persists delivered status", () => {
|
||||||
it("persists lastDelivered=true when isolated job reports delivered", async () => {
|
it("persists lastDelivered=true when isolated job reports delivered", async () => {
|
||||||
const store = await makeStorePath();
|
const store = await makeStorePath();
|
||||||
const finished = {
|
const { cron, finished } = createIsolatedCronWithFinishedBarrier({
|
||||||
resolvers: new Map<string, () => void>(),
|
|
||||||
waitForOk(jobId: string) {
|
|
||||||
return new Promise<void>((resolve) => {
|
|
||||||
this.resolvers.set(jobId, resolve);
|
|
||||||
});
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
const cron = new CronService({
|
|
||||||
storePath: store.storePath,
|
storePath: store.storePath,
|
||||||
cronEnabled: true,
|
delivered: true,
|
||||||
log: noopLogger,
|
|
||||||
enqueueSystemEvent: vi.fn(),
|
|
||||||
requestHeartbeatNow: vi.fn(),
|
|
||||||
runIsolatedAgentJob: vi.fn(async () => ({
|
|
||||||
status: "ok" as const,
|
|
||||||
summary: "done",
|
|
||||||
delivered: true,
|
|
||||||
})),
|
|
||||||
onEvent: (evt) => {
|
|
||||||
if (evt.action === "finished" && evt.status === "ok") {
|
|
||||||
finished.resolvers.get(evt.jobId)?.();
|
|
||||||
finished.resolvers.delete(evt.jobId);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
});
|
});
|
||||||
|
|
||||||
await cron.start();
|
await cron.start();
|
||||||
const job = await cron.add({
|
const { updated } = await runSingleJobAndReadState({
|
||||||
name: "delivered-true",
|
cron,
|
||||||
enabled: true,
|
finished,
|
||||||
schedule: { kind: "every", everyMs: 60_000 },
|
job: buildIsolatedAgentTurnJob("delivered-true"),
|
||||||
sessionTarget: "isolated",
|
|
||||||
wakeMode: "next-heartbeat",
|
|
||||||
payload: { kind: "agentTurn", message: "test" },
|
|
||||||
delivery: { mode: "none" },
|
|
||||||
});
|
});
|
||||||
|
|
||||||
vi.setSystemTime(new Date(job.state.nextRunAtMs! + 5));
|
|
||||||
await vi.runOnlyPendingTimersAsync();
|
|
||||||
await finished.waitForOk(job.id);
|
|
||||||
|
|
||||||
const jobs = await cron.list({ includeDisabled: true });
|
|
||||||
const updated = jobs.find((j) => j.id === job.id);
|
|
||||||
|
|
||||||
expect(updated?.state.lastStatus).toBe("ok");
|
expect(updated?.state.lastStatus).toBe("ok");
|
||||||
expect(updated?.state.lastDelivered).toBe(true);
|
expect(updated?.state.lastDelivered).toBe(true);
|
||||||
|
|
||||||
cron.stop();
|
cron.stop();
|
||||||
});
|
});
|
||||||
|
|
||||||
it("persists lastDelivered=undefined when isolated job does not deliver", async () => {
|
it("persists lastDelivered=false when isolated job explicitly reports not delivered", async () => {
|
||||||
const store = await makeStorePath();
|
const store = await makeStorePath();
|
||||||
const finished = {
|
const { cron, finished } = createIsolatedCronWithFinishedBarrier({
|
||||||
resolvers: new Map<string, () => void>(),
|
|
||||||
waitForOk(jobId: string) {
|
|
||||||
return new Promise<void>((resolve) => {
|
|
||||||
this.resolvers.set(jobId, resolve);
|
|
||||||
});
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
const cron = new CronService({
|
|
||||||
storePath: store.storePath,
|
storePath: store.storePath,
|
||||||
cronEnabled: true,
|
delivered: false,
|
||||||
log: noopLogger,
|
|
||||||
enqueueSystemEvent: vi.fn(),
|
|
||||||
requestHeartbeatNow: vi.fn(),
|
|
||||||
runIsolatedAgentJob: vi.fn(async () => ({
|
|
||||||
status: "ok" as const,
|
|
||||||
summary: "done",
|
|
||||||
})),
|
|
||||||
onEvent: (evt) => {
|
|
||||||
if (evt.action === "finished" && evt.status === "ok") {
|
|
||||||
finished.resolvers.get(evt.jobId)?.();
|
|
||||||
finished.resolvers.delete(evt.jobId);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
});
|
});
|
||||||
|
|
||||||
await cron.start();
|
await cron.start();
|
||||||
const job = await cron.add({
|
const { updated } = await runSingleJobAndReadState({
|
||||||
name: "no-delivery",
|
cron,
|
||||||
enabled: true,
|
finished,
|
||||||
schedule: { kind: "every", everyMs: 60_000 },
|
job: buildIsolatedAgentTurnJob("delivered-false"),
|
||||||
sessionTarget: "isolated",
|
|
||||||
wakeMode: "next-heartbeat",
|
|
||||||
payload: { kind: "agentTurn", message: "test" },
|
|
||||||
delivery: { mode: "none" },
|
|
||||||
});
|
});
|
||||||
|
|
||||||
vi.setSystemTime(new Date(job.state.nextRunAtMs! + 5));
|
expect(updated?.state.lastStatus).toBe("ok");
|
||||||
await vi.runOnlyPendingTimersAsync();
|
expect(updated?.state.lastDelivered).toBe(false);
|
||||||
await finished.waitForOk(job.id);
|
|
||||||
|
|
||||||
const jobs = await cron.list({ includeDisabled: true });
|
cron.stop();
|
||||||
const updated = jobs.find((j) => j.id === job.id);
|
});
|
||||||
|
|
||||||
|
it("persists lastDelivered=undefined when isolated job does not deliver", async () => {
|
||||||
|
const store = await makeStorePath();
|
||||||
|
const { cron, finished } = createIsolatedCronWithFinishedBarrier({
|
||||||
|
storePath: store.storePath,
|
||||||
|
});
|
||||||
|
|
||||||
|
await cron.start();
|
||||||
|
const { updated } = await runSingleJobAndReadState({
|
||||||
|
cron,
|
||||||
|
finished,
|
||||||
|
job: buildIsolatedAgentTurnJob("no-delivery"),
|
||||||
|
});
|
||||||
|
|
||||||
expect(updated?.state.lastStatus).toBe("ok");
|
expect(updated?.state.lastStatus).toBe("ok");
|
||||||
expect(updated?.state.lastDelivered).toBeUndefined();
|
expect(updated?.state.lastDelivered).toBeUndefined();
|
||||||
@@ -127,22 +146,12 @@ describe("CronService persists delivered status", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
await cron.start();
|
await cron.start();
|
||||||
const job = await cron.add({
|
const { updated } = await runSingleJobAndReadState({
|
||||||
name: "main-session",
|
cron,
|
||||||
enabled: true,
|
finished,
|
||||||
schedule: { kind: "every", everyMs: 60_000 },
|
job: buildMainSessionSystemEventJob("main-session"),
|
||||||
sessionTarget: "main",
|
|
||||||
wakeMode: "next-heartbeat",
|
|
||||||
payload: { kind: "systemEvent", text: "tick" },
|
|
||||||
});
|
});
|
||||||
|
|
||||||
vi.setSystemTime(new Date(job.state.nextRunAtMs! + 5));
|
|
||||||
await vi.runOnlyPendingTimersAsync();
|
|
||||||
await finished.waitForOk(job.id);
|
|
||||||
|
|
||||||
const jobs = await cron.list({ includeDisabled: true });
|
|
||||||
const updated = jobs.find((j) => j.id === job.id);
|
|
||||||
|
|
||||||
expect(updated?.state.lastStatus).toBe("ok");
|
expect(updated?.state.lastStatus).toBe("ok");
|
||||||
expect(updated?.state.lastDelivered).toBeUndefined();
|
expect(updated?.state.lastDelivered).toBeUndefined();
|
||||||
expect(enqueueSystemEvent).toHaveBeenCalled();
|
expect(enqueueSystemEvent).toHaveBeenCalled();
|
||||||
@@ -153,58 +162,23 @@ describe("CronService persists delivered status", () => {
|
|||||||
it("emits delivered in the finished event", async () => {
|
it("emits delivered in the finished event", async () => {
|
||||||
const store = await makeStorePath();
|
const store = await makeStorePath();
|
||||||
let capturedEvent: { jobId: string; delivered?: boolean } | undefined;
|
let capturedEvent: { jobId: string; delivered?: boolean } | undefined;
|
||||||
const finished = {
|
const { cron, finished } = createIsolatedCronWithFinishedBarrier({
|
||||||
resolvers: new Map<string, () => void>(),
|
|
||||||
waitForOk(jobId: string) {
|
|
||||||
return new Promise<void>((resolve) => {
|
|
||||||
this.resolvers.set(jobId, resolve);
|
|
||||||
});
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
const cron = new CronService({
|
|
||||||
storePath: store.storePath,
|
storePath: store.storePath,
|
||||||
cronEnabled: true,
|
delivered: true,
|
||||||
log: noopLogger,
|
onFinished: (evt) => {
|
||||||
enqueueSystemEvent: vi.fn(),
|
capturedEvent = evt;
|
||||||
requestHeartbeatNow: vi.fn(),
|
|
||||||
runIsolatedAgentJob: vi.fn(async () => ({
|
|
||||||
status: "ok" as const,
|
|
||||||
summary: "done",
|
|
||||||
delivered: true,
|
|
||||||
})),
|
|
||||||
onEvent: (evt) => {
|
|
||||||
if (evt.action === "finished") {
|
|
||||||
capturedEvent = { jobId: evt.jobId, delivered: evt.delivered };
|
|
||||||
if (evt.status === "ok") {
|
|
||||||
finished.resolvers.get(evt.jobId)?.();
|
|
||||||
finished.resolvers.delete(evt.jobId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
await cron.start();
|
await cron.start();
|
||||||
const job = await cron.add({
|
await runSingleJobAndReadState({
|
||||||
name: "event-test",
|
cron,
|
||||||
enabled: true,
|
finished,
|
||||||
schedule: { kind: "every", everyMs: 60_000 },
|
job: buildIsolatedAgentTurnJob("event-test"),
|
||||||
sessionTarget: "isolated",
|
|
||||||
wakeMode: "next-heartbeat",
|
|
||||||
payload: { kind: "agentTurn", message: "test" },
|
|
||||||
delivery: { mode: "none" },
|
|
||||||
});
|
});
|
||||||
|
|
||||||
vi.setSystemTime(new Date(job.state.nextRunAtMs! + 5));
|
|
||||||
await vi.runOnlyPendingTimersAsync();
|
|
||||||
await finished.waitForOk(job.id);
|
|
||||||
|
|
||||||
expect(capturedEvent).toBeDefined();
|
expect(capturedEvent).toBeDefined();
|
||||||
expect(capturedEvent?.delivered).toBe(true);
|
expect(capturedEvent?.delivered).toBe(true);
|
||||||
|
|
||||||
// Flush pending store writes before stopping so the temp file is released
|
|
||||||
// (prevents ENOTEMPTY on Windows when afterAll removes the fixture dir).
|
|
||||||
await cron.list({ includeDisabled: true });
|
|
||||||
cron.stop();
|
cron.stop();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user