refactor(agent): dedupe harness and command workflows

This commit is contained in:
Peter Steinberger
2026-02-16 14:52:09 +00:00
parent 04892ee230
commit f717a13039
204 changed files with 7366 additions and 11540 deletions

View File

@@ -1,12 +1,8 @@
import "./isolated-agent.mocks.js";
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { CliDeps } from "../cli/deps.js";
import { loadModelCatalog } from "../agents/model-catalog.js";
import { runEmbeddedPiAgent } from "../agents/pi-embedded.js";
import { runSubagentAnnounceFlow } from "../agents/subagent-announce.js";
import { telegramOutbound } from "../channels/plugins/outbound/telegram.js";
import { setActivePluginRegistry } from "../plugins/runtime.js";
import { createOutboundTestPlugin, createTestRegistry } from "../test-utils/channel-plugins.js";
import { runCronIsolatedAgentTurn } from "./isolated-agent.js";
import {
makeCfg,
@@ -14,22 +10,11 @@ import {
withTempCronHome,
writeSessionStore,
} from "./isolated-agent.test-harness.js";
import { setupIsolatedAgentTurnMocks } from "./isolated-agent.test-setup.js";
describe("runCronIsolatedAgentTurn", () => {
beforeEach(() => {
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
vi.mocked(runEmbeddedPiAgent).mockReset();
vi.mocked(loadModelCatalog).mockResolvedValue([]);
vi.mocked(runSubagentAnnounceFlow).mockReset().mockResolvedValue(true);
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "telegram",
plugin: createOutboundTestPlugin({ id: "telegram", outbound: telegramOutbound }),
source: "test",
},
]),
);
setupIsolatedAgentTurnMocks({ fast: true });
});
it("handles media heartbeat delivery and announce cleanup modes", async () => {

View File

@@ -2,12 +2,8 @@ import "./isolated-agent.mocks.js";
import fs from "node:fs/promises";
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { CliDeps } from "../cli/deps.js";
import { loadModelCatalog } from "../agents/model-catalog.js";
import { runEmbeddedPiAgent } from "../agents/pi-embedded.js";
import { runSubagentAnnounceFlow } from "../agents/subagent-announce.js";
import { telegramOutbound } from "../channels/plugins/outbound/telegram.js";
import { setActivePluginRegistry } from "../plugins/runtime.js";
import { createOutboundTestPlugin, createTestRegistry } from "../test-utils/channel-plugins.js";
import { runCronIsolatedAgentTurn } from "./isolated-agent.js";
import {
makeCfg,
@@ -15,44 +11,78 @@ import {
withTempCronHome,
writeSessionStore,
} from "./isolated-agent.test-harness.js";
import { setupIsolatedAgentTurnMocks } from "./isolated-agent.test-setup.js";
function createCliDeps(overrides: Partial<CliDeps> = {}): CliDeps {
return {
sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn(),
sendMessageDiscord: vi.fn(),
sendMessageSignal: vi.fn(),
sendMessageIMessage: vi.fn(),
...overrides,
};
}
function mockAgentPayloads(
payloads: Array<Record<string, unknown>>,
extra: Partial<Awaited<ReturnType<typeof runEmbeddedPiAgent>>> = {},
): void {
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
payloads,
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
...extra,
});
}
async function runTelegramAnnounceTurn(params: {
home: string;
storePath: string;
deps: CliDeps;
delivery: {
mode: "announce";
channel: string;
to?: string;
bestEffort?: boolean;
};
}): Promise<Awaited<ReturnType<typeof runCronIsolatedAgentTurn>>> {
return runCronIsolatedAgentTurn({
cfg: makeCfg(params.home, params.storePath, {
channels: { telegram: { botToken: "t-1" } },
}),
deps: params.deps,
job: {
...makeJob({ kind: "agentTurn", message: "do it" }),
delivery: params.delivery,
},
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
});
}
async function expectBestEffortTelegramNotDelivered(
payload: Record<string, unknown>,
): Promise<void> {
await withTempCronHome(async (home) => {
const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" });
const deps: CliDeps = {
sendMessageWhatsApp: vi.fn(),
const deps = createCliDeps({
sendMessageTelegram: vi.fn().mockRejectedValue(new Error("boom")),
sendMessageDiscord: vi.fn(),
sendMessageSignal: vi.fn(),
sendMessageIMessage: vi.fn(),
};
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
payloads: [payload],
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
});
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(home, storePath, {
channels: { telegram: { botToken: "t-1" } },
}),
mockAgentPayloads([payload]);
const res = await runTelegramAnnounceTurn({
home,
storePath,
deps,
job: {
...makeJob({ kind: "agentTurn", message: "do it" }),
delivery: {
mode: "announce",
channel: "telegram",
to: "123",
bestEffort: true,
},
delivery: {
mode: "announce",
channel: "telegram",
to: "123",
bestEffort: true,
},
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
});
expect(res.status).toBe("ok");
@@ -62,103 +92,47 @@ async function expectBestEffortTelegramNotDelivered(
});
}
async function expectExplicitTelegramTargetDelivery(params: {
payloads: Array<Record<string, unknown>>;
expectedText: string;
}): Promise<void> {
await withTempCronHome(async (home) => {
const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" });
const deps = createCliDeps();
mockAgentPayloads(params.payloads);
const res = await runTelegramAnnounceTurn({
home,
storePath,
deps,
delivery: { mode: "announce", channel: "telegram", to: "123" },
});
expect(res.status).toBe("ok");
expect(res.delivered).toBe(true);
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1);
const [to, text] = vi.mocked(deps.sendMessageTelegram).mock.calls[0] ?? [];
expect(to).toBe("123");
expect(text).toBe(params.expectedText);
});
}
describe("runCronIsolatedAgentTurn", () => {
beforeEach(() => {
vi.mocked(runEmbeddedPiAgent).mockReset();
vi.mocked(loadModelCatalog).mockResolvedValue([]);
vi.mocked(runSubagentAnnounceFlow).mockReset().mockResolvedValue(true);
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "telegram",
plugin: createOutboundTestPlugin({ id: "telegram", outbound: telegramOutbound }),
source: "test",
},
]),
);
setupIsolatedAgentTurnMocks();
});
it("delivers directly when delivery has an explicit target", async () => {
await withTempCronHome(async (home) => {
const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" });
const deps: CliDeps = {
sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn(),
sendMessageDiscord: vi.fn(),
sendMessageSignal: vi.fn(),
sendMessageIMessage: vi.fn(),
};
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
payloads: [{ text: "hello from cron" }],
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
});
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(home, storePath, {
channels: { telegram: { botToken: "t-1" } },
}),
deps,
job: {
...makeJob({ kind: "agentTurn", message: "do it" }),
delivery: { mode: "announce", channel: "telegram", to: "123" },
},
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
});
expect(res.status).toBe("ok");
expect(res.delivered).toBe(true);
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1);
const [to, text] = vi.mocked(deps.sendMessageTelegram).mock.calls[0] ?? [];
expect(to).toBe("123");
expect(text).toBe("hello from cron");
await expectExplicitTelegramTargetDelivery({
payloads: [{ text: "hello from cron" }],
expectedText: "hello from cron",
});
});
it("delivers the final payload text when delivery has an explicit target", async () => {
await withTempCronHome(async (home) => {
const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" });
const deps: CliDeps = {
sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn(),
sendMessageDiscord: vi.fn(),
sendMessageSignal: vi.fn(),
sendMessageIMessage: vi.fn(),
};
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
payloads: [{ text: "Working on it..." }, { text: "Final weather summary" }],
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
});
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(home, storePath, {
channels: { telegram: { botToken: "t-1" } },
}),
deps,
job: {
...makeJob({ kind: "agentTurn", message: "do it" }),
delivery: { mode: "announce", channel: "telegram", to: "123" },
},
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
});
expect(res.status).toBe("ok");
expect(res.delivered).toBe(true);
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1);
const [to, text] = vi.mocked(deps.sendMessageTelegram).mock.calls[0] ?? [];
expect(to).toBe("123");
expect(text).toBe("Final weather summary");
await expectExplicitTelegramTargetDelivery({
payloads: [{ text: "Working on it..." }, { text: "Final weather summary" }],
expectedText: "Final weather summary",
});
});
@@ -182,33 +156,13 @@ describe("runCronIsolatedAgentTurn", () => {
),
"utf-8",
);
const deps: CliDeps = {
sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn(),
sendMessageDiscord: vi.fn(),
sendMessageSignal: vi.fn(),
sendMessageIMessage: vi.fn(),
};
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
payloads: [{ text: "Final weather summary" }],
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
});
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(home, storePath, {
channels: { telegram: { botToken: "t-1" } },
}),
const deps = createCliDeps();
mockAgentPayloads([{ text: "Final weather summary" }]);
const res = await runTelegramAnnounceTurn({
home,
storePath,
deps,
job: {
...makeJob({ kind: "agentTurn", message: "do it" }),
delivery: { mode: "announce", channel: "last" },
},
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
delivery: { mode: "announce", channel: "last" },
});
expect(res.status).toBe("ok");
@@ -225,35 +179,17 @@ describe("runCronIsolatedAgentTurn", () => {
it("skips announce when messaging tool already sent to target", async () => {
await withTempCronHome(async (home) => {
const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" });
const deps: CliDeps = {
sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn(),
sendMessageDiscord: vi.fn(),
sendMessageSignal: vi.fn(),
sendMessageIMessage: vi.fn(),
};
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
payloads: [{ text: "sent" }],
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
const deps = createCliDeps();
mockAgentPayloads([{ text: "sent" }], {
didSendViaMessagingTool: true,
messagingToolSentTargets: [{ tool: "message", provider: "telegram", to: "123" }],
});
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(home, storePath, {
channels: { telegram: { botToken: "t-1" } },
}),
const res = await runTelegramAnnounceTurn({
home,
storePath,
deps,
job: {
...makeJob({ kind: "agentTurn", message: "do it" }),
delivery: { mode: "announce", channel: "telegram", to: "123" },
},
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
delivery: { mode: "announce", channel: "telegram", to: "123" },
});
expect(res.status).toBe("ok");
@@ -273,33 +209,13 @@ describe("runCronIsolatedAgentTurn", () => {
it("skips announce for heartbeat-only output", async () => {
await withTempCronHome(async (home) => {
const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" });
const deps: CliDeps = {
sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn(),
sendMessageDiscord: vi.fn(),
sendMessageSignal: vi.fn(),
sendMessageIMessage: vi.fn(),
};
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
payloads: [{ text: "HEARTBEAT_OK" }],
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
});
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(home, storePath, {
channels: { telegram: { botToken: "t-1" } },
}),
const deps = createCliDeps();
mockAgentPayloads([{ text: "HEARTBEAT_OK" }]);
const res = await runTelegramAnnounceTurn({
home,
storePath,
deps,
job: {
...makeJob({ kind: "agentTurn", message: "do it" }),
delivery: { mode: "announce", channel: "telegram", to: "123" },
},
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
delivery: { mode: "announce", channel: "telegram", to: "123" },
});
expect(res.status).toBe("ok");
@@ -311,32 +227,15 @@ describe("runCronIsolatedAgentTurn", () => {
it("fails when direct delivery fails and best-effort is disabled", async () => {
await withTempCronHome(async (home) => {
const storePath = await writeSessionStore(home, { lastProvider: "webchat", lastTo: "" });
const deps: CliDeps = {
sendMessageWhatsApp: vi.fn(),
const deps = createCliDeps({
sendMessageTelegram: vi.fn().mockRejectedValue(new Error("boom")),
sendMessageDiscord: vi.fn(),
sendMessageSignal: vi.fn(),
sendMessageIMessage: vi.fn(),
};
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
payloads: [{ text: "hello from cron" }],
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
});
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(home, storePath, {
channels: { telegram: { botToken: "t-1" } },
}),
mockAgentPayloads([{ text: "hello from cron" }]);
const res = await runTelegramAnnounceTurn({
home,
storePath,
deps,
job: {
...makeJob({ kind: "agentTurn", message: "do it" }),
delivery: { mode: "announce", channel: "telegram", to: "123" },
},
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
delivery: { mode: "announce", channel: "telegram", to: "123" },
});
expect(res.status).toBe("error");

View File

@@ -0,0 +1,25 @@
import { vi } from "vitest";
import { loadModelCatalog } from "../agents/model-catalog.js";
import { runEmbeddedPiAgent } from "../agents/pi-embedded.js";
import { runSubagentAnnounceFlow } from "../agents/subagent-announce.js";
import { telegramOutbound } from "../channels/plugins/outbound/telegram.js";
import { setActivePluginRegistry } from "../plugins/runtime.js";
import { createOutboundTestPlugin, createTestRegistry } from "../test-utils/channel-plugins.js";
export function setupIsolatedAgentTurnMocks(params?: { fast?: boolean }): void {
if (params?.fast) {
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
}
vi.mocked(runEmbeddedPiAgent).mockReset();
vi.mocked(loadModelCatalog).mockResolvedValue([]);
vi.mocked(runSubagentAnnounceFlow).mockReset().mockResolvedValue(true);
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "telegram",
plugin: createOutboundTestPlugin({ id: "telegram", outbound: telegramOutbound }),
source: "test",
},
]),
);
}

View File

@@ -2,9 +2,8 @@ import fs from "node:fs/promises";
import path from "node:path";
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { CliDeps } from "../cli/deps.js";
import type { OpenClawConfig } from "../config/config.js";
import type { CronJob } from "./types.js";
import { withTempHome as withTempHomeBase } from "../../test/helpers/temp-home.js";
import { makeCfg, makeJob, withTempCronHome } from "./isolated-agent.test-harness.js";
vi.mock("../agents/pi-embedded.js", () => ({
abortEmbeddedPiRun: vi.fn().mockReturnValue(false),
@@ -18,10 +17,7 @@ vi.mock("../agents/model-catalog.js", () => ({
import { loadModelCatalog } from "../agents/model-catalog.js";
import { runEmbeddedPiAgent } from "../agents/pi-embedded.js";
import { runCronIsolatedAgentTurn } from "./isolated-agent.js";
async function withTempHome<T>(fn: (home: string) => Promise<T>): Promise<T> {
return withTempHomeBase(fn, { prefix: "openclaw-cron-" });
}
const withTempHome = withTempCronHome;
function makeDeps(): CliDeps {
return {
@@ -89,36 +85,55 @@ async function readSessionEntry(storePath: string, key: string) {
return store[key];
}
function makeCfg(
const GMAIL_MODEL = "openrouter/meta-llama/llama-3.3-70b:free";
async function runGmailHookTurn(
home: string,
storePath: string,
overrides: Partial<OpenClawConfig> = {},
): OpenClawConfig {
const base: OpenClawConfig = {
agents: {
defaults: {
model: "anthropic/claude-opus-4-5",
workspace: path.join(home, "openclaw"),
storeEntries?: Record<string, Record<string, unknown>>,
) {
const storePath = await writeSessionStore(home, storeEntries);
const deps = makeDeps();
mockEmbeddedOk();
return runCronIsolatedAgentTurn({
cfg: makeCfg(home, storePath, {
hooks: {
gmail: {
model: GMAIL_MODEL,
},
},
},
session: { store: storePath, mainKey: "main" },
} as OpenClawConfig;
return { ...base, ...overrides };
}),
deps,
job: makeJob({ kind: "agentTurn", message: "do it", deliver: false }),
message: "do it",
sessionKey: "hook:gmail:msg-1",
lane: "cron",
});
}
function makeJob(payload: CronJob["payload"]): CronJob {
const now = Date.now();
return {
id: "job-1",
enabled: true,
createdAtMs: now,
updatedAtMs: now,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "isolated",
wakeMode: "now",
payload,
state: {},
};
async function runTurnWithStoredModelOverride(
home: string,
jobPayload: CronJob["payload"],
modelOverride = "gpt-4.1-mini",
) {
const storePath = await writeSessionStore(home, {
"agent:main:cron:job-1": {
sessionId: "existing-cron-session",
updatedAt: Date.now(),
providerOverride: "openai",
modelOverride,
},
});
const deps = makeDeps();
mockEmbeddedOk();
return await runCronIsolatedAgentTurn({
cfg: makeCfg(home, storePath),
deps,
job: makeJob(jobPayload),
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
});
}
describe("runCronIsolatedAgentTurn", () => {
@@ -268,24 +283,10 @@ describe("runCronIsolatedAgentTurn", () => {
it("uses stored session override when no job model override is provided", async () => {
await withTempHome(async (home) => {
const storePath = await writeSessionStore(home, {
"agent:main:cron:job-1": {
sessionId: "existing-cron-session",
updatedAt: Date.now(),
providerOverride: "openai",
modelOverride: "gpt-4.1-mini",
},
});
const deps = makeDeps();
mockEmbeddedOk();
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(home, storePath),
deps,
job: makeJob({ kind: "agentTurn", message: "do it", deliver: false }),
const res = await runTurnWithStoredModelOverride(home, {
kind: "agentTurn",
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
deliver: false,
});
expect(res.status).toBe("ok");
@@ -295,68 +296,32 @@ describe("runCronIsolatedAgentTurn", () => {
it("prefers job model override over stored session override", async () => {
await withTempHome(async (home) => {
const storePath = await writeSessionStore(home, {
"agent:main:cron:job-1": {
sessionId: "existing-cron-session",
updatedAt: Date.now(),
providerOverride: "openai",
modelOverride: "gpt-4.1-mini",
},
});
const deps = makeDeps();
mockEmbeddedOk();
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(home, storePath),
deps,
job: makeJob({
kind: "agentTurn",
message: "do it",
model: "anthropic/claude-opus-4-5",
deliver: false,
}),
const res = await runTurnWithStoredModelOverride(home, {
kind: "agentTurn",
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
model: "anthropic/claude-opus-4-5",
deliver: false,
});
expect(res.status).toBe("ok");
expectEmbeddedProviderModel({ provider: "anthropic", model: "claude-opus-4-5" });
});
});
it("uses hooks.gmail.model for Gmail hook sessions", async () => {
await withTempHome(async (home) => {
const storePath = await writeSessionStore(home);
const deps = makeDeps();
mockEmbeddedOk();
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(home, storePath, {
hooks: {
gmail: {
model: "openrouter/meta-llama/llama-3.3-70b:free",
},
},
}),
deps,
job: makeJob({ kind: "agentTurn", message: "do it", deliver: false }),
message: "do it",
sessionKey: "hook:gmail:msg-1",
lane: "cron",
});
const res = await runGmailHookTurn(home);
expect(res.status).toBe("ok");
expectEmbeddedProviderModel({
provider: "openrouter",
model: "meta-llama/llama-3.3-70b:free",
model: GMAIL_MODEL.replace("openrouter/", ""),
});
});
});
it("keeps hooks.gmail.model precedence over stored session override", async () => {
await withTempHome(async (home) => {
const storePath = await writeSessionStore(home, {
const res = await runGmailHookTurn(home, {
"agent:main:hook:gmail:msg-1": {
sessionId: "existing-gmail-session",
updatedAt: Date.now(),
@@ -364,28 +329,11 @@ describe("runCronIsolatedAgentTurn", () => {
modelOverride: "claude-opus-4-5",
},
});
const deps = makeDeps();
mockEmbeddedOk();
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(home, storePath, {
hooks: {
gmail: {
model: "openrouter/meta-llama/llama-3.3-70b:free",
},
},
}),
deps,
job: makeJob({ kind: "agentTurn", message: "do it", deliver: false }),
message: "do it",
sessionKey: "hook:gmail:msg-1",
lane: "cron",
});
expect(res.status).toBe("ok");
expectEmbeddedProviderModel({
provider: "openrouter",
model: "meta-llama/llama-3.3-70b:free",
model: GMAIL_MODEL.replace("openrouter/", ""),
});
});
});
@@ -443,20 +391,8 @@ describe("runCronIsolatedAgentTurn", () => {
it("ignores hooks.gmail.model when not in the allowlist", async () => {
await withTempHome(async (home) => {
const storePath = await writeSessionStore(home);
const deps: CliDeps = {
sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn(),
sendMessageDiscord: vi.fn(),
sendMessageSignal: vi.fn(),
sendMessageIMessage: vi.fn(),
};
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
payloads: [{ text: "ok" }],
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
});
const deps = makeDeps();
mockEmbeddedOk();
vi.mocked(loadModelCatalog).mockResolvedValueOnce([
{
id: "claude-opus-4-5",
@@ -605,20 +541,8 @@ describe("runCronIsolatedAgentTurn", () => {
it("starts a fresh session id for each cron run", async () => {
await withTempHome(async (home) => {
const storePath = await writeSessionStore(home);
const deps: CliDeps = {
sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn(),
sendMessageDiscord: vi.fn(),
sendMessageSignal: vi.fn(),
sendMessageIMessage: vi.fn(),
};
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
payloads: [{ text: "ok" }],
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
});
const deps = makeDeps();
mockEmbeddedOk();
const cfg = makeCfg(home, storePath);
const job = makeJob({ kind: "agentTurn", message: "ping", deliver: false });

View File

@@ -1,6 +1,24 @@
import { describe, expect, it } from "vitest";
import { normalizeCronJobCreate, normalizeCronJobPatch } from "./normalize.js";
function expectNormalizedAtSchedule(scheduleInput: Record<string, unknown>) {
const normalized = normalizeCronJobCreate({
name: "iso schedule",
enabled: true,
schedule: scheduleInput,
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: {
kind: "systemEvent",
text: "hi",
},
}) as unknown as Record<string, unknown>;
const schedule = normalized.schedule as Record<string, unknown>;
expect(schedule.kind).toBe("at");
expect(schedule.at).toBe(new Date(Date.parse("2026-01-12T18:00:00Z")).toISOString());
}
describe("normalizeCronJobCreate", () => {
it("maps legacy payload.provider to payload.channel and strips provider", () => {
const normalized = normalizeCronJobCreate({
@@ -88,39 +106,11 @@ describe("normalizeCronJobCreate", () => {
});
it("coerces ISO schedule.at to normalized ISO (UTC)", () => {
const normalized = normalizeCronJobCreate({
name: "iso at",
enabled: true,
schedule: { at: "2026-01-12T18:00:00" },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: {
kind: "systemEvent",
text: "hi",
},
}) as unknown as Record<string, unknown>;
const schedule = normalized.schedule as Record<string, unknown>;
expect(schedule.kind).toBe("at");
expect(schedule.at).toBe(new Date(Date.parse("2026-01-12T18:00:00Z")).toISOString());
expectNormalizedAtSchedule({ at: "2026-01-12T18:00:00" });
});
it("coerces schedule.atMs string to schedule.at (UTC)", () => {
const normalized = normalizeCronJobCreate({
name: "iso atMs",
enabled: true,
schedule: { kind: "at", atMs: "2026-01-12T18:00:00" },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: {
kind: "systemEvent",
text: "hi",
},
}) as unknown as Record<string, unknown>;
const schedule = normalized.schedule as Record<string, unknown>;
expect(schedule.kind).toBe("at");
expect(schedule.at).toBe(new Date(Date.parse("2026-01-12T18:00:00Z")).toISOString());
expectNormalizedAtSchedule({ kind: "at", atMs: "2026-01-12T18:00:00" });
});
it("defaults deleteAfterRun for one-shot schedules", () => {

View File

@@ -21,118 +21,131 @@ async function makeStorePath() {
};
}
describe("CronService delivery plan consistency", () => {
it("does not post isolated summary when legacy deliver=false", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const cron = new CronService({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok", summary: "done" })),
});
await cron.start();
const job = await cron.add({
name: "legacy-off",
schedule: { kind: "every", everyMs: 60_000, anchorMs: Date.now() },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: {
kind: "agentTurn",
message: "hello",
deliver: false,
},
});
type DeliveryMode = "none" | "announce";
const result = await cron.run(job.id, "force");
expect(result).toEqual({ ok: true, ran: true });
expect(enqueueSystemEvent).not.toHaveBeenCalled();
type DeliveryOverride = {
mode: DeliveryMode;
channel?: string;
to?: string;
};
async function withCronService(
params: {
runIsolatedAgentJob?: () => Promise<{ status: "ok"; summary: string; delivered?: boolean }>;
},
run: (context: {
cron: CronService;
enqueueSystemEvent: ReturnType<typeof vi.fn>;
requestHeartbeatNow: ReturnType<typeof vi.fn>;
}) => Promise<void>,
) {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const cron = new CronService({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob:
params.runIsolatedAgentJob ??
(vi.fn(async () => ({ status: "ok", summary: "done" })) as never),
});
await cron.start();
try {
await run({ cron, enqueueSystemEvent, requestHeartbeatNow });
} finally {
cron.stop();
await store.cleanup();
}
}
async function addIsolatedAgentTurnJob(
cron: CronService,
params: {
name: string;
wakeMode: "next-heartbeat" | "now";
payload?: { deliver?: boolean };
delivery?: DeliveryOverride;
},
) {
return cron.add({
name: params.name,
schedule: { kind: "every", everyMs: 60_000, anchorMs: Date.now() },
sessionTarget: "isolated",
wakeMode: params.wakeMode,
payload: {
kind: "agentTurn",
message: "hello",
...params.payload,
},
...(params.delivery
? {
delivery: params.delivery as unknown as {
mode: DeliveryMode;
channel?: string;
to?: string;
},
}
: {}),
});
}
describe("CronService delivery plan consistency", () => {
it("does not post isolated summary when legacy deliver=false", async () => {
await withCronService({}, async ({ cron, enqueueSystemEvent }) => {
const job = await addIsolatedAgentTurnJob(cron, {
name: "legacy-off",
wakeMode: "next-heartbeat",
payload: { deliver: false },
});
const result = await cron.run(job.id, "force");
expect(result).toEqual({ ok: true, ran: true });
expect(enqueueSystemEvent).not.toHaveBeenCalled();
});
});
it("treats delivery object without mode as announce", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const cron = new CronService({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok", summary: "done" })),
});
await cron.start();
const job = await cron.add({
name: "partial-delivery",
schedule: { kind: "every", everyMs: 60_000, anchorMs: Date.now() },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: {
kind: "agentTurn",
message: "hello",
},
delivery: { channel: "telegram", to: "123" } as unknown as {
mode: "none" | "announce";
channel?: string;
to?: string;
},
});
await withCronService({}, async ({ cron, enqueueSystemEvent }) => {
const job = await addIsolatedAgentTurnJob(cron, {
name: "partial-delivery",
wakeMode: "next-heartbeat",
delivery: { channel: "telegram", to: "123" } as DeliveryOverride,
});
const result = await cron.run(job.id, "force");
expect(result).toEqual({ ok: true, ran: true });
expect(enqueueSystemEvent).toHaveBeenCalledWith(
"Cron: done",
expect.objectContaining({ agentId: undefined }),
);
cron.stop();
await store.cleanup();
const result = await cron.run(job.id, "force");
expect(result).toEqual({ ok: true, ran: true });
expect(enqueueSystemEvent).toHaveBeenCalledWith(
"Cron: done",
expect.objectContaining({ agentId: undefined }),
);
});
});
it("does not enqueue duplicate relay when isolated run marks delivery handled", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const runIsolatedAgentJob = vi.fn(async () => ({
status: "ok" as const,
summary: "done",
delivered: true,
}));
const cron = new CronService({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob,
});
await cron.start();
const job = await cron.add({
name: "announce-delivered",
schedule: { kind: "every", everyMs: 60_000, anchorMs: Date.now() },
sessionTarget: "isolated",
wakeMode: "now",
payload: {
kind: "agentTurn",
message: "hello",
await withCronService(
{
runIsolatedAgentJob: vi.fn(async () => ({
status: "ok",
summary: "done",
delivered: true,
})),
},
delivery: { channel: "telegram", to: "123" } as unknown as {
mode: "none" | "announce";
channel?: string;
to?: string;
async ({ cron, enqueueSystemEvent, requestHeartbeatNow }) => {
const job = await addIsolatedAgentTurnJob(cron, {
name: "announce-delivered",
wakeMode: "now",
delivery: { channel: "telegram", to: "123" } as DeliveryOverride,
});
const result = await cron.run(job.id, "force");
expect(result).toEqual({ ok: true, ran: true });
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
},
});
const result = await cron.run(job.id, "force");
expect(result).toEqual({ ok: true, ran: true });
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
cron.stop();
await store.cleanup();
);
});
});

View File

@@ -1,9 +1,9 @@
import fs from "node:fs/promises";
import path from "node:path";
import { describe, expect, it, vi } from "vitest";
import type { CronEvent } from "./service.js";
import { CronService } from "./service.js";
import {
createStartedCronServiceWithFinishedBarrier,
createCronStoreHarness,
createNoopLogger,
installCronTestHooks,
@@ -13,42 +13,12 @@ const noopLogger = createNoopLogger();
const { makeStorePath } = createCronStoreHarness();
installCronTestHooks({ logger: noopLogger });
function createFinishedBarrier() {
const resolvers = new Map<string, (evt: CronEvent) => void>();
return {
waitForOk: (jobId: string) =>
new Promise<CronEvent>((resolve) => {
resolvers.set(jobId, resolve);
}),
onEvent: (evt: CronEvent) => {
if (evt.action !== "finished" || evt.status !== "ok") {
return;
}
const resolve = resolvers.get(evt.jobId);
if (!resolve) {
return;
}
resolvers.delete(evt.jobId);
resolve(evt);
},
};
}
describe("CronService interval/cron jobs fire on time", () => {
it("fires an every-type main job when the timer fires a few ms late", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const finished = createFinishedBarrier();
const cron = new CronService({
const { cron, enqueueSystemEvent, finished } = createStartedCronServiceWithFinishedBarrier({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
onEvent: finished.onEvent,
logger: noopLogger,
});
await cron.start();
@@ -86,23 +56,14 @@ describe("CronService interval/cron jobs fire on time", () => {
it("fires a cron-expression job when the timer fires a few ms late", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const finished = createFinishedBarrier();
const { cron, enqueueSystemEvent, finished } = createStartedCronServiceWithFinishedBarrier({
storePath: store.storePath,
logger: noopLogger,
});
// Set time to just before a minute boundary.
vi.setSystemTime(new Date("2025-12-13T00:00:59.000Z"));
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
onEvent: finished.onEvent,
});
await cron.start();
const job = await cron.add({
name: "every minute check",

View File

@@ -2,8 +2,8 @@ import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import type { CronEvent } from "./service.js";
import { CronService } from "./service.js";
import { createStartedCronServiceWithFinishedBarrier } from "./service.test-harness.js";
const noopLogger = {
debug: vi.fn(),
@@ -22,25 +22,20 @@ async function makeStorePath() {
return { storePath };
}
function createFinishedBarrier() {
const resolvers = new Map<string, (evt: CronEvent) => void>();
return {
waitForOk: (jobId: string) =>
new Promise<CronEvent>((resolve) => {
resolvers.set(jobId, resolve);
}),
onEvent: (evt: CronEvent) => {
if (evt.action !== "finished" || evt.status !== "ok") {
return;
}
const resolve = resolvers.get(evt.jobId);
if (!resolve) {
return;
}
resolvers.delete(evt.jobId);
resolve(evt);
},
};
async function writeJobsStore(storePath: string, jobs: unknown[]) {
await fs.mkdir(path.dirname(storePath), { recursive: true });
await fs.writeFile(storePath, JSON.stringify({ version: 1, jobs }, null, 2), "utf-8");
}
function createCronFromStorePath(storePath: string) {
return new CronService({
storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
});
}
describe("#16156: cron.list() must not silently advance past-due recurring jobs", () => {
@@ -69,18 +64,9 @@ describe("#16156: cron.list() must not silently advance past-due recurring jobs"
it("does not skip a cron job when list() is called while the job is past-due", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const finished = createFinishedBarrier();
const cron = new CronService({
const { cron, enqueueSystemEvent, finished } = createStartedCronServiceWithFinishedBarrier({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
onEvent: finished.onEvent,
logger: noopLogger,
});
await cron.start();
@@ -133,18 +119,9 @@ describe("#16156: cron.list() must not silently advance past-due recurring jobs"
it("does not skip a cron job when status() is called while the job is past-due", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const finished = createFinishedBarrier();
const cron = new CronService({
const { cron, enqueueSystemEvent, finished } = createStartedCronServiceWithFinishedBarrier({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
onEvent: finished.onEvent,
logger: noopLogger,
});
await cron.start();
@@ -188,41 +165,22 @@ describe("#16156: cron.list() must not silently advance past-due recurring jobs"
const nowMs = Date.parse("2025-12-13T00:00:00.000Z");
// Write a store file with a cron job that has no nextRunAtMs.
await fs.mkdir(path.dirname(store.storePath), { recursive: true });
await fs.writeFile(
store.storePath,
JSON.stringify(
{
version: 1,
jobs: [
{
id: "missing-next",
name: "missing next",
enabled: true,
createdAtMs: nowMs,
updatedAtMs: nowMs,
schedule: { kind: "cron", expr: "* * * * *", tz: "UTC" },
sessionTarget: "main",
wakeMode: "now",
payload: { kind: "systemEvent", text: "fill-me" },
state: {},
},
],
},
null,
2,
),
"utf-8",
);
await writeJobsStore(store.storePath, [
{
id: "missing-next",
name: "missing next",
enabled: true,
createdAtMs: nowMs,
updatedAtMs: nowMs,
schedule: { kind: "cron", expr: "* * * * *", tz: "UTC" },
sessionTarget: "main",
wakeMode: "now",
payload: { kind: "systemEvent", text: "fill-me" },
state: {},
},
]);
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
});
const cron = createCronFromStorePath(store.storePath);
await cron.start();

View File

@@ -1,40 +1,19 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { describe, expect, it, vi } from "vitest";
import { CronService } from "./service.js";
import {
createCronStoreHarness,
createNoopLogger,
installCronTestHooks,
} from "./service.test-harness.js";
const noopLogger = {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
};
async function makeStorePath() {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cron-"));
return {
storePath: path.join(dir, "cron", "jobs.json"),
cleanup: async () => {
await fs.rm(dir, { recursive: true, force: true });
},
};
}
const noopLogger = createNoopLogger();
const { makeStorePath } = createCronStoreHarness({ prefix: "openclaw-cron-" });
installCronTestHooks({
logger: noopLogger,
baseTimeIso: "2025-12-13T00:00:00.000Z",
});
describe("CronService", () => {
beforeEach(() => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2025-12-13T00:00:00.000Z"));
noopLogger.debug.mockClear();
noopLogger.info.mockClear();
noopLogger.warn.mockClear();
noopLogger.error.mockClear();
});
afterEach(() => {
vi.useRealTimers();
});
it("avoids duplicate runs when two services share a store", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();

View File

@@ -1,27 +1,11 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { CronJob } from "./types.js";
import { createCronStoreHarness, createNoopLogger } from "./service.test-harness.js";
import { createCronServiceState } from "./service/state.js";
import { onTimer } from "./service/timer.js";
const noopLogger = {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
};
async function makeStorePath() {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cron-"));
return {
storePath: path.join(dir, "cron", "jobs.json"),
cleanup: async () => {
await fs.rm(dir, { recursive: true, force: true });
},
};
}
const noopLogger = createNoopLogger();
const { makeStorePath } = createCronStoreHarness();
function createDueRecurringJob(params: {
id: string;

View File

@@ -1,39 +1,40 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { describe, expect, it, vi } from "vitest";
import { CronService } from "./service.js";
import {
createCronStoreHarness,
createNoopLogger,
installCronTestHooks,
} from "./service.test-harness.js";
const noopLogger = {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
};
async function makeStorePath() {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cron-"));
return {
storePath: path.join(dir, "cron", "jobs.json"),
cleanup: async () => {
await fs.rm(dir, { recursive: true, force: true });
},
};
}
const noopLogger = createNoopLogger();
const { makeStorePath } = createCronStoreHarness({ prefix: "openclaw-cron-" });
installCronTestHooks({
logger: noopLogger,
baseTimeIso: "2025-12-13T17:00:00.000Z",
});
describe("CronService restart catch-up", () => {
beforeEach(() => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2025-12-13T17:00:00.000Z"));
noopLogger.debug.mockClear();
noopLogger.info.mockClear();
noopLogger.warn.mockClear();
noopLogger.error.mockClear();
});
async function writeStoreJobs(storePath: string, jobs: unknown[]) {
await fs.mkdir(path.dirname(storePath), { recursive: true });
await fs.writeFile(storePath, JSON.stringify({ version: 1, jobs }, null, 2), "utf-8");
}
afterEach(() => {
vi.useRealTimers();
});
function createRestartCronService(params: {
storePath: string;
enqueueSystemEvent: ReturnType<typeof vi.fn>;
requestHeartbeatNow: ReturnType<typeof vi.fn>;
}) {
return new CronService({
storePath: params.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent: params.enqueueSystemEvent,
requestHeartbeatNow: params.requestHeartbeatNow,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
});
}
it("executes an overdue recurring job immediately on start", async () => {
const store = await makeStorePath();
@@ -43,44 +44,29 @@ describe("CronService restart catch-up", () => {
const dueAt = Date.parse("2025-12-13T15:00:00.000Z");
const lastRunAt = Date.parse("2025-12-12T15:00:00.000Z");
await fs.mkdir(path.dirname(store.storePath), { recursive: true });
await fs.writeFile(
store.storePath,
JSON.stringify(
{
version: 1,
jobs: [
{
id: "restart-overdue-job",
name: "daily digest",
enabled: true,
createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"),
updatedAtMs: Date.parse("2025-12-12T15:00:00.000Z"),
schedule: { kind: "cron", expr: "0 15 * * *", tz: "UTC" },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "digest now" },
state: {
nextRunAtMs: dueAt,
lastRunAtMs: lastRunAt,
lastStatus: "ok",
},
},
],
await writeStoreJobs(store.storePath, [
{
id: "restart-overdue-job",
name: "daily digest",
enabled: true,
createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"),
updatedAtMs: Date.parse("2025-12-12T15:00:00.000Z"),
schedule: { kind: "cron", expr: "0 15 * * *", tz: "UTC" },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "digest now" },
state: {
nextRunAtMs: dueAt,
lastRunAtMs: lastRunAt,
lastStatus: "ok",
},
null,
2,
),
"utf-8",
);
},
]);
const cron = new CronService({
const cron = createRestartCronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
});
await cron.start();
@@ -109,43 +95,28 @@ describe("CronService restart catch-up", () => {
const dueAt = Date.parse("2025-12-13T16:00:00.000Z");
const staleRunningAt = Date.parse("2025-12-13T16:30:00.000Z");
await fs.mkdir(path.dirname(store.storePath), { recursive: true });
await fs.writeFile(
store.storePath,
JSON.stringify(
{
version: 1,
jobs: [
{
id: "restart-stale-running",
name: "daily stale marker",
enabled: true,
createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"),
updatedAtMs: Date.parse("2025-12-13T16:30:00.000Z"),
schedule: { kind: "cron", expr: "0 16 * * *", tz: "UTC" },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "resume stale marker" },
state: {
nextRunAtMs: dueAt,
runningAtMs: staleRunningAt,
},
},
],
await writeStoreJobs(store.storePath, [
{
id: "restart-stale-running",
name: "daily stale marker",
enabled: true,
createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"),
updatedAtMs: Date.parse("2025-12-13T16:30:00.000Z"),
schedule: { kind: "cron", expr: "0 16 * * *", tz: "UTC" },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "resume stale marker" },
state: {
nextRunAtMs: dueAt,
runningAtMs: staleRunningAt,
},
null,
2,
),
"utf-8",
);
},
]);
const cron = new CronService({
const cron = createRestartCronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
});
await cron.start();

View File

@@ -238,25 +238,84 @@ function createCronEventHarness() {
return { onEvent, waitFor, events };
}
async function createMainOneShotHarness() {
ensureDir(fixturesRoot);
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const events = createCronEventHarness();
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
onEvent: events.onEvent,
});
await cron.start();
return { store, cron, enqueueSystemEvent, requestHeartbeatNow, events };
}
async function createIsolatedAnnounceHarness(runIsolatedAgentJob: ReturnType<typeof vi.fn>) {
ensureDir(fixturesRoot);
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const events = createCronEventHarness();
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob,
onEvent: events.onEvent,
});
await cron.start();
return { store, cron, enqueueSystemEvent, requestHeartbeatNow, events };
}
async function addDefaultIsolatedAnnounceJob(cron: CronService, name: string) {
const runAt = new Date("2025-12-13T00:00:01.000Z");
const job = await cron.add({
enabled: true,
name,
schedule: { kind: "at", at: runAt.toISOString() },
sessionTarget: "isolated",
wakeMode: "now",
payload: { kind: "agentTurn", message: "do it" },
delivery: { mode: "announce" },
});
return { job, runAt };
}
async function loadLegacyDeliveryMigration(rawJob: Record<string, unknown>) {
ensureDir(fixturesRoot);
const store = await makeStorePath();
writeStoreFile(store.storePath, { version: 1, jobs: [rawJob] });
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
});
await cron.start();
const jobs = await cron.list({ includeDisabled: true });
const job = jobs.find((j) => j.id === rawJob.id);
return { store, cron, job };
}
describe("CronService", () => {
it("runs a one-shot main job and disables it after success when requested", async () => {
ensureDir(fixturesRoot);
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const events = createCronEventHarness();
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
onEvent: events.onEvent,
});
await cron.start();
const { store, cron, enqueueSystemEvent, requestHeartbeatNow, events } =
await createMainOneShotHarness();
const atMs = Date.parse("2025-12-13T00:00:02.000Z");
const job = await cron.add({
name: "one-shot hello",
@@ -289,23 +348,8 @@ describe("CronService", () => {
});
it("runs a one-shot job and deletes it after success by default", async () => {
ensureDir(fixturesRoot);
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const events = createCronEventHarness();
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
onEvent: events.onEvent,
});
await cron.start();
const { store, cron, enqueueSystemEvent, requestHeartbeatNow, events } =
await createMainOneShotHarness();
const atMs = Date.parse("2025-12-13T00:00:02.000Z");
const job = await cron.add({
name: "one-shot delete",
@@ -502,39 +546,12 @@ describe("CronService", () => {
});
it("runs an isolated job and posts summary to main", async () => {
ensureDir(fixturesRoot);
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const runIsolatedAgentJob = vi.fn(async () => ({
status: "ok" as const,
summary: "done",
}));
const events = createCronEventHarness();
const runIsolatedAgentJob = vi.fn(async () => ({ status: "ok" as const, summary: "done" }));
const { store, cron, enqueueSystemEvent, requestHeartbeatNow, events } =
await createIsolatedAnnounceHarness(runIsolatedAgentJob);
const { job, runAt } = await addDefaultIsolatedAnnounceJob(cron, "weekly");
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob,
onEvent: events.onEvent,
});
await cron.start();
const atMs = Date.parse("2025-12-13T00:00:01.000Z");
const job = await cron.add({
enabled: true,
name: "weekly",
schedule: { kind: "at", at: new Date(atMs).toISOString() },
sessionTarget: "isolated",
wakeMode: "now",
payload: { kind: "agentTurn", message: "do it" },
delivery: { mode: "announce" },
});
vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z"));
vi.setSystemTime(runAt);
await vi.runOnlyPendingTimersAsync();
await events.waitFor(
@@ -551,40 +568,16 @@ describe("CronService", () => {
});
it("does not post isolated summary to main when run already delivered output", async () => {
ensureDir(fixturesRoot);
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const runIsolatedAgentJob = vi.fn(async () => ({
status: "ok" as const,
summary: "done",
delivered: true,
}));
const events = createCronEventHarness();
const { store, cron, enqueueSystemEvent, requestHeartbeatNow, events } =
await createIsolatedAnnounceHarness(runIsolatedAgentJob);
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob,
onEvent: events.onEvent,
});
await cron.start();
const atMs = Date.parse("2025-12-13T00:00:01.000Z");
const job = await cron.add({
enabled: true,
name: "weekly delivered",
schedule: { kind: "at", at: new Date(atMs).toISOString() },
sessionTarget: "isolated",
wakeMode: "now",
payload: { kind: "agentTurn", message: "do it" },
delivery: { mode: "announce" },
});
vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z"));
const { job, runAt } = await addDefaultIsolatedAnnounceJob(cron, "weekly delivered");
vi.setSystemTime(runAt);
await vi.runOnlyPendingTimersAsync();
await events.waitFor(
@@ -598,11 +591,6 @@ describe("CronService", () => {
});
it("migrates legacy payload.provider to payload.channel on load", async () => {
ensureDir(fixturesRoot);
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const rawJob = {
id: "legacy-1",
name: "legacy",
@@ -621,21 +609,7 @@ describe("CronService", () => {
},
state: {},
};
writeStoreFile(store.storePath, { version: 1, jobs: [rawJob] });
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
});
await cron.start();
const jobs = await cron.list({ includeDisabled: true });
const job = jobs.find((j) => j.id === rawJob.id);
const { store, cron, job } = await loadLegacyDeliveryMigration(rawJob);
// Legacy delivery fields are migrated to the top-level delivery object
const delivery = job?.delivery as unknown as Record<string, unknown>;
expect(delivery?.channel).toBe("telegram");
@@ -648,11 +622,6 @@ describe("CronService", () => {
});
it("canonicalizes payload.channel casing on load", async () => {
ensureDir(fixturesRoot);
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const rawJob = {
id: "legacy-2",
name: "legacy",
@@ -671,21 +640,7 @@ describe("CronService", () => {
},
state: {},
};
writeStoreFile(store.storePath, { version: 1, jobs: [rawJob] });
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
});
await cron.start();
const jobs = await cron.list({ includeDisabled: true });
const job = jobs.find((j) => j.id === rawJob.id);
const { store, cron, job } = await loadLegacyDeliveryMigration(rawJob);
// Legacy delivery fields are migrated to the top-level delivery object
const delivery = job?.delivery as unknown as Record<string, unknown>;
expect(delivery?.channel).toBe("telegram");
@@ -695,40 +650,16 @@ describe("CronService", () => {
});
it("posts last output to main even when isolated job errors", async () => {
ensureDir(fixturesRoot);
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const runIsolatedAgentJob = vi.fn(async () => ({
status: "error" as const,
summary: "last output",
error: "boom",
}));
const events = createCronEventHarness();
const { store, cron, enqueueSystemEvent, requestHeartbeatNow, events } =
await createIsolatedAnnounceHarness(runIsolatedAgentJob);
const { job, runAt } = await addDefaultIsolatedAnnounceJob(cron, "isolated error test");
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob,
onEvent: events.onEvent,
});
await cron.start();
const atMs = Date.parse("2025-12-13T00:00:01.000Z");
const job = await cron.add({
name: "isolated error test",
enabled: true,
schedule: { kind: "at", at: new Date(atMs).toISOString() },
sessionTarget: "isolated",
wakeMode: "now",
payload: { kind: "agentTurn", message: "do it" },
delivery: { mode: "announce" },
});
vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z"));
vi.setSystemTime(runAt);
await vi.runOnlyPendingTimersAsync();
await events.waitFor(
(evt) => evt.jobId === job.id && evt.action === "finished" && evt.status === "error",

View File

@@ -1,26 +1,10 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { CronJob } from "./types.js";
import { CronService } from "./service.js";
import { createCronStoreHarness, createNoopLogger } from "./service.test-harness.js";
const noopLogger = {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
};
async function makeStorePath() {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cron-"));
return {
storePath: path.join(dir, "cron", "jobs.json"),
cleanup: async () => {
await fs.rm(dir, { recursive: true, force: true });
},
};
}
const noopLogger = createNoopLogger();
const { makeStorePath } = createCronStoreHarness();
async function waitForFirstJob(
cron: CronService,
@@ -38,6 +22,35 @@ async function waitForFirstJob(
return latest;
}
async function withCronService(
cronEnabled: boolean,
run: (params: {
cron: CronService;
enqueueSystemEvent: ReturnType<typeof vi.fn>;
requestHeartbeatNow: ReturnType<typeof vi.fn>;
}) => Promise<void>,
) {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const cron = new CronService({
storePath: store.storePath,
cronEnabled,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
});
await cron.start();
try {
await run({ cron, enqueueSystemEvent, requestHeartbeatNow });
} finally {
cron.stop();
await store.cleanup();
}
}
describe("CronService", () => {
beforeEach(() => {
vi.useFakeTimers();
@@ -53,115 +66,70 @@ describe("CronService", () => {
});
it("skips main jobs with empty systemEvent text", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
await withCronService(true, async ({ cron, enqueueSystemEvent, requestHeartbeatNow }) => {
const atMs = Date.parse("2025-12-13T00:00:01.000Z");
await cron.add({
name: "empty systemEvent test",
enabled: true,
schedule: { kind: "at", at: new Date(atMs).toISOString() },
sessionTarget: "main",
wakeMode: "now",
payload: { kind: "systemEvent", text: " " },
});
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z"));
await vi.runOnlyPendingTimersAsync();
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
const job = await waitForFirstJob(cron, (current) => current?.state.lastStatus === "skipped");
expect(job?.state.lastStatus).toBe("skipped");
expect(job?.state.lastError).toMatch(/non-empty/i);
});
await cron.start();
const atMs = Date.parse("2025-12-13T00:00:01.000Z");
await cron.add({
name: "empty systemEvent test",
enabled: true,
schedule: { kind: "at", at: new Date(atMs).toISOString() },
sessionTarget: "main",
wakeMode: "now",
payload: { kind: "systemEvent", text: " " },
});
vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z"));
await vi.runOnlyPendingTimersAsync();
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
const job = await waitForFirstJob(cron, (current) => current?.state.lastStatus === "skipped");
expect(job?.state.lastStatus).toBe("skipped");
expect(job?.state.lastError).toMatch(/non-empty/i);
cron.stop();
await store.cleanup();
});
it("does not schedule timers when cron is disabled", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
await withCronService(false, async ({ cron, enqueueSystemEvent, requestHeartbeatNow }) => {
const atMs = Date.parse("2025-12-13T00:00:01.000Z");
await cron.add({
name: "disabled cron job",
enabled: true,
schedule: { kind: "at", at: new Date(atMs).toISOString() },
sessionTarget: "main",
wakeMode: "now",
payload: { kind: "systemEvent", text: "hello" },
});
const cron = new CronService({
storePath: store.storePath,
cronEnabled: false,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
const status = await cron.status();
expect(status.enabled).toBe(false);
expect(status.nextWakeAtMs).toBeNull();
vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z"));
await vi.runOnlyPendingTimersAsync();
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
expect(noopLogger.warn).toHaveBeenCalled();
});
await cron.start();
const atMs = Date.parse("2025-12-13T00:00:01.000Z");
await cron.add({
name: "disabled cron job",
enabled: true,
schedule: { kind: "at", at: new Date(atMs).toISOString() },
sessionTarget: "main",
wakeMode: "now",
payload: { kind: "systemEvent", text: "hello" },
});
const status = await cron.status();
expect(status.enabled).toBe(false);
expect(status.nextWakeAtMs).toBeNull();
vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z"));
await vi.runOnlyPendingTimersAsync();
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestHeartbeatNow).not.toHaveBeenCalled();
expect(noopLogger.warn).toHaveBeenCalled();
cron.stop();
await store.cleanup();
});
it("status reports next wake when enabled", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
await withCronService(true, async ({ cron }) => {
const atMs = Date.parse("2025-12-13T00:00:05.000Z");
await cron.add({
name: "status next wake",
enabled: true,
schedule: { kind: "at", at: new Date(atMs).toISOString() },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "hello" },
});
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
const status = await cron.status();
expect(status.enabled).toBe(true);
expect(status.jobs).toBe(1);
expect(status.nextWakeAtMs).toBe(atMs);
});
await cron.start();
const atMs = Date.parse("2025-12-13T00:00:05.000Z");
await cron.add({
name: "status next wake",
enabled: true,
schedule: { kind: "at", at: new Date(atMs).toISOString() },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "hello" },
});
const status = await cron.status();
expect(status.enabled).toBe(true);
expect(status.jobs).toBe(1);
expect(status.nextWakeAtMs).toBe(atMs);
cron.stop();
await store.cleanup();
});
});

View File

@@ -1,40 +1,21 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { describe, expect, it, vi } from "vitest";
import { CronService } from "./service.js";
import {
createCronStoreHarness,
createNoopLogger,
installCronTestHooks,
} from "./service.test-harness.js";
const noopLogger = {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
};
async function makeStorePath() {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cron-"));
return {
storePath: path.join(dir, "cron", "jobs.json"),
cleanup: async () => {
await fs.rm(dir, { recursive: true, force: true });
},
};
}
const noopLogger = createNoopLogger();
const { makeStorePath } = createCronStoreHarness({ prefix: "openclaw-cron-" });
installCronTestHooks({
logger: noopLogger,
baseTimeIso: "2026-02-06T17:00:00.000Z",
});
describe("CronService store migrations", () => {
beforeEach(() => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-02-06T17:00:00.000Z"));
noopLogger.debug.mockClear();
noopLogger.info.mockClear();
noopLogger.warn.mockClear();
noopLogger.error.mockClear();
});
afterEach(() => {
vi.useRealTimers();
});
it("migrates legacy top-level agentTurn fields and initializes missing state", async () => {
const store = await makeStorePath();
await fs.mkdir(path.dirname(store.storePath), { recursive: true });

View File

@@ -23,6 +23,23 @@ async function makeStorePath() {
};
}
async function migrateAndLoadFirstJob(storePath: string): Promise<Record<string, unknown>> {
const cron = new CronService({
storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
});
await cron.start();
cron.stop();
const loaded = await loadCronStore(storePath);
return loaded.jobs[0] as Record<string, unknown>;
}
describe("cron store migration", () => {
beforeEach(() => {
noopLogger.debug.mockClear();
@@ -64,20 +81,7 @@ describe("cron store migration", () => {
await fs.mkdir(path.dirname(store.storePath), { recursive: true });
await fs.writeFile(store.storePath, JSON.stringify({ version: 1, jobs: [legacyJob] }, null, 2));
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
});
await cron.start();
cron.stop();
const loaded = await loadCronStore(store.storePath);
const migrated = loaded.jobs[0] as Record<string, unknown>;
const migrated = await migrateAndLoadFirstJob(store.storePath);
expect(migrated.delivery).toEqual({
mode: "announce",
channel: "telegram",
@@ -123,20 +127,7 @@ describe("cron store migration", () => {
await fs.mkdir(path.dirname(store.storePath), { recursive: true });
await fs.writeFile(store.storePath, JSON.stringify({ version: 1, jobs: [legacyJob] }, null, 2));
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
});
await cron.start();
cron.stop();
const loaded = await loadCronStore(store.storePath);
const migrated = loaded.jobs[0] as Record<string, unknown>;
const migrated = await migrateAndLoadFirstJob(store.storePath);
const schedule = migrated.schedule as Record<string, unknown>;
expect(schedule.kind).toBe("every");
expect(schedule.anchorMs).toBe(createdAtMs);

View File

@@ -3,6 +3,8 @@ import os from "node:os";
import path from "node:path";
import { afterAll, afterEach, beforeAll, beforeEach, vi } from "vitest";
import type { MockFn } from "../test-utils/vitest-mock-fn.js";
import type { CronEvent } from "./service.js";
import { CronService } from "./service.js";
export type NoopLogger = {
debug: MockFn;
@@ -64,3 +66,43 @@ export function installCronTestHooks(options: {
vi.useRealTimers();
});
}
export function createFinishedBarrier() {
const resolvers = new Map<string, (evt: CronEvent) => void>();
return {
waitForOk: (jobId: string) =>
new Promise<CronEvent>((resolve) => {
resolvers.set(jobId, resolve);
}),
onEvent: (evt: CronEvent) => {
if (evt.action !== "finished" || evt.status !== "ok") {
return;
}
const resolve = resolvers.get(evt.jobId);
if (!resolve) {
return;
}
resolvers.delete(evt.jobId);
resolve(evt);
},
};
}
export function createStartedCronServiceWithFinishedBarrier(params: {
storePath: string;
logger: ReturnType<typeof createNoopLogger>;
}) {
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const finished = createFinishedBarrier();
const cron = new CronService({
storePath: params.storePath,
cronEnabled: true,
log: params.logger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
onEvent: finished.onEvent,
});
return { cron, enqueueSystemEvent, requestHeartbeatNow, finished };
}