mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-08 09:41:24 +00:00
iOS/Gateway: wake disconnected iOS nodes via APNs before invoke (#20332)
Merged via /review-pr -> /prepare-pr -> /merge-pr.
Prepared head SHA: 7751f9c531
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Reviewed-by: @mbelinky
This commit is contained in:
246
src/gateway/server-methods/nodes.invoke-wake.test.ts
Normal file
246
src/gateway/server-methods/nodes.invoke-wake.test.ts
Normal file
@@ -0,0 +1,246 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { ErrorCodes } from "../protocol/index.js";
|
||||
import { nodeHandlers } from "./nodes.js";
|
||||
|
||||
const mocks = vi.hoisted(() => ({
|
||||
loadConfig: vi.fn(() => ({})),
|
||||
resolveNodeCommandAllowlist: vi.fn(() => []),
|
||||
isNodeCommandAllowed: vi.fn(() => ({ ok: true })),
|
||||
sanitizeNodeInvokeParamsForForwarding: vi.fn(({ rawParams }: { rawParams: unknown }) => ({
|
||||
ok: true,
|
||||
params: rawParams,
|
||||
})),
|
||||
loadApnsRegistration: vi.fn(),
|
||||
resolveApnsAuthConfigFromEnv: vi.fn(),
|
||||
sendApnsBackgroundWake: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("../../config/config.js", () => ({
|
||||
loadConfig: mocks.loadConfig,
|
||||
}));
|
||||
|
||||
vi.mock("../node-command-policy.js", () => ({
|
||||
resolveNodeCommandAllowlist: mocks.resolveNodeCommandAllowlist,
|
||||
isNodeCommandAllowed: mocks.isNodeCommandAllowed,
|
||||
}));
|
||||
|
||||
vi.mock("../node-invoke-sanitize.js", () => ({
|
||||
sanitizeNodeInvokeParamsForForwarding: mocks.sanitizeNodeInvokeParamsForForwarding,
|
||||
}));
|
||||
|
||||
vi.mock("../../infra/push-apns.js", () => ({
|
||||
loadApnsRegistration: mocks.loadApnsRegistration,
|
||||
resolveApnsAuthConfigFromEnv: mocks.resolveApnsAuthConfigFromEnv,
|
||||
sendApnsBackgroundWake: mocks.sendApnsBackgroundWake,
|
||||
}));
|
||||
|
||||
type RespondCall = [
|
||||
boolean,
|
||||
unknown?,
|
||||
{
|
||||
code?: number;
|
||||
message?: string;
|
||||
details?: unknown;
|
||||
}?,
|
||||
];
|
||||
|
||||
type TestNodeSession = {
|
||||
nodeId: string;
|
||||
commands: string[];
|
||||
};
|
||||
|
||||
function makeNodeInvokeParams(overrides?: Partial<Record<string, unknown>>) {
|
||||
return {
|
||||
nodeId: "ios-node-1",
|
||||
command: "camera.capture",
|
||||
params: { quality: "high" },
|
||||
timeoutMs: 5000,
|
||||
idempotencyKey: "idem-node-invoke",
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
async function invokeNode(params: {
|
||||
nodeRegistry: {
|
||||
get: (nodeId: string) => TestNodeSession | undefined;
|
||||
invoke: (payload: {
|
||||
nodeId: string;
|
||||
command: string;
|
||||
params?: unknown;
|
||||
timeoutMs?: number;
|
||||
idempotencyKey?: string;
|
||||
}) => Promise<{
|
||||
ok: boolean;
|
||||
payload?: unknown;
|
||||
payloadJSON?: string | null;
|
||||
error?: { code?: string; message?: string } | null;
|
||||
}>;
|
||||
};
|
||||
requestParams?: Partial<Record<string, unknown>>;
|
||||
}) {
|
||||
const respond = vi.fn();
|
||||
await nodeHandlers["node.invoke"]({
|
||||
params: makeNodeInvokeParams(params.requestParams),
|
||||
respond: respond as never,
|
||||
context: {
|
||||
nodeRegistry: params.nodeRegistry,
|
||||
execApprovalManager: undefined,
|
||||
} as never,
|
||||
client: null,
|
||||
req: { type: "req", id: "req-node-invoke", method: "node.invoke" },
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
return respond;
|
||||
}
|
||||
|
||||
describe("node.invoke APNs wake path", () => {
|
||||
beforeEach(() => {
|
||||
mocks.loadConfig.mockReset();
|
||||
mocks.loadConfig.mockReturnValue({});
|
||||
mocks.resolveNodeCommandAllowlist.mockReset();
|
||||
mocks.resolveNodeCommandAllowlist.mockReturnValue([]);
|
||||
mocks.isNodeCommandAllowed.mockReset();
|
||||
mocks.isNodeCommandAllowed.mockReturnValue({ ok: true });
|
||||
mocks.sanitizeNodeInvokeParamsForForwarding.mockReset();
|
||||
mocks.sanitizeNodeInvokeParamsForForwarding.mockImplementation(
|
||||
({ rawParams }: { rawParams: unknown }) => ({ ok: true, params: rawParams }),
|
||||
);
|
||||
mocks.loadApnsRegistration.mockReset();
|
||||
mocks.resolveApnsAuthConfigFromEnv.mockReset();
|
||||
mocks.sendApnsBackgroundWake.mockReset();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("keeps the existing not-connected response when wake path is unavailable", async () => {
|
||||
mocks.loadApnsRegistration.mockResolvedValue(null);
|
||||
|
||||
const nodeRegistry = {
|
||||
get: vi.fn(() => undefined),
|
||||
invoke: vi.fn().mockResolvedValue({ ok: true }),
|
||||
};
|
||||
|
||||
const respond = await invokeNode({ nodeRegistry });
|
||||
const call = respond.mock.calls[0] as RespondCall | undefined;
|
||||
expect(call?.[0]).toBe(false);
|
||||
expect(call?.[2]?.code).toBe(ErrorCodes.UNAVAILABLE);
|
||||
expect(call?.[2]?.message).toBe("node not connected");
|
||||
expect(mocks.sendApnsBackgroundWake).not.toHaveBeenCalled();
|
||||
expect(nodeRegistry.invoke).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("wakes and retries invoke after the node reconnects", async () => {
|
||||
vi.useFakeTimers();
|
||||
mocks.loadApnsRegistration.mockResolvedValue({
|
||||
nodeId: "ios-node-reconnect",
|
||||
token: "abcd1234abcd1234abcd1234abcd1234",
|
||||
topic: "ai.openclaw.ios",
|
||||
environment: "sandbox",
|
||||
updatedAtMs: 1,
|
||||
});
|
||||
mocks.resolveApnsAuthConfigFromEnv.mockResolvedValue({
|
||||
ok: true,
|
||||
value: {
|
||||
teamId: "TEAM123",
|
||||
keyId: "KEY123",
|
||||
privateKey: "-----BEGIN PRIVATE KEY-----\nabc\n-----END PRIVATE KEY-----",
|
||||
},
|
||||
});
|
||||
mocks.sendApnsBackgroundWake.mockResolvedValue({
|
||||
ok: true,
|
||||
status: 200,
|
||||
tokenSuffix: "1234abcd",
|
||||
topic: "ai.openclaw.ios",
|
||||
environment: "sandbox",
|
||||
});
|
||||
|
||||
let connected = false;
|
||||
const session: TestNodeSession = { nodeId: "ios-node-reconnect", commands: ["camera.capture"] };
|
||||
const nodeRegistry = {
|
||||
get: vi.fn((nodeId: string) => {
|
||||
if (nodeId !== "ios-node-reconnect") {
|
||||
return undefined;
|
||||
}
|
||||
return connected ? session : undefined;
|
||||
}),
|
||||
invoke: vi.fn().mockResolvedValue({
|
||||
ok: true,
|
||||
payload: { ok: true },
|
||||
payloadJSON: '{"ok":true}',
|
||||
}),
|
||||
};
|
||||
|
||||
const invokePromise = invokeNode({
|
||||
nodeRegistry,
|
||||
requestParams: { nodeId: "ios-node-reconnect", idempotencyKey: "idem-reconnect" },
|
||||
});
|
||||
setTimeout(() => {
|
||||
connected = true;
|
||||
}, 300);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(4_000);
|
||||
const respond = await invokePromise;
|
||||
|
||||
expect(mocks.sendApnsBackgroundWake).toHaveBeenCalledTimes(1);
|
||||
expect(nodeRegistry.invoke).toHaveBeenCalledTimes(1);
|
||||
expect(nodeRegistry.invoke).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
nodeId: "ios-node-reconnect",
|
||||
command: "camera.capture",
|
||||
}),
|
||||
);
|
||||
const call = respond.mock.calls[0] as RespondCall | undefined;
|
||||
expect(call?.[0]).toBe(true);
|
||||
expect(call?.[1]).toMatchObject({ ok: true, nodeId: "ios-node-reconnect" });
|
||||
});
|
||||
|
||||
it("throttles repeated wake attempts for the same disconnected node", async () => {
|
||||
vi.useFakeTimers();
|
||||
mocks.loadApnsRegistration.mockResolvedValue({
|
||||
nodeId: "ios-node-throttle",
|
||||
token: "abcd1234abcd1234abcd1234abcd1234",
|
||||
topic: "ai.openclaw.ios",
|
||||
environment: "sandbox",
|
||||
updatedAtMs: 1,
|
||||
});
|
||||
mocks.resolveApnsAuthConfigFromEnv.mockResolvedValue({
|
||||
ok: true,
|
||||
value: {
|
||||
teamId: "TEAM123",
|
||||
keyId: "KEY123",
|
||||
privateKey: "-----BEGIN PRIVATE KEY-----\nabc\n-----END PRIVATE KEY-----",
|
||||
},
|
||||
});
|
||||
mocks.sendApnsBackgroundWake.mockResolvedValue({
|
||||
ok: true,
|
||||
status: 200,
|
||||
tokenSuffix: "1234abcd",
|
||||
topic: "ai.openclaw.ios",
|
||||
environment: "sandbox",
|
||||
});
|
||||
|
||||
const nodeRegistry = {
|
||||
get: vi.fn(() => undefined),
|
||||
invoke: vi.fn().mockResolvedValue({ ok: true }),
|
||||
};
|
||||
|
||||
const first = invokeNode({
|
||||
nodeRegistry,
|
||||
requestParams: { nodeId: "ios-node-throttle", idempotencyKey: "idem-throttle-1" },
|
||||
});
|
||||
await vi.advanceTimersByTimeAsync(4_000);
|
||||
await first;
|
||||
|
||||
const second = invokeNode({
|
||||
nodeRegistry,
|
||||
requestParams: { nodeId: "ios-node-throttle", idempotencyKey: "idem-throttle-2" },
|
||||
});
|
||||
await vi.advanceTimersByTimeAsync(4_000);
|
||||
await second;
|
||||
|
||||
expect(mocks.sendApnsBackgroundWake).toHaveBeenCalledTimes(1);
|
||||
expect(nodeRegistry.invoke).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
@@ -8,6 +8,11 @@ import {
|
||||
requestNodePairing,
|
||||
verifyNodeToken,
|
||||
} from "../../infra/node-pairing.js";
|
||||
import {
|
||||
loadApnsRegistration,
|
||||
resolveApnsAuthConfigFromEnv,
|
||||
sendApnsBackgroundWake,
|
||||
} from "../../infra/push-apns.js";
|
||||
import { isNodeCommandAllowed, resolveNodeCommandAllowlist } from "../node-command-policy.js";
|
||||
import { sanitizeNodeInvokeParamsForForwarding } from "../node-invoke-sanitize.js";
|
||||
import {
|
||||
@@ -34,6 +39,17 @@ import {
|
||||
} from "./nodes.helpers.js";
|
||||
import type { GatewayRequestHandlers } from "./types.js";
|
||||
|
||||
const NODE_WAKE_RECONNECT_WAIT_MS = 3_000;
|
||||
const NODE_WAKE_RECONNECT_POLL_MS = 150;
|
||||
const NODE_WAKE_THROTTLE_MS = 15_000;
|
||||
|
||||
type NodeWakeState = {
|
||||
lastWakeAtMs: number;
|
||||
inFlight?: Promise<boolean>;
|
||||
};
|
||||
|
||||
const nodeWakeById = new Map<string, NodeWakeState>();
|
||||
|
||||
function isNodeEntry(entry: { role?: string; roles?: string[] }) {
|
||||
if (entry.role === "node") {
|
||||
return true;
|
||||
@@ -44,6 +60,77 @@ function isNodeEntry(entry: { role?: string; roles?: string[] }) {
|
||||
return false;
|
||||
}
|
||||
|
||||
async function delayMs(ms: number): Promise<void> {
|
||||
await new Promise<void>((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
async function maybeWakeNodeWithApns(nodeId: string): Promise<boolean> {
|
||||
const state = nodeWakeById.get(nodeId) ?? { lastWakeAtMs: 0 };
|
||||
nodeWakeById.set(nodeId, state);
|
||||
|
||||
if (state.inFlight) {
|
||||
return await state.inFlight;
|
||||
}
|
||||
|
||||
const now = Date.now();
|
||||
if (state.lastWakeAtMs > 0 && now - state.lastWakeAtMs < NODE_WAKE_THROTTLE_MS) {
|
||||
return true;
|
||||
}
|
||||
|
||||
state.inFlight = (async () => {
|
||||
try {
|
||||
const registration = await loadApnsRegistration(nodeId);
|
||||
if (!registration) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const auth = await resolveApnsAuthConfigFromEnv(process.env);
|
||||
if (!auth.ok) {
|
||||
return false;
|
||||
}
|
||||
|
||||
state.lastWakeAtMs = Date.now();
|
||||
await sendApnsBackgroundWake({
|
||||
auth: auth.value,
|
||||
registration,
|
||||
nodeId,
|
||||
wakeReason: "node.invoke",
|
||||
});
|
||||
} catch {
|
||||
// Best-effort wake only.
|
||||
if (state.lastWakeAtMs === 0) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
})();
|
||||
|
||||
try {
|
||||
return await state.inFlight;
|
||||
} finally {
|
||||
state.inFlight = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
async function waitForNodeReconnect(params: {
|
||||
nodeId: string;
|
||||
context: { nodeRegistry: { get: (nodeId: string) => unknown } };
|
||||
timeoutMs?: number;
|
||||
pollMs?: number;
|
||||
}): Promise<boolean> {
|
||||
const timeoutMs = Math.max(250, params.timeoutMs ?? NODE_WAKE_RECONNECT_WAIT_MS);
|
||||
const pollMs = Math.max(50, params.pollMs ?? NODE_WAKE_RECONNECT_POLL_MS);
|
||||
const deadline = Date.now() + timeoutMs;
|
||||
|
||||
while (Date.now() < deadline) {
|
||||
if (params.context.nodeRegistry.get(params.nodeId)) {
|
||||
return true;
|
||||
}
|
||||
await delayMs(pollMs);
|
||||
}
|
||||
return Boolean(params.context.nodeRegistry.get(params.nodeId));
|
||||
}
|
||||
|
||||
export const nodeHandlers: GatewayRequestHandlers = {
|
||||
"node.pair.request": async ({ params, respond, context }) => {
|
||||
if (!validateNodePairRequestParams(params)) {
|
||||
@@ -383,16 +470,23 @@ export const nodeHandlers: GatewayRequestHandlers = {
|
||||
}
|
||||
|
||||
await respondUnavailableOnThrow(respond, async () => {
|
||||
const nodeSession = context.nodeRegistry.get(nodeId);
|
||||
let nodeSession = context.nodeRegistry.get(nodeId);
|
||||
if (!nodeSession) {
|
||||
respond(
|
||||
false,
|
||||
undefined,
|
||||
errorShape(ErrorCodes.UNAVAILABLE, "node not connected", {
|
||||
details: { code: "NOT_CONNECTED" },
|
||||
}),
|
||||
);
|
||||
return;
|
||||
const wakeAvailable = await maybeWakeNodeWithApns(nodeId);
|
||||
if (wakeAvailable) {
|
||||
await waitForNodeReconnect({ nodeId, context });
|
||||
}
|
||||
nodeSession = context.nodeRegistry.get(nodeId);
|
||||
if (!nodeSession) {
|
||||
respond(
|
||||
false,
|
||||
undefined,
|
||||
errorShape(ErrorCodes.UNAVAILABLE, "node not connected", {
|
||||
details: { code: "NOT_CONNECTED" },
|
||||
}),
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
const cfg = loadConfig();
|
||||
const allowlist = resolveNodeCommandAllowlist(cfg, nodeSession);
|
||||
|
||||
Reference in New Issue
Block a user