mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-10 23:54:42 +00:00
fix: prevent heartbeat scheduler silent death from wake handler race (#15108)
Merged via /review-pr -> /prepare-pr -> /merge-pr.
Prepared head SHA: fd7165b935
Co-authored-by: joeykrug <5925937+joeykrug@users.noreply.github.com>
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Reviewed-by: @gumadeiras
This commit is contained in:
@@ -5,21 +5,102 @@ export type HeartbeatRunResult =
|
||||
|
||||
export type HeartbeatWakeHandler = (opts: { reason?: string }) => Promise<HeartbeatRunResult>;
|
||||
|
||||
type WakeTimerKind = "normal" | "retry";
|
||||
type PendingWakeReason = {
|
||||
reason: string;
|
||||
priority: number;
|
||||
requestedAt: number;
|
||||
};
|
||||
|
||||
let handler: HeartbeatWakeHandler | null = null;
|
||||
let pendingReason: string | null = null;
|
||||
let handlerGeneration = 0;
|
||||
let pendingWake: PendingWakeReason | null = null;
|
||||
let scheduled = false;
|
||||
let running = false;
|
||||
let timer: NodeJS.Timeout | null = null;
|
||||
let timerDueAt: number | null = null;
|
||||
let timerKind: WakeTimerKind | null = null;
|
||||
|
||||
const DEFAULT_COALESCE_MS = 250;
|
||||
const DEFAULT_RETRY_MS = 1_000;
|
||||
const HOOK_REASON_PREFIX = "hook:";
|
||||
const REASON_PRIORITY = {
|
||||
RETRY: 0,
|
||||
INTERVAL: 1,
|
||||
DEFAULT: 2,
|
||||
ACTION: 3,
|
||||
} as const;
|
||||
|
||||
function schedule(coalesceMs: number) {
|
||||
if (timer) {
|
||||
function isActionWakeReason(reason: string): boolean {
|
||||
return reason === "manual" || reason === "exec-event" || reason.startsWith(HOOK_REASON_PREFIX);
|
||||
}
|
||||
|
||||
function resolveReasonPriority(reason: string): number {
|
||||
if (reason === "retry") {
|
||||
return REASON_PRIORITY.RETRY;
|
||||
}
|
||||
if (reason === "interval") {
|
||||
return REASON_PRIORITY.INTERVAL;
|
||||
}
|
||||
if (isActionWakeReason(reason)) {
|
||||
return REASON_PRIORITY.ACTION;
|
||||
}
|
||||
return REASON_PRIORITY.DEFAULT;
|
||||
}
|
||||
|
||||
function normalizeWakeReason(reason?: string): string {
|
||||
if (typeof reason !== "string") {
|
||||
return "requested";
|
||||
}
|
||||
const trimmed = reason.trim();
|
||||
return trimmed.length > 0 ? trimmed : "requested";
|
||||
}
|
||||
|
||||
function queuePendingWakeReason(reason?: string, requestedAt = Date.now()) {
|
||||
const normalizedReason = normalizeWakeReason(reason);
|
||||
const next: PendingWakeReason = {
|
||||
reason: normalizedReason,
|
||||
priority: resolveReasonPriority(normalizedReason),
|
||||
requestedAt,
|
||||
};
|
||||
if (!pendingWake) {
|
||||
pendingWake = next;
|
||||
return;
|
||||
}
|
||||
if (next.priority > pendingWake.priority) {
|
||||
pendingWake = next;
|
||||
return;
|
||||
}
|
||||
if (next.priority === pendingWake.priority && next.requestedAt >= pendingWake.requestedAt) {
|
||||
pendingWake = next;
|
||||
}
|
||||
}
|
||||
|
||||
function schedule(coalesceMs: number, kind: WakeTimerKind = "normal") {
|
||||
const delay = Number.isFinite(coalesceMs) ? Math.max(0, coalesceMs) : DEFAULT_COALESCE_MS;
|
||||
const dueAt = Date.now() + delay;
|
||||
if (timer) {
|
||||
// Keep retry cooldown as a hard minimum delay. This prevents the
|
||||
// finally-path reschedule (often delay=0) from collapsing backoff.
|
||||
if (timerKind === "retry") {
|
||||
return;
|
||||
}
|
||||
// If existing timer fires sooner or at the same time, keep it.
|
||||
if (typeof timerDueAt === "number" && timerDueAt <= dueAt) {
|
||||
return;
|
||||
}
|
||||
// New request needs to fire sooner — preempt the existing timer.
|
||||
clearTimeout(timer);
|
||||
timer = null;
|
||||
timerDueAt = null;
|
||||
timerKind = null;
|
||||
}
|
||||
timerDueAt = dueAt;
|
||||
timerKind = kind;
|
||||
timer = setTimeout(async () => {
|
||||
timer = null;
|
||||
timerDueAt = null;
|
||||
timerKind = null;
|
||||
scheduled = false;
|
||||
const active = handler;
|
||||
if (!active) {
|
||||
@@ -27,44 +108,62 @@ function schedule(coalesceMs: number) {
|
||||
}
|
||||
if (running) {
|
||||
scheduled = true;
|
||||
schedule(coalesceMs);
|
||||
schedule(delay, kind);
|
||||
return;
|
||||
}
|
||||
|
||||
const reason = pendingReason;
|
||||
pendingReason = null;
|
||||
const reason = pendingWake?.reason;
|
||||
pendingWake = null;
|
||||
running = true;
|
||||
try {
|
||||
const res = await active({ reason: reason ?? undefined });
|
||||
if (res.status === "skipped" && res.reason === "requests-in-flight") {
|
||||
// The main lane is busy; retry soon.
|
||||
pendingReason = reason ?? "retry";
|
||||
schedule(DEFAULT_RETRY_MS);
|
||||
queuePendingWakeReason(reason ?? "retry");
|
||||
schedule(DEFAULT_RETRY_MS, "retry");
|
||||
}
|
||||
} catch {
|
||||
// Error is already logged by the heartbeat runner; schedule a retry.
|
||||
pendingReason = reason ?? "retry";
|
||||
schedule(DEFAULT_RETRY_MS);
|
||||
queuePendingWakeReason(reason ?? "retry");
|
||||
schedule(DEFAULT_RETRY_MS, "retry");
|
||||
} finally {
|
||||
running = false;
|
||||
if (pendingReason || scheduled) {
|
||||
schedule(coalesceMs);
|
||||
if (pendingWake || scheduled) {
|
||||
schedule(delay, "normal");
|
||||
}
|
||||
}
|
||||
}, coalesceMs);
|
||||
}, delay);
|
||||
timer.unref?.();
|
||||
}
|
||||
|
||||
export function setHeartbeatWakeHandler(next: HeartbeatWakeHandler | null) {
|
||||
/**
|
||||
* Register (or clear) the heartbeat wake handler.
|
||||
* Returns a disposer function that clears this specific registration.
|
||||
* Stale disposers (from previous registrations) are no-ops, preventing
|
||||
* a race where an old runner's cleanup clears a newer runner's handler.
|
||||
*/
|
||||
export function setHeartbeatWakeHandler(next: HeartbeatWakeHandler | null): () => void {
|
||||
handlerGeneration += 1;
|
||||
const generation = handlerGeneration;
|
||||
handler = next;
|
||||
if (handler && pendingReason) {
|
||||
schedule(DEFAULT_COALESCE_MS);
|
||||
if (handler && pendingWake) {
|
||||
schedule(DEFAULT_COALESCE_MS, "normal");
|
||||
}
|
||||
return () => {
|
||||
if (handlerGeneration !== generation) {
|
||||
return;
|
||||
}
|
||||
if (handler !== next) {
|
||||
return;
|
||||
}
|
||||
handlerGeneration += 1;
|
||||
handler = null;
|
||||
};
|
||||
}
|
||||
|
||||
export function requestHeartbeatNow(opts?: { reason?: string; coalesceMs?: number }) {
|
||||
pendingReason = opts?.reason ?? pendingReason ?? "requested";
|
||||
schedule(opts?.coalesceMs ?? DEFAULT_COALESCE_MS);
|
||||
queuePendingWakeReason(opts?.reason);
|
||||
schedule(opts?.coalesceMs ?? DEFAULT_COALESCE_MS, "normal");
|
||||
}
|
||||
|
||||
export function hasHeartbeatWakeHandler() {
|
||||
@@ -72,5 +171,5 @@ export function hasHeartbeatWakeHandler() {
|
||||
}
|
||||
|
||||
export function hasPendingHeartbeatWake() {
|
||||
return pendingReason !== null || Boolean(timer) || scheduled;
|
||||
return pendingWake !== null || Boolean(timer) || scheduled;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user