refactor(gateway): share broadcast function types

This commit is contained in:
Peter Steinberger
2026-02-15 13:39:59 +00:00
parent 0d47bea3bf
commit c1cc28a4e1
3 changed files with 37 additions and 56 deletions

View File

@@ -15,6 +15,29 @@ const EVENT_SCOPE_GUARDS: Record<string, string[]> = {
"node.pair.resolved": [PAIRING_SCOPE],
};
export type GatewayBroadcastStateVersion = {
presence?: number;
health?: number;
};
export type GatewayBroadcastOpts = {
dropIfSlow?: boolean;
stateVersion?: GatewayBroadcastStateVersion;
};
export type GatewayBroadcastFn = (
event: string,
payload: unknown,
opts?: GatewayBroadcastOpts,
) => void;
export type GatewayBroadcastToConnIdsFn = (
event: string,
payload: unknown,
connIds: ReadonlySet<string>,
opts?: GatewayBroadcastOpts,
) => void;
function hasEventScope(client: GatewayWsClient, event: string): boolean {
const required = EVENT_SCOPE_GUARDS[event];
if (!required) {
@@ -37,10 +60,7 @@ export function createGatewayBroadcaster(params: { clients: Set<GatewayWsClient>
const broadcastInternal = (
event: string,
payload: unknown,
opts?: {
dropIfSlow?: boolean;
stateVersion?: { presence?: number; health?: number };
},
opts?: GatewayBroadcastOpts,
targetConnIds?: ReadonlySet<string>,
) => {
if (params.clients.size === 0) {
@@ -97,24 +117,10 @@ export function createGatewayBroadcaster(params: { clients: Set<GatewayWsClient>
}
};
const broadcast = (
event: string,
payload: unknown,
opts?: {
dropIfSlow?: boolean;
stateVersion?: { presence?: number; health?: number };
},
) => broadcastInternal(event, payload, opts);
const broadcast: GatewayBroadcastFn = (event, payload, opts) =>
broadcastInternal(event, payload, opts);
const broadcastToConnIds = (
event: string,
payload: unknown,
connIds: ReadonlySet<string>,
opts?: {
dropIfSlow?: boolean;
stateVersion?: { presence?: number; health?: number };
},
) => {
const broadcastToConnIds: GatewayBroadcastToConnIdsFn = (event, payload, connIds, opts) => {
if (connIds.size === 0) {
return;
}

View File

@@ -8,6 +8,7 @@ import type { ChatAbortControllerEntry } from "../chat-abort.js";
import type { ExecApprovalManager } from "../exec-approval-manager.js";
import type { NodeRegistry } from "../node-registry.js";
import type { ConnectParams, ErrorShape, RequestFrame } from "../protocol/index.js";
import type { GatewayBroadcastFn, GatewayBroadcastToConnIdsFn } from "../server-broadcast.js";
import type { ChannelRuntimeSnapshot } from "../server-channels.js";
import type { DedupeEntry } from "../server-shared.js";
@@ -37,23 +38,8 @@ export type GatewayRequestContext = {
logGateway: SubsystemLogger;
incrementPresenceVersion: () => number;
getHealthVersion: () => number;
broadcast: (
event: string,
payload: unknown,
opts?: {
dropIfSlow?: boolean;
stateVersion?: { presence?: number; health?: number };
},
) => void;
broadcastToConnIds: (
event: string,
payload: unknown,
connIds: ReadonlySet<string>,
opts?: {
dropIfSlow?: boolean;
stateVersion?: { presence?: number; health?: number };
},
) => void;
broadcast: GatewayBroadcastFn;
broadcastToConnIds: GatewayBroadcastToConnIdsFn;
nodeSendToSession: (sessionKey: string, event: string, payload: unknown) => void;
nodeSendToAllSubscribed: (event: string, payload: unknown) => void;
nodeSubscribe: (nodeId: string, sessionKey: string) => void;

View File

@@ -15,7 +15,11 @@ import type { GatewayWsClient } from "./server/ws-types.js";
import { CANVAS_HOST_PATH } from "../canvas-host/a2ui.js";
import { type CanvasHostHandler, createCanvasHostHandler } from "../canvas-host/server.js";
import { resolveGatewayListenHosts } from "./net.js";
import { createGatewayBroadcaster } from "./server-broadcast.js";
import {
createGatewayBroadcaster,
type GatewayBroadcastFn,
type GatewayBroadcastToConnIdsFn,
} from "./server-broadcast.js";
import {
type ChatRunEntry,
createChatRunState,
@@ -58,23 +62,8 @@ export async function createGatewayRuntimeState(params: {
httpBindHosts: string[];
wss: WebSocketServer;
clients: Set<GatewayWsClient>;
broadcast: (
event: string,
payload: unknown,
opts?: {
dropIfSlow?: boolean;
stateVersion?: { presence?: number; health?: number };
},
) => void;
broadcastToConnIds: (
event: string,
payload: unknown,
connIds: ReadonlySet<string>,
opts?: {
dropIfSlow?: boolean;
stateVersion?: { presence?: number; health?: number };
},
) => void;
broadcast: GatewayBroadcastFn;
broadcastToConnIds: GatewayBroadcastToConnIdsFn;
agentRunSeq: Map<string, number>;
dedupe: Map<string, DedupeEntry>;
chatRunState: ReturnType<typeof createChatRunState>;