refactor: centralize presence routing and version precedence coverage (#19609)

Merged via /review-pr -> /prepare-pr -> /merge-pr.

Prepared head SHA: 10d9df5263
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Reviewed-by: @gumadeiras
This commit is contained in:
Gustavo Madeira Santana
2026-02-18 00:02:51 -05:00
committed by GitHub
parent 5c69e625f5
commit 07fdceb5fd
12 changed files with 305 additions and 33 deletions

View File

@@ -4,6 +4,7 @@ import { setHeartbeatsEnabled } from "../../infra/heartbeat-runner.js";
import { enqueueSystemEvent, isSystemEventContextChanged } from "../../infra/system-events.js";
import { listSystemPresence, updateSystemPresence } from "../../infra/system-presence.js";
import { ErrorCodes, errorShape } from "../protocol/index.js";
import { broadcastPresenceSnapshot } from "../server/presence-events.js";
import type { GatewayRequestHandlers } from "./types.js";
export const systemHandlers: GatewayRequestHandlers = {
@@ -123,18 +124,11 @@ export const systemHandlers: GatewayRequestHandlers = {
} else {
enqueueSystemEvent(text, { sessionKey });
}
const nextPresenceVersion = context.incrementPresenceVersion();
context.broadcast(
"presence",
{ presence: listSystemPresence() },
{
dropIfSlow: true,
stateVersion: {
presence: nextPresenceVersion,
health: context.getHealthVersion(),
},
},
);
broadcastPresenceSnapshot({
broadcast: context.broadcast,
incrementPresenceVersion: context.incrementPresenceVersion,
getHealthVersion: context.getHealthVersion,
});
respond(true, { ok: true }, undefined);
},
};

View File

