mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-19 07:47:28 +00:00
fix: prevent heartbeat scheduler death when runOnce throws (#14901)
Merged via /review-pr -> /prepare-pr -> /merge-pr.
Prepared head SHA: 022efbfef9
Co-authored-by: joeykrug <5925937+joeykrug@users.noreply.github.com>
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Reviewed-by: @gumadeiras
This commit is contained in:
@@ -28,6 +28,7 @@ Docs: https://docs.openclaw.ai
|
|||||||
- Cron: prevent duplicate fires when multiple jobs trigger simultaneously. (#14256) Thanks @xinhuagu.
|
- Cron: prevent duplicate fires when multiple jobs trigger simultaneously. (#14256) Thanks @xinhuagu.
|
||||||
- Cron: isolate scheduler errors so one bad job does not break all jobs. (#14385) Thanks @MarvinDontPanic.
|
- Cron: isolate scheduler errors so one bad job does not break all jobs. (#14385) Thanks @MarvinDontPanic.
|
||||||
- Cron: prevent one-shot `at` jobs from re-firing on restart after skipped/errored runs. (#13878) Thanks @lailoo.
|
- Cron: prevent one-shot `at` jobs from re-firing on restart after skipped/errored runs. (#13878) Thanks @lailoo.
|
||||||
|
- Heartbeat: prevent scheduler stalls on unexpected run errors and avoid immediate rerun loops after `requests-in-flight` skips. (#14901) Thanks @joeykrug.
|
||||||
- WhatsApp: convert Markdown bold/strikethrough to WhatsApp formatting. (#14285) Thanks @Raikan10.
|
- WhatsApp: convert Markdown bold/strikethrough to WhatsApp formatting. (#14285) Thanks @Raikan10.
|
||||||
- WhatsApp: allow media-only sends and normalize leading blank payloads. (#14408) Thanks @karimnaguib.
|
- WhatsApp: allow media-only sends and normalize leading blank payloads. (#14408) Thanks @karimnaguib.
|
||||||
- WhatsApp: default MIME type for voice messages when Baileys omits it. (#14444) Thanks @mcaxtr.
|
- WhatsApp: default MIME type for voice messages when Baileys omits it. (#14444) Thanks @mcaxtr.
|
||||||
|
|||||||
@@ -54,4 +54,67 @@ describe("startHeartbeatRunner", () => {
|
|||||||
|
|
||||||
runner.stop();
|
runner.stop();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("continues scheduling after runOnce throws an unhandled error", async () => {
|
||||||
|
vi.useFakeTimers();
|
||||||
|
vi.setSystemTime(new Date(0));
|
||||||
|
|
||||||
|
let callCount = 0;
|
||||||
|
const runSpy = vi.fn().mockImplementation(async () => {
|
||||||
|
callCount++;
|
||||||
|
if (callCount === 1) {
|
||||||
|
// First call throws (simulates crash during session compaction)
|
||||||
|
throw new Error("session compaction error");
|
||||||
|
}
|
||||||
|
return { status: "ran", durationMs: 1 };
|
||||||
|
});
|
||||||
|
|
||||||
|
const runner = startHeartbeatRunner({
|
||||||
|
cfg: {
|
||||||
|
agents: { defaults: { heartbeat: { every: "30m" } } },
|
||||||
|
} as OpenClawConfig,
|
||||||
|
runOnce: runSpy,
|
||||||
|
});
|
||||||
|
|
||||||
|
// First heartbeat fires and throws
|
||||||
|
await vi.advanceTimersByTimeAsync(30 * 60_000 + 1_000);
|
||||||
|
expect(runSpy).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
// Second heartbeat should still fire (scheduler must not be dead)
|
||||||
|
await vi.advanceTimersByTimeAsync(30 * 60_000 + 1_000);
|
||||||
|
expect(runSpy).toHaveBeenCalledTimes(2);
|
||||||
|
|
||||||
|
runner.stop();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("reschedules timer when runOnce returns requests-in-flight", async () => {
|
||||||
|
vi.useFakeTimers();
|
||||||
|
vi.setSystemTime(new Date(0));
|
||||||
|
|
||||||
|
let callCount = 0;
|
||||||
|
const runSpy = vi.fn().mockImplementation(async () => {
|
||||||
|
callCount++;
|
||||||
|
if (callCount === 1) {
|
||||||
|
return { status: "skipped", reason: "requests-in-flight" };
|
||||||
|
}
|
||||||
|
return { status: "ran", durationMs: 1 };
|
||||||
|
});
|
||||||
|
|
||||||
|
const runner = startHeartbeatRunner({
|
||||||
|
cfg: {
|
||||||
|
agents: { defaults: { heartbeat: { every: "30m" } } },
|
||||||
|
} as OpenClawConfig,
|
||||||
|
runOnce: runSpy,
|
||||||
|
});
|
||||||
|
|
||||||
|
// First heartbeat returns requests-in-flight
|
||||||
|
await vi.advanceTimersByTimeAsync(30 * 60_000 + 1_000);
|
||||||
|
expect(runSpy).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
// Timer should be rescheduled; next heartbeat should still fire
|
||||||
|
await vi.advanceTimersByTimeAsync(30 * 60_000 + 1_000);
|
||||||
|
expect(runSpy).toHaveBeenCalledTimes(2);
|
||||||
|
|
||||||
|
runner.stop();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -797,6 +797,11 @@ export function startHeartbeatRunner(opts: {
|
|||||||
return now + intervalMs;
|
return now + intervalMs;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const advanceAgentSchedule = (agent: HeartbeatAgentState, now: number) => {
|
||||||
|
agent.lastRunMs = now;
|
||||||
|
agent.nextDueMs = now + agent.intervalMs;
|
||||||
|
};
|
||||||
|
|
||||||
const scheduleNext = () => {
|
const scheduleNext = () => {
|
||||||
if (state.stopped) {
|
if (state.stopped) {
|
||||||
return;
|
return;
|
||||||
@@ -897,19 +902,30 @@ export function startHeartbeatRunner(opts: {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
const res = await runOnce({
|
let res: HeartbeatRunResult;
|
||||||
cfg: state.cfg,
|
try {
|
||||||
agentId: agent.agentId,
|
res = await runOnce({
|
||||||
heartbeat: agent.heartbeat,
|
cfg: state.cfg,
|
||||||
reason,
|
agentId: agent.agentId,
|
||||||
deps: { runtime: state.runtime },
|
heartbeat: agent.heartbeat,
|
||||||
});
|
reason,
|
||||||
|
deps: { runtime: state.runtime },
|
||||||
|
});
|
||||||
|
} catch (err) {
|
||||||
|
// If runOnce throws (e.g. during session compaction), we must still
|
||||||
|
// advance the timer and call scheduleNext so heartbeats keep firing.
|
||||||
|
const errMsg = formatErrorMessage(err);
|
||||||
|
log.error(`heartbeat runner: runOnce threw unexpectedly: ${errMsg}`, { error: errMsg });
|
||||||
|
advanceAgentSchedule(agent, now);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (res.status === "skipped" && res.reason === "requests-in-flight") {
|
if (res.status === "skipped" && res.reason === "requests-in-flight") {
|
||||||
|
advanceAgentSchedule(agent, now);
|
||||||
|
scheduleNext();
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
if (res.status !== "skipped" || res.reason !== "disabled") {
|
if (res.status !== "skipped" || res.reason !== "disabled") {
|
||||||
agent.lastRunMs = now;
|
advanceAgentSchedule(agent, now);
|
||||||
agent.nextDueMs = now + agent.intervalMs;
|
|
||||||
}
|
}
|
||||||
if (res.status === "ran") {
|
if (res.status === "ran") {
|
||||||
ran = true;
|
ran = true;
|
||||||
|
|||||||
98
src/infra/heartbeat-wake.test.ts
Normal file
98
src/infra/heartbeat-wake.test.ts
Normal file
@@ -0,0 +1,98 @@
|
|||||||
|
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||||
|
|
||||||
|
async function loadWakeModule() {
|
||||||
|
vi.resetModules();
|
||||||
|
return import("./heartbeat-wake.js");
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("heartbeat-wake", () => {
|
||||||
|
afterEach(() => {
|
||||||
|
vi.useRealTimers();
|
||||||
|
vi.restoreAllMocks();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("coalesces multiple wake requests into one run", async () => {
|
||||||
|
vi.useFakeTimers();
|
||||||
|
const wake = await loadWakeModule();
|
||||||
|
const handler = vi.fn().mockResolvedValue({ status: "skipped", reason: "disabled" });
|
||||||
|
wake.setHeartbeatWakeHandler(handler);
|
||||||
|
|
||||||
|
wake.requestHeartbeatNow({ reason: "interval", coalesceMs: 200 });
|
||||||
|
wake.requestHeartbeatNow({ reason: "exec-event", coalesceMs: 200 });
|
||||||
|
wake.requestHeartbeatNow({ reason: "retry", coalesceMs: 200 });
|
||||||
|
|
||||||
|
expect(wake.hasPendingHeartbeatWake()).toBe(true);
|
||||||
|
|
||||||
|
await vi.advanceTimersByTimeAsync(199);
|
||||||
|
expect(handler).not.toHaveBeenCalled();
|
||||||
|
|
||||||
|
await vi.advanceTimersByTimeAsync(1);
|
||||||
|
expect(handler).toHaveBeenCalledTimes(1);
|
||||||
|
expect(handler).toHaveBeenCalledWith({ reason: "retry" });
|
||||||
|
expect(wake.hasPendingHeartbeatWake()).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("retries requests-in-flight after the default retry delay", async () => {
|
||||||
|
vi.useFakeTimers();
|
||||||
|
const wake = await loadWakeModule();
|
||||||
|
const handler = vi
|
||||||
|
.fn()
|
||||||
|
.mockResolvedValueOnce({ status: "skipped", reason: "requests-in-flight" })
|
||||||
|
.mockResolvedValueOnce({ status: "ran", durationMs: 1 });
|
||||||
|
wake.setHeartbeatWakeHandler(handler);
|
||||||
|
|
||||||
|
wake.requestHeartbeatNow({ reason: "interval", coalesceMs: 0 });
|
||||||
|
|
||||||
|
await vi.advanceTimersByTimeAsync(1);
|
||||||
|
expect(handler).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
await vi.advanceTimersByTimeAsync(500);
|
||||||
|
expect(handler).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
await vi.advanceTimersByTimeAsync(500);
|
||||||
|
expect(handler).toHaveBeenCalledTimes(2);
|
||||||
|
expect(handler.mock.calls[1]?.[0]).toEqual({ reason: "interval" });
|
||||||
|
});
|
||||||
|
|
||||||
|
it("retries thrown handler errors after the default retry delay", async () => {
|
||||||
|
vi.useFakeTimers();
|
||||||
|
const wake = await loadWakeModule();
|
||||||
|
const handler = vi
|
||||||
|
.fn()
|
||||||
|
.mockRejectedValueOnce(new Error("boom"))
|
||||||
|
.mockResolvedValueOnce({ status: "skipped", reason: "disabled" });
|
||||||
|
wake.setHeartbeatWakeHandler(handler);
|
||||||
|
|
||||||
|
wake.requestHeartbeatNow({ reason: "exec-event", coalesceMs: 0 });
|
||||||
|
|
||||||
|
await vi.advanceTimersByTimeAsync(1);
|
||||||
|
expect(handler).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
await vi.advanceTimersByTimeAsync(500);
|
||||||
|
expect(handler).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
await vi.advanceTimersByTimeAsync(500);
|
||||||
|
expect(handler).toHaveBeenCalledTimes(2);
|
||||||
|
expect(handler.mock.calls[1]?.[0]).toEqual({ reason: "exec-event" });
|
||||||
|
});
|
||||||
|
|
||||||
|
it("drains pending wake once a handler is registered", async () => {
|
||||||
|
vi.useFakeTimers();
|
||||||
|
const wake = await loadWakeModule();
|
||||||
|
|
||||||
|
wake.requestHeartbeatNow({ reason: "manual", coalesceMs: 0 });
|
||||||
|
await vi.advanceTimersByTimeAsync(1);
|
||||||
|
expect(wake.hasPendingHeartbeatWake()).toBe(true);
|
||||||
|
|
||||||
|
const handler = vi.fn().mockResolvedValue({ status: "skipped", reason: "disabled" });
|
||||||
|
wake.setHeartbeatWakeHandler(handler);
|
||||||
|
|
||||||
|
await vi.advanceTimersByTimeAsync(249);
|
||||||
|
expect(handler).not.toHaveBeenCalled();
|
||||||
|
|
||||||
|
await vi.advanceTimersByTimeAsync(1);
|
||||||
|
expect(handler).toHaveBeenCalledTimes(1);
|
||||||
|
expect(handler).toHaveBeenCalledWith({ reason: "manual" });
|
||||||
|
expect(wake.hasPendingHeartbeatWake()).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
Reference in New Issue
Block a user