refactor(tui): simplify stream boundary-drop modes

This commit is contained in:
Peter Steinberger
2026-02-26 20:54:29 +01:00
parent b01273cfc6
commit 675764e866
2 changed files with 125 additions and 213 deletions

View File

@@ -1,232 +1,122 @@
import { describe, expect, it } from "vitest";
import { TuiStreamAssembler } from "./tui-stream-assembler.js";
const STREAM_WITH_TOOL_BLOCKS = {
role: "assistant",
content: [
{ type: "text", text: "Before tool call" },
{ type: "tool_use", name: "search" },
{ type: "text", text: "After tool call" },
],
} as const;
const text = (value: string) => ({ type: "text", text: value }) as const;
const thinking = (value: string) => ({ type: "thinking", thinking: value }) as const;
const toolUse = () => ({ type: "tool_use", name: "search" }) as const;
const STREAM_AFTER_TOOL_BLOCKS = {
role: "assistant",
content: [
{ type: "tool_use", name: "search" },
{ type: "text", text: "After tool call" },
],
} as const;
const messageWithContent = (content: readonly Record<string, unknown>[]) =>
({
role: "assistant",
content,
}) as const;
const TEXT_ONLY_TWO_BLOCKS = messageWithContent([text("Draft line 1"), text("Draft line 2")]);
type FinalizeBoundaryCase = {
name: string;
streamedContent: readonly Record<string, unknown>[];
finalContent: readonly Record<string, unknown>[];
expected: string;
};
const FINALIZE_BOUNDARY_CASES: FinalizeBoundaryCase[] = [
{
name: "preserves streamed text when tool-boundary final payload drops prefix blocks",
streamedContent: [text("Before tool call"), toolUse(), text("After tool call")],
finalContent: [toolUse(), text("After tool call")],
expected: "Before tool call\nAfter tool call",
},
{
name: "preserves streamed text when streamed run had non-text and final drops suffix blocks",
streamedContent: [text("Before tool call"), toolUse(), text("After tool call")],
finalContent: [text("Before tool call")],
expected: "Before tool call\nAfter tool call",
},
{
name: "prefers final text when non-text appears only in final payload",
streamedContent: [text("Draft line 1"), text("Draft line 2")],
finalContent: [toolUse(), text("Draft line 2")],
expected: "Draft line 2",
},
{
name: "keeps non-empty final text for plain text boundary drops",
streamedContent: [text("Draft line 1"), text("Draft line 2")],
finalContent: [text("Draft line 1")],
expected: "Draft line 1",
},
{
name: "prefers final replacement text when payload is not a boundary subset",
streamedContent: [text("Before tool call"), toolUse(), text("After tool call")],
finalContent: [toolUse(), text("Replacement")],
expected: "Replacement",
},
{
name: "accepts richer final payload when it extends streamed text",
streamedContent: [text("Before tool call")],
finalContent: [text("Before tool call"), text("After tool call")],
expected: "Before tool call\nAfter tool call",
},
];
describe("TuiStreamAssembler", () => {
it("keeps thinking before content even when thinking arrives later", () => {
const assembler = new TuiStreamAssembler();
const first = assembler.ingestDelta(
"run-1",
{
role: "assistant",
content: [{ type: "text", text: "Hello" }],
},
true,
);
const first = assembler.ingestDelta("run-1", messageWithContent([text("Hello")]), true);
expect(first).toBe("Hello");
const second = assembler.ingestDelta(
"run-1",
{
role: "assistant",
content: [{ type: "thinking", thinking: "Brain" }],
},
true,
);
const second = assembler.ingestDelta("run-1", messageWithContent([thinking("Brain")]), true);
expect(second).toBe("[thinking]\nBrain\n\nHello");
});
it("omits thinking when showThinking is false", () => {
const assembler = new TuiStreamAssembler();
const text = assembler.ingestDelta(
const output = assembler.ingestDelta(
"run-2",
{
role: "assistant",
content: [
{ type: "thinking", thinking: "Hidden" },
{ type: "text", text: "Visible" },
],
},
messageWithContent([thinking("Hidden"), text("Visible")]),
false,
);
expect(text).toBe("Visible");
expect(output).toBe("Visible");
});
it("falls back to streamed text on empty final payload", () => {
const assembler = new TuiStreamAssembler();
assembler.ingestDelta(
"run-3",
{
role: "assistant",
content: [{ type: "text", text: "Streamed" }],
},
false,
);
const finalText = assembler.finalize(
"run-3",
{
role: "assistant",
content: [],
},
false,
);
assembler.ingestDelta("run-3", messageWithContent([text("Streamed")]), false);
const finalText = assembler.finalize("run-3", { role: "assistant", content: [] }, false);
expect(finalText).toBe("Streamed");
});
it("returns null when delta text is unchanged", () => {
const assembler = new TuiStreamAssembler();
const first = assembler.ingestDelta(
"run-4",
{
role: "assistant",
content: [{ type: "text", text: "Repeat" }],
},
false,
);
const first = assembler.ingestDelta("run-4", messageWithContent([text("Repeat")]), false);
expect(first).toBe("Repeat");
const second = assembler.ingestDelta("run-4", messageWithContent([text("Repeat")]), false);
expect(second).toBeNull();
});
it("keeps streamed delta text when incoming tool boundary drops a block", () => {
const assembler = new TuiStreamAssembler();
const first = assembler.ingestDelta("run-delta-boundary", TEXT_ONLY_TWO_BLOCKS, false);
expect(first).toBe("Draft line 1\nDraft line 2");
const second = assembler.ingestDelta(
"run-4",
{
role: "assistant",
content: [{ type: "text", text: "Repeat" }],
},
"run-delta-boundary",
messageWithContent([toolUse(), text("Draft line 2")]),
false,
);
expect(second).toBeNull();
});
it("keeps richer streamed text when final payload drops earlier blocks", () => {
const assembler = new TuiStreamAssembler();
assembler.ingestDelta("run-5", STREAM_WITH_TOOL_BLOCKS, false);
const finalText = assembler.finalize("run-5", STREAM_AFTER_TOOL_BLOCKS, false);
expect(finalText).toBe("Before tool call\nAfter tool call");
});
it("does not regress streamed text when a delta drops boundary blocks after tool calls", () => {
const assembler = new TuiStreamAssembler();
const first = assembler.ingestDelta("run-5-stream", STREAM_WITH_TOOL_BLOCKS, false);
expect(first).toBe("Before tool call\nAfter tool call");
const second = assembler.ingestDelta("run-5-stream", STREAM_AFTER_TOOL_BLOCKS, false);
expect(second).toBeNull();
});
it("keeps non-empty final text for plain text prefix/suffix updates", () => {
const assembler = new TuiStreamAssembler();
assembler.ingestDelta(
"run-5b",
{
role: "assistant",
content: [
{ type: "text", text: "Draft line 1" },
{ type: "text", text: "Draft line 2" },
],
},
false,
);
const finalText = assembler.finalize(
"run-5b",
{
role: "assistant",
content: [{ type: "text", text: "Draft line 1" }],
},
false,
);
expect(finalText).toBe("Draft line 1");
});
it("prefers final text when non-text blocks appear only in final payload", () => {
const assembler = new TuiStreamAssembler();
assembler.ingestDelta(
"run-5c",
{
role: "assistant",
content: [
{ type: "text", text: "Draft line 1" },
{ type: "text", text: "Draft line 2" },
],
},
false,
);
const finalText = assembler.finalize(
"run-5c",
{
role: "assistant",
content: [
{ type: "tool_use", name: "search" },
{ type: "text", text: "Draft line 2" },
],
},
false,
);
expect(finalText).toBe("Draft line 2");
});
it("accepts richer final payload when it extends streamed text", () => {
const assembler = new TuiStreamAssembler();
assembler.ingestDelta(
"run-6",
{
role: "assistant",
content: [{ type: "text", text: "Before tool call" }],
},
false,
);
const finalText = assembler.finalize(
"run-6",
{
role: "assistant",
content: [
{ type: "text", text: "Before tool call" },
{ type: "text", text: "After tool call" },
],
},
false,
);
expect(finalText).toBe("Before tool call\nAfter tool call");
});
it("prefers non-empty final payload when it is not a dropped block regression", () => {
const assembler = new TuiStreamAssembler();
assembler.ingestDelta(
"run-7",
{
role: "assistant",
content: [{ type: "text", text: "NOT OK" }],
},
false,
);
const finalText = assembler.finalize(
"run-7",
{
role: "assistant",
content: [{ type: "text", text: "OK" }],
},
false,
);
expect(finalText).toBe("OK");
});
for (const testCase of FINALIZE_BOUNDARY_CASES) {
it(testCase.name, () => {
const assembler = new TuiStreamAssembler();
assembler.ingestDelta("run-boundary", messageWithContent(testCase.streamedContent), false);
const finalText = assembler.finalize(
"run-boundary",
messageWithContent(testCase.finalContent),
false,
);
expect(finalText).toBe(testCase.expected);
});
}
});

View File

@@ -13,6 +13,8 @@ type RunStreamState = {
displayText: string;
};
type BoundaryDropMode = "off" | "streamed-only" | "streamed-or-incoming";
function extractTextBlocksAndSignals(message: unknown): {
textBlocks: string[];
sawNonTextContentBlocks: boolean;
@@ -75,6 +77,29 @@ function isDroppedBoundaryTextBlockSubset(params: {
return finalTextBlocks.every((block, index) => streamedTextBlocks[suffixStart + index] === block);
}
function shouldPreserveBoundaryDroppedText(params: {
boundaryDropMode: BoundaryDropMode;
streamedSawNonTextContentBlocks: boolean;
incomingSawNonTextContentBlocks: boolean;
streamedTextBlocks: string[];
nextContentBlocks: string[];
}) {
if (params.boundaryDropMode === "off") {
return false;
}
const sawEligibleNonTextContent =
params.boundaryDropMode === "streamed-or-incoming"
? params.streamedSawNonTextContentBlocks || params.incomingSawNonTextContentBlocks
: params.streamedSawNonTextContentBlocks;
if (!sawEligibleNonTextContent) {
return false;
}
return isDroppedBoundaryTextBlockSubset({
streamedTextBlocks: params.streamedTextBlocks,
finalTextBlocks: params.nextContentBlocks,
});
}
export class TuiStreamAssembler {
private runs = new Map<string, RunStreamState>();
@@ -97,10 +122,7 @@ export class TuiStreamAssembler {
state: RunStreamState,
message: unknown,
showThinking: boolean,
opts?: {
protectBoundaryDrops?: boolean;
useIncomingNonTextForBoundaryDrops?: boolean;
},
opts?: { boundaryDropMode?: BoundaryDropMode },
) {
const thinkingText = extractThinkingFromMessage(message);
const contentText = extractContentFromMessage(message);
@@ -111,17 +133,16 @@ export class TuiStreamAssembler {
}
if (contentText) {
const nextContentBlocks = textBlocks.length > 0 ? textBlocks : [contentText];
const useIncomingNonTextForBoundaryDrops = opts?.useIncomingNonTextForBoundaryDrops !== false;
const shouldPreserveBoundaryDroppedText =
opts?.protectBoundaryDrops === true &&
(state.sawNonTextContentBlocks ||
(useIncomingNonTextForBoundaryDrops && sawNonTextContentBlocks)) &&
isDroppedBoundaryTextBlockSubset({
streamedTextBlocks: state.contentBlocks,
finalTextBlocks: nextContentBlocks,
});
const boundaryDropMode = opts?.boundaryDropMode ?? "off";
const shouldKeepStreamedBoundaryText = shouldPreserveBoundaryDroppedText({
boundaryDropMode,
streamedSawNonTextContentBlocks: state.sawNonTextContentBlocks,
incomingSawNonTextContentBlocks: sawNonTextContentBlocks,
streamedTextBlocks: state.contentBlocks,
nextContentBlocks,
});
if (!shouldPreserveBoundaryDroppedText) {
if (!shouldKeepStreamedBoundaryText) {
state.contentText = contentText;
state.contentBlocks = nextContentBlocks;
}
@@ -142,7 +163,9 @@ export class TuiStreamAssembler {
ingestDelta(runId: string, message: unknown, showThinking: boolean): string | null {
const state = this.getOrCreateRun(runId);
const previousDisplayText = state.displayText;
this.updateRunState(state, message, showThinking, { protectBoundaryDrops: true });
this.updateRunState(state, message, showThinking, {
boundaryDropMode: "streamed-or-incoming",
});
if (!state.displayText || state.displayText === previousDisplayText) {
return null;
@@ -157,8 +180,7 @@ export class TuiStreamAssembler {
const streamedTextBlocks = [...state.contentBlocks];
const streamedSawNonTextContentBlocks = state.sawNonTextContentBlocks;
this.updateRunState(state, message, showThinking, {
protectBoundaryDrops: true,
useIncomingNonTextForBoundaryDrops: false,
boundaryDropMode: "streamed-only",
});
const finalComposed = state.displayText;
const shouldKeepStreamedText =