From 949bd980f93003dfafdf156a0465ea45297366fe Mon Sep 17 00:00:00 2001 From: Ion Mudreac Date: Tue, 17 Feb 2026 16:22:03 +0800 Subject: [PATCH] fix(telegram): prevent silent message loss across all streamMode settings --- src/telegram/bot-message-dispatch.test.ts | 192 +++++++++++++++++++++- src/telegram/bot-message-dispatch.ts | 67 ++++++-- src/telegram/bot/delivery.ts | 2 + src/telegram/draft-stream.ts | 1 + 4 files changed, 245 insertions(+), 17 deletions(-) diff --git a/src/telegram/bot-message-dispatch.test.ts b/src/telegram/bot-message-dispatch.test.ts index 8893628fd17..6e6536cd7f9 100644 --- a/src/telegram/bot-message-dispatch.test.ts +++ b/src/telegram/bot-message-dispatch.test.ts @@ -1,5 +1,5 @@ -import path from "node:path"; import type { Bot } from "grammy"; +import path from "node:path"; import { beforeEach, describe, expect, it, vi } from "vitest"; import { STATE_DIR } from "../config/paths.js"; @@ -479,4 +479,194 @@ describe("dispatchTelegramMessage draft streaming", () => { }), ); }); + + it("clears preview for error-only finals", async () => { + const draftStream = createDraftStream(999); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { + await dispatcherOptions.deliver({ text: "tool failed", isError: true }, { kind: "final" }); + await dispatcherOptions.deliver({ text: "another error", isError: true }, { kind: "final" }); + return { queuedFinal: true }; + }); + deliverReplies.mockResolvedValue({ delivered: true }); + + await dispatchWithContext({ context: createContext() }); + + // Error payloads skip preview finalization — preview must be cleaned up + expect(draftStream.clear).toHaveBeenCalledTimes(1); + }); + + it("clears preview after media final delivery", async () => { + const draftStream = createDraftStream(999); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { + await dispatcherOptions.deliver({ mediaUrl: "file:///tmp/a.png" }, { kind: "final" }); + return { queuedFinal: true }; + }); + deliverReplies.mockResolvedValue({ delivered: true }); + + await dispatchWithContext({ context: createContext() }); + + expect(draftStream.clear).toHaveBeenCalledTimes(1); + }); + + it("clears stale preview when response is NO_REPLY", async () => { + const draftStream = createDraftStream(999); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockResolvedValue({ + queuedFinal: false, + }); + + await dispatchWithContext({ context: createContext() }); + + // Preview contains stale partial text — must be cleaned up + expect(draftStream.clear).toHaveBeenCalledTimes(1); + }); + + it("falls back when all finals are skipped and clears preview", async () => { + const draftStream = createDraftStream(999); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { + dispatcherOptions.onSkip?.({ text: "" }, { reason: "no_reply", kind: "final" }); + return { queuedFinal: false }; + }); + deliverReplies.mockResolvedValueOnce({ delivered: true }); + + await dispatchWithContext({ context: createContext() }); + + expect(deliverReplies).toHaveBeenCalledWith( + expect.objectContaining({ + replies: [ + expect.objectContaining({ + text: expect.stringContaining("No response"), + }), + ], + }), + ); + expect(draftStream.clear).toHaveBeenCalledTimes(1); + }); + + it("sends fallback and clears preview when deliver throws (dispatcher swallows error)", async () => { + const draftStream = createDraftStream(); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { + try { + await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" }); + } catch (err) { + dispatcherOptions.onError(err, { kind: "final" }); + } + return { queuedFinal: false }; + }); + deliverReplies + .mockRejectedValueOnce(new Error("network down")) + .mockResolvedValueOnce({ delivered: true }); + + await expect(dispatchWithContext({ context: createContext() })).resolves.toBeUndefined(); + // Fallback should be sent because failedDeliveries > 0 + expect(deliverReplies).toHaveBeenCalledTimes(2); + expect(deliverReplies).toHaveBeenLastCalledWith( + expect.objectContaining({ + replies: [ + expect.objectContaining({ + text: expect.stringContaining("No response"), + }), + ], + }), + ); + expect(draftStream.clear).toHaveBeenCalledTimes(1); + }); + + it("sends fallback in off mode when deliver throws", async () => { + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { + try { + await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" }); + } catch (err) { + dispatcherOptions.onError(err, { kind: "final" }); + } + return { queuedFinal: false }; + }); + deliverReplies + .mockRejectedValueOnce(new Error("403 bot blocked")) + .mockResolvedValueOnce({ delivered: true }); + + await dispatchWithContext({ context: createContext(), streamMode: "off" }); + + expect(createTelegramDraftStream).not.toHaveBeenCalled(); + expect(deliverReplies).toHaveBeenCalledTimes(2); + expect(deliverReplies).toHaveBeenLastCalledWith( + expect.objectContaining({ + replies: [ + expect.objectContaining({ + text: expect.stringContaining("No response"), + }), + ], + }), + ); + }); + + it("handles error block + response final — error delivered, response finalizes preview", async () => { + const draftStream = createDraftStream(999); + createTelegramDraftStream.mockReturnValue(draftStream); + editMessageTelegram.mockResolvedValue({ ok: true }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + replyOptions?.onPartialReply?.({ text: "Processing..." }); + await dispatcherOptions.deliver( + { text: "⚠️ exec failed", isError: true }, + { kind: "block" }, + ); + await dispatcherOptions.deliver( + { text: "The command timed out. Here's what I found..." }, + { kind: "final" }, + ); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + + await dispatchWithContext({ context: createContext() }); + + // Block error went through deliverReplies + expect(deliverReplies).toHaveBeenCalledTimes(1); + // Final was finalized via preview edit + expect(editMessageTelegram).toHaveBeenCalledWith( + 123, + 999, + "The command timed out. Here's what I found...", + expect.any(Object), + ); + expect(draftStream.clear).not.toHaveBeenCalled(); + }); + + it("supports concurrent dispatches with independent previews", async () => { + const draftA = createDraftStream(11); + const draftB = createDraftStream(22); + createTelegramDraftStream.mockReturnValueOnce(draftA).mockReturnValueOnce(draftB); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "partial" }); + await dispatcherOptions.deliver({ mediaUrl: "file:///tmp/a.png" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + + await Promise.all([ + dispatchWithContext({ + context: createContext({ + chatId: 1, + msg: { chat: { id: 1, type: "private" }, message_id: 1 } as never, + }), + }), + dispatchWithContext({ + context: createContext({ + chatId: 2, + msg: { chat: { id: 2, type: "private" }, message_id: 2 } as never, + }), + }), + ]); + + expect(draftA.clear).toHaveBeenCalledTimes(1); + expect(draftB.clear).toHaveBeenCalledTimes(1); + }); }); diff --git a/src/telegram/bot-message-dispatch.ts b/src/telegram/bot-message-dispatch.ts index 7cfd0778790..a33b65019b9 100644 --- a/src/telegram/bot-message-dispatch.ts +++ b/src/telegram/bot-message-dispatch.ts @@ -1,4 +1,10 @@ import type { Bot } from "grammy"; +import type { OpenClawConfig, ReplyToMode, TelegramAccountConfig } from "../config/types.js"; +import type { RuntimeEnv } from "../runtime.js"; +import type { TelegramMessageContext } from "./bot-message-context.js"; +import type { TelegramBotOptions } from "./bot.js"; +import type { TelegramStreamMode } from "./bot/types.js"; +import type { TelegramInlineButtons } from "./button-types.js"; import { resolveAgentDir } from "../agents/agent-scope.js"; import { findModelInCatalog, @@ -15,15 +21,9 @@ import { logAckFailure, logTypingFailure } from "../channels/logging.js"; import { createReplyPrefixOptions } from "../channels/reply-prefix.js"; import { createTypingCallbacks } from "../channels/typing.js"; import { resolveMarkdownTableMode } from "../config/markdown-tables.js"; -import type { OpenClawConfig, ReplyToMode, TelegramAccountConfig } from "../config/types.js"; import { danger, logVerbose } from "../globals.js"; import { getAgentScopedMediaLocalRoots } from "../media/local-roots.js"; -import type { RuntimeEnv } from "../runtime.js"; -import type { TelegramMessageContext } from "./bot-message-context.js"; -import type { TelegramBotOptions } from "./bot.js"; import { deliverReplies } from "./bot/delivery.js"; -import type { TelegramStreamMode } from "./bot/types.js"; -import type { TelegramInlineButtons } from "./button-types.js"; import { resolveTelegramDraftStreamingChunking } from "./draft-chunking.js"; import { createTelegramDraftStream } from "./draft-stream.js"; import { editMessageTelegram } from "./send.js"; @@ -189,11 +189,13 @@ export const dispatchTelegramMessage = async ({ }; const disableBlockStreaming = - typeof telegramCfg.blockStreaming === "boolean" - ? !telegramCfg.blockStreaming - : draftStream || streamMode === "off" - ? true - : undefined; + streamMode === "off" + ? true // off mode must always disable block streaming + : typeof telegramCfg.blockStreaming === "boolean" + ? !telegramCfg.blockStreaming + : draftStream + ? true + : undefined; const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({ cfg, @@ -269,8 +271,26 @@ export const dispatchTelegramMessage = async ({ const deliveryState = { delivered: false, skippedNonSilent: 0, + failedDeliveries: 0, }; let finalizedViaPreviewMessage = false; + + /** + * Clean up the draft preview message. The preview must be removed in every + * case EXCEPT when it was successfully finalized as the actual response via + * an in-place edit (`finalizedViaPreviewMessage === true`). + */ + const clearDraftPreviewIfNeeded = async () => { + if (finalizedViaPreviewMessage) { + return; + } + try { + await draftStream?.clear(); + } catch (err) { + logVerbose(`telegram: draft preview cleanup failed: ${String(err)}`); + } + }; + const clearGroupHistory = () => { if (isGroup && historyKey) { clearHistoryEntriesIfEnabled({ historyMap: groupHistories, historyKey, limit: historyLimit }); @@ -340,6 +360,9 @@ export const dispatchTelegramMessage = async ({ }); finalizedViaPreviewMessage = true; deliveryState.delivered = true; + logVerbose( + `telegram: finalized response via preview edit (messageId=${previewMessageId})`, + ); return; } catch (err) { logVerbose( @@ -382,6 +405,9 @@ export const dispatchTelegramMessage = async ({ }); finalizedViaPreviewMessage = true; deliveryState.delivered = true; + logVerbose( + `telegram: finalized response via post-stop preview edit (messageId=${messageIdAfterStop})`, + ); return; } catch (err) { logVerbose( @@ -397,6 +423,13 @@ export const dispatchTelegramMessage = async ({ }); if (result.delivered) { deliveryState.delivered = true; + logVerbose( + `telegram: ${info.kind} reply delivered to chat ${chatId}${payload.isError ? " (error payload)" : ""}`, + ); + } else { + logVerbose( + `telegram: ${info.kind} reply delivery returned not-delivered for chat ${chatId}`, + ); } }, onSkip: (_payload, info) => { @@ -405,6 +438,7 @@ export const dispatchTelegramMessage = async ({ } }, onError: (err, info) => { + deliveryState.failedDeliveries += 1; runtime.error?.(danger(`telegram ${info.kind} reply failed: ${String(err)}`)); }, onReplyStart: createTypingCallbacks({ @@ -454,14 +488,13 @@ export const dispatchTelegramMessage = async ({ }, })); } finally { - // Must stop() first to flush debounced content before clear() wipes state await draftStream?.stop(); - if (!finalizedViaPreviewMessage) { - await draftStream?.clear(); - } } let sentFallback = false; - if (!deliveryState.delivered && deliveryState.skippedNonSilent > 0) { + if ( + !deliveryState.delivered && + (deliveryState.skippedNonSilent > 0 || deliveryState.failedDeliveries > 0) + ) { const result = await deliverReplies({ replies: [{ text: EMPTY_RESPONSE_FALLBACK }], ...deliveryBaseOptions, @@ -469,6 +502,8 @@ export const dispatchTelegramMessage = async ({ sentFallback = result.delivered; } + await clearDraftPreviewIfNeeded(); + const hasFinalResponse = queuedFinal || sentFallback; if (!hasFinalResponse) { clearGroupHistory(); diff --git a/src/telegram/bot/delivery.ts b/src/telegram/bot/delivery.ts index b5fbfc434e5..5e0efa652c3 100644 --- a/src/telegram/bot/delivery.ts +++ b/src/telegram/bot/delivery.ts @@ -558,6 +558,7 @@ async function sendTelegramText( ...baseParams, }), }); + runtime.log?.(`telegram sendMessage ok chat=${chatId} message=${res.message_id}`); return res.message_id; } catch (err) { const errText = formatErrorMessage(err); @@ -574,6 +575,7 @@ async function sendTelegramText( ...baseParams, }), }); + runtime.log?.(`telegram sendMessage ok chat=${chatId} message=${res.message_id} (plain)`); return res.message_id; } throw err; diff --git a/src/telegram/draft-stream.ts b/src/telegram/draft-stream.ts index 1d8d8e81f04..9d87358671d 100644 --- a/src/telegram/draft-stream.ts +++ b/src/telegram/draft-stream.ts @@ -127,6 +127,7 @@ export function createTelegramDraftStream(params: { } try { await params.api.deleteMessage(chatId, messageId); + params.log?.(`telegram stream preview deleted (chat=${chatId}, message=${messageId})`); } catch (err) { params.warn?.( `telegram stream preview cleanup failed: ${err instanceof Error ? err.message : String(err)}`,