diff --git a/src/gateway/server-methods/nodes.invoke-wake.test.ts b/src/gateway/server-methods/nodes.invoke-wake.test.ts index 147e1df86da..82bf3cee99d 100644 --- a/src/gateway/server-methods/nodes.invoke-wake.test.ts +++ b/src/gateway/server-methods/nodes.invoke-wake.test.ts @@ -13,6 +13,7 @@ const mocks = vi.hoisted(() => ({ loadApnsRegistration: vi.fn(), resolveApnsAuthConfigFromEnv: vi.fn(), sendApnsBackgroundWake: vi.fn(), + sendApnsAlert: vi.fn(), })); vi.mock("../../config/config.js", () => ({ @@ -32,6 +33,7 @@ vi.mock("../../infra/push-apns.js", () => ({ loadApnsRegistration: mocks.loadApnsRegistration, resolveApnsAuthConfigFromEnv: mocks.resolveApnsAuthConfigFromEnv, sendApnsBackgroundWake: mocks.sendApnsBackgroundWake, + sendApnsAlert: mocks.sendApnsAlert, })); type RespondCall = [ @@ -81,12 +83,17 @@ async function invokeNode(params: { requestParams?: Partial>; }) { const respond = vi.fn(); + const logGateway = { + info: vi.fn(), + warn: vi.fn(), + }; await nodeHandlers["node.invoke"]({ params: makeNodeInvokeParams(params.requestParams), respond: respond as never, context: { nodeRegistry: params.nodeRegistry, execApprovalManager: undefined, + logGateway, } as never, client: null, req: { type: "req", id: "req-node-invoke", method: "node.invoke" }, @@ -135,6 +142,7 @@ describe("node.invoke APNs wake path", () => { mocks.loadApnsRegistration.mockReset(); mocks.resolveApnsAuthConfigFromEnv.mockReset(); mocks.sendApnsBackgroundWake.mockReset(); + mocks.sendApnsAlert.mockReset(); }); afterEach(() => { @@ -202,7 +210,7 @@ describe("node.invoke APNs wake path", () => { expect(call?.[1]).toMatchObject({ ok: true, nodeId: "ios-node-reconnect" }); }); - it("throttles repeated wake attempts for the same disconnected node", async () => { + it("forces one retry wake when the first wake still fails to reconnect", async () => { vi.useFakeTimers(); mockSuccessfulWakeConfig("ios-node-throttle"); @@ -211,21 +219,14 @@ describe("node.invoke APNs wake path", () => { invoke: vi.fn().mockResolvedValue({ ok: true }), }; - const first = invokeNode({ + const invokePromise = invokeNode({ nodeRegistry, requestParams: { nodeId: "ios-node-throttle", idempotencyKey: "idem-throttle-1" }, }); - await vi.advanceTimersByTimeAsync(WAKE_WAIT_TIMEOUT_MS); - await first; + await vi.advanceTimersByTimeAsync(20_000); + await invokePromise; - const second = invokeNode({ - nodeRegistry, - requestParams: { nodeId: "ios-node-throttle", idempotencyKey: "idem-throttle-2" }, - }); - await vi.advanceTimersByTimeAsync(WAKE_WAIT_TIMEOUT_MS); - await second; - - expect(mocks.sendApnsBackgroundWake).toHaveBeenCalledTimes(1); + expect(mocks.sendApnsBackgroundWake).toHaveBeenCalledTimes(2); expect(nodeRegistry.invoke).not.toHaveBeenCalled(); }); }); diff --git a/src/gateway/server-methods/nodes.ts b/src/gateway/server-methods/nodes.ts index 1ea705365e4..9bb27049685 100644 --- a/src/gateway/server-methods/nodes.ts +++ b/src/gateway/server-methods/nodes.ts @@ -11,6 +11,7 @@ import { import { loadApnsRegistration, resolveApnsAuthConfigFromEnv, + sendApnsAlert, sendApnsBackgroundWake, } from "../../infra/push-apns.js"; import { isNodeCommandAllowed, resolveNodeCommandAllowlist } from "../node-command-policy.js"; @@ -40,15 +41,36 @@ import { import type { GatewayRequestHandlers } from "./types.js"; const NODE_WAKE_RECONNECT_WAIT_MS = 3_000; +const NODE_WAKE_RECONNECT_RETRY_WAIT_MS = 12_000; const NODE_WAKE_RECONNECT_POLL_MS = 150; const NODE_WAKE_THROTTLE_MS = 15_000; +const NODE_WAKE_NUDGE_THROTTLE_MS = 10 * 60_000; type NodeWakeState = { lastWakeAtMs: number; - inFlight?: Promise; + inFlight?: Promise; }; const nodeWakeById = new Map(); +const nodeWakeNudgeById = new Map(); + +type NodeWakeAttempt = { + available: boolean; + throttled: boolean; + path: "throttled" | "no-registration" | "no-auth" | "sent" | "send-error"; + durationMs: number; + apnsStatus?: number; + apnsReason?: string; +}; + +type NodeWakeNudgeAttempt = { + sent: boolean; + throttled: boolean; + reason: "throttled" | "no-registration" | "no-auth" | "send-error" | "apns-not-ok" | "sent"; + durationMs: number; + apnsStatus?: number; + apnsReason?: string; +}; function isNodeEntry(entry: { role?: string; roles?: string[] }) { if (entry.role === "node") { @@ -64,7 +86,10 @@ async function delayMs(ms: number): Promise { await new Promise((resolve) => setTimeout(resolve, ms)); } -async function maybeWakeNodeWithApns(nodeId: string): Promise { +async function maybeWakeNodeWithApns( + nodeId: string, + opts?: { force?: boolean }, +): Promise { const state = nodeWakeById.get(nodeId) ?? { lastWakeAtMs: 0 }; nodeWakeById.set(nodeId, state); @@ -73,36 +98,75 @@ async function maybeWakeNodeWithApns(nodeId: string): Promise { } const now = Date.now(); - if (state.lastWakeAtMs > 0 && now - state.lastWakeAtMs < NODE_WAKE_THROTTLE_MS) { - return true; + const force = opts?.force === true; + if (!force && state.lastWakeAtMs > 0 && now - state.lastWakeAtMs < NODE_WAKE_THROTTLE_MS) { + return { available: true, throttled: true, path: "throttled", durationMs: 0 }; } state.inFlight = (async () => { + const startedAtMs = Date.now(); + const withDuration = (attempt: Omit): NodeWakeAttempt => ({ + ...attempt, + durationMs: Math.max(0, Date.now() - startedAtMs), + }); + try { const registration = await loadApnsRegistration(nodeId); if (!registration) { - return false; + return withDuration({ available: false, throttled: false, path: "no-registration" }); } const auth = await resolveApnsAuthConfigFromEnv(process.env); if (!auth.ok) { - return false; + return withDuration({ + available: false, + throttled: false, + path: "no-auth", + apnsReason: auth.error, + }); } state.lastWakeAtMs = Date.now(); - await sendApnsBackgroundWake({ + const wakeResult = await sendApnsBackgroundWake({ auth: auth.value, registration, nodeId, wakeReason: "node.invoke", }); - } catch { - // Best-effort wake only. - if (state.lastWakeAtMs === 0) { - return false; + if (!wakeResult.ok) { + return withDuration({ + available: true, + throttled: false, + path: "send-error", + apnsStatus: wakeResult.status, + apnsReason: wakeResult.reason, + }); } + return withDuration({ + available: true, + throttled: false, + path: "sent", + apnsStatus: wakeResult.status, + apnsReason: wakeResult.reason, + }); + } catch (err) { + // Best-effort wake only. + const message = err instanceof Error ? err.message : String(err); + if (state.lastWakeAtMs === 0) { + return withDuration({ + available: false, + throttled: false, + path: "send-error", + apnsReason: message, + }); + } + return withDuration({ + available: true, + throttled: false, + path: "send-error", + apnsReason: message, + }); } - return true; })(); try { @@ -112,6 +176,70 @@ async function maybeWakeNodeWithApns(nodeId: string): Promise { } } +async function maybeSendNodeWakeNudge(nodeId: string): Promise { + const startedAtMs = Date.now(); + const withDuration = ( + attempt: Omit, + ): NodeWakeNudgeAttempt => ({ + ...attempt, + durationMs: Math.max(0, Date.now() - startedAtMs), + }); + + const lastNudgeAtMs = nodeWakeNudgeById.get(nodeId) ?? 0; + if (lastNudgeAtMs > 0 && Date.now() - lastNudgeAtMs < NODE_WAKE_NUDGE_THROTTLE_MS) { + return withDuration({ sent: false, throttled: true, reason: "throttled" }); + } + + const registration = await loadApnsRegistration(nodeId); + if (!registration) { + return withDuration({ sent: false, throttled: false, reason: "no-registration" }); + } + const auth = await resolveApnsAuthConfigFromEnv(process.env); + if (!auth.ok) { + return withDuration({ + sent: false, + throttled: false, + reason: "no-auth", + apnsReason: auth.error, + }); + } + + try { + const result = await sendApnsAlert({ + auth: auth.value, + registration, + nodeId, + title: "OpenClaw needs a quick reopen", + body: "Tap to reopen OpenClaw and restore the node connection.", + }); + if (!result.ok) { + return withDuration({ + sent: false, + throttled: false, + reason: "apns-not-ok", + apnsStatus: result.status, + apnsReason: result.reason, + }); + } + nodeWakeNudgeById.set(nodeId, Date.now()); + return withDuration({ + sent: true, + throttled: false, + reason: "sent", + apnsStatus: result.status, + apnsReason: result.reason, + }); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + return withDuration({ + sent: false, + throttled: false, + reason: "send-error", + apnsReason: message, + }); + } +} + async function waitForNodeReconnect(params: { nodeId: string; context: { nodeRegistry: { get: (nodeId: string) => unknown } }; @@ -430,7 +558,7 @@ export const nodeHandlers: GatewayRequestHandlers = { ); }); }, - "node.invoke": async ({ params, respond, context, client }) => { + "node.invoke": async ({ params, respond, context, client, req }) => { if (!validateNodeInvokeParams(params)) { respondInvalidParams({ respond, @@ -472,12 +600,70 @@ export const nodeHandlers: GatewayRequestHandlers = { await respondUnavailableOnThrow(respond, async () => { let nodeSession = context.nodeRegistry.get(nodeId); if (!nodeSession) { - const wakeAvailable = await maybeWakeNodeWithApns(nodeId); - if (wakeAvailable) { - await waitForNodeReconnect({ nodeId, context }); + const wakeReqId = req.id; + const wakeFlowStartedAtMs = Date.now(); + context.logGateway.info( + `node wake start node=${nodeId} req=${wakeReqId} command=${command}`, + ); + + const wake = await maybeWakeNodeWithApns(nodeId); + context.logGateway.info( + `node wake stage=wake1 node=${nodeId} req=${wakeReqId} ` + + `available=${wake.available} throttled=${wake.throttled} ` + + `path=${wake.path} durationMs=${wake.durationMs} ` + + `apnsStatus=${wake.apnsStatus ?? -1} apnsReason=${wake.apnsReason ?? "-"}`, + ); + if (wake.available) { + const waitStartedAtMs = Date.now(); + const waitTimeoutMs = NODE_WAKE_RECONNECT_WAIT_MS; + const reconnected = await waitForNodeReconnect({ + nodeId, + context, + timeoutMs: waitTimeoutMs, + }); + const waitDurationMs = Math.max(0, Date.now() - waitStartedAtMs); + context.logGateway.info( + `node wake stage=wait1 node=${nodeId} req=${wakeReqId} ` + + `reconnected=${reconnected} timeoutMs=${waitTimeoutMs} durationMs=${waitDurationMs}`, + ); } nodeSession = context.nodeRegistry.get(nodeId); + if (!nodeSession && wake.available) { + const retryWake = await maybeWakeNodeWithApns(nodeId, { force: true }); + context.logGateway.info( + `node wake stage=wake2 node=${nodeId} req=${wakeReqId} force=true ` + + `available=${retryWake.available} throttled=${retryWake.throttled} ` + + `path=${retryWake.path} durationMs=${retryWake.durationMs} ` + + `apnsStatus=${retryWake.apnsStatus ?? -1} apnsReason=${retryWake.apnsReason ?? "-"}`, + ); + if (retryWake.available) { + const waitStartedAtMs = Date.now(); + const waitTimeoutMs = NODE_WAKE_RECONNECT_RETRY_WAIT_MS; + const reconnected = await waitForNodeReconnect({ + nodeId, + context, + timeoutMs: waitTimeoutMs, + }); + const waitDurationMs = Math.max(0, Date.now() - waitStartedAtMs); + context.logGateway.info( + `node wake stage=wait2 node=${nodeId} req=${wakeReqId} ` + + `reconnected=${reconnected} timeoutMs=${waitTimeoutMs} durationMs=${waitDurationMs}`, + ); + } + nodeSession = context.nodeRegistry.get(nodeId); + } if (!nodeSession) { + const totalDurationMs = Math.max(0, Date.now() - wakeFlowStartedAtMs); + const nudge = await maybeSendNodeWakeNudge(nodeId); + context.logGateway.info( + `node wake nudge node=${nodeId} req=${wakeReqId} sent=${nudge.sent} ` + + `throttled=${nudge.throttled} reason=${nudge.reason} durationMs=${nudge.durationMs} ` + + `apnsStatus=${nudge.apnsStatus ?? -1} apnsReason=${nudge.apnsReason ?? "-"}`, + ); + context.logGateway.warn( + `node wake done node=${nodeId} req=${wakeReqId} connected=false ` + + `reason=not_connected totalMs=${totalDurationMs}`, + ); respond( false, undefined, @@ -487,6 +673,11 @@ export const nodeHandlers: GatewayRequestHandlers = { ); return; } + + const totalDurationMs = Math.max(0, Date.now() - wakeFlowStartedAtMs); + context.logGateway.info( + `node wake done node=${nodeId} req=${wakeReqId} connected=true totalMs=${totalDurationMs}`, + ); } const cfg = loadConfig(); const allowlist = resolveNodeCommandAllowlist(cfg, nodeSession);