TUI/Gateway: fix pi streaming + tool routing + model display + msg updating (#8432)

* TUI/Gateway: fix pi streaming + tool routing

* Tests: clarify verbose tool output expectation

* fix: avoid seq gaps for targeted tool events (#8432) (thanks @gumadeiras)
This commit is contained in:
Gustavo Madeira Santana
2026-02-04 17:12:16 -05:00
committed by GitHub
parent a42e3cb78a
commit 38e6da1fe0
32 changed files with 1227 additions and 208 deletions

View File

@@ -120,6 +120,79 @@ export function createChatRunState(): ChatRunState {
};
}
export type ToolEventRecipientRegistry = {
add: (runId: string, connId: string) => void;
get: (runId: string) => ReadonlySet<string> | undefined;
markFinal: (runId: string) => void;
};
type ToolRecipientEntry = {
connIds: Set<string>;
updatedAt: number;
finalizedAt?: number;
};
const TOOL_EVENT_RECIPIENT_TTL_MS = 10 * 60 * 1000;
const TOOL_EVENT_RECIPIENT_FINAL_GRACE_MS = 30 * 1000;
export function createToolEventRecipientRegistry(): ToolEventRecipientRegistry {
const recipients = new Map<string, ToolRecipientEntry>();
const prune = () => {
if (recipients.size === 0) {
return;
}
const now = Date.now();
for (const [runId, entry] of recipients) {
const cutoff = entry.finalizedAt
? entry.finalizedAt + TOOL_EVENT_RECIPIENT_FINAL_GRACE_MS
: entry.updatedAt + TOOL_EVENT_RECIPIENT_TTL_MS;
if (now >= cutoff) {
recipients.delete(runId);
}
}
};
const add = (runId: string, connId: string) => {
if (!runId || !connId) {
return;
}
const now = Date.now();
const existing = recipients.get(runId);
if (existing) {
existing.connIds.add(connId);
existing.updatedAt = now;
} else {
recipients.set(runId, {
connIds: new Set([connId]),
updatedAt: now,
});
}
prune();
};
const get = (runId: string) => {
const entry = recipients.get(runId);
if (!entry) {
return undefined;
}
entry.updatedAt = Date.now();
prune();
return entry.connIds;
};
const markFinal = (runId: string) => {
const entry = recipients.get(runId);
if (!entry) {
return;
}
entry.finalizedAt = Date.now();
prune();
};
return { add, get, markFinal };
}
export type ChatEventBroadcast = (
event: string,
payload: unknown,
@@ -130,20 +203,29 @@ export type NodeSendToSession = (sessionKey: string, event: string, payload: unk
export type AgentEventHandlerOptions = {
broadcast: ChatEventBroadcast;
broadcastToConnIds: (
event: string,
payload: unknown,
connIds: ReadonlySet<string>,
opts?: { dropIfSlow?: boolean },
) => void;
nodeSendToSession: NodeSendToSession;
agentRunSeq: Map<string, number>;
chatRunState: ChatRunState;
resolveSessionKeyForRun: (runId: string) => string | undefined;
clearAgentRunContext: (runId: string) => void;
toolEventRecipients: ToolEventRecipientRegistry;
};
export function createAgentEventHandler({
broadcast,
broadcastToConnIds,
nodeSendToSession,
agentRunSeq,
chatRunState,
resolveSessionKeyForRun,
clearAgentRunContext,
toolEventRecipients,
}: AgentEventHandlerOptions) {
const emitChatDelta = (sessionKey: string, clientRunId: string, seq: number, text: string) => {
chatRunState.buffers.set(clientRunId, text);
@@ -213,25 +295,25 @@ export function createAgentEventHandler({
nodeSendToSession(sessionKey, "chat", payload);
};
const shouldEmitToolEvents = (runId: string, sessionKey?: string) => {
const resolveToolVerboseLevel = (runId: string, sessionKey?: string) => {
const runContext = getAgentRunContext(runId);
const runVerbose = normalizeVerboseLevel(runContext?.verboseLevel);
if (runVerbose) {
return runVerbose === "on";
return runVerbose;
}
if (!sessionKey) {
return false;
return "off";
}
try {
const { cfg, entry } = loadSessionEntry(sessionKey);
const sessionVerbose = normalizeVerboseLevel(entry?.verboseLevel);
if (sessionVerbose) {
return sessionVerbose === "on";
return sessionVerbose;
}
const defaultVerbose = normalizeVerboseLevel(cfg.agents?.defaults?.verboseDefault);
return defaultVerbose === "on";
return defaultVerbose ?? "off";
} catch {
return false;
return "off";
}
};
@@ -244,10 +326,21 @@ export function createAgentEventHandler({
// Include sessionKey so Control UI can filter tool streams per session.
const agentPayload = sessionKey ? { ...evt, sessionKey } : evt;
const last = agentRunSeq.get(evt.runId) ?? 0;
if (evt.stream === "tool" && !shouldEmitToolEvents(evt.runId, sessionKey)) {
const isToolEvent = evt.stream === "tool";
const toolVerbose = isToolEvent ? resolveToolVerboseLevel(evt.runId, sessionKey) : "off";
if (isToolEvent && toolVerbose === "off") {
agentRunSeq.set(evt.runId, evt.seq);
return;
}
const toolPayload =
isToolEvent && toolVerbose !== "full"
? (() => {
const data = evt.data ? { ...evt.data } : {};
delete data.result;
delete data.partialResult;
return sessionKey ? { ...evt, sessionKey, data } : { ...evt, data };
})()
: agentPayload;
if (evt.seq !== last + 1) {
broadcast("agent", {
runId: evt.runId,
@@ -262,13 +355,20 @@ export function createAgentEventHandler({
});
}
agentRunSeq.set(evt.runId, evt.seq);
broadcast("agent", agentPayload);
if (isToolEvent) {
const recipients = toolEventRecipients.get(evt.runId);
if (recipients && recipients.size > 0) {
broadcastToConnIds("agent", toolPayload, recipients);
}
} else {
broadcast("agent", agentPayload);
}
const lifecyclePhase =
evt.stream === "lifecycle" && typeof evt.data?.phase === "string" ? evt.data.phase : null;
if (sessionKey) {
nodeSendToSession(sessionKey, "agent", agentPayload);
nodeSendToSession(sessionKey, "agent", isToolEvent ? toolPayload : agentPayload);
if (!isAborted && evt.stream === "assistant" && typeof evt.data?.text === "string") {
emitChatDelta(sessionKey, clientRunId, evt.seq, evt.data.text);
} else if (!isAborted && (lifecyclePhase === "end" || lifecyclePhase === "error")) {
@@ -306,6 +406,7 @@ export function createAgentEventHandler({
}
if (lifecyclePhase === "end" || lifecyclePhase === "error") {
toolEventRecipients.markFinal(evt.runId);
clearAgentRunContext(evt.runId);
}
};