mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-09 08:37:41 +00:00
test: speed up test suite
This commit is contained in:
@@ -1,10 +1,11 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { describe, expect, test } from "vitest";
|
||||
import { describe, expect, test, vi } from "vitest";
|
||||
import {
|
||||
connectOk,
|
||||
installGatewayTestHooks,
|
||||
onceMessage,
|
||||
rpcReq,
|
||||
startServerWithClient,
|
||||
testState,
|
||||
@@ -35,246 +36,35 @@ async function rmTempDir(dir: string) {
|
||||
await fs.rm(dir, { recursive: true, force: true });
|
||||
}
|
||||
|
||||
async function waitForCronFinished(ws: { send: (data: string) => void }, jobId: string) {
|
||||
await onceMessage(
|
||||
ws as never,
|
||||
(o) =>
|
||||
o.type === "event" &&
|
||||
o.event === "cron" &&
|
||||
o.payload?.action === "finished" &&
|
||||
o.payload?.jobId === jobId,
|
||||
10_000,
|
||||
);
|
||||
}
|
||||
|
||||
async function waitForNonEmptyFile(pathname: string, timeoutMs = 2000) {
|
||||
const deadline = Date.now() + timeoutMs;
|
||||
for (;;) {
|
||||
const raw = await fs.readFile(pathname, "utf-8").catch(() => "");
|
||||
if (raw.trim().length > 0) return raw;
|
||||
if (Date.now() >= deadline) {
|
||||
throw new Error(`timeout waiting for file ${pathname}`);
|
||||
}
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
}
|
||||
}
|
||||
|
||||
describe("gateway server cron", () => {
|
||||
test("supports cron.add and cron.list", { timeout: 120_000 }, async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-cron-"));
|
||||
testState.cronStorePath = path.join(dir, "cron", "jobs.json");
|
||||
await fs.mkdir(path.dirname(testState.cronStorePath), { recursive: true });
|
||||
await fs.writeFile(testState.cronStorePath, JSON.stringify({ version: 1, jobs: [] }));
|
||||
|
||||
const { server, ws } = await startServerWithClient();
|
||||
await connectOk(ws);
|
||||
|
||||
const addRes = await rpcReq(ws, "cron.add", {
|
||||
name: "daily",
|
||||
enabled: true,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "hello" },
|
||||
});
|
||||
expect(addRes.ok).toBe(true);
|
||||
expect(typeof (addRes.payload as { id?: unknown } | null)?.id).toBe("string");
|
||||
|
||||
const listRes = await rpcReq(ws, "cron.list", {
|
||||
includeDisabled: true,
|
||||
});
|
||||
expect(listRes.ok).toBe(true);
|
||||
const jobs = (listRes.payload as { jobs?: unknown } | null)?.jobs;
|
||||
expect(Array.isArray(jobs)).toBe(true);
|
||||
expect((jobs as unknown[]).length).toBe(1);
|
||||
expect(((jobs as Array<{ name?: unknown }>)[0]?.name as string) ?? "").toBe("daily");
|
||||
|
||||
ws.close();
|
||||
await server.close();
|
||||
await rmTempDir(dir);
|
||||
testState.cronStorePath = undefined;
|
||||
});
|
||||
|
||||
test("enqueues main cron system events to the resolved main session key", async () => {
|
||||
test("handles cron CRUD, normalization, and patch semantics", { timeout: 120_000 }, async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-cron-"));
|
||||
testState.cronStorePath = path.join(dir, "cron", "jobs.json");
|
||||
testState.sessionConfig = { mainKey: "primary" };
|
||||
await fs.mkdir(path.dirname(testState.cronStorePath), { recursive: true });
|
||||
await fs.writeFile(testState.cronStorePath, JSON.stringify({ version: 1, jobs: [] }));
|
||||
|
||||
const { server, ws } = await startServerWithClient();
|
||||
await connectOk(ws);
|
||||
|
||||
const atMs = Date.now() - 1;
|
||||
const addRes = await rpcReq(ws, "cron.add", {
|
||||
name: "route test",
|
||||
enabled: true,
|
||||
schedule: { kind: "at", atMs },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "cron route check" },
|
||||
});
|
||||
expect(addRes.ok).toBe(true);
|
||||
const jobIdValue = (addRes.payload as { id?: unknown } | null)?.id;
|
||||
const jobId = typeof jobIdValue === "string" ? jobIdValue : "";
|
||||
expect(jobId.length > 0).toBe(true);
|
||||
|
||||
const runRes = await rpcReq(ws, "cron.run", { id: jobId, mode: "force" }, 20_000);
|
||||
expect(runRes.ok).toBe(true);
|
||||
|
||||
const events = await waitForSystemEvent();
|
||||
expect(events.some((event) => event.includes("cron route check"))).toBe(true);
|
||||
|
||||
ws.close();
|
||||
await server.close();
|
||||
await rmTempDir(dir);
|
||||
testState.cronStorePath = undefined;
|
||||
testState.sessionConfig = undefined;
|
||||
});
|
||||
|
||||
test("normalizes wrapped cron.add payloads", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-cron-"));
|
||||
testState.cronStorePath = path.join(dir, "cron", "jobs.json");
|
||||
await fs.mkdir(path.dirname(testState.cronStorePath), { recursive: true });
|
||||
await fs.writeFile(testState.cronStorePath, JSON.stringify({ version: 1, jobs: [] }));
|
||||
|
||||
const { server, ws } = await startServerWithClient();
|
||||
await connectOk(ws);
|
||||
|
||||
const atMs = Date.now() + 1000;
|
||||
const addRes = await rpcReq(ws, "cron.add", {
|
||||
data: {
|
||||
name: "wrapped",
|
||||
schedule: { atMs },
|
||||
payload: { kind: "systemEvent", text: "hello" },
|
||||
},
|
||||
});
|
||||
expect(addRes.ok).toBe(true);
|
||||
const payload = addRes.payload as
|
||||
| { schedule?: unknown; sessionTarget?: unknown; wakeMode?: unknown }
|
||||
| undefined;
|
||||
expect(payload?.sessionTarget).toBe("main");
|
||||
expect(payload?.wakeMode).toBe("next-heartbeat");
|
||||
expect((payload?.schedule as { kind?: unknown } | undefined)?.kind).toBe("at");
|
||||
|
||||
ws.close();
|
||||
await server.close();
|
||||
await rmTempDir(dir);
|
||||
testState.cronStorePath = undefined;
|
||||
});
|
||||
|
||||
test("normalizes cron.update patch payloads", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-cron-"));
|
||||
testState.cronStorePath = path.join(dir, "cron", "jobs.json");
|
||||
await fs.mkdir(path.dirname(testState.cronStorePath), { recursive: true });
|
||||
await fs.writeFile(testState.cronStorePath, JSON.stringify({ version: 1, jobs: [] }));
|
||||
|
||||
const { server, ws } = await startServerWithClient();
|
||||
await connectOk(ws);
|
||||
|
||||
const addRes = await rpcReq(ws, "cron.add", {
|
||||
name: "patch test",
|
||||
enabled: true,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "hello" },
|
||||
});
|
||||
expect(addRes.ok).toBe(true);
|
||||
const jobIdValue = (addRes.payload as { id?: unknown } | null)?.id;
|
||||
const jobId = typeof jobIdValue === "string" ? jobIdValue : "";
|
||||
expect(jobId.length > 0).toBe(true);
|
||||
|
||||
const atMs = Date.now() + 1_000;
|
||||
const updateRes = await rpcReq(ws, "cron.update", {
|
||||
id: jobId,
|
||||
patch: {
|
||||
schedule: { atMs },
|
||||
payload: { kind: "systemEvent", text: "updated" },
|
||||
},
|
||||
});
|
||||
expect(updateRes.ok).toBe(true);
|
||||
const updated = updateRes.payload as
|
||||
| { schedule?: { kind?: unknown }; payload?: { kind?: unknown } }
|
||||
| undefined;
|
||||
expect(updated?.schedule?.kind).toBe("at");
|
||||
expect(updated?.payload?.kind).toBe("systemEvent");
|
||||
|
||||
ws.close();
|
||||
await server.close();
|
||||
await rmTempDir(dir);
|
||||
testState.cronStorePath = undefined;
|
||||
});
|
||||
|
||||
test("merges agentTurn payload patches", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-cron-"));
|
||||
testState.cronStorePath = path.join(dir, "cron", "jobs.json");
|
||||
await fs.mkdir(path.dirname(testState.cronStorePath), { recursive: true });
|
||||
await fs.writeFile(testState.cronStorePath, JSON.stringify({ version: 1, jobs: [] }));
|
||||
|
||||
const { server, ws } = await startServerWithClient();
|
||||
await connectOk(ws);
|
||||
|
||||
const addRes = await rpcReq(ws, "cron.add", {
|
||||
name: "patch merge",
|
||||
enabled: true,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "isolated",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "agentTurn", message: "hello", model: "opus" },
|
||||
});
|
||||
expect(addRes.ok).toBe(true);
|
||||
const jobIdValue = (addRes.payload as { id?: unknown } | null)?.id;
|
||||
const jobId = typeof jobIdValue === "string" ? jobIdValue : "";
|
||||
expect(jobId.length > 0).toBe(true);
|
||||
|
||||
const updateRes = await rpcReq(ws, "cron.update", {
|
||||
id: jobId,
|
||||
patch: {
|
||||
payload: { kind: "agentTurn", deliver: true, channel: "telegram", to: "19098680" },
|
||||
},
|
||||
});
|
||||
expect(updateRes.ok).toBe(true);
|
||||
const updated = updateRes.payload as
|
||||
| {
|
||||
payload?: {
|
||||
kind?: unknown;
|
||||
message?: unknown;
|
||||
model?: unknown;
|
||||
deliver?: unknown;
|
||||
channel?: unknown;
|
||||
to?: unknown;
|
||||
};
|
||||
}
|
||||
| undefined;
|
||||
expect(updated?.payload?.kind).toBe("agentTurn");
|
||||
expect(updated?.payload?.message).toBe("hello");
|
||||
expect(updated?.payload?.model).toBe("opus");
|
||||
expect(updated?.payload?.deliver).toBe(true);
|
||||
expect(updated?.payload?.channel).toBe("telegram");
|
||||
expect(updated?.payload?.to).toBe("19098680");
|
||||
|
||||
ws.close();
|
||||
await server.close();
|
||||
await rmTempDir(dir);
|
||||
testState.cronStorePath = undefined;
|
||||
});
|
||||
|
||||
test("rejects payload kind changes without required fields", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-cron-"));
|
||||
testState.cronStorePath = path.join(dir, "cron", "jobs.json");
|
||||
await fs.mkdir(path.dirname(testState.cronStorePath), { recursive: true });
|
||||
await fs.writeFile(testState.cronStorePath, JSON.stringify({ version: 1, jobs: [] }));
|
||||
|
||||
const { server, ws } = await startServerWithClient();
|
||||
await connectOk(ws);
|
||||
|
||||
const addRes = await rpcReq(ws, "cron.add", {
|
||||
name: "patch reject",
|
||||
enabled: true,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "hello" },
|
||||
});
|
||||
expect(addRes.ok).toBe(true);
|
||||
const jobIdValue = (addRes.payload as { id?: unknown } | null)?.id;
|
||||
const jobId = typeof jobIdValue === "string" ? jobIdValue : "";
|
||||
expect(jobId.length > 0).toBe(true);
|
||||
|
||||
const updateRes = await rpcReq(ws, "cron.update", {
|
||||
id: jobId,
|
||||
patch: {
|
||||
payload: { kind: "agentTurn", deliver: true },
|
||||
},
|
||||
});
|
||||
expect(updateRes.ok).toBe(false);
|
||||
|
||||
ws.close();
|
||||
await server.close();
|
||||
await rmTempDir(dir);
|
||||
testState.cronStorePath = undefined;
|
||||
});
|
||||
|
||||
test("accepts jobId for cron.update", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-cron-"));
|
||||
testState.cronStorePath = path.join(dir, "cron", "jobs.json");
|
||||
testState.cronEnabled = false;
|
||||
await fs.mkdir(path.dirname(testState.cronStorePath), { recursive: true });
|
||||
await fs.writeFile(testState.cronStorePath, JSON.stringify({ version: 1, jobs: [] }));
|
||||
@@ -282,218 +72,256 @@ describe("gateway server cron", () => {
|
||||
const { server, ws } = await startServerWithClient();
|
||||
await connectOk(ws);
|
||||
|
||||
const addRes = await rpcReq(ws, "cron.add", {
|
||||
name: "jobId test",
|
||||
enabled: true,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "hello" },
|
||||
});
|
||||
expect(addRes.ok).toBe(true);
|
||||
const jobIdValue = (addRes.payload as { id?: unknown } | null)?.id;
|
||||
const jobId = typeof jobIdValue === "string" ? jobIdValue : "";
|
||||
expect(jobId.length > 0).toBe(true);
|
||||
try {
|
||||
const addRes = await rpcReq(ws, "cron.add", {
|
||||
name: "daily",
|
||||
enabled: true,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "hello" },
|
||||
});
|
||||
expect(addRes.ok).toBe(true);
|
||||
expect(typeof (addRes.payload as { id?: unknown } | null)?.id).toBe("string");
|
||||
|
||||
const atMs = Date.now() + 2_000;
|
||||
const updateRes = await rpcReq(ws, "cron.update", {
|
||||
jobId,
|
||||
patch: {
|
||||
schedule: { atMs },
|
||||
payload: { kind: "systemEvent", text: "updated" },
|
||||
},
|
||||
});
|
||||
expect(updateRes.ok).toBe(true);
|
||||
const listRes = await rpcReq(ws, "cron.list", {
|
||||
includeDisabled: true,
|
||||
});
|
||||
expect(listRes.ok).toBe(true);
|
||||
const jobs = (listRes.payload as { jobs?: unknown } | null)?.jobs;
|
||||
expect(Array.isArray(jobs)).toBe(true);
|
||||
expect((jobs as unknown[]).length).toBe(1);
|
||||
expect(((jobs as Array<{ name?: unknown }>)[0]?.name as string) ?? "").toBe("daily");
|
||||
|
||||
ws.close();
|
||||
await server.close();
|
||||
await rmTempDir(dir);
|
||||
testState.cronStorePath = undefined;
|
||||
testState.cronEnabled = undefined;
|
||||
const routeAtMs = Date.now() - 1;
|
||||
const routeRes = await rpcReq(ws, "cron.add", {
|
||||
name: "route test",
|
||||
enabled: true,
|
||||
schedule: { kind: "at", atMs: routeAtMs },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "cron route check" },
|
||||
});
|
||||
expect(routeRes.ok).toBe(true);
|
||||
const routeJobIdValue = (routeRes.payload as { id?: unknown } | null)?.id;
|
||||
const routeJobId = typeof routeJobIdValue === "string" ? routeJobIdValue : "";
|
||||
expect(routeJobId.length > 0).toBe(true);
|
||||
|
||||
const runRes = await rpcReq(ws, "cron.run", { id: routeJobId, mode: "force" }, 20_000);
|
||||
expect(runRes.ok).toBe(true);
|
||||
const events = await waitForSystemEvent();
|
||||
expect(events.some((event) => event.includes("cron route check"))).toBe(true);
|
||||
|
||||
const wrappedAtMs = Date.now() + 1000;
|
||||
const wrappedRes = await rpcReq(ws, "cron.add", {
|
||||
data: {
|
||||
name: "wrapped",
|
||||
schedule: { atMs: wrappedAtMs },
|
||||
payload: { kind: "systemEvent", text: "hello" },
|
||||
},
|
||||
});
|
||||
expect(wrappedRes.ok).toBe(true);
|
||||
const wrappedPayload = wrappedRes.payload as
|
||||
| { schedule?: unknown; sessionTarget?: unknown; wakeMode?: unknown }
|
||||
| undefined;
|
||||
expect(wrappedPayload?.sessionTarget).toBe("main");
|
||||
expect(wrappedPayload?.wakeMode).toBe("next-heartbeat");
|
||||
expect((wrappedPayload?.schedule as { kind?: unknown } | undefined)?.kind).toBe("at");
|
||||
|
||||
const patchRes = await rpcReq(ws, "cron.add", {
|
||||
name: "patch test",
|
||||
enabled: true,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "hello" },
|
||||
});
|
||||
expect(patchRes.ok).toBe(true);
|
||||
const patchJobIdValue = (patchRes.payload as { id?: unknown } | null)?.id;
|
||||
const patchJobId = typeof patchJobIdValue === "string" ? patchJobIdValue : "";
|
||||
expect(patchJobId.length > 0).toBe(true);
|
||||
|
||||
const atMs = Date.now() + 1_000;
|
||||
const updateRes = await rpcReq(ws, "cron.update", {
|
||||
id: patchJobId,
|
||||
patch: {
|
||||
schedule: { atMs },
|
||||
payload: { kind: "systemEvent", text: "updated" },
|
||||
},
|
||||
});
|
||||
expect(updateRes.ok).toBe(true);
|
||||
const updated = updateRes.payload as
|
||||
| { schedule?: { kind?: unknown }; payload?: { kind?: unknown } }
|
||||
| undefined;
|
||||
expect(updated?.schedule?.kind).toBe("at");
|
||||
expect(updated?.payload?.kind).toBe("systemEvent");
|
||||
|
||||
const mergeRes = await rpcReq(ws, "cron.add", {
|
||||
name: "patch merge",
|
||||
enabled: true,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "isolated",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "agentTurn", message: "hello", model: "opus" },
|
||||
});
|
||||
expect(mergeRes.ok).toBe(true);
|
||||
const mergeJobIdValue = (mergeRes.payload as { id?: unknown } | null)?.id;
|
||||
const mergeJobId = typeof mergeJobIdValue === "string" ? mergeJobIdValue : "";
|
||||
expect(mergeJobId.length > 0).toBe(true);
|
||||
|
||||
const mergeUpdateRes = await rpcReq(ws, "cron.update", {
|
||||
id: mergeJobId,
|
||||
patch: {
|
||||
payload: { kind: "agentTurn", deliver: true, channel: "telegram", to: "19098680" },
|
||||
},
|
||||
});
|
||||
expect(mergeUpdateRes.ok).toBe(true);
|
||||
const merged = mergeUpdateRes.payload as
|
||||
| {
|
||||
payload?: {
|
||||
kind?: unknown;
|
||||
message?: unknown;
|
||||
model?: unknown;
|
||||
deliver?: unknown;
|
||||
channel?: unknown;
|
||||
to?: unknown;
|
||||
};
|
||||
}
|
||||
| undefined;
|
||||
expect(merged?.payload?.kind).toBe("agentTurn");
|
||||
expect(merged?.payload?.message).toBe("hello");
|
||||
expect(merged?.payload?.model).toBe("opus");
|
||||
expect(merged?.payload?.deliver).toBe(true);
|
||||
expect(merged?.payload?.channel).toBe("telegram");
|
||||
expect(merged?.payload?.to).toBe("19098680");
|
||||
|
||||
const rejectRes = await rpcReq(ws, "cron.add", {
|
||||
name: "patch reject",
|
||||
enabled: true,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "hello" },
|
||||
});
|
||||
expect(rejectRes.ok).toBe(true);
|
||||
const rejectJobIdValue = (rejectRes.payload as { id?: unknown } | null)?.id;
|
||||
const rejectJobId = typeof rejectJobIdValue === "string" ? rejectJobIdValue : "";
|
||||
expect(rejectJobId.length > 0).toBe(true);
|
||||
|
||||
const rejectUpdateRes = await rpcReq(ws, "cron.update", {
|
||||
id: rejectJobId,
|
||||
patch: {
|
||||
payload: { kind: "agentTurn", deliver: true },
|
||||
},
|
||||
});
|
||||
expect(rejectUpdateRes.ok).toBe(false);
|
||||
|
||||
const jobIdRes = await rpcReq(ws, "cron.add", {
|
||||
name: "jobId test",
|
||||
enabled: true,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "hello" },
|
||||
});
|
||||
expect(jobIdRes.ok).toBe(true);
|
||||
const jobIdValue = (jobIdRes.payload as { id?: unknown } | null)?.id;
|
||||
const jobId = typeof jobIdValue === "string" ? jobIdValue : "";
|
||||
expect(jobId.length > 0).toBe(true);
|
||||
|
||||
const jobIdUpdateRes = await rpcReq(ws, "cron.update", {
|
||||
jobId,
|
||||
patch: {
|
||||
schedule: { atMs: Date.now() + 2_000 },
|
||||
payload: { kind: "systemEvent", text: "updated" },
|
||||
},
|
||||
});
|
||||
expect(jobIdUpdateRes.ok).toBe(true);
|
||||
|
||||
const disableRes = await rpcReq(ws, "cron.add", {
|
||||
name: "disable test",
|
||||
enabled: true,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "hello" },
|
||||
});
|
||||
expect(disableRes.ok).toBe(true);
|
||||
const disableJobIdValue = (disableRes.payload as { id?: unknown } | null)?.id;
|
||||
const disableJobId = typeof disableJobIdValue === "string" ? disableJobIdValue : "";
|
||||
expect(disableJobId.length > 0).toBe(true);
|
||||
|
||||
const disableUpdateRes = await rpcReq(ws, "cron.update", {
|
||||
id: disableJobId,
|
||||
patch: { enabled: false },
|
||||
});
|
||||
expect(disableUpdateRes.ok).toBe(true);
|
||||
const disabled = disableUpdateRes.payload as { enabled?: unknown } | undefined;
|
||||
expect(disabled?.enabled).toBe(false);
|
||||
} finally {
|
||||
ws.close();
|
||||
await server.close();
|
||||
await rmTempDir(dir);
|
||||
testState.cronStorePath = undefined;
|
||||
testState.sessionConfig = undefined;
|
||||
testState.cronEnabled = undefined;
|
||||
}
|
||||
});
|
||||
|
||||
test("disables cron jobs via enabled:false patches", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-cron-"));
|
||||
testState.cronStorePath = path.join(dir, "cron", "jobs.json");
|
||||
await fs.mkdir(path.dirname(testState.cronStorePath), { recursive: true });
|
||||
await fs.writeFile(testState.cronStorePath, JSON.stringify({ version: 1, jobs: [] }));
|
||||
|
||||
const { server, ws } = await startServerWithClient();
|
||||
await connectOk(ws);
|
||||
|
||||
const addRes = await rpcReq(ws, "cron.add", {
|
||||
name: "disable test",
|
||||
enabled: true,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "hello" },
|
||||
});
|
||||
expect(addRes.ok).toBe(true);
|
||||
const jobIdValue = (addRes.payload as { id?: unknown } | null)?.id;
|
||||
const jobId = typeof jobIdValue === "string" ? jobIdValue : "";
|
||||
expect(jobId.length > 0).toBe(true);
|
||||
|
||||
const updateRes = await rpcReq(ws, "cron.update", {
|
||||
id: jobId,
|
||||
patch: { enabled: false },
|
||||
});
|
||||
expect(updateRes.ok).toBe(true);
|
||||
const updated = updateRes.payload as { enabled?: unknown } | undefined;
|
||||
expect(updated?.enabled).toBe(false);
|
||||
|
||||
ws.close();
|
||||
await server.close();
|
||||
await fs.rm(dir, { recursive: true, force: true });
|
||||
testState.cronStorePath = undefined;
|
||||
});
|
||||
|
||||
test("writes cron run history to runs/<jobId>.jsonl", async () => {
|
||||
test("writes cron run history and auto-runs due jobs", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-cron-log-"));
|
||||
testState.cronStorePath = path.join(dir, "cron", "jobs.json");
|
||||
testState.cronEnabled = undefined;
|
||||
await fs.mkdir(path.dirname(testState.cronStorePath), { recursive: true });
|
||||
await fs.writeFile(testState.cronStorePath, JSON.stringify({ version: 1, jobs: [] }));
|
||||
|
||||
const { server, ws } = await startServerWithClient();
|
||||
await connectOk(ws);
|
||||
|
||||
const atMs = Date.now() - 1;
|
||||
const addRes = await rpcReq(ws, "cron.add", {
|
||||
name: "log test",
|
||||
enabled: true,
|
||||
schedule: { kind: "at", atMs },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "hello" },
|
||||
});
|
||||
expect(addRes.ok).toBe(true);
|
||||
const jobIdValue = (addRes.payload as { id?: unknown } | null)?.id;
|
||||
const jobId = typeof jobIdValue === "string" ? jobIdValue : "";
|
||||
expect(jobId.length > 0).toBe(true);
|
||||
|
||||
// Full-suite runs can starve the event loop; give cron.run extra time to respond.
|
||||
const runRes = await rpcReq(ws, "cron.run", { id: jobId, mode: "force" }, 20_000);
|
||||
expect(runRes.ok).toBe(true);
|
||||
|
||||
const logPath = path.join(dir, "cron", "runs", `${jobId}.jsonl`);
|
||||
const waitForLog = async () => {
|
||||
for (let i = 0; i < 200; i += 1) {
|
||||
const raw = await fs.readFile(logPath, "utf-8").catch(() => "");
|
||||
if (raw.trim().length > 0) return raw;
|
||||
await yieldToEventLoop();
|
||||
}
|
||||
throw new Error("timeout waiting for cron run log");
|
||||
};
|
||||
|
||||
const raw = await waitForLog();
|
||||
const line = raw
|
||||
.split("\n")
|
||||
.map((l) => l.trim())
|
||||
.filter(Boolean)
|
||||
.at(-1);
|
||||
const last = JSON.parse(line ?? "{}") as {
|
||||
jobId?: unknown;
|
||||
action?: unknown;
|
||||
status?: unknown;
|
||||
summary?: unknown;
|
||||
};
|
||||
expect(last.action).toBe("finished");
|
||||
expect(last.jobId).toBe(jobId);
|
||||
expect(last.status).toBe("ok");
|
||||
expect(last.summary).toBe("hello");
|
||||
|
||||
const runsRes = await rpcReq(ws, "cron.runs", { id: jobId, limit: 50 });
|
||||
expect(runsRes.ok).toBe(true);
|
||||
const entries = (runsRes.payload as { entries?: unknown } | null)?.entries;
|
||||
expect(Array.isArray(entries)).toBe(true);
|
||||
expect((entries as Array<{ jobId?: unknown }>).at(-1)?.jobId).toBe(jobId);
|
||||
expect((entries as Array<{ summary?: unknown }>).at(-1)?.summary).toBe("hello");
|
||||
|
||||
ws.close();
|
||||
await server.close();
|
||||
await fs.rm(dir, { recursive: true, force: true });
|
||||
testState.cronStorePath = undefined;
|
||||
});
|
||||
|
||||
test("writes cron run history to per-job runs/ when store is jobs.json", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-cron-log-jobs-"));
|
||||
const cronDir = path.join(dir, "cron");
|
||||
testState.cronStorePath = path.join(cronDir, "jobs.json");
|
||||
await fs.mkdir(cronDir, { recursive: true });
|
||||
await fs.writeFile(testState.cronStorePath, JSON.stringify({ version: 1, jobs: [] }));
|
||||
|
||||
const { server, ws } = await startServerWithClient();
|
||||
await connectOk(ws);
|
||||
|
||||
const atMs = Date.now() - 1;
|
||||
const addRes = await rpcReq(ws, "cron.add", {
|
||||
name: "log test (jobs.json)",
|
||||
enabled: true,
|
||||
schedule: { kind: "at", atMs },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "hello" },
|
||||
});
|
||||
|
||||
expect(addRes.ok).toBe(true);
|
||||
const jobIdValue = (addRes.payload as { id?: unknown } | null)?.id;
|
||||
const jobId = typeof jobIdValue === "string" ? jobIdValue : "";
|
||||
expect(jobId.length > 0).toBe(true);
|
||||
|
||||
const runRes = await rpcReq(ws, "cron.run", { id: jobId, mode: "force" });
|
||||
expect(runRes.ok).toBe(true);
|
||||
|
||||
const logPath = path.join(cronDir, "runs", `${jobId}.jsonl`);
|
||||
const waitForLog = async () => {
|
||||
for (let i = 0; i < 200; i += 1) {
|
||||
const raw = await fs.readFile(logPath, "utf-8").catch(() => "");
|
||||
if (raw.trim().length > 0) return raw;
|
||||
await yieldToEventLoop();
|
||||
}
|
||||
throw new Error("timeout waiting for per-job cron run log");
|
||||
};
|
||||
|
||||
const raw = await waitForLog();
|
||||
const line = raw
|
||||
.split("\n")
|
||||
.map((l) => l.trim())
|
||||
.filter(Boolean)
|
||||
.at(-1);
|
||||
const last = JSON.parse(line ?? "{}") as {
|
||||
jobId?: unknown;
|
||||
action?: unknown;
|
||||
summary?: unknown;
|
||||
};
|
||||
expect(last.action).toBe("finished");
|
||||
expect(last.jobId).toBe(jobId);
|
||||
expect(last.summary).toBe("hello");
|
||||
|
||||
const runsRes = await rpcReq(ws, "cron.runs", { id: jobId, limit: 20 }, 20_000);
|
||||
expect(runsRes.ok).toBe(true);
|
||||
const entries = (runsRes.payload as { entries?: unknown } | null)?.entries;
|
||||
expect(Array.isArray(entries)).toBe(true);
|
||||
expect((entries as Array<{ jobId?: unknown }>).at(-1)?.jobId).toBe(jobId);
|
||||
expect((entries as Array<{ summary?: unknown }>).at(-1)?.summary).toBe("hello");
|
||||
|
||||
ws.close();
|
||||
await server.close();
|
||||
await fs.rm(dir, { recursive: true, force: true });
|
||||
testState.cronStorePath = undefined;
|
||||
});
|
||||
|
||||
test("enables cron scheduler by default and runs due jobs automatically", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-cron-default-on-"));
|
||||
testState.cronStorePath = path.join(dir, "cron", "jobs.json");
|
||||
testState.cronEnabled = undefined;
|
||||
|
||||
try {
|
||||
await fs.mkdir(path.dirname(testState.cronStorePath), {
|
||||
recursive: true,
|
||||
const atMs = Date.now() - 1;
|
||||
const addRes = await rpcReq(ws, "cron.add", {
|
||||
name: "log test",
|
||||
enabled: true,
|
||||
schedule: { kind: "at", atMs },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "hello" },
|
||||
});
|
||||
await fs.writeFile(testState.cronStorePath, JSON.stringify({ version: 1, jobs: [] }));
|
||||
expect(addRes.ok).toBe(true);
|
||||
const jobIdValue = (addRes.payload as { id?: unknown } | null)?.id;
|
||||
const jobId = typeof jobIdValue === "string" ? jobIdValue : "";
|
||||
expect(jobId.length > 0).toBe(true);
|
||||
|
||||
const { server, ws } = await startServerWithClient();
|
||||
await connectOk(ws);
|
||||
const finishedP = waitForCronFinished(ws, jobId);
|
||||
const runRes = await rpcReq(ws, "cron.run", { id: jobId, mode: "force" }, 20_000);
|
||||
expect(runRes.ok).toBe(true);
|
||||
await finishedP;
|
||||
|
||||
const logPath = path.join(dir, "cron", "runs", `${jobId}.jsonl`);
|
||||
const raw = await waitForNonEmptyFile(logPath);
|
||||
const line = raw
|
||||
.split("\n")
|
||||
.map((l) => l.trim())
|
||||
.filter(Boolean)
|
||||
.at(-1);
|
||||
const last = JSON.parse(line ?? "{}") as {
|
||||
jobId?: unknown;
|
||||
action?: unknown;
|
||||
status?: unknown;
|
||||
summary?: unknown;
|
||||
};
|
||||
expect(last.action).toBe("finished");
|
||||
expect(last.jobId).toBe(jobId);
|
||||
expect(last.status).toBe("ok");
|
||||
expect(last.summary).toBe("hello");
|
||||
|
||||
const runsRes = await rpcReq(ws, "cron.runs", { id: jobId, limit: 50 });
|
||||
expect(runsRes.ok).toBe(true);
|
||||
const entries = (runsRes.payload as { entries?: unknown } | null)?.entries;
|
||||
expect(Array.isArray(entries)).toBe(true);
|
||||
expect((entries as Array<{ jobId?: unknown }>).at(-1)?.jobId).toBe(jobId);
|
||||
expect((entries as Array<{ summary?: unknown }>).at(-1)?.summary).toBe("hello");
|
||||
|
||||
const statusRes = await rpcReq(ws, "cron.status", {});
|
||||
expect(statusRes.ok).toBe(true);
|
||||
@@ -504,45 +332,41 @@ describe("gateway server cron", () => {
|
||||
const storePath = typeof statusPayload?.storePath === "string" ? statusPayload.storePath : "";
|
||||
expect(storePath).toContain("jobs.json");
|
||||
|
||||
// Keep the job due immediately; we poll run logs instead of relying on
|
||||
// the cron finished event to avoid timing races under heavy load.
|
||||
const atMs = Date.now() - 10;
|
||||
const addRes = await rpcReq(ws, "cron.add", {
|
||||
const autoRes = await rpcReq(ws, "cron.add", {
|
||||
name: "auto run test",
|
||||
enabled: true,
|
||||
schedule: { kind: "at", atMs },
|
||||
schedule: { kind: "at", atMs: Date.now() - 10 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "auto" },
|
||||
});
|
||||
expect(addRes.ok).toBe(true);
|
||||
const jobIdValue = (addRes.payload as { id?: unknown } | null)?.id;
|
||||
const jobId = typeof jobIdValue === "string" ? jobIdValue : "";
|
||||
expect(jobId.length > 0).toBe(true);
|
||||
expect(autoRes.ok).toBe(true);
|
||||
const autoJobIdValue = (autoRes.payload as { id?: unknown } | null)?.id;
|
||||
const autoJobId = typeof autoJobIdValue === "string" ? autoJobIdValue : "";
|
||||
expect(autoJobId.length > 0).toBe(true);
|
||||
|
||||
const waitForRuns = async () => {
|
||||
for (let i = 0; i < 500; i += 1) {
|
||||
const runsRes = await rpcReq(ws, "cron.runs", {
|
||||
id: jobId,
|
||||
limit: 10,
|
||||
});
|
||||
expect(runsRes.ok).toBe(true);
|
||||
const entries = (runsRes.payload as { entries?: unknown } | null)?.entries;
|
||||
if (Array.isArray(entries) && entries.length > 0) return entries;
|
||||
await yieldToEventLoop();
|
||||
}
|
||||
throw new Error("timeout waiting for cron.runs entries");
|
||||
};
|
||||
|
||||
const entries = (await waitForRuns()) as Array<{ jobId?: unknown }>;
|
||||
expect(entries.at(-1)?.jobId).toBe(jobId);
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const autoFinishedP = waitForCronFinished(ws, autoJobId);
|
||||
await vi.advanceTimersByTimeAsync(1000);
|
||||
await autoFinishedP;
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
|
||||
await waitForNonEmptyFile(path.join(dir, "cron", "runs", `${autoJobId}.jsonl`));
|
||||
const autoEntries = (await rpcReq(ws, "cron.runs", { id: autoJobId, limit: 10 })).payload as
|
||||
| { entries?: Array<{ jobId?: unknown }> }
|
||||
| undefined;
|
||||
expect(Array.isArray(autoEntries?.entries)).toBe(true);
|
||||
const runs = autoEntries?.entries ?? [];
|
||||
expect(runs.at(-1)?.jobId).toBe(autoJobId);
|
||||
} finally {
|
||||
ws.close();
|
||||
await server.close();
|
||||
} finally {
|
||||
testState.cronEnabled = false;
|
||||
testState.cronStorePath = undefined;
|
||||
await rmTempDir(dir);
|
||||
testState.cronStorePath = undefined;
|
||||
testState.cronEnabled = undefined;
|
||||
}
|
||||
}, 45_000);
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user