fix: defer gateway restart until all replies are sent (#12970)

* fix: defer gateway restart until all replies are sent

Fixes a race condition where gateway config changes (e.g., enabling
plugins via iMessage) trigger an immediate SIGUSR1 restart, killing the
iMessage RPC connection before replies are delivered.

Both restart paths (config watcher and RPC-triggered) now defer until
all queued operations, pending replies, and embedded agent runs complete
(polling every 500ms, 30s timeout). A shared emitGatewayRestart() guard
prevents double SIGUSR1 when both paths fire simultaneously.

Key changes:
- Dispatcher registry tracks active reply dispatchers globally
- markComplete() called in finally block for guaranteed cleanup
- Pre-restart deferral hook registered at gateway startup
- Centralized extractDeliveryInfo() for session key parsing
- Post-restart sentinel messages delivered directly (not via agent)
- config-patch distinguished from config-apply in sentinel kind

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: single-source gateway restart authorization

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Peter Steinberger <steipete@gmail.com>
This commit is contained in:
Bridgerz
2026-02-13 15:29:29 -08:00
committed by GitHub
parent dc507f3dec
commit ab4a08a82a
21 changed files with 976 additions and 76 deletions

View File

@@ -64,6 +64,7 @@ function createDispatcher(): ReplyDispatcher {
sendFinalReply: vi.fn(() => true),
waitForIdle: vi.fn(async () => {}),
getQueuedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })),
markComplete: vi.fn(),
};
}

View File

@@ -454,5 +454,9 @@ export async function dispatchReplyFromConfig(params: {
recordProcessed("error", { error: String(err) });
markIdle("message_error");
throw err;
} finally {
// Always clear the dispatcher reservation so a leaked pending count
// can never permanently block gateway restarts.
dispatcher.markComplete();
}
}

View File

@@ -0,0 +1,58 @@
/**
* Global registry for tracking active reply dispatchers.
* Used to ensure gateway restart waits for all replies to complete.
*/
type TrackedDispatcher = {
readonly id: string;
readonly pending: () => number;
readonly waitForIdle: () => Promise<void>;
};
const activeDispatchers = new Set<TrackedDispatcher>();
let nextId = 0;
/**
* Register a reply dispatcher for global tracking.
* Returns an unregister function to call when the dispatcher is no longer needed.
*/
export function registerDispatcher(dispatcher: {
readonly pending: () => number;
readonly waitForIdle: () => Promise<void>;
}): { id: string; unregister: () => void } {
const id = `dispatcher-${++nextId}`;
const tracked: TrackedDispatcher = {
id,
pending: dispatcher.pending,
waitForIdle: dispatcher.waitForIdle,
};
activeDispatchers.add(tracked);
const unregister = () => {
activeDispatchers.delete(tracked);
};
return { id, unregister };
}
/**
* Get the total number of pending replies across all dispatchers.
*/
export function getTotalPendingReplies(): number {
let total = 0;
for (const dispatcher of activeDispatchers) {
total += dispatcher.pending();
}
return total;
}
/**
* Clear all registered dispatchers (for testing).
* WARNING: Only use this in test cleanup!
*/
export function clearAllDispatchers(): void {
if (!process.env.VITEST && process.env.NODE_ENV !== "test") {
throw new Error("clearAllDispatchers() is only available in test environments");
}
activeDispatchers.clear();
}

View File

@@ -3,6 +3,7 @@ import type { GetReplyOptions, ReplyPayload } from "../types.js";
import type { ResponsePrefixContext } from "./response-prefix-template.js";
import type { TypingController } from "./typing.js";
import { sleep } from "../../utils.js";
import { registerDispatcher } from "./dispatcher-registry.js";
import { normalizeReplyPayload, type NormalizeReplySkipReason } from "./normalize-reply.js";
export type ReplyDispatchKind = "tool" | "block" | "final";
@@ -74,6 +75,7 @@ export type ReplyDispatcher = {
sendFinalReply: (payload: ReplyPayload) => boolean;
waitForIdle: () => Promise<void>;
getQueuedCounts: () => Record<ReplyDispatchKind, number>;
markComplete: () => void;
};
type NormalizeReplyPayloadInternalOptions = Pick<
@@ -101,7 +103,10 @@ function normalizeReplyPayloadInternal(
export function createReplyDispatcher(options: ReplyDispatcherOptions): ReplyDispatcher {
let sendChain: Promise<void> = Promise.resolve();
// Track in-flight deliveries so we can emit a reliable "idle" signal.
let pending = 0;
// Start with pending=1 as a "reservation" to prevent premature gateway restart.
// This is decremented when markComplete() is called to signal no more replies will come.
let pending = 1;
let completeCalled = false;
// Track whether we've sent a block reply (for human delay - skip delay on first block).
let sentFirstBlock = false;
// Serialize outbound replies to preserve tool/block/final order.
@@ -111,6 +116,12 @@ export function createReplyDispatcher(options: ReplyDispatcherOptions): ReplyDis
final: 0,
};
// Register this dispatcher globally for gateway restart coordination.
const { unregister } = registerDispatcher({
pending: () => pending,
waitForIdle: () => sendChain,
});
const enqueue = (kind: ReplyDispatchKind, payload: ReplyPayload) => {
const normalized = normalizeReplyPayloadInternal(payload, {
responsePrefix: options.responsePrefix,
@@ -140,6 +151,8 @@ export function createReplyDispatcher(options: ReplyDispatcherOptions): ReplyDis
await sleep(delayMs);
}
}
// Safe: deliver is called inside an async .then() callback, so even a synchronous
// throw becomes a rejection that flows through .catch()/.finally(), ensuring cleanup.
await options.deliver(normalized, { kind });
})
.catch((err) => {
@@ -147,19 +160,49 @@ export function createReplyDispatcher(options: ReplyDispatcherOptions): ReplyDis
})
.finally(() => {
pending -= 1;
// Clear reservation if:
// 1. pending is now 1 (just the reservation left)
// 2. markComplete has been called
// 3. No more replies will be enqueued
if (pending === 1 && completeCalled) {
pending -= 1; // Clear the reservation
}
if (pending === 0) {
// Unregister from global tracking when idle.
unregister();
options.onIdle?.();
}
});
return true;
};
const markComplete = () => {
if (completeCalled) {
return;
}
completeCalled = true;
// If no replies were enqueued (pending is still 1 = just the reservation),
// schedule clearing the reservation after current microtasks complete.
// This gives any in-flight enqueue() calls a chance to increment pending.
void Promise.resolve().then(() => {
if (pending === 1 && completeCalled) {
// Still just the reservation, no replies were enqueued
pending -= 1;
if (pending === 0) {
unregister();
options.onIdle?.();
}
}
});
};
return {
sendToolResult: (payload) => enqueue("tool", payload),
sendBlockReply: (payload) => enqueue("block", payload),
sendFinalReply: (payload) => enqueue("final", payload),
waitForIdle: () => sendChain,
getQueuedCounts: () => ({ ...queuedCounts }),
markComplete,
};
}

View File

@@ -100,6 +100,8 @@ describe("createReplyDispatcher", () => {
dispatcher.sendFinalReply({ text: "two" });
await dispatcher.waitForIdle();
dispatcher.markComplete();
await Promise.resolve();
expect(onIdle).toHaveBeenCalledTimes(1);
});