diff --git a/src/agents/pi-embedded-runner/runs.ts b/src/agents/pi-embedded-runner/runs.ts index f5ca9721083..e0155874028 100644 --- a/src/agents/pi-embedded-runner/runs.ts +++ b/src/agents/pi-embedded-runner/runs.ts @@ -64,6 +64,10 @@ export function isEmbeddedPiRunStreaming(sessionId: string): boolean { return handle.isStreaming(); } +export function getActiveEmbeddedRunCount(): number { + return ACTIVE_EMBEDDED_RUNS.size; +} + export function waitForEmbeddedPiRunEnd(sessionId: string, timeoutMs = 15_000): Promise { if (!sessionId || !ACTIVE_EMBEDDED_RUNS.has(sessionId)) { return Promise.resolve(true); diff --git a/src/agents/tools/gateway-tool.ts b/src/agents/tools/gateway-tool.ts index 9560b323c4a..127fe1ff184 100644 --- a/src/agents/tools/gateway-tool.ts +++ b/src/agents/tools/gateway-tool.ts @@ -1,7 +1,7 @@ import { Type } from "@sinclair/typebox"; import type { OpenClawConfig } from "../../config/config.js"; -import { loadConfig, resolveConfigSnapshotHash } from "../../config/io.js"; -import { loadSessionStore, resolveStorePath } from "../../config/sessions.js"; +import { resolveConfigSnapshotHash } from "../../config/io.js"; +import { extractDeliveryInfo } from "../../config/sessions.js"; import { formatDoctorNonInteractiveHint, type RestartSentinelPayload, @@ -69,7 +69,7 @@ export function createGatewayTool(opts?: { label: "Gateway", name: "gateway", description: - "Restart, apply config, or update the gateway in-place (SIGUSR1). Use config.patch for safe partial config updates (merges with existing). Use config.apply only when replacing entire config. Both trigger restart after writing.", + "Restart, apply config, or update the gateway in-place (SIGUSR1). Use config.patch for safe partial config updates (merges with existing). Use config.apply only when replacing entire config. Both trigger restart after writing. Always pass a human-readable completion message via the `note` parameter so the system can deliver it to the user after restart.", parameters: GatewayToolSchema, execute: async (_toolCallId, args) => { const params = args as Record; @@ -93,34 +93,8 @@ export function createGatewayTool(opts?: { const note = typeof params.note === "string" && params.note.trim() ? params.note.trim() : undefined; // Extract channel + threadId for routing after restart - let deliveryContext: { channel?: string; to?: string; accountId?: string } | undefined; - let threadId: string | undefined; - if (sessionKey) { - const threadMarker = ":thread:"; - const threadIndex = sessionKey.lastIndexOf(threadMarker); - const baseSessionKey = threadIndex === -1 ? sessionKey : sessionKey.slice(0, threadIndex); - const threadIdRaw = - threadIndex === -1 ? undefined : sessionKey.slice(threadIndex + threadMarker.length); - threadId = threadIdRaw?.trim() || undefined; - try { - const cfg = loadConfig(); - const storePath = resolveStorePath(cfg.session?.store); - const store = loadSessionStore(storePath); - let entry = store[sessionKey]; - if (!entry?.deliveryContext && threadIndex !== -1 && baseSessionKey) { - entry = store[baseSessionKey]; - } - if (entry?.deliveryContext) { - deliveryContext = { - channel: entry.deliveryContext.channel, - to: entry.deliveryContext.to, - accountId: entry.deliveryContext.accountId, - }; - } - } catch { - // ignore: best-effort - } - } + // Supports both :thread: (most channels) and :topic: (Telegram) + const { deliveryContext, threadId } = extractDeliveryInfo(sessionKey); const payload: RestartSentinelPayload = { kind: "restart", status: "ok", diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index 01c96466965..4cc6657d2a2 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -64,6 +64,7 @@ function createDispatcher(): ReplyDispatcher { sendFinalReply: vi.fn(() => true), waitForIdle: vi.fn(async () => {}), getQueuedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })), + markComplete: vi.fn(), }; } diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index f04aff0a7b5..0f2cae6b4a2 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -454,5 +454,9 @@ export async function dispatchReplyFromConfig(params: { recordProcessed("error", { error: String(err) }); markIdle("message_error"); throw err; + } finally { + // Always clear the dispatcher reservation so a leaked pending count + // can never permanently block gateway restarts. + dispatcher.markComplete(); } } diff --git a/src/auto-reply/reply/dispatcher-registry.ts b/src/auto-reply/reply/dispatcher-registry.ts new file mode 100644 index 00000000000..0ef42fbf73f --- /dev/null +++ b/src/auto-reply/reply/dispatcher-registry.ts @@ -0,0 +1,58 @@ +/** + * Global registry for tracking active reply dispatchers. + * Used to ensure gateway restart waits for all replies to complete. + */ + +type TrackedDispatcher = { + readonly id: string; + readonly pending: () => number; + readonly waitForIdle: () => Promise; +}; + +const activeDispatchers = new Set(); +let nextId = 0; + +/** + * Register a reply dispatcher for global tracking. + * Returns an unregister function to call when the dispatcher is no longer needed. + */ +export function registerDispatcher(dispatcher: { + readonly pending: () => number; + readonly waitForIdle: () => Promise; +}): { id: string; unregister: () => void } { + const id = `dispatcher-${++nextId}`; + const tracked: TrackedDispatcher = { + id, + pending: dispatcher.pending, + waitForIdle: dispatcher.waitForIdle, + }; + activeDispatchers.add(tracked); + + const unregister = () => { + activeDispatchers.delete(tracked); + }; + + return { id, unregister }; +} + +/** + * Get the total number of pending replies across all dispatchers. + */ +export function getTotalPendingReplies(): number { + let total = 0; + for (const dispatcher of activeDispatchers) { + total += dispatcher.pending(); + } + return total; +} + +/** + * Clear all registered dispatchers (for testing). + * WARNING: Only use this in test cleanup! + */ +export function clearAllDispatchers(): void { + if (!process.env.VITEST && process.env.NODE_ENV !== "test") { + throw new Error("clearAllDispatchers() is only available in test environments"); + } + activeDispatchers.clear(); +} diff --git a/src/auto-reply/reply/reply-dispatcher.ts b/src/auto-reply/reply/reply-dispatcher.ts index 270efb001e5..9027af0693d 100644 --- a/src/auto-reply/reply/reply-dispatcher.ts +++ b/src/auto-reply/reply/reply-dispatcher.ts @@ -3,6 +3,7 @@ import type { GetReplyOptions, ReplyPayload } from "../types.js"; import type { ResponsePrefixContext } from "./response-prefix-template.js"; import type { TypingController } from "./typing.js"; import { sleep } from "../../utils.js"; +import { registerDispatcher } from "./dispatcher-registry.js"; import { normalizeReplyPayload, type NormalizeReplySkipReason } from "./normalize-reply.js"; export type ReplyDispatchKind = "tool" | "block" | "final"; @@ -74,6 +75,7 @@ export type ReplyDispatcher = { sendFinalReply: (payload: ReplyPayload) => boolean; waitForIdle: () => Promise; getQueuedCounts: () => Record; + markComplete: () => void; }; type NormalizeReplyPayloadInternalOptions = Pick< @@ -101,7 +103,10 @@ function normalizeReplyPayloadInternal( export function createReplyDispatcher(options: ReplyDispatcherOptions): ReplyDispatcher { let sendChain: Promise = Promise.resolve(); // Track in-flight deliveries so we can emit a reliable "idle" signal. - let pending = 0; + // Start with pending=1 as a "reservation" to prevent premature gateway restart. + // This is decremented when markComplete() is called to signal no more replies will come. + let pending = 1; + let completeCalled = false; // Track whether we've sent a block reply (for human delay - skip delay on first block). let sentFirstBlock = false; // Serialize outbound replies to preserve tool/block/final order. @@ -111,6 +116,12 @@ export function createReplyDispatcher(options: ReplyDispatcherOptions): ReplyDis final: 0, }; + // Register this dispatcher globally for gateway restart coordination. + const { unregister } = registerDispatcher({ + pending: () => pending, + waitForIdle: () => sendChain, + }); + const enqueue = (kind: ReplyDispatchKind, payload: ReplyPayload) => { const normalized = normalizeReplyPayloadInternal(payload, { responsePrefix: options.responsePrefix, @@ -140,6 +151,8 @@ export function createReplyDispatcher(options: ReplyDispatcherOptions): ReplyDis await sleep(delayMs); } } + // Safe: deliver is called inside an async .then() callback, so even a synchronous + // throw becomes a rejection that flows through .catch()/.finally(), ensuring cleanup. await options.deliver(normalized, { kind }); }) .catch((err) => { @@ -147,19 +160,49 @@ export function createReplyDispatcher(options: ReplyDispatcherOptions): ReplyDis }) .finally(() => { pending -= 1; + // Clear reservation if: + // 1. pending is now 1 (just the reservation left) + // 2. markComplete has been called + // 3. No more replies will be enqueued + if (pending === 1 && completeCalled) { + pending -= 1; // Clear the reservation + } if (pending === 0) { + // Unregister from global tracking when idle. + unregister(); options.onIdle?.(); } }); return true; }; + const markComplete = () => { + if (completeCalled) { + return; + } + completeCalled = true; + // If no replies were enqueued (pending is still 1 = just the reservation), + // schedule clearing the reservation after current microtasks complete. + // This gives any in-flight enqueue() calls a chance to increment pending. + void Promise.resolve().then(() => { + if (pending === 1 && completeCalled) { + // Still just the reservation, no replies were enqueued + pending -= 1; + if (pending === 0) { + unregister(); + options.onIdle?.(); + } + } + }); + }; + return { sendToolResult: (payload) => enqueue("tool", payload), sendBlockReply: (payload) => enqueue("block", payload), sendFinalReply: (payload) => enqueue("final", payload), waitForIdle: () => sendChain, getQueuedCounts: () => ({ ...queuedCounts }), + markComplete, }; } diff --git a/src/auto-reply/reply/reply-routing.test.ts b/src/auto-reply/reply/reply-routing.test.ts index 6637c6c1401..3d5179d6c0c 100644 --- a/src/auto-reply/reply/reply-routing.test.ts +++ b/src/auto-reply/reply/reply-routing.test.ts @@ -100,6 +100,8 @@ describe("createReplyDispatcher", () => { dispatcher.sendFinalReply({ text: "two" }); await dispatcher.waitForIdle(); + dispatcher.markComplete(); + await Promise.resolve(); expect(onIdle).toHaveBeenCalledTimes(1); }); diff --git a/src/config/sessions.ts b/src/config/sessions.ts index 20de39409b1..0ea031cf050 100644 --- a/src/config/sessions.ts +++ b/src/config/sessions.ts @@ -7,3 +7,4 @@ export * from "./sessions/session-key.js"; export * from "./sessions/store.js"; export * from "./sessions/types.js"; export * from "./sessions/transcript.js"; +export * from "./sessions/delivery-info.js"; diff --git a/src/config/sessions/delivery-info.ts b/src/config/sessions/delivery-info.ts new file mode 100644 index 00000000000..006f1db4490 --- /dev/null +++ b/src/config/sessions/delivery-info.ts @@ -0,0 +1,46 @@ +import { loadConfig } from "../io.js"; +import { resolveStorePath } from "./paths.js"; +import { loadSessionStore } from "./store.js"; + +/** + * Extract deliveryContext and threadId from a sessionKey. + * Supports both :thread: (most channels) and :topic: (Telegram). + */ +export function extractDeliveryInfo(sessionKey: string | undefined): { + deliveryContext: { channel?: string; to?: string; accountId?: string } | undefined; + threadId: string | undefined; +} { + if (!sessionKey) { + return { deliveryContext: undefined, threadId: undefined }; + } + const topicIndex = sessionKey.lastIndexOf(":topic:"); + const threadIndex = sessionKey.lastIndexOf(":thread:"); + const markerIndex = Math.max(topicIndex, threadIndex); + const marker = topicIndex > threadIndex ? ":topic:" : ":thread:"; + + const baseSessionKey = markerIndex === -1 ? sessionKey : sessionKey.slice(0, markerIndex); + const threadIdRaw = + markerIndex === -1 ? undefined : sessionKey.slice(markerIndex + marker.length); + const threadId = threadIdRaw?.trim() || undefined; + + let deliveryContext: { channel?: string; to?: string; accountId?: string } | undefined; + try { + const cfg = loadConfig(); + const storePath = resolveStorePath(cfg.session?.store); + const store = loadSessionStore(storePath); + let entry = store[sessionKey]; + if (!entry?.deliveryContext && markerIndex !== -1 && baseSessionKey) { + entry = store[baseSessionKey]; + } + if (entry?.deliveryContext) { + deliveryContext = { + channel: entry.deliveryContext.channel, + to: entry.deliveryContext.to, + accountId: entry.deliveryContext.accountId, + }; + } + } catch { + // ignore: best-effort + } + return { deliveryContext, threadId }; +} diff --git a/src/gateway/server-methods/config.ts b/src/gateway/server-methods/config.ts index d4be1a8667e..2e397728c64 100644 --- a/src/gateway/server-methods/config.ts +++ b/src/gateway/server-methods/config.ts @@ -18,6 +18,7 @@ import { restoreRedactedValues, } from "../../config/redact-snapshot.js"; import { buildConfigSchema, type ConfigSchemaResponse } from "../../config/schema.js"; +import { extractDeliveryInfo } from "../../config/sessions.js"; import { formatDoctorNonInteractiveHint, type RestartSentinelPayload, @@ -315,11 +316,17 @@ export const configHandlers: GatewayRequestHandlers = { ? Math.max(0, Math.floor(restartDelayMsRaw)) : undefined; + // Extract deliveryContext + threadId for routing after restart + // Supports both :thread: (most channels) and :topic: (Telegram) + const { deliveryContext, threadId } = extractDeliveryInfo(sessionKey); + const payload: RestartSentinelPayload = { - kind: "config-apply", + kind: "config-patch", status: "ok", ts: Date.now(), sessionKey, + deliveryContext, + threadId, message: note ?? null, doctorHint: formatDoctorNonInteractiveHint(), stats: { @@ -422,11 +429,18 @@ export const configHandlers: GatewayRequestHandlers = { ? Math.max(0, Math.floor(restartDelayMsRaw)) : undefined; + // Extract deliveryContext + threadId for routing after restart + // Supports both :thread: (most channels) and :topic: (Telegram) + const { deliveryContext: deliveryContextApply, threadId: threadIdApply } = + extractDeliveryInfo(sessionKey); + const payload: RestartSentinelPayload = { kind: "config-apply", status: "ok", ts: Date.now(), sessionKey, + deliveryContext: deliveryContextApply, + threadId: threadIdApply, message: note ?? null, doctorHint: formatDoctorNonInteractiveHint(), stats: { diff --git a/src/gateway/server-reload-handlers.ts b/src/gateway/server-reload-handlers.ts index 393a38cf778..02ec35bc306 100644 --- a/src/gateway/server-reload-handlers.ts +++ b/src/gateway/server-reload-handlers.ts @@ -2,15 +2,14 @@ import type { CliDeps } from "../cli/deps.js"; import type { loadConfig } from "../config/config.js"; import type { HeartbeatRunner } from "../infra/heartbeat-runner.js"; import type { ChannelKind, GatewayReloadPlan } from "./config-reload.js"; +import { getActiveEmbeddedRunCount } from "../agents/pi-embedded-runner/runs.js"; +import { getTotalPendingReplies } from "../auto-reply/reply/dispatcher-registry.js"; import { resolveAgentMaxConcurrent, resolveSubagentMaxConcurrent } from "../config/agent-limits.js"; import { startGmailWatcher, stopGmailWatcher } from "../hooks/gmail-watcher.js"; import { isTruthyEnvValue } from "../infra/env.js"; import { resetDirectoryCache } from "../infra/outbound/target-resolver.js"; -import { - authorizeGatewaySigusr1Restart, - setGatewaySigusr1RestartPolicy, -} from "../infra/restart.js"; -import { setCommandLaneConcurrency } from "../process/command-queue.js"; +import { emitGatewayRestart, setGatewaySigusr1RestartPolicy } from "../infra/restart.js"; +import { setCommandLaneConcurrency, getTotalQueueSize } from "../process/command-queue.js"; import { CommandLane } from "../process/lanes.js"; import { resolveHooksConfig } from "./hooks.js"; import { startBrowserControlServerIfEnabled } from "./server-browser.js"; @@ -140,6 +139,8 @@ export function createGatewayReloadHandlers(params: { params.setState(nextState); }; + let restartPending = false; + const requestGatewayRestart = ( plan: GatewayReloadPlan, nextConfig: ReturnType, @@ -148,13 +149,85 @@ export function createGatewayReloadHandlers(params: { const reasons = plan.restartReasons.length ? plan.restartReasons.join(", ") : plan.changedPaths.join(", "); - params.logReload.warn(`config change requires gateway restart (${reasons})`); + if (process.listenerCount("SIGUSR1") === 0) { params.logReload.warn("no SIGUSR1 listener found; restart skipped"); return; } - authorizeGatewaySigusr1Restart(); - process.emit("SIGUSR1"); + + // Check if there are active operations (commands in queue, pending replies, or embedded runs) + const queueSize = getTotalQueueSize(); + const pendingReplies = getTotalPendingReplies(); + const embeddedRuns = getActiveEmbeddedRunCount(); + const totalActive = queueSize + pendingReplies + embeddedRuns; + + if (totalActive > 0) { + // Avoid spinning up duplicate polling loops from repeated config changes. + if (restartPending) { + params.logReload.info( + `config change requires gateway restart (${reasons}) — already waiting for operations to complete`, + ); + return; + } + restartPending = true; + const details = []; + if (queueSize > 0) { + details.push(`${queueSize} queued operation(s)`); + } + if (pendingReplies > 0) { + details.push(`${pendingReplies} pending reply(ies)`); + } + if (embeddedRuns > 0) { + details.push(`${embeddedRuns} embedded run(s)`); + } + params.logReload.warn( + `config change requires gateway restart (${reasons}) — deferring until ${details.join(", ")} complete`, + ); + + // Wait for all operations and replies to complete before restarting (max 30 seconds) + const maxWaitMs = 30_000; + const checkIntervalMs = 500; + const startTime = Date.now(); + + const checkAndRestart = () => { + const currentQueueSize = getTotalQueueSize(); + const currentPendingReplies = getTotalPendingReplies(); + const currentEmbeddedRuns = getActiveEmbeddedRunCount(); + const currentTotalActive = currentQueueSize + currentPendingReplies + currentEmbeddedRuns; + const elapsed = Date.now() - startTime; + + if (currentTotalActive === 0) { + restartPending = false; + params.logReload.info("all operations and replies completed; restarting gateway now"); + emitGatewayRestart(); + } else if (elapsed >= maxWaitMs) { + const remainingDetails = []; + if (currentQueueSize > 0) { + remainingDetails.push(`${currentQueueSize} operation(s)`); + } + if (currentPendingReplies > 0) { + remainingDetails.push(`${currentPendingReplies} reply(ies)`); + } + if (currentEmbeddedRuns > 0) { + remainingDetails.push(`${currentEmbeddedRuns} embedded run(s)`); + } + restartPending = false; + params.logReload.warn( + `restart timeout after ${elapsed}ms with ${remainingDetails.join(", ")} still active; restarting anyway`, + ); + emitGatewayRestart(); + } else { + // Check again soon + setTimeout(checkAndRestart, checkIntervalMs); + } + }; + + setTimeout(checkAndRestart, checkIntervalMs); + } else { + // No active operations or pending replies, restart immediately + params.logReload.warn(`config change requires gateway restart (${reasons})`); + emitGatewayRestart(); + } }; return { applyHotReload, requestGatewayRestart }; diff --git a/src/gateway/server-reload.config-during-reply.test.ts b/src/gateway/server-reload.config-during-reply.test.ts new file mode 100644 index 00000000000..2ae95be5557 --- /dev/null +++ b/src/gateway/server-reload.config-during-reply.test.ts @@ -0,0 +1,151 @@ +/** + * E2E test for config reload during active reply sending. + * Tests that gateway restart is properly deferred until replies are sent. + */ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { + clearAllDispatchers, + getTotalPendingReplies, +} from "../auto-reply/reply/dispatcher-registry.js"; + +// Helper to flush all pending microtasks +async function flushMicrotasks() { + for (let i = 0; i < 10; i++) { + await Promise.resolve(); + } +} + +describe("gateway config reload during reply", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + afterEach(async () => { + vi.restoreAllMocks(); + // Wait for any pending microtasks (from markComplete()) to complete + await flushMicrotasks(); + clearAllDispatchers(); + }); + + it("should defer restart until reply dispatcher completes", async () => { + const { createReplyDispatcher } = await import("../auto-reply/reply/reply-dispatcher.js"); + const { getTotalQueueSize } = await import("../process/command-queue.js"); + + // Create a dispatcher (simulating message handling) + let deliveredReplies: string[] = []; + const dispatcher = createReplyDispatcher({ + deliver: async (payload) => { + // Simulate async reply delivery + await new Promise((resolve) => setTimeout(resolve, 100)); + deliveredReplies.push(payload.text ?? ""); + }, + onError: (err) => { + throw err; + }, + }); + + // Initially: pending=1 (reservation) + expect(getTotalPendingReplies()).toBe(1); + + // Simulate command finishing and enqueuing reply + dispatcher.sendFinalReply({ text: "Configuration updated successfully!" }); + + // Now: pending=2 (reservation + 1 enqueued reply) + expect(getTotalPendingReplies()).toBe(2); + + // Mark dispatcher complete (flags reservation for cleanup on last delivery) + dispatcher.markComplete(); + + // Reservation is still counted until the delivery .finally() clears it, + // but the important invariant is pending > 0 while delivery is in flight. + expect(getTotalPendingReplies()).toBeGreaterThan(0); + + // At this point, if gateway restart was requested, it should defer + // because getTotalPendingReplies() > 0 + + // Wait for reply to be delivered + await dispatcher.waitForIdle(); + + // Now: pending=0 (reply sent) + expect(getTotalPendingReplies()).toBe(0); + expect(deliveredReplies).toEqual(["Configuration updated successfully!"]); + + // Now restart can proceed safely + expect(getTotalQueueSize()).toBe(0); + expect(getTotalPendingReplies()).toBe(0); + }); + + it("should handle dispatcher reservation correctly when no replies sent", async () => { + const { createReplyDispatcher } = await import("../auto-reply/reply/reply-dispatcher.js"); + + let deliverCalled = false; + const dispatcher = createReplyDispatcher({ + deliver: async () => { + deliverCalled = true; + }, + }); + + // Initially: pending=1 (reservation) + expect(getTotalPendingReplies()).toBe(1); + + // Mark complete without sending any replies + dispatcher.markComplete(); + + // Reservation is cleared via microtask — flush it + await flushMicrotasks(); + + // Now: pending=0 (reservation cleared, no replies were enqueued) + expect(getTotalPendingReplies()).toBe(0); + + // Wait for idle (should resolve immediately since no replies) + await dispatcher.waitForIdle(); + + expect(deliverCalled).toBe(false); + expect(getTotalPendingReplies()).toBe(0); + }); + + it("should integrate dispatcher reservation with concurrent dispatchers", async () => { + const { createReplyDispatcher } = await import("../auto-reply/reply/reply-dispatcher.js"); + const { getTotalQueueSize } = await import("../process/command-queue.js"); + + const deliveredReplies: string[] = []; + const dispatcher = createReplyDispatcher({ + deliver: async (payload) => { + await new Promise((resolve) => setTimeout(resolve, 50)); + deliveredReplies.push(payload.text ?? ""); + }, + }); + + // Dispatcher has reservation (pending=1) + expect(getTotalPendingReplies()).toBe(1); + + // Total active = queue + pending + const totalActive = getTotalQueueSize() + getTotalPendingReplies(); + expect(totalActive).toBe(1); // 0 queue + 1 pending + + // Command finishes, replies enqueued + dispatcher.sendFinalReply({ text: "Reply 1" }); + dispatcher.sendFinalReply({ text: "Reply 2" }); + + // Now: pending=3 (reservation + 2 replies) + expect(getTotalPendingReplies()).toBe(3); + + // Mark complete (flags reservation for cleanup on last delivery) + dispatcher.markComplete(); + + // Reservation still counted until delivery .finally() clears it, + // but the important invariant is pending > 0 while deliveries are in flight. + expect(getTotalPendingReplies()).toBeGreaterThan(0); + + // Wait for replies + await dispatcher.waitForIdle(); + + // Replies sent, pending=0 + expect(getTotalPendingReplies()).toBe(0); + expect(deliveredReplies).toEqual(["Reply 1", "Reply 2"]); + + // Now everything is idle + expect(getTotalPendingReplies()).toBe(0); + expect(getTotalQueueSize()).toBe(0); + }); +}); diff --git a/src/gateway/server-reload.integration.test.ts b/src/gateway/server-reload.integration.test.ts new file mode 100644 index 00000000000..d2ab045fac3 --- /dev/null +++ b/src/gateway/server-reload.integration.test.ts @@ -0,0 +1,199 @@ +/** + * Integration test simulating full message handling + config change + reply flow. + * This tests the complete scenario where a user configures an adapter via chat + * and ensures they get a reply before the gateway restarts. + */ +import { describe, expect, it, vi, beforeEach, afterEach } from "vitest"; + +describe("gateway restart deferral integration", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + afterEach(async () => { + vi.restoreAllMocks(); + // Wait for any pending microtasks (from markComplete()) to complete + await Promise.resolve(); + const { clearAllDispatchers } = await import("../auto-reply/reply/dispatcher-registry.js"); + clearAllDispatchers(); + }); + + it("should defer restart until dispatcher completes with reply", async () => { + const { createReplyDispatcher } = await import("../auto-reply/reply/reply-dispatcher.js"); + const { getTotalPendingReplies } = await import("../auto-reply/reply/dispatcher-registry.js"); + const { getTotalQueueSize } = await import("../process/command-queue.js"); + + const events: string[] = []; + + // T=0: Message received — dispatcher created (pending=1 reservation) + events.push("message-received"); + const deliveredReplies: Array<{ text: string; timestamp: number }> = []; + const dispatcher = createReplyDispatcher({ + deliver: async (payload) => { + // Simulate network delay + await new Promise((resolve) => setTimeout(resolve, 100)); + deliveredReplies.push({ + text: payload.text ?? "", + timestamp: Date.now(), + }); + events.push(`reply-delivered: ${payload.text}`); + }, + }); + events.push("dispatcher-created"); + + // T=1: Config change detected + events.push("config-change-detected"); + + // Check if restart should be deferred + const queueSize = getTotalQueueSize(); + const pendingReplies = getTotalPendingReplies(); + const totalActive = queueSize + pendingReplies; + + events.push(`defer-check: queue=${queueSize} pending=${pendingReplies} total=${totalActive}`); + + // Should defer because dispatcher has reservation + expect(totalActive).toBeGreaterThan(0); + expect(pendingReplies).toBe(1); // reservation + + if (totalActive > 0) { + events.push("restart-deferred"); + } + + // T=2: Command finishes, enqueue replies + dispatcher.sendFinalReply({ text: "Adapter configured successfully!" }); + dispatcher.sendFinalReply({ text: "Gateway will restart to apply changes." }); + events.push("replies-enqueued"); + + // Now pending should be 3 (reservation + 2 replies) + expect(getTotalPendingReplies()).toBe(3); + + // Mark command complete (flags reservation for cleanup on last delivery) + dispatcher.markComplete(); + events.push("command-complete"); + + // Reservation still counted until delivery .finally() clears it, + // but the important invariant is pending > 0 while deliveries are in flight. + expect(getTotalPendingReplies()).toBeGreaterThan(0); + + // T=3: Wait for replies to be delivered + await dispatcher.waitForIdle(); + events.push("dispatcher-idle"); + + // Replies should be delivered + expect(deliveredReplies).toHaveLength(2); + expect(deliveredReplies[0].text).toBe("Adapter configured successfully!"); + expect(deliveredReplies[1].text).toBe("Gateway will restart to apply changes."); + + // Pending should be 0 + expect(getTotalPendingReplies()).toBe(0); + + // T=4: Check if restart can proceed + const finalQueueSize = getTotalQueueSize(); + const finalPendingReplies = getTotalPendingReplies(); + const finalTotalActive = finalQueueSize + finalPendingReplies; + + events.push( + `restart-check: queue=${finalQueueSize} pending=${finalPendingReplies} total=${finalTotalActive}`, + ); + + // Everything should be idle now + expect(finalTotalActive).toBe(0); + events.push("restart-can-proceed"); + + // Verify event sequence + expect(events).toEqual([ + "message-received", + "dispatcher-created", + "config-change-detected", + "defer-check: queue=0 pending=1 total=1", + "restart-deferred", + "replies-enqueued", + "command-complete", + "reply-delivered: Adapter configured successfully!", + "reply-delivered: Gateway will restart to apply changes.", + "dispatcher-idle", + "restart-check: queue=0 pending=0 total=0", + "restart-can-proceed", + ]); + }); + + it("should handle concurrent dispatchers with config changes", async () => { + const { createReplyDispatcher } = await import("../auto-reply/reply/reply-dispatcher.js"); + const { getTotalPendingReplies } = await import("../auto-reply/reply/dispatcher-registry.js"); + + // Simulate two messages being processed concurrently + const deliveredReplies: string[] = []; + + // Message 1 — dispatcher created + const dispatcher1 = createReplyDispatcher({ + deliver: async (payload) => { + await new Promise((resolve) => setTimeout(resolve, 50)); + deliveredReplies.push(`msg1: ${payload.text}`); + }, + }); + + // Message 2 — dispatcher created + const dispatcher2 = createReplyDispatcher({ + deliver: async (payload) => { + await new Promise((resolve) => setTimeout(resolve, 50)); + deliveredReplies.push(`msg2: ${payload.text}`); + }, + }); + + // Both dispatchers have reservations + expect(getTotalPendingReplies()).toBe(2); + + // Config change detected - should defer + const totalActive = getTotalPendingReplies(); + expect(totalActive).toBe(2); // 2 dispatcher reservations + + // Messages process and send replies + dispatcher1.sendFinalReply({ text: "Reply from message 1" }); + dispatcher1.markComplete(); + + dispatcher2.sendFinalReply({ text: "Reply from message 2" }); + dispatcher2.markComplete(); + + // Wait for both + await Promise.all([dispatcher1.waitForIdle(), dispatcher2.waitForIdle()]); + + // All idle + expect(getTotalPendingReplies()).toBe(0); + + // Replies delivered + expect(deliveredReplies).toHaveLength(2); + }); + + it("should handle rapid config changes without losing replies", async () => { + const { createReplyDispatcher } = await import("../auto-reply/reply/reply-dispatcher.js"); + const { getTotalPendingReplies } = await import("../auto-reply/reply/dispatcher-registry.js"); + + const deliveredReplies: string[] = []; + + // Message received — dispatcher created + const dispatcher = createReplyDispatcher({ + deliver: async (payload) => { + await new Promise((resolve) => setTimeout(resolve, 200)); // Slow network + deliveredReplies.push(payload.text ?? ""); + }, + }); + + // Config change 1, 2, 3 (rapid changes) + // All should be deferred because dispatcher has pending replies + + // Send replies + dispatcher.sendFinalReply({ text: "Processing..." }); + dispatcher.sendFinalReply({ text: "Almost done..." }); + dispatcher.sendFinalReply({ text: "Complete!" }); + dispatcher.markComplete(); + + // Wait for all replies + await dispatcher.waitForIdle(); + + // All replies should be delivered + expect(deliveredReplies).toEqual(["Processing...", "Almost done...", "Complete!"]); + + // Now restart can proceed + expect(getTotalPendingReplies()).toBe(0); + }); +}); diff --git a/src/gateway/server-reload.real-scenario.test.ts b/src/gateway/server-reload.real-scenario.test.ts new file mode 100644 index 00000000000..c3da2723f4e --- /dev/null +++ b/src/gateway/server-reload.real-scenario.test.ts @@ -0,0 +1,121 @@ +/** + * REAL scenario test - simulates actual message handling with config changes. + * This test MUST fail if "imsg rpc not running" would occur in production. + */ +import { describe, expect, it, vi, beforeEach, afterEach } from "vitest"; + +describe("real scenario: config change during message processing", () => { + let replyErrors: string[] = []; + + beforeEach(() => { + vi.clearAllMocks(); + replyErrors = []; + }); + + afterEach(async () => { + vi.restoreAllMocks(); + // Wait for any pending microtasks (from markComplete()) to complete + await Promise.resolve(); + const { clearAllDispatchers } = await import("../auto-reply/reply/dispatcher-registry.js"); + clearAllDispatchers(); + }); + + it("should NOT restart gateway while reply delivery is in flight", async () => { + const { createReplyDispatcher } = await import("../auto-reply/reply/reply-dispatcher.js"); + const { getTotalPendingReplies } = await import("../auto-reply/reply/dispatcher-registry.js"); + + let rpcConnected = true; + const deliveredReplies: string[] = []; + + // Create dispatcher with slow delivery (simulates real network delay) + const dispatcher = createReplyDispatcher({ + deliver: async (payload) => { + if (!rpcConnected) { + const error = "Error: imsg rpc not running"; + replyErrors.push(error); + throw new Error(error); + } + // Slow delivery — restart checks will run during this window + await new Promise((resolve) => setTimeout(resolve, 500)); + deliveredReplies.push(payload.text ?? ""); + }, + onError: () => { + // Swallow delivery errors so the test can assert on replyErrors + }, + }); + + // Enqueue reply and immediately clear the reservation. + // This is the critical sequence: after markComplete(), the ONLY thing + // keeping pending > 0 is the in-flight delivery itself. + dispatcher.sendFinalReply({ text: "Configuration updated!" }); + dispatcher.markComplete(); + + // At this point: markComplete flagged, delivery is in flight. + // pending > 0 because the in-flight delivery keeps it alive. + const pendingDuringDelivery = getTotalPendingReplies(); + expect(pendingDuringDelivery).toBeGreaterThan(0); + + // Simulate restart checks while delivery is in progress. + // If the tracking is broken, pending would be 0 and we'd restart. + let restartTriggered = false; + for (let i = 0; i < 3; i++) { + await new Promise((resolve) => setTimeout(resolve, 100)); + const pending = getTotalPendingReplies(); + if (pending === 0) { + restartTriggered = true; + rpcConnected = false; + break; + } + } + + // Wait for delivery to complete + await dispatcher.waitForIdle(); + + // Now pending should be 0 — restart can proceed + expect(getTotalPendingReplies()).toBe(0); + + // CRITICAL: delivery must have succeeded without RPC being killed + expect(restartTriggered).toBe(false); + expect(replyErrors).toEqual([]); + expect(deliveredReplies).toEqual(["Configuration updated!"]); + }); + + it("should keep pending > 0 until reply is actually enqueued", async () => { + const { createReplyDispatcher } = await import("../auto-reply/reply/reply-dispatcher.js"); + const { getTotalPendingReplies } = await import("../auto-reply/reply/dispatcher-registry.js"); + + const dispatcher = createReplyDispatcher({ + deliver: async (_payload) => { + await new Promise((resolve) => setTimeout(resolve, 50)); + }, + }); + + // Initially: pending=1 (reservation) + expect(getTotalPendingReplies()).toBe(1); + + // Simulate command processing delay BEFORE reply is enqueued + await new Promise((resolve) => setTimeout(resolve, 100)); + + // During this delay, pending should STILL be 1 (reservation active) + expect(getTotalPendingReplies()).toBe(1); + + // Now enqueue reply + dispatcher.sendFinalReply({ text: "Reply" }); + + // Now pending should be 2 (reservation + reply) + expect(getTotalPendingReplies()).toBe(2); + + // Mark complete + dispatcher.markComplete(); + + // After markComplete, pending should still be > 0 if reply hasn't sent yet + const pendingAfterMarkComplete = getTotalPendingReplies(); + expect(pendingAfterMarkComplete).toBeGreaterThan(0); + + // Wait for reply to send + await dispatcher.waitForIdle(); + + // Now pending should be 0 + expect(getTotalPendingReplies()).toBe(0); + }); +}); diff --git a/src/gateway/server-restart-sentinel.ts b/src/gateway/server-restart-sentinel.ts index 2600a0b6380..901465b5684 100644 --- a/src/gateway/server-restart-sentinel.ts +++ b/src/gateway/server-restart-sentinel.ts @@ -1,8 +1,8 @@ import type { CliDeps } from "../cli/deps.js"; import { resolveAnnounceTargetFromKey } from "../agents/tools/sessions-send-helpers.js"; import { normalizeChannelId } from "../channels/plugins/index.js"; -import { agentCommand } from "../commands/agent.js"; import { resolveMainSessionKeyFromConfig } from "../config/sessions.js"; +import { deliverOutboundPayloads } from "../infra/outbound/deliver.js"; import { resolveOutboundTarget } from "../infra/outbound/targets.js"; import { consumeRestartSentinel, @@ -10,11 +10,10 @@ import { summarizeRestartSentinel, } from "../infra/restart-sentinel.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; -import { defaultRuntime } from "../runtime.js"; import { deliveryContextFromSession, mergeDeliveryContext } from "../utils/delivery-context.js"; import { loadSessionEntry } from "./session-utils.js"; -export async function scheduleRestartSentinelWake(params: { deps: CliDeps }) { +export async function scheduleRestartSentinelWake(_params: { deps: CliDeps }) { const sentinel = await consumeRestartSentinel(); if (!sentinel) { return; @@ -86,20 +85,15 @@ export async function scheduleRestartSentinelWake(params: { deps: CliDeps }) { (origin?.threadId != null ? String(origin.threadId) : undefined); try { - await agentCommand( - { - message, - sessionKey, - to: resolved.to, - channel, - deliver: true, - bestEffortDeliver: true, - messageChannel: channel, - threadId, - }, - defaultRuntime, - params.deps, - ); + await deliverOutboundPayloads({ + cfg, + channel, + to: resolved.to, + accountId: origin?.accountId, + threadId, + payloads: [{ text: message }], + bestEffort: true, + }); } catch (err) { enqueueSystemEvent(`${summary}\n${String(err)}`, { sessionKey }); } diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 3146c0c6deb..7cc895df499 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -5,8 +5,10 @@ import type { RuntimeEnv } from "../runtime.js"; import type { ControlUiRootState } from "./control-ui.js"; import type { startBrowserControlServerIfEnabled } from "./server-browser.js"; import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../agents/agent-scope.js"; +import { getActiveEmbeddedRunCount } from "../agents/pi-embedded-runner/runs.js"; import { registerSkillsChangeListener } from "../agents/skills/refresh.js"; import { initSubagentRegistry } from "../agents/subagent-registry.js"; +import { getTotalPendingReplies } from "../auto-reply/reply/dispatcher-registry.js"; import { type ChannelId, listChannelPlugins } from "../channels/plugins/index.js"; import { formatCliCommand } from "../cli/command-format.js"; import { createDefaultDeps } from "../cli/deps.js"; @@ -32,7 +34,7 @@ import { onHeartbeatEvent } from "../infra/heartbeat-events.js"; import { startHeartbeatRunner } from "../infra/heartbeat-runner.js"; import { getMachineDisplayName } from "../infra/machine-name.js"; import { ensureOpenClawCliOnPath } from "../infra/path-env.js"; -import { setGatewaySigusr1RestartPolicy } from "../infra/restart.js"; +import { setGatewaySigusr1RestartPolicy, setPreRestartDeferralCheck } from "../infra/restart.js"; import { primeRemoteSkillsCache, refreshRemoteBinsForConnectedNodes, @@ -42,6 +44,7 @@ import { scheduleGatewayUpdateCheck } from "../infra/update-startup.js"; import { startDiagnosticHeartbeat, stopDiagnosticHeartbeat } from "../logging/diagnostic.js"; import { createSubsystemLogger, runtimeForLogger } from "../logging/subsystem.js"; import { getGlobalHookRunner } from "../plugins/hook-runner-global.js"; +import { getTotalQueueSize } from "../process/command-queue.js"; import { runOnboardingWizard } from "../wizard/onboarding.js"; import { createAuthRateLimiter, type AuthRateLimiter } from "./auth-rate-limit.js"; import { startGatewayConfigReloader } from "./config-reload.js"; @@ -225,6 +228,9 @@ export async function startGatewayServer( startDiagnosticHeartbeat(); } setGatewaySigusr1RestartPolicy({ allowExternal: cfgAtStart.commands?.restart === true }); + setPreRestartDeferralCheck( + () => getTotalQueueSize() + getTotalPendingReplies() + getActiveEmbeddedRunCount(), + ); initSubagentRegistry(); const defaultAgentId = resolveDefaultAgentId(cfgAtStart); const defaultWorkspaceDir = resolveAgentWorkspaceDir(cfgAtStart, defaultAgentId); diff --git a/src/imessage/monitor/monitor-provider.ts b/src/imessage/monitor/monitor-provider.ts index a9e0d93f7cc..445fe73aeae 100644 --- a/src/imessage/monitor/monitor-provider.ts +++ b/src/imessage/monitor/monitor-provider.ts @@ -659,6 +659,7 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P onModelSelected, }, }); + if (!queuedFinal) { if (isGroup && historyKey) { clearHistoryEntriesIfEnabled({ diff --git a/src/infra/infra-runtime.test.ts b/src/infra/infra-runtime.test.ts index 926c1f224c6..61e7dff4393 100644 --- a/src/infra/infra-runtime.test.ts +++ b/src/infra/infra-runtime.test.ts @@ -9,6 +9,7 @@ import { isGatewaySigusr1RestartExternallyAllowed, scheduleGatewaySigusr1Restart, setGatewaySigusr1RestartPolicy, + setPreRestartDeferralCheck, } from "./restart.js"; import { createTelegramRetryRunner } from "./retry-policy.js"; import { getShellPathFromLoginShell, resetShellPathCacheForTests } from "./shell-env.js"; @@ -79,11 +80,15 @@ describe("infra runtime", () => { __testing.resetSigusr1State(); }); - it("consumes a scheduled authorization once", async () => { + it("authorizes exactly once when scheduled restart emits", async () => { expect(consumeGatewaySigusr1RestartAuthorization()).toBe(false); scheduleGatewaySigusr1Restart({ delayMs: 0 }); + // No pre-authorization before the scheduled emission fires. + expect(consumeGatewaySigusr1RestartAuthorization()).toBe(false); + await vi.advanceTimersByTimeAsync(0); + expect(consumeGatewaySigusr1RestartAuthorization()).toBe(true); expect(consumeGatewaySigusr1RestartAuthorization()).toBe(false); @@ -97,6 +102,110 @@ describe("infra runtime", () => { }); }); + describe("pre-restart deferral check", () => { + beforeEach(() => { + __testing.resetSigusr1State(); + vi.useFakeTimers(); + vi.spyOn(process, "kill").mockImplementation(() => true); + }); + + afterEach(async () => { + await vi.runOnlyPendingTimersAsync(); + vi.useRealTimers(); + vi.restoreAllMocks(); + __testing.resetSigusr1State(); + }); + + it("emits SIGUSR1 immediately when no deferral check is registered", async () => { + const emitSpy = vi.spyOn(process, "emit"); + const handler = () => {}; + process.on("SIGUSR1", handler); + try { + scheduleGatewaySigusr1Restart({ delayMs: 0 }); + await vi.advanceTimersByTimeAsync(0); + expect(emitSpy).toHaveBeenCalledWith("SIGUSR1"); + } finally { + process.removeListener("SIGUSR1", handler); + } + }); + + it("emits SIGUSR1 immediately when deferral check returns 0", async () => { + const emitSpy = vi.spyOn(process, "emit"); + const handler = () => {}; + process.on("SIGUSR1", handler); + try { + setPreRestartDeferralCheck(() => 0); + scheduleGatewaySigusr1Restart({ delayMs: 0 }); + await vi.advanceTimersByTimeAsync(0); + expect(emitSpy).toHaveBeenCalledWith("SIGUSR1"); + } finally { + process.removeListener("SIGUSR1", handler); + } + }); + + it("defers SIGUSR1 until deferral check returns 0", async () => { + const emitSpy = vi.spyOn(process, "emit"); + const handler = () => {}; + process.on("SIGUSR1", handler); + try { + let pending = 2; + setPreRestartDeferralCheck(() => pending); + scheduleGatewaySigusr1Restart({ delayMs: 0 }); + + // After initial delay fires, deferral check returns 2 — should NOT emit yet + await vi.advanceTimersByTimeAsync(0); + expect(emitSpy).not.toHaveBeenCalledWith("SIGUSR1"); + + // After one poll (500ms), still pending + await vi.advanceTimersByTimeAsync(500); + expect(emitSpy).not.toHaveBeenCalledWith("SIGUSR1"); + + // Drain pending work + pending = 0; + await vi.advanceTimersByTimeAsync(500); + expect(emitSpy).toHaveBeenCalledWith("SIGUSR1"); + } finally { + process.removeListener("SIGUSR1", handler); + } + }); + + it("emits SIGUSR1 after deferral timeout even if still pending", async () => { + const emitSpy = vi.spyOn(process, "emit"); + const handler = () => {}; + process.on("SIGUSR1", handler); + try { + setPreRestartDeferralCheck(() => 5); // always pending + scheduleGatewaySigusr1Restart({ delayMs: 0 }); + + // Fire initial timeout + await vi.advanceTimersByTimeAsync(0); + expect(emitSpy).not.toHaveBeenCalledWith("SIGUSR1"); + + // Advance past the 30s max deferral wait + await vi.advanceTimersByTimeAsync(30_000); + expect(emitSpy).toHaveBeenCalledWith("SIGUSR1"); + } finally { + process.removeListener("SIGUSR1", handler); + } + }); + + it("emits SIGUSR1 if deferral check throws", async () => { + const emitSpy = vi.spyOn(process, "emit"); + const handler = () => {}; + process.on("SIGUSR1", handler); + try { + setPreRestartDeferralCheck(() => { + throw new Error("boom"); + }); + scheduleGatewaySigusr1Restart({ delayMs: 0 }); + await vi.advanceTimersByTimeAsync(0); + expect(emitSpy).toHaveBeenCalledWith("SIGUSR1"); + } finally { + process.removeListener("SIGUSR1", handler); + } + }); + }); + describe("getShellPathFromLoginShell", () => { afterEach(() => resetShellPathCacheForTests()); diff --git a/src/infra/restart-sentinel.test.ts b/src/infra/restart-sentinel.test.ts index 638d389f561..5c1fa60632b 100644 --- a/src/infra/restart-sentinel.test.ts +++ b/src/infra/restart-sentinel.test.ts @@ -4,6 +4,7 @@ import path from "node:path"; import { afterEach, beforeEach, describe, expect, it } from "vitest"; import { consumeRestartSentinel, + formatRestartSentinelMessage, readRestartSentinel, resolveRestartSentinelPath, trimLogTail, @@ -61,6 +62,40 @@ describe("restart sentinel", () => { await expect(fs.stat(filePath)).rejects.toThrow(); }); + it("formatRestartSentinelMessage uses custom message when present", () => { + const payload = { + kind: "config-apply" as const, + status: "ok" as const, + ts: Date.now(), + message: "Config updated successfully", + }; + expect(formatRestartSentinelMessage(payload)).toBe("Config updated successfully"); + }); + + it("formatRestartSentinelMessage falls back to summary when no message", () => { + const payload = { + kind: "update" as const, + status: "ok" as const, + ts: Date.now(), + stats: { mode: "git" }, + }; + const result = formatRestartSentinelMessage(payload); + expect(result).toContain("Gateway restart"); + expect(result).toContain("update"); + expect(result).toContain("ok"); + }); + + it("formatRestartSentinelMessage falls back to summary for blank message", () => { + const payload = { + kind: "restart" as const, + status: "ok" as const, + ts: Date.now(), + message: " ", + }; + const result = formatRestartSentinelMessage(payload); + expect(result).toContain("Gateway restart"); + }); + it("trims log tails", () => { const text = "a".repeat(9000); const trimmed = trimLogTail(text, 8000); diff --git a/src/infra/restart-sentinel.ts b/src/infra/restart-sentinel.ts index 1f3b13094f9..8405426cbd6 100644 --- a/src/infra/restart-sentinel.ts +++ b/src/infra/restart-sentinel.ts @@ -28,7 +28,7 @@ export type RestartSentinelStats = { }; export type RestartSentinelPayload = { - kind: "config-apply" | "update" | "restart"; + kind: "config-apply" | "config-patch" | "update" | "restart"; status: "ok" | "error" | "skipped"; ts: number; sessionKey?: string; @@ -109,7 +109,10 @@ export async function consumeRestartSentinel( } export function formatRestartSentinelMessage(payload: RestartSentinelPayload): string { - return `GatewayRestart:\n${JSON.stringify(payload, null, 2)}`; + if (payload.message?.trim()) { + return payload.message.trim(); + } + return summarizeRestartSentinel(payload); } export function summarizeRestartSentinel(payload: RestartSentinelPayload): string { diff --git a/src/infra/restart.ts b/src/infra/restart.ts index d671c112b53..830d0731049 100644 --- a/src/infra/restart.ts +++ b/src/infra/restart.ts @@ -17,6 +17,40 @@ const SIGUSR1_AUTH_GRACE_MS = 5000; let sigusr1AuthorizedCount = 0; let sigusr1AuthorizedUntil = 0; let sigusr1ExternalAllowed = false; +let preRestartCheck: (() => number) | null = null; +let sigusr1Emitted = false; + +/** + * Register a callback that scheduleGatewaySigusr1Restart checks before emitting SIGUSR1. + * The callback should return the number of pending items (0 = safe to restart). + */ +export function setPreRestartDeferralCheck(fn: () => number): void { + preRestartCheck = fn; +} + +/** + * Emit an authorized SIGUSR1 gateway restart, guarded against duplicate emissions. + * Returns true if SIGUSR1 was emitted, false if a restart was already emitted. + * Both scheduleGatewaySigusr1Restart and the config watcher should use this + * to ensure only one restart fires. + */ +export function emitGatewayRestart(): boolean { + if (sigusr1Emitted) { + return false; + } + sigusr1Emitted = true; + authorizeGatewaySigusr1Restart(); + try { + if (process.listenerCount("SIGUSR1") > 0) { + process.emit("SIGUSR1"); + } else { + process.kill(process.pid, "SIGUSR1"); + } + } catch { + /* ignore */ + } + return true; +} function resetSigusr1AuthorizationIfExpired(now = Date.now()) { if (sigusr1AuthorizedCount <= 0) { @@ -37,7 +71,7 @@ export function isGatewaySigusr1RestartExternallyAllowed() { return sigusr1ExternalAllowed; } -export function authorizeGatewaySigusr1Restart(delayMs = 0) { +function authorizeGatewaySigusr1Restart(delayMs = 0) { const delay = Math.max(0, Math.floor(delayMs)); const expiresAt = Date.now() + delay + SIGUSR1_AUTH_GRACE_MS; sigusr1AuthorizedCount += 1; @@ -51,6 +85,10 @@ export function consumeGatewaySigusr1RestartAuthorization(): boolean { if (sigusr1AuthorizedCount <= 0) { return false; } + // Reset the emission guard so the next restart cycle can fire. + // The run loop re-enters startGatewayServer() after close(), which + // re-registers setPreRestartDeferralCheck and can schedule new restarts. + sigusr1Emitted = false; sigusr1AuthorizedCount -= 1; if (sigusr1AuthorizedCount <= 0) { sigusr1AuthorizedUntil = 0; @@ -189,27 +227,48 @@ export function scheduleGatewaySigusr1Restart(opts?: { typeof opts?.reason === "string" && opts.reason.trim() ? opts.reason.trim().slice(0, 200) : undefined; - authorizeGatewaySigusr1Restart(delayMs); - const pid = process.pid; - const hasListener = process.listenerCount("SIGUSR1") > 0; + const DEFERRAL_POLL_MS = 500; + const DEFERRAL_MAX_WAIT_MS = 30_000; + setTimeout(() => { - try { - if (hasListener) { - process.emit("SIGUSR1"); - } else { - process.kill(pid, "SIGUSR1"); - } - } catch { - /* ignore */ + if (!preRestartCheck) { + emitGatewayRestart(); + return; } + let pending: number; + try { + pending = preRestartCheck(); + } catch { + emitGatewayRestart(); + return; + } + if (pending <= 0) { + emitGatewayRestart(); + return; + } + // Poll until pending work drains or timeout + let waited = 0; + const poll = setInterval(() => { + waited += DEFERRAL_POLL_MS; + let current: number; + try { + current = preRestartCheck!(); + } catch { + current = 0; + } + if (current <= 0 || waited >= DEFERRAL_MAX_WAIT_MS) { + clearInterval(poll); + emitGatewayRestart(); + } + }, DEFERRAL_POLL_MS); }, delayMs); return { ok: true, - pid, + pid: process.pid, signal: "SIGUSR1", delayMs, reason, - mode: hasListener ? "emit" : "signal", + mode: process.listenerCount("SIGUSR1") > 0 ? "emit" : "signal", }; } @@ -218,5 +277,7 @@ export const __testing = { sigusr1AuthorizedCount = 0; sigusr1AuthorizedUntil = 0; sigusr1ExternalAllowed = false; + preRestartCheck = null; + sigusr1Emitted = false; }, };