From c1cc28a4e176516c4db1d0a1c1f9eec814ba8f00 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 15 Feb 2026 13:39:59 +0000 Subject: [PATCH] refactor(gateway): share broadcast function types --- src/gateway/server-broadcast.ts | 48 ++++++++++++++++------------- src/gateway/server-methods/types.ts | 20 ++---------- src/gateway/server-runtime-state.ts | 25 +++++---------- 3 files changed, 37 insertions(+), 56 deletions(-) diff --git a/src/gateway/server-broadcast.ts b/src/gateway/server-broadcast.ts index c68fc9b8fcb..6f4e570f3b9 100644 --- a/src/gateway/server-broadcast.ts +++ b/src/gateway/server-broadcast.ts @@ -15,6 +15,29 @@ const EVENT_SCOPE_GUARDS: Record = { "node.pair.resolved": [PAIRING_SCOPE], }; +export type GatewayBroadcastStateVersion = { + presence?: number; + health?: number; +}; + +export type GatewayBroadcastOpts = { + dropIfSlow?: boolean; + stateVersion?: GatewayBroadcastStateVersion; +}; + +export type GatewayBroadcastFn = ( + event: string, + payload: unknown, + opts?: GatewayBroadcastOpts, +) => void; + +export type GatewayBroadcastToConnIdsFn = ( + event: string, + payload: unknown, + connIds: ReadonlySet, + opts?: GatewayBroadcastOpts, +) => void; + function hasEventScope(client: GatewayWsClient, event: string): boolean { const required = EVENT_SCOPE_GUARDS[event]; if (!required) { @@ -37,10 +60,7 @@ export function createGatewayBroadcaster(params: { clients: Set const broadcastInternal = ( event: string, payload: unknown, - opts?: { - dropIfSlow?: boolean; - stateVersion?: { presence?: number; health?: number }; - }, + opts?: GatewayBroadcastOpts, targetConnIds?: ReadonlySet, ) => { if (params.clients.size === 0) { @@ -97,24 +117,10 @@ export function createGatewayBroadcaster(params: { clients: Set } }; - const broadcast = ( - event: string, - payload: unknown, - opts?: { - dropIfSlow?: boolean; - stateVersion?: { presence?: number; health?: number }; - }, - ) => broadcastInternal(event, payload, opts); + const broadcast: GatewayBroadcastFn = (event, payload, opts) => + broadcastInternal(event, payload, opts); - const broadcastToConnIds = ( - event: string, - payload: unknown, - connIds: ReadonlySet, - opts?: { - dropIfSlow?: boolean; - stateVersion?: { presence?: number; health?: number }; - }, - ) => { + const broadcastToConnIds: GatewayBroadcastToConnIdsFn = (event, payload, connIds, opts) => { if (connIds.size === 0) { return; } diff --git a/src/gateway/server-methods/types.ts b/src/gateway/server-methods/types.ts index 1b7b34339e1..b0c70acd505 100644 --- a/src/gateway/server-methods/types.ts +++ b/src/gateway/server-methods/types.ts @@ -8,6 +8,7 @@ import type { ChatAbortControllerEntry } from "../chat-abort.js"; import type { ExecApprovalManager } from "../exec-approval-manager.js"; import type { NodeRegistry } from "../node-registry.js"; import type { ConnectParams, ErrorShape, RequestFrame } from "../protocol/index.js"; +import type { GatewayBroadcastFn, GatewayBroadcastToConnIdsFn } from "../server-broadcast.js"; import type { ChannelRuntimeSnapshot } from "../server-channels.js"; import type { DedupeEntry } from "../server-shared.js"; @@ -37,23 +38,8 @@ export type GatewayRequestContext = { logGateway: SubsystemLogger; incrementPresenceVersion: () => number; getHealthVersion: () => number; - broadcast: ( - event: string, - payload: unknown, - opts?: { - dropIfSlow?: boolean; - stateVersion?: { presence?: number; health?: number }; - }, - ) => void; - broadcastToConnIds: ( - event: string, - payload: unknown, - connIds: ReadonlySet, - opts?: { - dropIfSlow?: boolean; - stateVersion?: { presence?: number; health?: number }; - }, - ) => void; + broadcast: GatewayBroadcastFn; + broadcastToConnIds: GatewayBroadcastToConnIdsFn; nodeSendToSession: (sessionKey: string, event: string, payload: unknown) => void; nodeSendToAllSubscribed: (event: string, payload: unknown) => void; nodeSubscribe: (nodeId: string, sessionKey: string) => void; diff --git a/src/gateway/server-runtime-state.ts b/src/gateway/server-runtime-state.ts index 03700757dab..d0c8c01e17c 100644 --- a/src/gateway/server-runtime-state.ts +++ b/src/gateway/server-runtime-state.ts @@ -15,7 +15,11 @@ import type { GatewayWsClient } from "./server/ws-types.js"; import { CANVAS_HOST_PATH } from "../canvas-host/a2ui.js"; import { type CanvasHostHandler, createCanvasHostHandler } from "../canvas-host/server.js"; import { resolveGatewayListenHosts } from "./net.js"; -import { createGatewayBroadcaster } from "./server-broadcast.js"; +import { + createGatewayBroadcaster, + type GatewayBroadcastFn, + type GatewayBroadcastToConnIdsFn, +} from "./server-broadcast.js"; import { type ChatRunEntry, createChatRunState, @@ -58,23 +62,8 @@ export async function createGatewayRuntimeState(params: { httpBindHosts: string[]; wss: WebSocketServer; clients: Set; - broadcast: ( - event: string, - payload: unknown, - opts?: { - dropIfSlow?: boolean; - stateVersion?: { presence?: number; health?: number }; - }, - ) => void; - broadcastToConnIds: ( - event: string, - payload: unknown, - connIds: ReadonlySet, - opts?: { - dropIfSlow?: boolean; - stateVersion?: { presence?: number; health?: number }; - }, - ) => void; + broadcast: GatewayBroadcastFn; + broadcastToConnIds: GatewayBroadcastToConnIdsFn; agentRunSeq: Map; dedupe: Map; chatRunState: ReturnType;