Heartbeat: queue pending wakes per target

This commit is contained in:
Vignesh Natarajan
2026-02-16 15:04:45 -08:00
committed by Peter Steinberger
parent a7c25f203a
commit 064a3079cb
2 changed files with 88 additions and 36 deletions

View File

@@ -279,4 +279,41 @@ describe("heartbeat-wake", () => {
sessionKey: "agent:ops:discord:channel:alerts",
});
});
it("executes distinct targeted wakes queued in the same coalescing window", async () => {
vi.useFakeTimers();
const handler = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 });
setHeartbeatWakeHandler(handler);
requestHeartbeatNow({
reason: "cron:job-a",
agentId: "ops",
sessionKey: "agent:ops:discord:channel:alerts",
coalesceMs: 100,
});
requestHeartbeatNow({
reason: "cron:job-b",
agentId: "main",
sessionKey: "agent:main:telegram:group:-1001",
coalesceMs: 100,
});
await vi.advanceTimersByTimeAsync(100);
expect(handler).toHaveBeenCalledTimes(2);
expect(handler.mock.calls.map((call) => call[0])).toEqual(
expect.arrayContaining([
{
reason: "cron:job-a",
agentId: "ops",
sessionKey: "agent:ops:discord:channel:alerts",
},
{
reason: "cron:job-b",
agentId: "main",
sessionKey: "agent:main:telegram:group:-1001",
},
]),
);
});
});

View File

@@ -20,7 +20,7 @@ type PendingWakeReason = {
let handler: HeartbeatWakeHandler | null = null;
let handlerGeneration = 0;
let pendingWake: PendingWakeReason | null = null;
const pendingWakes = new Map<string, PendingWakeReason>();
let scheduled = false;
let running = false;
let timer: NodeJS.Timeout | null = null;
@@ -67,6 +67,12 @@ function normalizeWakeTarget(value?: string): string | undefined {
return trimmed || undefined;
}
function getWakeTargetKey(params: { agentId?: string; sessionKey?: string }) {
const agentId = normalizeWakeTarget(params.agentId);
const sessionKey = normalizeWakeTarget(params.sessionKey);
return `${agentId ?? ""}::${sessionKey ?? ""}`;
}
function queuePendingWakeReason(params?: {
reason?: string;
requestedAt?: number;
@@ -75,23 +81,30 @@ function queuePendingWakeReason(params?: {
}) {
const requestedAt = params?.requestedAt ?? Date.now();
const normalizedReason = normalizeWakeReason(params?.reason);
const normalizedAgentId = normalizeWakeTarget(params?.agentId);
const normalizedSessionKey = normalizeWakeTarget(params?.sessionKey);
const wakeTargetKey = getWakeTargetKey({
agentId: normalizedAgentId,
sessionKey: normalizedSessionKey,
});
const next: PendingWakeReason = {
reason: normalizedReason,
priority: resolveReasonPriority(normalizedReason),
requestedAt,
agentId: normalizeWakeTarget(params?.agentId),
sessionKey: normalizeWakeTarget(params?.sessionKey),
agentId: normalizedAgentId,
sessionKey: normalizedSessionKey,
};
if (!pendingWake) {
pendingWake = next;
const previous = pendingWakes.get(wakeTargetKey);
if (!previous) {
pendingWakes.set(wakeTargetKey, next);
return;
}
if (next.priority > pendingWake.priority) {
pendingWake = next;
if (next.priority > previous.priority) {
pendingWakes.set(wakeTargetKey, next);
return;
}
if (next.priority === pendingWake.priority && next.requestedAt >= pendingWake.requestedAt) {
pendingWake = next;
if (next.priority === previous.priority && next.requestedAt >= previous.requestedAt) {
pendingWakes.set(wakeTargetKey, next);
}
}
@@ -131,38 +144,40 @@ function schedule(coalesceMs: number, kind: WakeTimerKind = "normal") {
return;
}
const reason = pendingWake?.reason;
const agentId = pendingWake?.agentId;
const sessionKey = pendingWake?.sessionKey;
pendingWake = null;
const pendingBatch = Array.from(pendingWakes.values());
pendingWakes.clear();
running = true;
try {
const wakeOpts = {
reason: reason ?? undefined,
...(agentId ? { agentId } : {}),
...(sessionKey ? { sessionKey } : {}),
};
const res = await active(wakeOpts);
if (res.status === "skipped" && res.reason === "requests-in-flight") {
// The main lane is busy; retry soon.
queuePendingWakeReason({
reason: reason ?? "retry",
agentId,
sessionKey,
});
schedule(DEFAULT_RETRY_MS, "retry");
for (const pendingWake of pendingBatch) {
const wakeOpts = {
reason: pendingWake.reason ?? undefined,
...(pendingWake.agentId ? { agentId: pendingWake.agentId } : {}),
...(pendingWake.sessionKey ? { sessionKey: pendingWake.sessionKey } : {}),
};
const res = await active(wakeOpts);
if (res.status === "skipped" && res.reason === "requests-in-flight") {
// The main lane is busy; retry this wake target soon.
queuePendingWakeReason({
reason: pendingWake.reason ?? "retry",
agentId: pendingWake.agentId,
sessionKey: pendingWake.sessionKey,
});
schedule(DEFAULT_RETRY_MS, "retry");
}
}
} catch {
// Error is already logged by the heartbeat runner; schedule a retry.
queuePendingWakeReason({
reason: reason ?? "retry",
agentId,
sessionKey,
});
for (const pendingWake of pendingBatch) {
queuePendingWakeReason({
reason: pendingWake.reason ?? "retry",
agentId: pendingWake.agentId,
sessionKey: pendingWake.sessionKey,
});
}
schedule(DEFAULT_RETRY_MS, "retry");
} finally {
running = false;
if (pendingWake || scheduled) {
if (pendingWakes.size > 0 || scheduled) {
schedule(delay, "normal");
}
}
@@ -197,7 +212,7 @@ export function setHeartbeatWakeHandler(next: HeartbeatWakeHandler | null): () =
running = false;
scheduled = false;
}
if (handler && pendingWake) {
if (handler && pendingWakes.size > 0) {
schedule(DEFAULT_COALESCE_MS, "normal");
}
return () => {
@@ -231,7 +246,7 @@ export function hasHeartbeatWakeHandler() {
}
export function hasPendingHeartbeatWake() {
return pendingWake !== null || Boolean(timer) || scheduled;
return pendingWakes.size > 0 || Boolean(timer) || scheduled;
}
export function resetHeartbeatWakeStateForTests() {
@@ -241,7 +256,7 @@ export function resetHeartbeatWakeStateForTests() {
timer = null;
timerDueAt = null;
timerKind = null;
pendingWake = null;
pendingWakes.clear();
scheduled = false;
running = false;
handlerGeneration += 1;