mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-09 10:17:39 +00:00
perf(cron): make wakeMode now busy-wait configurable
This commit is contained in:
@@ -188,6 +188,9 @@ describe("CronService", () => {
|
|||||||
storePath: store.storePath,
|
storePath: store.storePath,
|
||||||
cronEnabled: true,
|
cronEnabled: true,
|
||||||
log: noopLogger,
|
log: noopLogger,
|
||||||
|
// Perf: avoid advancing fake timers by 2+ minutes for the busy-heartbeat fallback.
|
||||||
|
wakeNowHeartbeatBusyMaxWaitMs: 1,
|
||||||
|
wakeNowHeartbeatBusyRetryDelayMs: 2,
|
||||||
enqueueSystemEvent,
|
enqueueSystemEvent,
|
||||||
requestHeartbeatNow,
|
requestHeartbeatNow,
|
||||||
runHeartbeatOnce,
|
runHeartbeatOnce,
|
||||||
@@ -229,11 +232,20 @@ describe("CronService", () => {
|
|||||||
status: "skipped" as const,
|
status: "skipped" as const,
|
||||||
reason: "requests-in-flight",
|
reason: "requests-in-flight",
|
||||||
}));
|
}));
|
||||||
|
let now = 0;
|
||||||
|
const nowMs = () => {
|
||||||
|
now += 10;
|
||||||
|
return now;
|
||||||
|
};
|
||||||
|
|
||||||
const cron = new CronService({
|
const cron = new CronService({
|
||||||
storePath: store.storePath,
|
storePath: store.storePath,
|
||||||
cronEnabled: true,
|
cronEnabled: true,
|
||||||
log: noopLogger,
|
log: noopLogger,
|
||||||
|
nowMs,
|
||||||
|
// Perf: avoid advancing fake timers by 2+ minutes for the busy-heartbeat fallback.
|
||||||
|
wakeNowHeartbeatBusyMaxWaitMs: 1,
|
||||||
|
wakeNowHeartbeatBusyRetryDelayMs: 2,
|
||||||
enqueueSystemEvent,
|
enqueueSystemEvent,
|
||||||
requestHeartbeatNow,
|
requestHeartbeatNow,
|
||||||
runHeartbeatOnce,
|
runHeartbeatOnce,
|
||||||
@@ -250,9 +262,7 @@ describe("CronService", () => {
|
|||||||
payload: { kind: "systemEvent", text: "hello" },
|
payload: { kind: "systemEvent", text: "hello" },
|
||||||
});
|
});
|
||||||
|
|
||||||
const runPromise = cron.run(job.id, "force");
|
await cron.run(job.id, "force");
|
||||||
await vi.advanceTimersByTimeAsync(125_000);
|
|
||||||
await runPromise;
|
|
||||||
|
|
||||||
expect(runHeartbeatOnce).toHaveBeenCalled();
|
expect(runHeartbeatOnce).toHaveBeenCalled();
|
||||||
expect(requestHeartbeatNow).toHaveBeenCalled();
|
expect(requestHeartbeatNow).toHaveBeenCalled();
|
||||||
|
|||||||
@@ -38,6 +38,14 @@ export type CronServiceDeps = {
|
|||||||
enqueueSystemEvent: (text: string, opts?: { agentId?: string }) => void;
|
enqueueSystemEvent: (text: string, opts?: { agentId?: string }) => void;
|
||||||
requestHeartbeatNow: (opts?: { reason?: string }) => void;
|
requestHeartbeatNow: (opts?: { reason?: string }) => void;
|
||||||
runHeartbeatOnce?: (opts?: { reason?: string; agentId?: string }) => Promise<HeartbeatRunResult>;
|
runHeartbeatOnce?: (opts?: { reason?: string; agentId?: string }) => Promise<HeartbeatRunResult>;
|
||||||
|
/**
|
||||||
|
* WakeMode=now: max time to wait for runHeartbeatOnce to stop returning
|
||||||
|
* { status:"skipped", reason:"requests-in-flight" } before falling back to
|
||||||
|
* requestHeartbeatNow.
|
||||||
|
*/
|
||||||
|
wakeNowHeartbeatBusyMaxWaitMs?: number;
|
||||||
|
/** WakeMode=now: delay between runHeartbeatOnce retries while busy. */
|
||||||
|
wakeNowHeartbeatBusyRetryDelayMs?: number;
|
||||||
runIsolatedAgentJob: (params: { job: CronJob; message: string }) => Promise<{
|
runIsolatedAgentJob: (params: { job: CronJob; message: string }) => Promise<{
|
||||||
status: "ok" | "error" | "skipped";
|
status: "ok" | "error" | "skipped";
|
||||||
summary?: string;
|
summary?: string;
|
||||||
|
|||||||
@@ -442,7 +442,8 @@ async function executeJobCore(
|
|||||||
if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) {
|
if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) {
|
||||||
const reason = `cron:${job.id}`;
|
const reason = `cron:${job.id}`;
|
||||||
const delay = (ms: number) => new Promise<void>((resolve) => setTimeout(resolve, ms));
|
const delay = (ms: number) => new Promise<void>((resolve) => setTimeout(resolve, ms));
|
||||||
const maxWaitMs = 2 * 60_000;
|
const maxWaitMs = state.deps.wakeNowHeartbeatBusyMaxWaitMs ?? 2 * 60_000;
|
||||||
|
const retryDelayMs = state.deps.wakeNowHeartbeatBusyRetryDelayMs ?? 250;
|
||||||
const waitStartedAt = state.deps.nowMs();
|
const waitStartedAt = state.deps.nowMs();
|
||||||
|
|
||||||
let heartbeatResult: HeartbeatRunResult;
|
let heartbeatResult: HeartbeatRunResult;
|
||||||
@@ -458,7 +459,7 @@ async function executeJobCore(
|
|||||||
state.deps.requestHeartbeatNow({ reason });
|
state.deps.requestHeartbeatNow({ reason });
|
||||||
return { status: "ok", summary: text };
|
return { status: "ok", summary: text };
|
||||||
}
|
}
|
||||||
await delay(250);
|
await delay(retryDelayMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (heartbeatResult.status === "ran") {
|
if (heartbeatResult.status === "ran") {
|
||||||
|
|||||||
Reference in New Issue
Block a user