diff --git a/CHANGELOG.md b/CHANGELOG.md index 42587728ea6..ef3f7e5782a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Security/Gateway: rate-limit control-plane write RPCs (`config.apply`, `config.patch`, `update.run`) to 3 requests per minute per `deviceId+clientIp`, add restart single-flight coalescing plus a 30-second restart cooldown, and log actor/device/ip with changed-path audit details for config/update-triggered restarts. - Commands/Doctor: skip embedding-provider warnings when `memory.backend` is `qmd`, because QMD manages embeddings internally and does not require `memorySearch` providers. (#17263) Thanks @miloudbelarebia. - Security/Webhooks: harden Feishu and Zalo webhook ingress with webhook-mode token preconditions, loopback-default Feishu bind host, JSON content-type enforcement, per-path rate limiting, replay dedupe for Zalo events, constant-time Zalo secret comparison, and anomaly status counters. - Security/Plugins: add explicit `plugins.runtime.allowLegacyExec` opt-in to re-enable deprecated `runtime.system.runCommandWithTimeout` for legacy modules while keeping runtime command execution disabled by default. (#20874) Thanks @mbelinky. diff --git a/docs/gateway/configuration.md b/docs/gateway/configuration.md index 1a5e921c01d..bdc1d5b1a85 100644 --- a/docs/gateway/configuration.md +++ b/docs/gateway/configuration.md @@ -370,6 +370,10 @@ Most fields hot-apply without downtime. In `hybrid` mode, restart-required chang ## Config RPC (programmatic updates) + +Control-plane write RPCs (`config.apply`, `config.patch`, `update.run`) are rate-limited to **3 requests per 60 seconds** per `deviceId+clientIp`. When limited, the RPC returns `UNAVAILABLE` with `retryAfterMs`. + + Validates + writes the full config and restarts the Gateway in one step. @@ -386,6 +390,8 @@ Most fields hot-apply without downtime. In `hybrid` mode, restart-required chang - `note` (optional) — note for the restart sentinel - `restartDelayMs` (optional) — delay before restart (default 2000) + Restart requests are coalesced while one is already pending/in-flight, and a 30-second cooldown applies between restart cycles. + ```bash openclaw gateway call config.get --params '{}' # capture payload.hash openclaw gateway call config.apply --params '{ @@ -410,6 +416,8 @@ Most fields hot-apply without downtime. In `hybrid` mode, restart-required chang - `baseHash` (required) — config hash from `config.get` - `sessionKey`, `note`, `restartDelayMs` — same as `config.apply` + Restart behavior matches `config.apply`: coalesced pending restarts plus a 30-second cooldown between restart cycles. + ```bash openclaw gateway call config.patch --params '{ "raw": "{ channels: { telegram: { groups: { \"*\": { requireMention: false } } } } }", diff --git a/src/gateway/control-plane-audit.ts b/src/gateway/control-plane-audit.ts new file mode 100644 index 00000000000..08b4acb562a --- /dev/null +++ b/src/gateway/control-plane-audit.ts @@ -0,0 +1,40 @@ +import type { GatewayClient } from "./server-methods/types.js"; + +export type ControlPlaneActor = { + actor: string; + deviceId: string; + clientIp: string; + connId: string; +}; + +function normalizePart(value: unknown, fallback: string): string { + if (typeof value !== "string") { + return fallback; + } + const normalized = value.trim(); + return normalized.length > 0 ? normalized : fallback; +} + +export function resolveControlPlaneActor(client: GatewayClient | null): ControlPlaneActor { + return { + actor: normalizePart(client?.connect?.client?.id, "unknown-actor"), + deviceId: normalizePart(client?.connect?.device?.id, "unknown-device"), + clientIp: normalizePart(client?.clientIp, "unknown-ip"), + connId: normalizePart(client?.connId, "unknown-conn"), + }; +} + +export function formatControlPlaneActor(actor: ControlPlaneActor): string { + return `actor=${actor.actor} device=${actor.deviceId} ip=${actor.clientIp} conn=${actor.connId}`; +} + +export function summarizeChangedPaths(paths: string[], maxPaths = 8): string { + if (paths.length === 0) { + return ""; + } + if (paths.length <= maxPaths) { + return paths.join(","); + } + const head = paths.slice(0, maxPaths).join(","); + return `${head},+${paths.length - maxPaths} more`; +} diff --git a/src/gateway/control-plane-rate-limit.ts b/src/gateway/control-plane-rate-limit.ts new file mode 100644 index 00000000000..b7a3fc49dcc --- /dev/null +++ b/src/gateway/control-plane-rate-limit.ts @@ -0,0 +1,79 @@ +import type { GatewayClient } from "./server-methods/types.js"; + +const CONTROL_PLANE_RATE_LIMIT_MAX_REQUESTS = 3; +const CONTROL_PLANE_RATE_LIMIT_WINDOW_MS = 60_000; + +type Bucket = { + count: number; + windowStartMs: number; +}; + +const controlPlaneBuckets = new Map(); + +function normalizePart(value: unknown, fallback: string): string { + if (typeof value !== "string") { + return fallback; + } + const normalized = value.trim(); + return normalized.length > 0 ? normalized : fallback; +} + +export function resolveControlPlaneRateLimitKey(client: GatewayClient | null): string { + const deviceId = normalizePart(client?.connect?.device?.id, "unknown-device"); + const clientIp = normalizePart(client?.clientIp, "unknown-ip"); + return `${deviceId}|${clientIp}`; +} + +export function consumeControlPlaneWriteBudget(params: { + client: GatewayClient | null; + nowMs?: number; +}): { + allowed: boolean; + retryAfterMs: number; + remaining: number; + key: string; +} { + const nowMs = params.nowMs ?? Date.now(); + const key = resolveControlPlaneRateLimitKey(params.client); + const bucket = controlPlaneBuckets.get(key); + + if (!bucket || nowMs - bucket.windowStartMs >= CONTROL_PLANE_RATE_LIMIT_WINDOW_MS) { + controlPlaneBuckets.set(key, { + count: 1, + windowStartMs: nowMs, + }); + return { + allowed: true, + retryAfterMs: 0, + remaining: CONTROL_PLANE_RATE_LIMIT_MAX_REQUESTS - 1, + key, + }; + } + + if (bucket.count >= CONTROL_PLANE_RATE_LIMIT_MAX_REQUESTS) { + const retryAfterMs = Math.max( + 0, + bucket.windowStartMs + CONTROL_PLANE_RATE_LIMIT_WINDOW_MS - nowMs, + ); + return { + allowed: false, + retryAfterMs, + remaining: 0, + key, + }; + } + + bucket.count += 1; + return { + allowed: true, + retryAfterMs: 0, + remaining: Math.max(0, CONTROL_PLANE_RATE_LIMIT_MAX_REQUESTS - bucket.count), + key, + }; +} + +export const __testing = { + resetControlPlaneRateLimitState() { + controlPlaneBuckets.clear(); + }, +}; diff --git a/src/gateway/server-methods.control-plane-rate-limit.test.ts b/src/gateway/server-methods.control-plane-rate-limit.test.ts new file mode 100644 index 00000000000..2cd40cdf608 --- /dev/null +++ b/src/gateway/server-methods.control-plane-rate-limit.test.ts @@ -0,0 +1,124 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { GatewayRequestHandler } from "./server-methods/types.js"; +import { __testing as controlPlaneRateLimitTesting } from "./control-plane-rate-limit.js"; +import { handleGatewayRequest } from "./server-methods.js"; + +const noWebchat = () => false; + +describe("gateway control-plane write rate limit", () => { + beforeEach(() => { + controlPlaneRateLimitTesting.resetControlPlaneRateLimitState(); + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-02-19T00:00:00.000Z")); + }); + + afterEach(() => { + vi.useRealTimers(); + controlPlaneRateLimitTesting.resetControlPlaneRateLimitState(); + }); + + function buildContext(logWarn = vi.fn()) { + return { + logGateway: { + warn: logWarn, + }, + } as unknown as Parameters[0]["context"]; + } + + function buildClient() { + return { + connect: { + role: "operator", + scopes: ["operator.admin"], + client: { + id: "openclaw-control-ui", + version: "1.0.0", + platform: "darwin", + mode: "ui", + }, + minProtocol: 1, + maxProtocol: 1, + }, + connId: "conn-1", + clientIp: "10.0.0.5", + } as Parameters[0]["client"]; + } + + async function runRequest(params: { + method: string; + context: Parameters[0]["context"]; + client: Parameters[0]["client"]; + handler: GatewayRequestHandler; + }) { + const respond = vi.fn(); + await handleGatewayRequest({ + req: { + type: "req", + id: crypto.randomUUID(), + method: params.method, + }, + respond, + client: params.client, + isWebchatConnect: noWebchat, + context: params.context, + extraHandlers: { + [params.method]: params.handler, + }, + }); + return respond; + } + + it("allows 3 control-plane writes and blocks the 4th in the same minute", async () => { + const handlerCalls = vi.fn(); + const handler: GatewayRequestHandler = (opts) => { + handlerCalls(opts); + opts.respond(true, undefined, undefined); + }; + const logWarn = vi.fn(); + const context = buildContext(logWarn); + const client = buildClient(); + + await runRequest({ method: "config.patch", context, client, handler }); + await runRequest({ method: "config.patch", context, client, handler }); + await runRequest({ method: "config.patch", context, client, handler }); + const blocked = await runRequest({ method: "config.patch", context, client, handler }); + + expect(handlerCalls).toHaveBeenCalledTimes(3); + expect(blocked).toHaveBeenCalledWith( + false, + undefined, + expect.objectContaining({ + code: "UNAVAILABLE", + retryable: true, + }), + ); + expect(logWarn).toHaveBeenCalledTimes(1); + }); + + it("resets the control-plane write budget after 60 seconds", async () => { + const handlerCalls = vi.fn(); + const handler: GatewayRequestHandler = (opts) => { + handlerCalls(opts); + opts.respond(true, undefined, undefined); + }; + const context = buildContext(); + const client = buildClient(); + + await runRequest({ method: "update.run", context, client, handler }); + await runRequest({ method: "update.run", context, client, handler }); + await runRequest({ method: "update.run", context, client, handler }); + + const blocked = await runRequest({ method: "update.run", context, client, handler }); + expect(blocked).toHaveBeenCalledWith( + false, + undefined, + expect.objectContaining({ code: "UNAVAILABLE" }), + ); + + vi.advanceTimersByTime(60_001); + + const allowed = await runRequest({ method: "update.run", context, client, handler }); + expect(allowed).toHaveBeenCalledWith(true, undefined, undefined); + expect(handlerCalls).toHaveBeenCalledTimes(4); + }); +}); diff --git a/src/gateway/server-methods.ts b/src/gateway/server-methods.ts index 44907ff5f1b..b61ee7e18ab 100644 --- a/src/gateway/server-methods.ts +++ b/src/gateway/server-methods.ts @@ -1,3 +1,6 @@ +import type { GatewayRequestHandlers, GatewayRequestOptions } from "./server-methods/types.js"; +import { formatControlPlaneActor, resolveControlPlaneActor } from "./control-plane-audit.js"; +import { consumeControlPlaneWriteBudget } from "./control-plane-rate-limit.js"; import { ErrorCodes, errorShape } from "./protocol/index.js"; import { agentHandlers } from "./server-methods/agent.js"; import { agentsHandlers } from "./server-methods/agents.js"; @@ -20,7 +23,6 @@ import { skillsHandlers } from "./server-methods/skills.js"; import { systemHandlers } from "./server-methods/system.js"; import { talkHandlers } from "./server-methods/talk.js"; import { ttsHandlers } from "./server-methods/tts.js"; -import type { GatewayRequestHandlers, GatewayRequestOptions } from "./server-methods/types.js"; import { updateHandlers } from "./server-methods/update.js"; import { usageHandlers } from "./server-methods/usage.js"; import { voicewakeHandlers } from "./server-methods/voicewake.js"; @@ -98,6 +100,7 @@ const WRITE_METHODS = new Set([ "browser.request", "push.test", ]); +const CONTROL_PLANE_WRITE_METHODS = new Set(["config.apply", "config.patch", "update.run"]); function authorizeGatewayMethod(method: string, client: GatewayRequestOptions["client"]) { if (!client?.connect) { @@ -209,6 +212,32 @@ export async function handleGatewayRequest( respond(false, undefined, authError); return; } + if (CONTROL_PLANE_WRITE_METHODS.has(req.method)) { + const budget = consumeControlPlaneWriteBudget({ client }); + if (!budget.allowed) { + const actor = resolveControlPlaneActor(client); + context.logGateway.warn( + `control-plane write rate-limited method=${req.method} ${formatControlPlaneActor(actor)} retryAfterMs=${budget.retryAfterMs} key=${budget.key}`, + ); + respond( + false, + undefined, + errorShape( + ErrorCodes.UNAVAILABLE, + `rate limit exceeded for ${req.method}; retry after ${Math.ceil(budget.retryAfterMs / 1000)}s`, + { + retryable: true, + retryAfterMs: budget.retryAfterMs, + details: { + method: req.method, + limit: "3 per 60s", + }, + }, + ), + ); + return; + } + } const handler = opts.extraHandlers?.[req.method] ?? coreGatewayHandlers[req.method]; if (!handler) { respond( diff --git a/src/gateway/server-methods/config.ts b/src/gateway/server-methods/config.ts index 7956320f572..16af37c520a 100644 --- a/src/gateway/server-methods/config.ts +++ b/src/gateway/server-methods/config.ts @@ -1,3 +1,5 @@ +import type { OpenClawConfig } from "../../config/types.openclaw.js"; +import type { GatewayRequestHandlers, RespondFn } from "./types.js"; import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../../agents/agent-scope.js"; import { listChannelPlugins } from "../../channels/plugins/index.js"; import { @@ -19,7 +21,6 @@ import { } from "../../config/redact-snapshot.js"; import { buildConfigSchema, type ConfigSchemaResponse } from "../../config/schema.js"; import { extractDeliveryInfo } from "../../config/sessions.js"; -import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { formatDoctorNonInteractiveHint, type RestartSentinelPayload, @@ -27,6 +28,12 @@ import { } from "../../infra/restart-sentinel.js"; import { scheduleGatewaySigusr1Restart } from "../../infra/restart.js"; import { loadOpenClawPlugins } from "../../plugins/loader.js"; +import { diffConfigPaths } from "../config-reload.js"; +import { + formatControlPlaneActor, + resolveControlPlaneActor, + summarizeChangedPaths, +} from "../control-plane-audit.js"; import { ErrorCodes, errorShape, @@ -38,7 +45,6 @@ import { } from "../protocol/index.js"; import { resolveBaseHashParam } from "./base-hash.js"; import { parseRestartRequestParams } from "./restart-request.js"; -import type { GatewayRequestHandlers, RespondFn } from "./types.js"; import { assertValidParams } from "./validation.js"; function requireConfigBaseHash( @@ -275,7 +281,7 @@ export const configHandlers: GatewayRequestHandlers = { undefined, ); }, - "config.patch": async ({ params, respond }) => { + "config.patch": async ({ params, respond, client, context }) => { if (!assertValidParams(params, validateConfigPatchParams, "config.patch", respond)) { return; } @@ -349,6 +355,11 @@ export const configHandlers: GatewayRequestHandlers = { ); return; } + const changedPaths = diffConfigPaths(snapshot.config, validated.config); + const actor = resolveControlPlaneActor(client); + context?.logGateway?.info( + `config.patch write ${formatControlPlaneActor(actor)} changedPaths=${summarizeChangedPaths(changedPaths)} restartReason=config.patch`, + ); await writeConfigFile(validated.config, writeOptions); const { sessionKey, note, restartDelayMs, deliveryContext, threadId } = @@ -365,7 +376,18 @@ export const configHandlers: GatewayRequestHandlers = { const restart = scheduleGatewaySigusr1Restart({ delayMs: restartDelayMs, reason: "config.patch", + audit: { + actor: actor.actor, + deviceId: actor.deviceId, + clientIp: actor.clientIp, + changedPaths, + }, }); + if (restart.coalesced) { + context?.logGateway?.warn( + `config.patch restart coalesced ${formatControlPlaneActor(actor)} delayMs=${restart.delayMs}`, + ); + } respond( true, { @@ -381,7 +403,7 @@ export const configHandlers: GatewayRequestHandlers = { undefined, ); }, - "config.apply": async ({ params, respond }) => { + "config.apply": async ({ params, respond, client, context }) => { if (!assertValidParams(params, validateConfigApplyParams, "config.apply", respond)) { return; } @@ -393,6 +415,11 @@ export const configHandlers: GatewayRequestHandlers = { if (!parsed) { return; } + const changedPaths = diffConfigPaths(snapshot.config, parsed.config); + const actor = resolveControlPlaneActor(client); + context?.logGateway?.info( + `config.apply write ${formatControlPlaneActor(actor)} changedPaths=${summarizeChangedPaths(changedPaths)} restartReason=config.apply`, + ); await writeConfigFile(parsed.config, writeOptions); const { sessionKey, note, restartDelayMs, deliveryContext, threadId } = @@ -409,7 +436,18 @@ export const configHandlers: GatewayRequestHandlers = { const restart = scheduleGatewaySigusr1Restart({ delayMs: restartDelayMs, reason: "config.apply", + audit: { + actor: actor.actor, + deviceId: actor.deviceId, + clientIp: actor.clientIp, + changedPaths, + }, }); + if (restart.coalesced) { + context?.logGateway?.warn( + `config.apply restart coalesced ${formatControlPlaneActor(actor)} delayMs=${restart.delayMs}`, + ); + } respond( true, { diff --git a/src/gateway/server-methods/types.ts b/src/gateway/server-methods/types.ts index b0c70acd505..37db771073a 100644 --- a/src/gateway/server-methods/types.ts +++ b/src/gateway/server-methods/types.ts @@ -17,6 +17,7 @@ type SubsystemLogger = ReturnType; export type GatewayClient = { connect: ConnectParams; connId?: string; + clientIp?: string; }; export type RespondFn = ( diff --git a/src/gateway/server-methods/update.ts b/src/gateway/server-methods/update.ts index 5e743d82308..6ec496c064b 100644 --- a/src/gateway/server-methods/update.ts +++ b/src/gateway/server-methods/update.ts @@ -1,3 +1,4 @@ +import type { GatewayRequestHandlers } from "./types.js"; import { loadConfig } from "../../config/config.js"; import { extractDeliveryInfo } from "../../config/sessions.js"; import { resolveOpenClawPackageRoot } from "../../infra/openclaw-root.js"; @@ -9,16 +10,17 @@ import { import { scheduleGatewaySigusr1Restart } from "../../infra/restart.js"; import { normalizeUpdateChannel } from "../../infra/update-channels.js"; import { runGatewayUpdate } from "../../infra/update-runner.js"; +import { formatControlPlaneActor, resolveControlPlaneActor } from "../control-plane-audit.js"; import { validateUpdateRunParams } from "../protocol/index.js"; import { parseRestartRequestParams } from "./restart-request.js"; -import type { GatewayRequestHandlers } from "./types.js"; import { assertValidParams } from "./validation.js"; export const updateHandlers: GatewayRequestHandlers = { - "update.run": async ({ params, respond }) => { + "update.run": async ({ params, respond, client, context }) => { if (!assertValidParams(params, validateUpdateRunParams, "update.run", respond)) { return; } + const actor = resolveControlPlaneActor(client); const { sessionKey, note, restartDelayMs } = parseRestartRequestParams(params); const { deliveryContext, threadId } = extractDeliveryInfo(sessionKey); const timeoutMsRaw = (params as { timeoutMs?: unknown }).timeoutMs; @@ -98,8 +100,22 @@ export const updateHandlers: GatewayRequestHandlers = { ? scheduleGatewaySigusr1Restart({ delayMs: restartDelayMs, reason: "update.run", + audit: { + actor: actor.actor, + deviceId: actor.deviceId, + clientIp: actor.clientIp, + changedPaths: [], + }, }) : null; + context?.logGateway?.info( + `update.run completed ${formatControlPlaneActor(actor)} changedPaths= restartReason=update.run status=${result.status}`, + ); + if (restart?.coalesced) { + context?.logGateway?.warn( + `update.run restart coalesced ${formatControlPlaneActor(actor)} delayMs=${restart.delayMs}`, + ); + } respond( true, diff --git a/src/infra/infra-runtime.test.ts b/src/infra/infra-runtime.test.ts index 7b9de0c1b6d..6a406e8113b 100644 --- a/src/infra/infra-runtime.test.ts +++ b/src/infra/infra-runtime.test.ts @@ -124,6 +124,56 @@ describe("infra runtime", () => { process.removeListener("SIGUSR1", handler); } }); + + it("coalesces duplicate scheduled restarts into a single pending timer", async () => { + const emitSpy = vi.spyOn(process, "emit"); + const handler = () => {}; + process.on("SIGUSR1", handler); + try { + const first = scheduleGatewaySigusr1Restart({ delayMs: 1_000, reason: "first" }); + const second = scheduleGatewaySigusr1Restart({ delayMs: 1_000, reason: "second" }); + + expect(first.coalesced).toBe(false); + expect(second.coalesced).toBe(true); + + await vi.advanceTimersByTimeAsync(999); + expect(emitSpy).not.toHaveBeenCalledWith("SIGUSR1"); + + await vi.advanceTimersByTimeAsync(1); + const sigusr1Emits = emitSpy.mock.calls.filter((args) => args[0] === "SIGUSR1"); + expect(sigusr1Emits.length).toBe(1); + } finally { + process.removeListener("SIGUSR1", handler); + } + }); + + it("applies restart cooldown between emitted restart cycles", async () => { + const emitSpy = vi.spyOn(process, "emit"); + const handler = () => {}; + process.on("SIGUSR1", handler); + try { + const first = scheduleGatewaySigusr1Restart({ delayMs: 0, reason: "first" }); + expect(first.coalesced).toBe(false); + expect(first.delayMs).toBe(0); + + await vi.advanceTimersByTimeAsync(0); + expect(consumeGatewaySigusr1RestartAuthorization()).toBe(true); + markGatewaySigusr1RestartHandled(); + + const second = scheduleGatewaySigusr1Restart({ delayMs: 0, reason: "second" }); + expect(second.coalesced).toBe(false); + expect(second.delayMs).toBe(30_000); + expect(second.cooldownMsApplied).toBe(30_000); + + await vi.advanceTimersByTimeAsync(29_999); + expect(emitSpy.mock.calls.filter((args) => args[0] === "SIGUSR1").length).toBe(1); + + await vi.advanceTimersByTimeAsync(1); + expect(emitSpy.mock.calls.filter((args) => args[0] === "SIGUSR1").length).toBe(2); + } finally { + process.removeListener("SIGUSR1", handler); + } + }); }); describe("pre-restart deferral check", () => { diff --git a/src/infra/restart.ts b/src/infra/restart.ts index 60540884b90..4dd09beaa1a 100644 --- a/src/infra/restart.ts +++ b/src/infra/restart.ts @@ -3,6 +3,7 @@ import { resolveGatewayLaunchAgentLabel, resolveGatewaySystemdServiceName, } from "../daemon/constants.js"; +import { createSubsystemLogger } from "../logging/subsystem.js"; export type RestartAttempt = { ok: boolean; @@ -15,6 +16,9 @@ const SPAWN_TIMEOUT_MS = 2000; const SIGUSR1_AUTH_GRACE_MS = 5000; const DEFAULT_DEFERRAL_POLL_MS = 500; const DEFAULT_DEFERRAL_MAX_WAIT_MS = 30_000; +const RESTART_COOLDOWN_MS = 30_000; + +const restartLog = createSubsystemLogger("restart"); let sigusr1AuthorizedCount = 0; let sigusr1AuthorizedUntil = 0; @@ -23,11 +27,65 @@ let preRestartCheck: (() => number) | null = null; let restartCycleToken = 0; let emittedRestartToken = 0; let consumedRestartToken = 0; +let lastRestartEmittedAt = 0; +let pendingRestartTimer: ReturnType | null = null; +let pendingRestartDueAt = 0; +let pendingRestartReason: string | undefined; function hasUnconsumedRestartSignal(): boolean { return emittedRestartToken > consumedRestartToken; } +function clearPendingScheduledRestart(): void { + if (pendingRestartTimer) { + clearTimeout(pendingRestartTimer); + } + pendingRestartTimer = null; + pendingRestartDueAt = 0; + pendingRestartReason = undefined; +} + +export type RestartAuditInfo = { + actor?: string; + deviceId?: string; + clientIp?: string; + changedPaths?: string[]; +}; + +function summarizeChangedPaths(paths: string[] | undefined, maxPaths = 6): string | null { + if (!Array.isArray(paths) || paths.length === 0) { + return null; + } + if (paths.length <= maxPaths) { + return paths.join(","); + } + const head = paths.slice(0, maxPaths).join(","); + return `${head},+${paths.length - maxPaths} more`; +} + +function formatRestartAudit(audit: RestartAuditInfo | undefined): string { + const actor = typeof audit?.actor === "string" && audit.actor.trim() ? audit.actor.trim() : null; + const deviceId = + typeof audit?.deviceId === "string" && audit.deviceId.trim() ? audit.deviceId.trim() : null; + const clientIp = + typeof audit?.clientIp === "string" && audit.clientIp.trim() ? audit.clientIp.trim() : null; + const changed = summarizeChangedPaths(audit?.changedPaths); + const fields = []; + if (actor) { + fields.push(`actor=${actor}`); + } + if (deviceId) { + fields.push(`device=${deviceId}`); + } + if (clientIp) { + fields.push(`ip=${clientIp}`); + } + if (changed) { + fields.push(`changedPaths=${changed}`); + } + return fields.length > 0 ? fields.join(" ") : "actor="; +} + /** * Register a callback that scheduleGatewaySigusr1Restart checks before emitting SIGUSR1. * The callback should return the number of pending items (0 = safe to restart). @@ -44,8 +102,10 @@ export function setPreRestartDeferralCheck(fn: () => number): void { */ export function emitGatewayRestart(): boolean { if (hasUnconsumedRestartSignal()) { + clearPendingScheduledRestart(); return false; } + clearPendingScheduledRestart(); const cycleToken = ++restartCycleToken; emittedRestartToken = cycleToken; authorizeGatewaySigusr1Restart(); @@ -60,6 +120,7 @@ export function emitGatewayRestart(): boolean { emittedRestartToken = consumedRestartToken; return false; } + lastRestartEmittedAt = Date.now(); return true; } @@ -293,11 +354,14 @@ export type ScheduledRestart = { delayMs: number; reason?: string; mode: "emit" | "signal"; + coalesced: boolean; + cooldownMsApplied: number; }; export function scheduleGatewaySigusr1Restart(opts?: { delayMs?: number; reason?: string; + audit?: RestartAuditInfo; }): ScheduledRestart { const delayMsRaw = typeof opts?.delayMs === "number" && Number.isFinite(opts.delayMs) @@ -308,22 +372,77 @@ export function scheduleGatewaySigusr1Restart(opts?: { typeof opts?.reason === "string" && opts.reason.trim() ? opts.reason.trim().slice(0, 200) : undefined; + const mode = process.listenerCount("SIGUSR1") > 0 ? "emit" : "signal"; + const nowMs = Date.now(); + const cooldownMsApplied = Math.max(0, lastRestartEmittedAt + RESTART_COOLDOWN_MS - nowMs); + const requestedDueAt = nowMs + delayMs + cooldownMsApplied; - setTimeout(() => { - const pendingCheck = preRestartCheck; - if (!pendingCheck) { - emitGatewayRestart(); - return; + if (hasUnconsumedRestartSignal()) { + restartLog.warn( + `restart request coalesced (already in-flight) reason=${reason ?? "unspecified"} ${formatRestartAudit(opts?.audit)}`, + ); + return { + ok: true, + pid: process.pid, + signal: "SIGUSR1", + delayMs: 0, + reason, + mode, + coalesced: true, + cooldownMsApplied, + }; + } + + if (pendingRestartTimer) { + const remainingMs = Math.max(0, pendingRestartDueAt - nowMs); + const shouldPullEarlier = requestedDueAt < pendingRestartDueAt; + if (shouldPullEarlier) { + restartLog.warn( + `restart request rescheduled earlier reason=${reason ?? "unspecified"} pendingReason=${pendingRestartReason ?? "unspecified"} oldDelayMs=${remainingMs} newDelayMs=${Math.max(0, requestedDueAt - nowMs)} ${formatRestartAudit(opts?.audit)}`, + ); + clearPendingScheduledRestart(); + } else { + restartLog.warn( + `restart request coalesced (already scheduled) reason=${reason ?? "unspecified"} pendingReason=${pendingRestartReason ?? "unspecified"} delayMs=${remainingMs} ${formatRestartAudit(opts?.audit)}`, + ); + return { + ok: true, + pid: process.pid, + signal: "SIGUSR1", + delayMs: remainingMs, + reason, + mode, + coalesced: true, + cooldownMsApplied, + }; } - deferGatewayRestartUntilIdle({ getPendingCount: pendingCheck }); - }, delayMs); + } + + pendingRestartDueAt = requestedDueAt; + pendingRestartReason = reason; + pendingRestartTimer = setTimeout( + () => { + pendingRestartTimer = null; + pendingRestartDueAt = 0; + pendingRestartReason = undefined; + const pendingCheck = preRestartCheck; + if (!pendingCheck) { + emitGatewayRestart(); + return; + } + deferGatewayRestartUntilIdle({ getPendingCount: pendingCheck }); + }, + Math.max(0, requestedDueAt - nowMs), + ); return { ok: true, pid: process.pid, signal: "SIGUSR1", - delayMs, + delayMs: Math.max(0, requestedDueAt - nowMs), reason, - mode: process.listenerCount("SIGUSR1") > 0 ? "emit" : "signal", + mode, + coalesced: false, + cooldownMsApplied, }; } @@ -336,5 +455,7 @@ export const __testing = { restartCycleToken = 0; emittedRestartToken = 0; consumedRestartToken = 0; + lastRestartEmittedAt = 0; + clearPendingScheduledRestart(); }, };