From e5ff60607d8cffb22d81964e1e6281dcc3f25160 Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Thu, 19 Feb 2026 20:17:14 +0530 Subject: [PATCH] refactor(telegram): unify reasoning and answer lane delivery --- src/telegram/bot-message-dispatch.ts | 429 ++++++++++++--------------- 1 file changed, 186 insertions(+), 243 deletions(-) diff --git a/src/telegram/bot-message-dispatch.ts b/src/telegram/bot-message-dispatch.ts index 79e9109e050..e9a5fb0a4d3 100644 --- a/src/telegram/bot-message-dispatch.ts +++ b/src/telegram/bot-message-dispatch.ts @@ -149,47 +149,8 @@ export const dispatchTelegramMessage = async ({ replyToMode !== "off" && typeof msg.message_id === "number" ? msg.message_id : undefined; const draftMinInitialChars = streamMode === "partial" || streamReasoningDraft ? 1 : DRAFT_MIN_INITIAL_CHARS; - const answerDraftStream = canStreamAnswerDraft - ? createTelegramDraftStream({ - api: bot.api, - chatId, - maxChars: draftMaxChars, - thread: threadSpec, - replyToMessageId: draftReplyToMessageId, - minInitialChars: draftMinInitialChars, - renderText: renderDraftPreview, - log: logVerbose, - warn: logVerbose, - }) - : undefined; - const reasoningDraftStream = canStreamReasoningDraft - ? createTelegramDraftStream({ - api: bot.api, - chatId, - maxChars: draftMaxChars, - thread: threadSpec, - replyToMessageId: draftReplyToMessageId, - minInitialChars: draftMinInitialChars, - renderText: renderDraftPreview, - log: logVerbose, - warn: logVerbose, - }) - : undefined; - const answerDraftChunking = - answerDraftStream && streamMode === "block" - ? resolveTelegramDraftStreamingChunking(cfg, route.accountId) - : undefined; - const answerDraftChunker = answerDraftChunking - ? new EmbeddedBlockChunker(answerDraftChunking) - : undefined; - const reasoningDraftChunking = - reasoningDraftStream && streamMode === "block" - ? resolveTelegramDraftStreamingChunking(cfg, route.accountId) - : undefined; - const reasoningDraftChunker = reasoningDraftChunking - ? new EmbeddedBlockChunker(reasoningDraftChunking) - : undefined; const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId); + type LaneName = "answer" | "reasoning"; type DraftLaneState = { stream: ReturnType | undefined; lastPartialText: string; @@ -197,22 +158,52 @@ export const dispatchTelegramMessage = async ({ hasStreamedMessage: boolean; chunker: EmbeddedBlockChunker | undefined; }; - const answerLane: DraftLaneState = { - stream: answerDraftStream, - lastPartialText: "", - draftText: "", - hasStreamedMessage: false, - chunker: answerDraftChunker, + const createDraftLane = (enabled: boolean): DraftLaneState => { + const stream = enabled + ? createTelegramDraftStream({ + api: bot.api, + chatId, + maxChars: draftMaxChars, + thread: threadSpec, + replyToMessageId: draftReplyToMessageId, + minInitialChars: draftMinInitialChars, + renderText: renderDraftPreview, + log: logVerbose, + warn: logVerbose, + }) + : undefined; + const chunker = + stream && streamMode === "block" + ? new EmbeddedBlockChunker(resolveTelegramDraftStreamingChunking(cfg, route.accountId)) + : undefined; + return { + stream, + lastPartialText: "", + draftText: "", + hasStreamedMessage: false, + chunker, + }; }; - const reasoningLane: DraftLaneState = { - stream: reasoningDraftStream, - lastPartialText: "", - draftText: "", - hasStreamedMessage: false, - chunker: reasoningDraftChunker, + const lanes: Record = { + answer: createDraftLane(canStreamAnswerDraft), + reasoning: createDraftLane(canStreamReasoningDraft), }; + const answerLane = lanes.answer; + const reasoningLane = lanes.reasoning; let splitReasoningOnNextStream = false; const reasoningStepState = createTelegramReasoningStepState(); + type SplitLaneSegment = { lane: LaneName; text: string }; + const splitTextIntoLaneSegments = (text?: string): SplitLaneSegment[] => { + const split = splitTelegramReasoningText(text); + const segments: SplitLaneSegment[] = []; + if (split.reasoningText) { + segments.push({ lane: "reasoning", text: split.reasoningText }); + } + if (split.answerText) { + segments.push({ lane: "answer", text: split.answerText }); + } + return segments; + }; const resetDraftLaneState = (lane: DraftLaneState) => { lane.lastPartialText = ""; lane.draftText = ""; @@ -271,17 +262,12 @@ export const dispatchTelegramMessage = async ({ }); }; const updateDraftLanesFromPartial = (text: string | undefined) => { - if (!text) { - return; - } - const split = splitTelegramReasoningText(text); - if (split.reasoningText) { - reasoningStepState.noteReasoningHint(); - reasoningStepState.noteReasoningDelivered(); - updateDraftFromPartial(reasoningLane, split.reasoningText); - } - if (split.answerText) { - updateDraftFromPartial(answerLane, split.answerText); + for (const segment of splitTextIntoLaneSegments(text)) { + if (segment.lane === "reasoning") { + reasoningStepState.noteReasoningHint(); + reasoningStepState.noteReasoningDelivered(); + } + updateDraftFromPartial(lanes[segment.lane], segment.text); } }; const flushDraftLane = async (lane: DraftLaneState) => { @@ -381,8 +367,10 @@ export const dispatchTelegramMessage = async ({ delivered: false, skippedNonSilent: 0, }; - let finalizedViaPreviewMessage = false; - let finalizedReasoningViaPreviewMessage = false; + const finalizedPreviewByLane: Record = { + answer: false, + reasoning: false, + }; const clearGroupHistory = () => { if (isGroup && historyKey) { clearHistoryEntriesIfEnabled({ historyMap: groupHistories, historyKey, limit: historyLimit }); @@ -402,89 +390,67 @@ export const dispatchTelegramMessage = async ({ linkPreview: telegramCfg.linkPreview, replyQuoteText, }; - const tryFinalizePreviewForLane = async (params: { + const getLanePreviewText = (lane: DraftLaneState) => + streamMode === "block" ? lane.draftText : lane.lastPartialText; + const tryUpdatePreviewForLane = async (params: { lane: DraftLaneState; - laneName: "answer" | "reasoning"; - finalText: string; + laneName: LaneName; + text: string; previewButtons?: TelegramInlineButtons; + stopBeforeEdit?: boolean; + updateLaneSnapshot?: boolean; + skipRegressive: "always" | "existingOnly"; + context: "final" | "update"; }): Promise => { - const { lane, laneName, finalText, previewButtons } = params; + const { + lane, + laneName, + text, + previewButtons, + stopBeforeEdit = false, + updateLaneSnapshot = false, + skipRegressive, + context, + } = params; if (!lane.stream) { return false; } const hadPreviewMessage = typeof lane.stream.messageId() === "number"; - const currentPreviewText = streamMode === "block" ? lane.draftText : lane.lastPartialText; - await lane.stream.stop(); + if (stopBeforeEdit) { + await lane.stream.stop(); + } const previewMessageId = lane.stream.messageId(); if (typeof previewMessageId !== "number") { return false; } - if ( - hadPreviewMessage && - currentPreviewText && - currentPreviewText.startsWith(finalText) && - finalText.length < currentPreviewText.length - ) { + const currentPreviewText = getLanePreviewText(lane); + const shouldSkipRegressive = + Boolean(currentPreviewText) && + currentPreviewText.startsWith(text) && + text.length < currentPreviewText.length && + (skipRegressive === "always" || hadPreviewMessage); + if (shouldSkipRegressive) { // Avoid regressive punctuation/wording flicker from occasional shorter finals. deliveryState.delivered = true; return true; } try { - await editMessageTelegram(chatId, previewMessageId, finalText, { + await editMessageTelegram(chatId, previewMessageId, text, { api: bot.api, cfg, accountId: route.accountId, linkPreview: telegramCfg.linkPreview, buttons: previewButtons, }); + if (updateLaneSnapshot) { + lane.lastPartialText = text; + lane.draftText = text; + } deliveryState.delivered = true; return true; } catch (err) { logVerbose( - `telegram: ${laneName} preview final edit failed; falling back to standard send (${String(err)})`, - ); - return false; - } - }; - const tryEditExistingPreviewForLane = async (params: { - lane: DraftLaneState; - laneName: "answer" | "reasoning"; - finalText: string; - previewButtons?: TelegramInlineButtons; - }): Promise => { - const { lane, laneName, finalText, previewButtons } = params; - if (!lane.stream) { - return false; - } - const previewMessageId = lane.stream.messageId(); - if (typeof previewMessageId !== "number") { - return false; - } - const currentPreviewText = streamMode === "block" ? lane.draftText : lane.lastPartialText; - if ( - currentPreviewText && - currentPreviewText.startsWith(finalText) && - finalText.length < currentPreviewText.length - ) { - // Avoid regressive punctuation/wording flicker from occasional shorter finals. - deliveryState.delivered = true; - return true; - } - try { - await editMessageTelegram(chatId, previewMessageId, finalText, { - api: bot.api, - cfg, - accountId: route.accountId, - linkPreview: telegramCfg.linkPreview, - buttons: previewButtons, - }); - lane.lastPartialText = finalText; - lane.draftText = finalText; - deliveryState.delivered = true; - return true; - } catch (err) { - logVerbose( - `telegram: ${laneName} preview update failed; falling back to standard send (${String(err)})`, + `telegram: ${laneName} preview ${context} edit failed; falling back to standard send (${String(err)})`, ); return false; } @@ -506,33 +472,72 @@ export const dispatchTelegramMessage = async ({ } return result.delivered; }; - const tryFinalizeLaneText = async (params: { - lane: DraftLaneState; - laneName: "answer" | "reasoning"; + type LaneDeliveryResult = "preview-finalized" | "preview-updated" | "sent" | "skipped"; + const deliverLaneText = async (params: { + laneName: LaneName; text: string; - previewButtons?: TelegramInlineButtons; - alreadyFinalized?: boolean; payload: ReplyPayload; - }): Promise => { - const { lane, laneName, text, previewButtons, alreadyFinalized, payload } = params; + infoKind: string; + previewButtons?: TelegramInlineButtons; + allowPreviewUpdateForNonFinal?: boolean; + }): Promise => { + const { + laneName, + text, + payload, + infoKind, + previewButtons, + allowPreviewUpdateForNonFinal = false, + } = params; + const lane = lanes[laneName]; const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; - const canFinalizeViaPreviewEdit = + const canEditViaPreview = !hasMedia && text.length > 0 && text.length <= draftMaxChars && !payload.isError; - if (!canFinalizeViaPreviewEdit || alreadyFinalized) { - if (!hasMedia && !payload.isError && text.length > draftMaxChars) { + + if (infoKind === "final") { + if (canEditViaPreview && !finalizedPreviewByLane[laneName]) { + await flushDraftLane(lane); + const finalized = await tryUpdatePreviewForLane({ + lane, + laneName, + text, + previewButtons, + stopBeforeEdit: true, + skipRegressive: "existingOnly", + context: "final", + }); + if (finalized) { + finalizedPreviewByLane[laneName] = true; + return "preview-finalized"; + } + } else if (!hasMedia && !payload.isError && text.length > draftMaxChars) { logVerbose( `telegram: preview final too long for edit (${text.length} > ${draftMaxChars}); falling back to standard send`, ); } - return false; + await lane.stream?.stop(); + const delivered = await sendPayload(applyTextToPayload(payload, text)); + return delivered ? "sent" : "skipped"; } - await flushDraftLane(lane); - return tryFinalizePreviewForLane({ - lane, - laneName, - finalText: text, - previewButtons, - }); + + if (allowPreviewUpdateForNonFinal && canEditViaPreview) { + const updated = await tryUpdatePreviewForLane({ + lane, + laneName, + text, + previewButtons, + stopBeforeEdit: false, + updateLaneSnapshot: true, + skipRegressive: "always", + context: "update", + }); + if (updated) { + return "preview-updated"; + } + } + + const delivered = await sendPayload(applyTextToPayload(payload, text)); + return delivered ? "sent" : "skipped"; }; let queuedFinal = false; @@ -546,7 +551,7 @@ export const dispatchTelegramMessage = async ({ const previewButtons = ( payload.channelData?.telegram as { buttons?: TelegramInlineButtons } | undefined )?.buttons; - const split = splitTelegramReasoningText(payload.text); + const segments = splitTextIntoLaneSegments(payload.text); const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; const flushBufferedFinalAnswer = async () => { @@ -559,112 +564,51 @@ export const dispatchTelegramMessage = async ({ | { buttons?: TelegramInlineButtons } | undefined )?.buttons; - const finalizedBufferedAnswer = await tryFinalizeLaneText({ - lane: answerLane, + await deliverLaneText({ laneName: "answer", text: buffered.text, - previewButtons: bufferedButtons, - alreadyFinalized: finalizedViaPreviewMessage, payload: buffered.payload, + infoKind: "final", + previewButtons: bufferedButtons, }); - if (finalizedBufferedAnswer) { - finalizedViaPreviewMessage = true; - reasoningStepState.resetForNextStep(); - return; - } - await answerLane.stream?.stop(); - await sendPayload(applyTextToPayload(buffered.payload, buffered.text)); reasoningStepState.resetForNextStep(); }; - const deliverReasoningText = async (text: string) => { - reasoningStepState.noteReasoningHint(); - if (info.kind === "final") { - const finalizedReasoning = await tryFinalizeLaneText({ - lane: reasoningLane, - laneName: "reasoning", - text, - previewButtons, - payload, - }); - if (finalizedReasoning) { - finalizedReasoningViaPreviewMessage = true; - reasoningStepState.noteReasoningDelivered(); - await flushBufferedFinalAnswer(); - return; - } - await reasoningLane.stream?.stop(); - const delivered = await sendPayload(applyTextToPayload(payload, text)); - if (delivered) { + for (const segment of segments) { + if ( + segment.lane === "answer" && + info.kind === "final" && + reasoningStepState.shouldBufferFinalAnswer() + ) { + reasoningStepState.bufferFinalAnswer({ payload, text: segment.text }); + continue; + } + if (segment.lane === "reasoning") { + reasoningStepState.noteReasoningHint(); + } + const result = await deliverLaneText({ + laneName: segment.lane, + text: segment.text, + payload, + infoKind: info.kind, + previewButtons, + allowPreviewUpdateForNonFinal: segment.lane === "reasoning", + }); + if (segment.lane === "reasoning") { + if (result !== "skipped") { reasoningStepState.noteReasoningDelivered(); await flushBufferedFinalAnswer(); } - return; + continue; } - - const canEditReasoningPreview = - !hasMedia && text.length > 0 && text.length <= draftMaxChars && !payload.isError; - if (canEditReasoningPreview) { - const updatedReasoning = await tryEditExistingPreviewForLane({ - lane: reasoningLane, - laneName: "reasoning", - finalText: text, - previewButtons, - }); - if (updatedReasoning) { - reasoningStepState.noteReasoningDelivered(); - await flushBufferedFinalAnswer(); - return; - } - } - const delivered = await sendPayload(applyTextToPayload(payload, text)); - if (delivered) { - reasoningStepState.noteReasoningDelivered(); - await flushBufferedFinalAnswer(); - } - }; - - const deliverAnswerText = async (text: string) => { - if (info.kind === "final" && reasoningStepState.shouldBufferFinalAnswer()) { - reasoningStepState.bufferFinalAnswer({ payload, text }); - return; - } - if (info.kind === "final") { - const finalizedAnswer = await tryFinalizeLaneText({ - lane: answerLane, - laneName: "answer", - text, - previewButtons, - alreadyFinalized: finalizedViaPreviewMessage, - payload, - }); - if (finalizedAnswer && !finalizedViaPreviewMessage) { - finalizedViaPreviewMessage = true; - if (reasoningLane.hasStreamedMessage) { - finalizedReasoningViaPreviewMessage = true; - } - reasoningStepState.resetForNextStep(); - return; - } - await answerLane.stream?.stop(); - } - await sendPayload(applyTextToPayload(payload, text)); if (info.kind === "final") { if (reasoningLane.hasStreamedMessage) { - finalizedReasoningViaPreviewMessage = true; + finalizedPreviewByLane.reasoning = true; } reasoningStepState.resetForNextStep(); } - }; - - if (split.reasoningText) { - await deliverReasoningText(split.reasoningText); } - if (split.answerText) { - await deliverAnswerText(split.answerText); - return; - } - if (split.reasoningText) { + if (segments.length > 0) { return; } @@ -709,9 +653,10 @@ export const dispatchTelegramMessage = async ({ replyOptions: { skillFilter, disableBlockStreaming, - onPartialReply: answerLane.stream - ? (payload) => updateDraftLanesFromPartial(payload.text) - : undefined, + onPartialReply: + answerLane.stream || reasoningLane.stream + ? (payload) => updateDraftLanesFromPartial(payload.text) + : undefined, onReasoningStream: reasoningLane.stream ? (payload) => { // Split between reasoning blocks only when the next reasoning @@ -722,14 +667,12 @@ export const dispatchTelegramMessage = async ({ resetDraftLaneState(reasoningLane); splitReasoningOnNextStream = false; } - const split = splitTelegramReasoningText(payload.text); - if (split.reasoningText) { - reasoningStepState.noteReasoningHint(); - reasoningStepState.noteReasoningDelivered(); - updateDraftFromPartial(reasoningLane, split.reasoningText); - } - if (split.answerText) { - updateDraftFromPartial(answerLane, split.answerText); + for (const segment of splitTextIntoLaneSegments(payload.text)) { + if (segment.lane === "reasoning") { + reasoningStepState.noteReasoningHint(); + reasoningStepState.noteReasoningDelivered(); + } + updateDraftFromPartial(lanes[segment.lane], segment.text); } } : undefined, @@ -759,12 +702,12 @@ export const dispatchTelegramMessage = async ({ Boolean(reasoningLane.stream) && answerLane.stream === reasoningLane.stream; await answerLane.stream?.stop(); - if (!finalizedViaPreviewMessage) { + if (!finalizedPreviewByLane.answer) { await answerLane.stream?.clear(); } if (!streamsShareHandle) { await reasoningLane.stream?.stop(); - if (!finalizedReasoningViaPreviewMessage) { + if (!finalizedPreviewByLane.reasoning) { await reasoningLane.stream?.clear(); } }