From e1fa57cee7ab62b8cf187d84438b18c6eb91bbaa Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Thu, 19 Feb 2026 15:44:50 +0530 Subject: [PATCH] fix(telegram): prevent reasoning duplicates in draft lanes --- src/telegram/bot-message-dispatch.test.ts | 94 ++++++++++++++++------- src/telegram/bot-message-dispatch.ts | 18 ++--- src/telegram/draft-stream.test.ts | 22 ++++++ src/telegram/draft-stream.ts | 18 ++--- 4 files changed, 103 insertions(+), 49 deletions(-) diff --git a/src/telegram/bot-message-dispatch.test.ts b/src/telegram/bot-message-dispatch.test.ts index 7845b3d2f4e..7e6fc85e8bd 100644 --- a/src/telegram/bot-message-dispatch.test.ts +++ b/src/telegram/bot-message-dispatch.test.ts @@ -438,18 +438,17 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(draftStream.forceNewMessage).not.toHaveBeenCalled(); }); - it("forces new message when reasoning ends after previous output", async () => { - const draftStream = createDraftStream(999); - createTelegramDraftStream.mockReturnValue(draftStream); + it("defers reasoning split until next reasoning block in block mode", async () => { + const { reasoningDraftStream } = setupDraftStreams({ + answerMessageId: 999, + reasoningMessageId: 111, + }); dispatchReplyWithBufferedBlockDispatcher.mockImplementation( async ({ dispatcherOptions, replyOptions }) => { - // First partial: text before thinking - await replyOptions?.onPartialReply?.({ text: "Let me check" }); - // Reasoning stream (thinking block) - await replyOptions?.onReasoningStream?.({ text: "Analyzing..." }); - // Reasoning ends + await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_first block_" }); await replyOptions?.onReasoningEnd?.(); - // Second partial: text after thinking + expect(reasoningDraftStream.forceNewMessage).not.toHaveBeenCalled(); + await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_second block_" }); await replyOptions?.onPartialReply?.({ text: "Here's the answer" }); await dispatcherOptions.deliver({ text: "Here's the answer" }, { kind: "final" }); return { queuedFinal: true }; @@ -459,16 +458,17 @@ describe("dispatchTelegramMessage draft streaming", () => { await dispatchWithContext({ context: createContext(), streamMode: "block" }); - // Should force new message when reasoning ends - expect(draftStream.forceNewMessage).toHaveBeenCalled(); + expect(reasoningDraftStream.forceNewMessage).toHaveBeenCalledTimes(1); }); - it("does not force new message in partial mode when reasoning ends", async () => { - const draftStream = createDraftStream(999); - createTelegramDraftStream.mockReturnValue(draftStream); + it("does not split reasoning lane on reasoning end when no next reasoning block arrives", async () => { + const { reasoningDraftStream } = setupDraftStreams({ + answerMessageId: 999, + reasoningMessageId: 111, + }); dispatchReplyWithBufferedBlockDispatcher.mockImplementation( async ({ dispatcherOptions, replyOptions }) => { - await replyOptions?.onPartialReply?.({ text: "Let me check" }); + await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_first block_" }); await replyOptions?.onReasoningEnd?.(); await replyOptions?.onPartialReply?.({ text: "Here's the answer" }); await dispatcherOptions.deliver({ text: "Here's the answer" }, { kind: "final" }); @@ -477,21 +477,20 @@ describe("dispatchTelegramMessage draft streaming", () => { ); deliverReplies.mockResolvedValue({ delivered: true }); - await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + await dispatchWithContext({ context: createContext(), streamMode: "block" }); - expect(draftStream.forceNewMessage).not.toHaveBeenCalled(); + expect(reasoningDraftStream.forceNewMessage).not.toHaveBeenCalled(); }); - it("forces new message on reasoning end after streamed reasoning output", async () => { - const draftStream = createDraftStream(999); - createTelegramDraftStream.mockReturnValue(draftStream); + it("does not force new reasoning split in partial mode when no next block arrives", async () => { + const { reasoningDraftStream } = setupDraftStreams({ + answerMessageId: 999, + reasoningMessageId: 111, + }); dispatchReplyWithBufferedBlockDispatcher.mockImplementation( async ({ dispatcherOptions, replyOptions }) => { - // Reasoning starts immediately (no assistant-answer output yet) - await replyOptions?.onReasoningStream?.({ text: "Thinking..." }); - // Reasoning ends + await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_first block_" }); await replyOptions?.onReasoningEnd?.(); - // First actual text output await replyOptions?.onPartialReply?.({ text: "Here's my answer" }); await dispatcherOptions.deliver({ text: "Here's my answer" }, { kind: "final" }); return { queuedFinal: true }; @@ -499,10 +498,9 @@ describe("dispatchTelegramMessage draft streaming", () => { ); deliverReplies.mockResolvedValue({ delivered: true }); - await dispatchWithContext({ context: createContext(), streamMode: "block" }); + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); - // Reasoning stream produced preview output, so split for final answer. - expect(draftStream.forceNewMessage).toHaveBeenCalled(); + expect(reasoningDraftStream.forceNewMessage).not.toHaveBeenCalled(); }); it("does not finalize preview with reasoning payloads before answer payloads", async () => { @@ -647,6 +645,48 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(deliverReplies).not.toHaveBeenCalled(); }); + it("does not duplicate reasoning final after reasoning end in block mode", async () => { + let reasoningMessageId: number | undefined = 111; + const reasoningDraftStream = { + update: vi.fn(), + flush: vi.fn().mockResolvedValue(undefined), + messageId: vi.fn().mockImplementation(() => reasoningMessageId), + clear: vi.fn().mockResolvedValue(undefined), + stop: vi.fn().mockResolvedValue(undefined), + forceNewMessage: vi.fn().mockImplementation(() => { + reasoningMessageId = undefined; + }), + }; + const answerDraftStream = createDraftStream(999); + createTelegramDraftStream + .mockImplementationOnce(() => answerDraftStream) + .mockImplementationOnce(() => reasoningDraftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_step one_" }); + await replyOptions?.onReasoningEnd?.(); + await dispatcherOptions.deliver( + { text: "Reasoning:\n_step one expanded_" }, + { kind: "final" }, + ); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "111" }); + + await dispatchWithContext({ context: createContext(), streamMode: "block" }); + + expect(reasoningDraftStream.forceNewMessage).not.toHaveBeenCalled(); + expect(editMessageTelegram).toHaveBeenCalledWith( + 123, + 111, + "Reasoning:\n_step one expanded_", + expect.any(Object), + ); + expect(deliverReplies).not.toHaveBeenCalled(); + }); + it("splits reasoning preview only when next reasoning block starts in partial mode", async () => { const { reasoningDraftStream } = setupDraftStreams({ answerMessageId: 999, diff --git a/src/telegram/bot-message-dispatch.ts b/src/telegram/bot-message-dispatch.ts index d4740a5f5ed..0c02c4e16d4 100644 --- a/src/telegram/bot-message-dispatch.ts +++ b/src/telegram/bot-message-dispatch.ts @@ -494,10 +494,10 @@ export const dispatchTelegramMessage = async ({ : undefined, onReasoningStream: reasoningLane.stream ? (payload) => { - // In partial mode, split between reasoning blocks only when the - // next reasoning stream starts. Splitting at reasoning-end can - // orphan the active preview and cause duplicate reasoning sends. - if (streamMode === "partial" && splitReasoningOnNextStream) { + // Split between reasoning blocks only when the next reasoning + // stream starts. Splitting at reasoning-end can orphan the active + // preview and cause duplicate reasoning sends on reasoning final. + if (splitReasoningOnNextStream) { reasoningLane.stream?.forceNewMessage(); resetDraftLaneState(reasoningLane); splitReasoningOnNextStream = false; @@ -516,15 +516,7 @@ export const dispatchTelegramMessage = async ({ : undefined, onReasoningEnd: reasoningLane.stream ? () => { - // Block mode keeps hard message boundaries at reasoning-end. - if (streamMode === "block") { - if (reasoningLane.hasStreamedMessage) { - reasoningLane.stream?.forceNewMessage(); - } - resetDraftLaneState(reasoningLane); - return; - } - // Partial mode splits when/if a later reasoning block begins. + // Split when/if a later reasoning block begins. splitReasoningOnNextStream = reasoningLane.hasStreamedMessage; } : undefined, diff --git a/src/telegram/draft-stream.test.ts b/src/telegram/draft-stream.test.ts index ab385d2e5a4..7532015a5bb 100644 --- a/src/telegram/draft-stream.test.ts +++ b/src/telegram/draft-stream.test.ts @@ -153,6 +153,28 @@ describe("createTelegramDraftStream", () => { parse_mode: "HTML", }); }); + + it("enforces maxChars after renderText expansion", async () => { + const api = createMockDraftApi(); + const warn = vi.fn(); + const stream = createTelegramDraftStream({ + // oxlint-disable-next-line typescript/no-explicit-any + api: api as any, + chatId: 123, + maxChars: 100, + renderText: () => ({ text: `${"<".repeat(120)}`, parseMode: "HTML" }), + warn, + }); + + stream.update("short raw text"); + await stream.flush(); + + expect(api.sendMessage).not.toHaveBeenCalled(); + expect(api.editMessageText).not.toHaveBeenCalled(); + expect(warn).toHaveBeenCalledWith( + expect.stringContaining("telegram stream preview stopped (text length 127 > 100)"), + ); + }); }); describe("draft stream initial message debounce", () => { diff --git a/src/telegram/draft-stream.ts b/src/telegram/draft-stream.ts index beb4d63a836..a4a6b2db20c 100644 --- a/src/telegram/draft-stream.ts +++ b/src/telegram/draft-stream.ts @@ -62,21 +62,21 @@ export function createTelegramDraftStream(params: { if (!trimmed) { return false; } - if (trimmed.length > maxChars) { - // Telegram text messages/edits cap at 4096 chars. - // Stop streaming once we exceed the cap to avoid repeated API failures. - stopped = true; - params.warn?.( - `telegram stream preview stopped (text length ${trimmed.length} > ${maxChars})`, - ); - return false; - } const rendered = params.renderText?.(trimmed) ?? { text: trimmed }; const renderedText = rendered.text.trimEnd(); const renderedParseMode = rendered.parseMode; if (!renderedText) { return false; } + if (renderedText.length > maxChars) { + // Telegram text messages/edits cap at 4096 chars. + // Stop streaming once we exceed the cap to avoid repeated API failures. + stopped = true; + params.warn?.( + `telegram stream preview stopped (text length ${renderedText.length} > ${maxChars})`, + ); + return false; + } if (renderedText === lastSentText && renderedParseMode === lastSentParseMode) { return true; }