refactor: simplify Telegram preview streaming to single boolean (#22012)

Merged via /review-pr -> /prepare-pr -> /merge-pr.

Prepared head SHA: a4017d3b94
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Reviewed-by: @obviyus
This commit is contained in:
Ayaan Zaidi
2026-02-21 15:19:13 +05:30
committed by GitHub
parent e1cb73cdeb
commit 677384c519
13 changed files with 116 additions and 137 deletions

View File

@@ -193,7 +193,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(draftStream.clear).toHaveBeenCalledTimes(1);
});
it("keeps a higher initial debounce threshold in block stream mode", async () => {
it("uses immediate preview updates for legacy block stream mode", async () => {
const draftStream = createDraftStream();
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
@@ -209,7 +209,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(createTelegramDraftStream).toHaveBeenCalledWith(
expect.objectContaining({
minInitialChars: 30,
minInitialChars: 1,
}),
);
});
@@ -445,7 +445,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
);
});
it("forces new message when new assistant message starts after previous output", async () => {
it("does not force new message for legacy block stream mode", async () => {
const draftStream = createDraftStream(999);
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
@@ -464,8 +464,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
await dispatchWithContext({ context: createContext(), streamMode: "block" });
// Should force new message when assistant message starts after previous output
expect(draftStream.forceNewMessage).toHaveBeenCalled();
expect(draftStream.forceNewMessage).not.toHaveBeenCalled();
});
it("does not force new message in partial mode when assistant message restarts", async () => {

View File

@@ -6,7 +6,6 @@ import {
modelSupportsVision,
} from "../agents/model-catalog.js";
import { resolveDefaultModelForAgent } from "../agents/model-selection.js";
import { EmbeddedBlockChunker } from "../agents/pi-embedded-block-chunker.js";
import { resolveChunkMode } from "../auto-reply/chunk.js";
import { clearHistoryEntriesIfEnabled } from "../auto-reply/reply/history.js";
import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.js";
@@ -26,7 +25,6 @@ import type { TelegramBotOptions } from "./bot.js";
import { deliverReplies } from "./bot/delivery.js";
import type { TelegramStreamMode } from "./bot/types.js";
import type { TelegramInlineButtons } from "./button-types.js";
import { resolveTelegramDraftStreamingChunking } from "./draft-chunking.js";
import { createTelegramDraftStream } from "./draft-stream.js";
import { renderTelegramHtmlText } from "./format.js";
import {
@@ -143,21 +141,20 @@ export const dispatchTelegramMessage = async ({
});
const forceBlockStreamingForReasoning = resolvedReasoningLevel === "on";
const streamReasoningDraft = resolvedReasoningLevel === "stream";
const previewStreamingEnabled = streamMode !== "off";
const canStreamAnswerDraft =
streamMode !== "off" && !accountBlockStreamingEnabled && !forceBlockStreamingForReasoning;
previewStreamingEnabled && !accountBlockStreamingEnabled && !forceBlockStreamingForReasoning;
const canStreamReasoningDraft = canStreamAnswerDraft || streamReasoningDraft;
const draftReplyToMessageId =
replyToMode !== "off" && typeof msg.message_id === "number" ? msg.message_id : undefined;
const draftMinInitialChars =
streamMode === "partial" || streamReasoningDraft ? 1 : DRAFT_MIN_INITIAL_CHARS;
previewStreamingEnabled || streamReasoningDraft ? 1 : DRAFT_MIN_INITIAL_CHARS;
const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId);
type LaneName = "answer" | "reasoning";
type DraftLaneState = {
stream: ReturnType<typeof createTelegramDraftStream> | undefined;
lastPartialText: string;
draftText: string;
hasStreamedMessage: boolean;
chunker: EmbeddedBlockChunker | undefined;
};
const createDraftLane = (enabled: boolean): DraftLaneState => {
const stream = enabled
@@ -173,16 +170,10 @@ export const dispatchTelegramMessage = async ({
warn: logVerbose,
})
: undefined;
const chunker =
stream && streamMode === "block"
? new EmbeddedBlockChunker(resolveTelegramDraftStreamingChunking(cfg, route.accountId))
: undefined;
return {
stream,
lastPartialText: "",
draftText: "",
hasStreamedMessage: false,
chunker,
};
};
const lanes: Record<LaneName, DraftLaneState> = {
@@ -207,9 +198,7 @@ export const dispatchTelegramMessage = async ({
};
const resetDraftLaneState = (lane: DraftLaneState) => {
lane.lastPartialText = "";
lane.draftText = "";
lane.hasStreamedMessage = false;
lane.chunker?.reset();
};
const updateDraftFromPartial = (lane: DraftLaneState, text: string | undefined) => {
const laneStream = lane.stream;
@@ -221,46 +210,18 @@ export const dispatchTelegramMessage = async ({
}
// Mark that we've received streaming content (for forceNewMessage decision).
lane.hasStreamedMessage = true;
if (streamMode === "partial") {
// Some providers briefly emit a shorter prefix snapshot (for example
// "Sure." -> "Sure" -> "Sure."). Keep the longer preview to avoid
// visible punctuation flicker.
if (
lane.lastPartialText &&
lane.lastPartialText.startsWith(text) &&
text.length < lane.lastPartialText.length
) {
return;
}
lane.lastPartialText = text;
laneStream.update(text);
// Some providers briefly emit a shorter prefix snapshot (for example
// "Sure." -> "Sure" -> "Sure."). Keep the longer preview to avoid
// visible punctuation flicker.
if (
lane.lastPartialText &&
lane.lastPartialText.startsWith(text) &&
text.length < lane.lastPartialText.length
) {
return;
}
let delta = text;
if (text.startsWith(lane.lastPartialText)) {
delta = text.slice(lane.lastPartialText.length);
} else {
// Streaming buffer reset (or non-monotonic stream). Start fresh.
lane.chunker?.reset();
lane.draftText = "";
}
lane.lastPartialText = text;
if (!delta) {
return;
}
if (!lane.chunker) {
lane.draftText = text;
laneStream.update(lane.draftText);
return;
}
lane.chunker.append(delta);
lane.chunker.drain({
force: false,
emit: (chunk) => {
lane.draftText += chunk;
laneStream.update(lane.draftText);
},
});
laneStream.update(text);
};
const ingestDraftLaneSegments = (text: string | undefined) => {
for (const segment of splitTextIntoLaneSegments(text)) {
@@ -275,31 +236,18 @@ export const dispatchTelegramMessage = async ({
if (!lane.stream) {
return;
}
if (lane.chunker?.hasBuffered()) {
lane.chunker.drain({
force: true,
emit: (chunk) => {
lane.draftText += chunk;
},
});
lane.chunker.reset();
if (lane.draftText) {
lane.stream.update(lane.draftText);
}
}
await lane.stream.flush();
};
const disableBlockStreaming =
streamMode === "off"
? true
: forceBlockStreamingForReasoning
? false
: typeof telegramCfg.blockStreaming === "boolean"
? !telegramCfg.blockStreaming
: canStreamAnswerDraft
? true
: undefined;
const disableBlockStreaming = !previewStreamingEnabled
? true
: forceBlockStreamingForReasoning
? false
: typeof telegramCfg.blockStreaming === "boolean"
? !telegramCfg.blockStreaming
: canStreamAnswerDraft
? true
: undefined;
const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({
cfg,
@@ -395,8 +343,7 @@ export const dispatchTelegramMessage = async ({
linkPreview: telegramCfg.linkPreview,
replyQuoteText,
};
const getLanePreviewText = (lane: DraftLaneState) =>
streamMode === "block" ? lane.draftText : lane.lastPartialText;
const getLanePreviewText = (lane: DraftLaneState) => lane.lastPartialText;
const tryUpdatePreviewForLane = async (params: {
lane: DraftLaneState;
laneName: LaneName;
@@ -449,7 +396,6 @@ export const dispatchTelegramMessage = async ({
});
if (updateLaneSnapshot) {
lane.lastPartialText = text;
lane.draftText = text;
}
deliveryState.delivered = true;
return true;
@@ -684,10 +630,6 @@ export const dispatchTelegramMessage = async ({
onAssistantMessageStart: answerLane.stream
? () => {
reasoningStepState.resetForNextStep();
// Keep answer blocks separated in block mode; partial mode keeps one answer lane.
if (streamMode === "block" && answerLane.hasStreamedMessage) {
answerLane.stream?.forceNewMessage();
}
resetDraftLaneState(answerLane);
}
: undefined,

View File

@@ -154,11 +154,18 @@ export function buildTypingThreadParams(messageThreadId?: number) {
}
export function resolveTelegramStreamMode(telegramCfg?: {
streaming?: boolean;
streamMode?: TelegramStreamMode;
}): TelegramStreamMode {
if (typeof telegramCfg?.streaming === "boolean") {
return telegramCfg.streaming ? "partial" : "off";
}
const raw = telegramCfg?.streamMode?.trim().toLowerCase();
if (raw === "off" || raw === "partial" || raw === "block") {
return raw;
if (raw === "off") {
return "off";
}
if (raw === "partial" || raw === "block") {
return "partial";
}
return "partial";
}