From 064a3079cb5fe54a6539ed16c2d72227e89301b6 Mon Sep 17 00:00:00 2001 From: Vignesh Natarajan Date: Mon, 16 Feb 2026 15:04:45 -0800 Subject: [PATCH] Heartbeat: queue pending wakes per target --- src/infra/heartbeat-wake.test.ts | 37 ++++++++++++++ src/infra/heartbeat-wake.ts | 87 +++++++++++++++++++------------- 2 files changed, 88 insertions(+), 36 deletions(-) diff --git a/src/infra/heartbeat-wake.test.ts b/src/infra/heartbeat-wake.test.ts index e0c8364b30a..2cda1771b8b 100644 --- a/src/infra/heartbeat-wake.test.ts +++ b/src/infra/heartbeat-wake.test.ts @@ -279,4 +279,41 @@ describe("heartbeat-wake", () => { sessionKey: "agent:ops:discord:channel:alerts", }); }); + + it("executes distinct targeted wakes queued in the same coalescing window", async () => { + vi.useFakeTimers(); + const handler = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 }); + setHeartbeatWakeHandler(handler); + + requestHeartbeatNow({ + reason: "cron:job-a", + agentId: "ops", + sessionKey: "agent:ops:discord:channel:alerts", + coalesceMs: 100, + }); + requestHeartbeatNow({ + reason: "cron:job-b", + agentId: "main", + sessionKey: "agent:main:telegram:group:-1001", + coalesceMs: 100, + }); + + await vi.advanceTimersByTimeAsync(100); + + expect(handler).toHaveBeenCalledTimes(2); + expect(handler.mock.calls.map((call) => call[0])).toEqual( + expect.arrayContaining([ + { + reason: "cron:job-a", + agentId: "ops", + sessionKey: "agent:ops:discord:channel:alerts", + }, + { + reason: "cron:job-b", + agentId: "main", + sessionKey: "agent:main:telegram:group:-1001", + }, + ]), + ); + }); }); diff --git a/src/infra/heartbeat-wake.ts b/src/infra/heartbeat-wake.ts index 8543abb07a8..d1dcfb03953 100644 --- a/src/infra/heartbeat-wake.ts +++ b/src/infra/heartbeat-wake.ts @@ -20,7 +20,7 @@ type PendingWakeReason = { let handler: HeartbeatWakeHandler | null = null; let handlerGeneration = 0; -let pendingWake: PendingWakeReason | null = null; +const pendingWakes = new Map(); let scheduled = false; let running = false; let timer: NodeJS.Timeout | null = null; @@ -67,6 +67,12 @@ function normalizeWakeTarget(value?: string): string | undefined { return trimmed || undefined; } +function getWakeTargetKey(params: { agentId?: string; sessionKey?: string }) { + const agentId = normalizeWakeTarget(params.agentId); + const sessionKey = normalizeWakeTarget(params.sessionKey); + return `${agentId ?? ""}::${sessionKey ?? ""}`; +} + function queuePendingWakeReason(params?: { reason?: string; requestedAt?: number; @@ -75,23 +81,30 @@ function queuePendingWakeReason(params?: { }) { const requestedAt = params?.requestedAt ?? Date.now(); const normalizedReason = normalizeWakeReason(params?.reason); + const normalizedAgentId = normalizeWakeTarget(params?.agentId); + const normalizedSessionKey = normalizeWakeTarget(params?.sessionKey); + const wakeTargetKey = getWakeTargetKey({ + agentId: normalizedAgentId, + sessionKey: normalizedSessionKey, + }); const next: PendingWakeReason = { reason: normalizedReason, priority: resolveReasonPriority(normalizedReason), requestedAt, - agentId: normalizeWakeTarget(params?.agentId), - sessionKey: normalizeWakeTarget(params?.sessionKey), + agentId: normalizedAgentId, + sessionKey: normalizedSessionKey, }; - if (!pendingWake) { - pendingWake = next; + const previous = pendingWakes.get(wakeTargetKey); + if (!previous) { + pendingWakes.set(wakeTargetKey, next); return; } - if (next.priority > pendingWake.priority) { - pendingWake = next; + if (next.priority > previous.priority) { + pendingWakes.set(wakeTargetKey, next); return; } - if (next.priority === pendingWake.priority && next.requestedAt >= pendingWake.requestedAt) { - pendingWake = next; + if (next.priority === previous.priority && next.requestedAt >= previous.requestedAt) { + pendingWakes.set(wakeTargetKey, next); } } @@ -131,38 +144,40 @@ function schedule(coalesceMs: number, kind: WakeTimerKind = "normal") { return; } - const reason = pendingWake?.reason; - const agentId = pendingWake?.agentId; - const sessionKey = pendingWake?.sessionKey; - pendingWake = null; + const pendingBatch = Array.from(pendingWakes.values()); + pendingWakes.clear(); running = true; try { - const wakeOpts = { - reason: reason ?? undefined, - ...(agentId ? { agentId } : {}), - ...(sessionKey ? { sessionKey } : {}), - }; - const res = await active(wakeOpts); - if (res.status === "skipped" && res.reason === "requests-in-flight") { - // The main lane is busy; retry soon. - queuePendingWakeReason({ - reason: reason ?? "retry", - agentId, - sessionKey, - }); - schedule(DEFAULT_RETRY_MS, "retry"); + for (const pendingWake of pendingBatch) { + const wakeOpts = { + reason: pendingWake.reason ?? undefined, + ...(pendingWake.agentId ? { agentId: pendingWake.agentId } : {}), + ...(pendingWake.sessionKey ? { sessionKey: pendingWake.sessionKey } : {}), + }; + const res = await active(wakeOpts); + if (res.status === "skipped" && res.reason === "requests-in-flight") { + // The main lane is busy; retry this wake target soon. + queuePendingWakeReason({ + reason: pendingWake.reason ?? "retry", + agentId: pendingWake.agentId, + sessionKey: pendingWake.sessionKey, + }); + schedule(DEFAULT_RETRY_MS, "retry"); + } } } catch { // Error is already logged by the heartbeat runner; schedule a retry. - queuePendingWakeReason({ - reason: reason ?? "retry", - agentId, - sessionKey, - }); + for (const pendingWake of pendingBatch) { + queuePendingWakeReason({ + reason: pendingWake.reason ?? "retry", + agentId: pendingWake.agentId, + sessionKey: pendingWake.sessionKey, + }); + } schedule(DEFAULT_RETRY_MS, "retry"); } finally { running = false; - if (pendingWake || scheduled) { + if (pendingWakes.size > 0 || scheduled) { schedule(delay, "normal"); } } @@ -197,7 +212,7 @@ export function setHeartbeatWakeHandler(next: HeartbeatWakeHandler | null): () = running = false; scheduled = false; } - if (handler && pendingWake) { + if (handler && pendingWakes.size > 0) { schedule(DEFAULT_COALESCE_MS, "normal"); } return () => { @@ -231,7 +246,7 @@ export function hasHeartbeatWakeHandler() { } export function hasPendingHeartbeatWake() { - return pendingWake !== null || Boolean(timer) || scheduled; + return pendingWakes.size > 0 || Boolean(timer) || scheduled; } export function resetHeartbeatWakeStateForTests() { @@ -241,7 +256,7 @@ export function resetHeartbeatWakeStateForTests() { timer = null; timerDueAt = null; timerKind = null; - pendingWake = null; + pendingWakes.clear(); scheduled = false; running = false; handlerGeneration += 1;