@@ -1,5 +1,6 @@
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, test, vi } from "vitest";
import { WebSocket } from "ws";
import { withEnvAsync } from "../test-utils/env.js";
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js";
import { buildDeviceAuthPayload } from "./device-auth.js";
import { PROTOCOL_VERSION } from "./protocol/index.js";
@@ -63,6 +64,13 @@ function restoreGatewayToken(prevToken: string | undefined) {
}
}
async function withRuntimeVersionEnv<T>(
env: Record<string, string | undefined>,
run: () => Promise<T>,
): Promise<T> {
return withEnvAsync(env, run);
}
const TEST_OPERATOR_CLIENT = {
id: GATEWAY_CLIENT_NAMES.TEST,
version: "1.0.0",
@@ -235,6 +243,78 @@ describe("gateway server auth/connect", () => {
ws.close();
});
test("connect (req) handshake prefers service version fallback in hello-ok payload", async () => {
await withRuntimeVersionEnv(
{
OPENCLAW_VERSION: " ",
OPENCLAW_SERVICE_VERSION: "2.4.6-service",
npm_package_version: "1.0.0-package",
},
async () => {
const ws = await openWs(port);
const res = await connectReq(ws);
expect(res.ok).toBe(true);
const payload = res.payload as
| {
type?: unknown;
server?: { version?: string };
}
| undefined;
expect(payload?.type).toBe("hello-ok");
expect(payload?.server?.version).toBe("2.4.6-service");
ws.close();
},
);
});
test("connect (req) handshake prefers OPENCLAW_VERSION over service version", async () => {
await withRuntimeVersionEnv(
{
OPENCLAW_VERSION: "9.9.9-cli",
OPENCLAW_SERVICE_VERSION: "2.4.6-service",
npm_package_version: "1.0.0-package",
},
async () => {
const ws = await openWs(port);
const res = await connectReq(ws);
expect(res.ok).toBe(true);
const payload = res.payload as
| {
type?: unknown;
server?: { version?: string };
}
| undefined;
expect(payload?.type).toBe("hello-ok");
expect(payload?.server?.version).toBe("9.9.9-cli");
ws.close();
},
);
});
test("connect (req) handshake falls back to npm_package_version when higher-precedence env values are blank", async () => {
await withRuntimeVersionEnv(
{
OPENCLAW_VERSION: " ",
OPENCLAW_SERVICE_VERSION: "\t",
npm_package_version: "1.0.0-package",
},
async () => {
const ws = await openWs(port);
const res = await connectReq(ws);
expect(res.ok).toBe(true);
const payload = res.payload as
| {
type?: unknown;
server?: { version?: string };
}
| undefined;
expect(payload?.type).toBe("hello-ok");
expect(payload?.server?.version).toBe("1.0.0-package");
ws.close();
},
);
});
test("does not grant admin when scopes are empty", async () => {
const ws = await openWs(port);
const res = await connectReq(ws, { scopes: [] });

View File

@@ -0,0 +1,35 @@
import { describe, expect, it, vi } from "vitest";
import { broadcastPresenceSnapshot } from "./presence-events.js";
describe("broadcastPresenceSnapshot", () => {
it("increments version and broadcasts presence with state versions", () => {
const broadcast = vi.fn();
const incrementPresenceVersion = vi.fn(() => 7);
const getHealthVersion = vi.fn(() => 11);
const presenceVersion = broadcastPresenceSnapshot({
broadcast,
incrementPresenceVersion,
getHealthVersion,
});
expect(presenceVersion).toBe(7);
expect(incrementPresenceVersion).toHaveBeenCalledTimes(1);
expect(getHealthVersion).toHaveBeenCalledTimes(1);
expect(broadcast).toHaveBeenCalledTimes(1);
const [event, payload, opts] = broadcast.mock.calls[0] as [
string,
unknown,
{ dropIfSlow?: boolean; stateVersion?: { presence?: number; health?: number } } | undefined,
];
expect(event).toBe("presence");
if (!payload || typeof payload !== "object" || Array.isArray(payload)) {
throw new Error("expected object payload");
}
expect(Array.isArray((payload as { presence?: unknown }).presence)).toBe(true);
expect(opts?.dropIfSlow).toBe(true);
expect(opts?.stateVersion).toEqual({ presence: 7, health: 11 });
});
});

View File

@@ -0,0 +1,22 @@
import { listSystemPresence } from "../../infra/system-presence.js";
import type { GatewayBroadcastFn } from "../server-broadcast.js";
export function broadcastPresenceSnapshot(params: {
broadcast: GatewayBroadcastFn;
incrementPresenceVersion: () => number;
getHealthVersion: () => number;
}): number {
const presenceVersion = params.incrementPresenceVersion();
params.broadcast(
"presence",
{ presence: listSystemPresence() },
{
dropIfSlow: true,
stateVersion: {
presence: presenceVersion,
health: params.getHealthVersion(),
},
},
);
return presenceVersion;
}

View File

@@ -2,7 +2,7 @@ import { randomUUID } from "node:crypto";
import type { WebSocket, WebSocketServer } from "ws";
import { resolveCanvasHostUrl } from "../../infra/canvas-host-url.js";
import { removeRemoteNodeInfo } from "../../infra/skills-remote.js";
import { listSystemPresence, upsertPresence } from "../../infra/system-presence.js";
import { upsertPresence } from "../../infra/system-presence.js";
import type { createSubsystemLogger } from "../../logging/subsystem.js";
import { truncateUtf16Safe } from "../../utils.js";
import { isWebchatClient } from "../../utils/message-channel.js";
@@ -13,7 +13,8 @@ import { getHandshakeTimeoutMs } from "../server-constants.js";
import type { GatewayRequestContext, GatewayRequestHandlers } from "../server-methods/types.js";
import { formatError } from "../server-utils.js";
import { logWs } from "../ws-log.js";
import { getHealthVersion, getPresenceVersion, incrementPresenceVersion } from "./health-state.js";
import { getHealthVersion, incrementPresenceVersion } from "./health-state.js";
import { broadcastPresenceSnapshot } from "./presence-events.js";
import { attachGatewayWsMessageHandler } from "./ws-connection/message-handler.js";
import type { GatewayWsClient } from "./ws-types.js";
@@ -227,18 +228,7 @@ export function attachGatewayWsConnectionHandler(params: {
}
if (client?.presenceKey) {
upsertPresence(client.presenceKey, { reason: "disconnect" });
incrementPresenceVersion();
broadcast(
"presence",
{ presence: listSystemPresence() },
{
dropIfSlow: true,
stateVersion: {
presence: getPresenceVersion(),
health: getHealthVersion(),
},
},
);
broadcastPresenceSnapshot({ broadcast, incrementPresenceVersion, getHealthVersion });
}
if (client?.connect?.role === "node") {
const context = buildRequestContext();

View File

@@ -22,6 +22,7 @@ import { loadVoiceWakeConfig } from "../../../infra/voicewake.js";
import { rawDataToString } from "../../../infra/ws.js";
import type { createSubsystemLogger } from "../../../logging/subsystem.js";
import { isGatewayCliClient, isWebchatClient } from "../../../utils/message-channel.js";
import { resolveRuntimeServiceVersion } from "../../../version.js";
import {
AUTH_RATE_LIMIT_SCOPE_DEVICE_TOKEN,
AUTH_RATE_LIMIT_SCOPE_SHARED_SECRET,
@@ -791,7 +792,7 @@ export function attachGatewayWsMessageHandler(params: {
type: "hello-ok",
protocol: PROTOCOL_VERSION,
server: {
version: process.env.OPENCLAW_VERSION ?? process.env.npm_package_version ?? "dev",
version: resolveRuntimeServiceVersion(process.env, "dev"),
commit: process.env.GIT_COMMIT,
host: os.hostname(),
connId,