fix(telegram): stream replies in-place without duplicate final sends

This commit is contained in:
Ayaan Zaidi
2026-02-15 20:09:10 +05:30
committed by Ayaan Zaidi
parent 8b2a5672be
commit a69e82765f
17 changed files with 575 additions and 210 deletions

View File

@@ -21,7 +21,7 @@ title: grammY
- **Webhook support:** `webhook-set.ts` wraps `setWebhook/deleteWebhook`; `webhook.ts` hosts the callback with health + graceful shutdown. Gateway enables webhook mode when `channels.telegram.webhookUrl` + `channels.telegram.webhookSecret` are set (otherwise it long-polls).
- **Sessions:** direct chats collapse into the agent main session (`agent:<agentId>:<mainKey>`); groups use `agent:<agentId>:telegram:group:<chatId>`; replies route back to the same channel.
- **Config knobs:** `channels.telegram.botToken`, `channels.telegram.dmPolicy`, `channels.telegram.groups` (allowlist + mention defaults), `channels.telegram.allowFrom`, `channels.telegram.groupAllowFrom`, `channels.telegram.groupPolicy`, `channels.telegram.mediaMaxMb`, `channels.telegram.linkPreview`, `channels.telegram.proxy`, `channels.telegram.webhookSecret`, `channels.telegram.webhookUrl`, `channels.telegram.webhookHost`.
- **Draft streaming:** optional `channels.telegram.streamMode` uses `sendMessageDraft` in private topic chats (Bot API 9.3+). This is separate from channel block streaming.
- **Live stream preview:** optional `channels.telegram.streamMode` sends a temporary message and updates it with `editMessageText`. This is separate from channel block streaming.
- **Tests:** grammy mocks cover DM + group mention gating and outbound send; more media/webhook fixtures still welcome.
Open questions

View File

@@ -221,23 +221,20 @@ curl "https://api.telegram.org/bot<bot_token>/getUpdates"
## Feature reference
<AccordionGroup>
<Accordion title="Draft streaming in Telegram DMs">
OpenClaw can stream partial replies with Telegram draft bubbles (`sendMessageDraft`).
<Accordion title="Live stream preview (message edits)">
OpenClaw can stream partial replies by sending a temporary Telegram message and editing it as text arrives.
Requirements:
Requirement:
- `channels.telegram.streamMode` is not `"off"` (default: `"partial"`)
- private chat
- inbound update includes `message_thread_id`
- bot topics are enabled (`getMe().has_topics_enabled`)
Modes:
- `off`: no draft streaming
- `partial`: frequent draft updates from partial text
- `block`: chunked draft updates using `channels.telegram.draftChunk`
- `off`: no live preview
- `partial`: frequent preview updates from partial text
- `block`: chunked preview updates using `channels.telegram.draftChunk`
`draftChunk` defaults for block mode:
`draftChunk` defaults for `streamMode: "block"`:
- `minChars: 200`
- `maxChars: 800`
@@ -245,13 +242,17 @@ curl "https://api.telegram.org/bot<bot_token>/getUpdates"
`maxChars` is clamped by `channels.telegram.textChunkLimit`.
Draft streaming is DM-only; groups/channels do not use draft bubbles.
This works in direct chats and groups/topics.
If you want early real Telegram messages instead of draft updates, use block streaming (`channels.telegram.blockStreaming: true`).
For text-only replies, OpenClaw keeps the same preview message and performs a final edit in place (no second message).
For complex replies (for example media payloads), OpenClaw falls back to normal final delivery and then cleans up the preview message.
`streamMode` is separate from block streaming. When block streaming is explicitly enabled for Telegram, OpenClaw skips the preview stream to avoid double-streaming.
Telegram-only reasoning stream:
- `/reasoning stream` sends reasoning to the draft bubble while generating
- `/reasoning stream` sends reasoning to the live preview while generating
- final answer is sent without reasoning text
</Accordion>
@@ -703,7 +704,7 @@ Primary reference:
- `channels.telegram.textChunkLimit`: outbound chunk size (chars).
- `channels.telegram.chunkMode`: `length` (default) or `newline` to split on blank lines (paragraph boundaries) before length chunking.
- `channels.telegram.linkPreview`: toggle link previews for outbound messages (default: true).
- `channels.telegram.streamMode`: `off | partial | block` (draft streaming).
- `channels.telegram.streamMode`: `off | partial | block` (live stream preview).
- `channels.telegram.mediaMaxMb`: inbound/outbound media cap (MB).
- `channels.telegram.retry`: retry policy for outbound Telegram API calls (attempts, minDelayMs, maxDelayMs, jitter).
- `channels.telegram.network.autoSelectFamily`: override Node autoSelectFamily (true=enable, false=disable). Defaults to disabled on Node 22 to avoid Happy Eyeballs timeouts.
@@ -727,7 +728,7 @@ Telegram-specific high-signal fields:
- access control: `dmPolicy`, `allowFrom`, `groupPolicy`, `groupAllowFrom`, `groups`, `groups.*.topics.*`
- command/menu: `commands.native`, `customCommands`
- threading/replies: `replyToMode`
- streaming: `streamMode`, `draftChunk`, `blockStreaming`
- streaming: `streamMode` (preview), `draftChunk`, `blockStreaming`
- formatting/delivery: `textChunkLimit`, `chunkMode`, `linkPreview`, `responsePrefix`
- media/network: `mediaMaxMb`, `timeoutSeconds`, `retry`, `network.autoSelectFamily`, `proxy`
- webhook: `webhookUrl`, `webhookSecret`, `webhookPath`, `webhookHost`

View File

@@ -1,9 +1,9 @@
---
summary: "Streaming + chunking behavior (block replies, draft streaming, limits)"
summary: "Streaming + chunking behavior (block replies, Telegram preview streaming, limits)"
read_when:
- Explaining how streaming or chunking works on channels
- Changing block streaming or channel chunking behavior
- Debugging duplicate/early block replies or draft streaming
- Debugging duplicate/early block replies or Telegram preview streaming
title: "Streaming and Chunking"
---
@@ -12,9 +12,9 @@ title: "Streaming and Chunking"
OpenClaw has two separate “streaming” layers:
- **Block streaming (channels):** emit completed **blocks** as the assistant writes. These are normal channel messages (not token deltas).
- **Token-ish streaming (Telegram only):** update a **draft bubble** with partial text while generating; final message is sent at the end.
- **Token-ish streaming (Telegram only):** update a temporary **preview message** with partial text while generating.
There is **no real token streaming** to external channel messages today. Telegram draft streaming is the only partial-stream surface.
There is **no true token-delta streaming** to channel messages today. Telegram preview streaming is the only partial-stream surface.
## Block streaming (channel messages)
@@ -99,37 +99,38 @@ This maps to:
- **No block streaming:** `blockStreamingDefault: "off"` (only final reply).
**Channel note:** For non-Telegram channels, block streaming is **off unless**
`*.blockStreaming` is explicitly set to `true`. Telegram can stream drafts
`*.blockStreaming` is explicitly set to `true`. Telegram can stream a live preview
(`channels.telegram.streamMode`) without block replies.
Config location reminder: the `blockStreaming*` defaults live under
`agents.defaults`, not the root config.
## Telegram draft streaming (token-ish)
## Telegram preview streaming (token-ish)
Telegram is the only channel with draft streaming:
Telegram is the only channel with live preview streaming:
- Uses Bot API `sendMessageDraft` in **private chats with topics**.
- Uses Bot API `sendMessage` (first update) + `editMessageText` (subsequent updates).
- `channels.telegram.streamMode: "partial" | "block" | "off"`.
- `partial`: draft updates with the latest stream text.
- `block`: draft updates in chunked blocks (same chunker rules).
- `off`: no draft streaming.
- Draft chunk config (only for `streamMode: "block"`): `channels.telegram.draftChunk` (defaults: `minChars: 200`, `maxChars: 800`).
- Draft streaming is separate from block streaming; block replies are off by default and only enabled by `*.blockStreaming: true` on non-Telegram channels.
- Final reply is still a normal message.
- `/reasoning stream` writes reasoning into the draft bubble (Telegram only).
When draft streaming is active, OpenClaw disables block streaming for that reply to avoid double-streaming.
- `partial`: preview updates with latest stream text.
- `block`: preview updates in chunked blocks (same chunker rules).
- `off`: no preview streaming.
- Preview chunk config (only for `streamMode: "block"`): `channels.telegram.draftChunk` (defaults: `minChars: 200`, `maxChars: 800`).
- Preview streaming is separate from block streaming.
- When Telegram block streaming is explicitly enabled, preview streaming is skipped to avoid double-streaming.
- Text-only finals are applied by editing the preview message in place.
- Non-text/complex finals fall back to normal final message delivery.
- `/reasoning stream` writes reasoning into the live preview (Telegram only).
```
Telegram (private + topics)
└─ sendMessageDraft (draft bubble)
├─ streamMode=partial → update latest text
└─ streamMode=block → chunker updates draft
└─ final reply → normal message
Telegram
└─ sendMessage (temporary preview message)
├─ streamMode=partial → edit latest text
└─ streamMode=block → chunker + edit updates
└─ final text-only reply → final edit on same message
└─ fallback: cleanup preview + normal final delivery (media/complex)
```
Legend:
- `sendMessageDraft`: Telegram draft bubble (not a real message).
- `final reply`: normal Telegram message send.
- `preview message`: temporary Telegram message updated during generation.
- `final edit`: in-place edit on the same preview message (text-only).

View File

@@ -155,7 +155,7 @@ WhatsApp runs through the gateway's web channel (Baileys Web). It starts automat
- Bot token: `channels.telegram.botToken` or `channels.telegram.tokenFile`, with `TELEGRAM_BOT_TOKEN` as fallback for the default account.
- `configWrites: false` blocks Telegram-initiated config writes (supergroup ID migrations, `/config set|unset`).
- Draft streaming uses Telegram `sendMessageDraft` (requires private chat topics).
- Telegram stream previews use `sendMessage` + `editMessageText` (works in direct and group chats).
- Retry policy: see [Retry policy](/concepts/retry).
### Discord

View File

@@ -335,11 +335,11 @@ export const FIELD_HELP: Record<string, string> = {
"channels.telegram.dmPolicy":
'Direct message access control ("pairing" recommended). "open" requires channels.telegram.allowFrom=["*"].',
"channels.telegram.streamMode":
"Draft streaming mode for Telegram replies (off | partial | block). Separate from block streaming; requires private topics + sendMessageDraft.",
"Live stream preview mode for Telegram replies (off | partial | block). Separate from block streaming; uses sendMessage + editMessageText.",
"channels.telegram.draftChunk.minChars":
'Minimum chars before emitting a Telegram draft update when channels.telegram.streamMode="block" (default: 200).',
'Minimum chars before emitting a Telegram stream preview update when channels.telegram.streamMode="block" (default: 200).',
"channels.telegram.draftChunk.maxChars":
'Target max size for a Telegram draft update chunk when channels.telegram.streamMode="block" (default: 800; clamped to channels.telegram.textChunkLimit).',
'Target max size for a Telegram stream preview chunk when channels.telegram.streamMode="block" (default: 800; clamped to channels.telegram.textChunkLimit).',
"channels.telegram.draftChunk.breakPreference":
"Preferred breakpoints for Telegram draft chunks (paragraph | newline | sentence). Default: paragraph.",
"channels.telegram.retry.attempts":

View File

@@ -237,7 +237,7 @@ export const FIELD_LABELS: Record<string, string> = {
...IRC_FIELD_LABELS,
"channels.telegram.botToken": "Telegram Bot Token",
"channels.telegram.dmPolicy": "Telegram DM Policy",
"channels.telegram.streamMode": "Telegram Draft Stream Mode",
"channels.telegram.streamMode": "Telegram Stream Mode",
"channels.telegram.draftChunk.minChars": "Telegram Draft Chunk Min Chars",
"channels.telegram.draftChunk.maxChars": "Telegram Draft Chunk Max Chars",
"channels.telegram.draftChunk.breakPreference": "Telegram Draft Chunk Break Preference",

View File

@@ -93,11 +93,11 @@ export type TelegramAccountConfig = {
chunkMode?: "length" | "newline";
/** Disable block streaming for this account. */
blockStreaming?: boolean;
/** Chunking config for draft streaming in `streamMode: "block"`. */
/** Chunking config for Telegram stream previews in `streamMode: "block"`. */
draftChunk?: BlockStreamingChunkConfig;
/** Merge streamed block replies before sending. */
blockStreamingCoalesce?: BlockStreamingCoalesceConfig;
/** Draft streaming mode for Telegram (off|partial|block). Default: partial. */
/** Telegram stream preview mode (off|partial|block). Default: partial. */
streamMode?: "off" | "partial" | "block";
mediaMaxMb?: number;
/** Telegram API client timeout in seconds (grammY ApiClientOptions). */

View File

@@ -4,6 +4,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest";
const createTelegramDraftStream = vi.hoisted(() => vi.fn());
const dispatchReplyWithBufferedBlockDispatcher = vi.hoisted(() => vi.fn());
const deliverReplies = vi.hoisted(() => vi.fn());
const editMessageTelegram = vi.hoisted(() => vi.fn());
vi.mock("./draft-stream.js", () => ({
createTelegramDraftStream,
@@ -17,6 +18,10 @@ vi.mock("./bot/delivery.js", () => ({
deliverReplies,
}));
vi.mock("./send.js", () => ({
editMessageTelegram,
}));
vi.mock("./sticker-cache.js", () => ({
cacheSticker: vi.fn(),
describeStickerImage: vi.fn(),
@@ -29,12 +34,15 @@ describe("dispatchTelegramMessage draft streaming", () => {
createTelegramDraftStream.mockReset();
dispatchReplyWithBufferedBlockDispatcher.mockReset();
deliverReplies.mockReset();
editMessageTelegram.mockReset();
});
it("streams drafts in private threads and forwards thread id", async () => {
const draftStream = {
update: vi.fn(),
flush: vi.fn().mockResolvedValue(undefined),
messageId: vi.fn().mockReturnValue(undefined),
clear: vi.fn().mockResolvedValue(undefined),
stop: vi.fn(),
};
createTelegramDraftStream.mockReturnValue(draftStream);
@@ -47,7 +55,6 @@ describe("dispatchTelegramMessage draft streaming", () => {
);
deliverReplies.mockResolvedValue({ delivered: true });
const resolveBotTopicsEnabled = vi.fn().mockResolvedValue(true);
const context = {
ctxPayload: {},
primaryCtx: { message: { chat: { id: 123, type: "private" } } },
@@ -73,7 +80,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
removeAckAfterReply: false,
};
const bot = { api: { sendMessageDraft: vi.fn() } } as unknown as Bot;
const bot = { api: { sendMessage: vi.fn(), editMessageText: vi.fn() } } as unknown as Bot;
const runtime = {
log: vi.fn(),
error: vi.fn(),
@@ -92,10 +99,8 @@ describe("dispatchTelegramMessage draft streaming", () => {
textLimit: 4096,
telegramCfg: {},
opts: { token: "token" },
resolveBotTopicsEnabled,
});
expect(resolveBotTopicsEnabled).toHaveBeenCalledWith(context.primaryCtx);
expect(createTelegramDraftStream).toHaveBeenCalledWith(
expect.objectContaining({
chatId: 123,
@@ -108,5 +113,221 @@ describe("dispatchTelegramMessage draft streaming", () => {
thread: { id: 777, scope: "dm" },
}),
);
expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledWith(
expect.objectContaining({
replyOptions: expect.objectContaining({
disableBlockStreaming: true,
}),
}),
);
expect(editMessageTelegram).not.toHaveBeenCalled();
expect(draftStream.clear).toHaveBeenCalledTimes(1);
});
it("keeps block streaming enabled when account config enables it", async () => {
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
const context = {
ctxPayload: {},
primaryCtx: { message: { chat: { id: 123, type: "private" } } },
msg: {
chat: { id: 123, type: "private" },
message_id: 456,
message_thread_id: 777,
},
chatId: 123,
isGroup: false,
resolvedThreadId: undefined,
replyThreadId: 777,
threadSpec: { id: 777, scope: "dm" },
historyKey: undefined,
historyLimit: 0,
groupHistories: new Map(),
route: { agentId: "default", accountId: "default" },
skillFilter: undefined,
sendTyping: vi.fn(),
sendRecordVoice: vi.fn(),
ackReactionPromise: null,
reactionApi: null,
removeAckAfterReply: false,
};
const bot = { api: { sendMessage: vi.fn(), editMessageText: vi.fn() } } as unknown as Bot;
const runtime = {
log: vi.fn(),
error: vi.fn(),
exit: () => {
throw new Error("exit");
},
};
await dispatchTelegramMessage({
context,
bot,
cfg: {},
runtime,
replyToMode: "first",
streamMode: "partial",
textLimit: 4096,
telegramCfg: { blockStreaming: true },
opts: { token: "token" },
});
expect(createTelegramDraftStream).not.toHaveBeenCalled();
expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledWith(
expect.objectContaining({
replyOptions: expect.objectContaining({
disableBlockStreaming: false,
onPartialReply: undefined,
}),
}),
);
});
it("finalizes text-only replies by editing the preview message in place", async () => {
const draftStream = {
update: vi.fn(),
flush: vi.fn().mockResolvedValue(undefined),
messageId: vi.fn().mockReturnValue(999),
clear: vi.fn().mockResolvedValue(undefined),
stop: vi.fn(),
};
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Hel" });
await dispatcherOptions.deliver({ text: "Hello final" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" });
const context = {
ctxPayload: {},
primaryCtx: { message: { chat: { id: 123, type: "private" } } },
msg: {
chat: { id: 123, type: "private" },
message_id: 456,
message_thread_id: 777,
},
chatId: 123,
isGroup: false,
resolvedThreadId: undefined,
replyThreadId: 777,
threadSpec: { id: 777, scope: "dm" },
historyKey: undefined,
historyLimit: 0,
groupHistories: new Map(),
route: { agentId: "default", accountId: "default" },
skillFilter: undefined,
sendTyping: vi.fn(),
sendRecordVoice: vi.fn(),
ackReactionPromise: null,
reactionApi: null,
removeAckAfterReply: false,
};
const bot = { api: { sendMessage: vi.fn(), editMessageText: vi.fn() } } as unknown as Bot;
const runtime = {
log: vi.fn(),
error: vi.fn(),
exit: () => {
throw new Error("exit");
},
};
await dispatchTelegramMessage({
context,
bot,
cfg: {},
runtime,
replyToMode: "first",
streamMode: "partial",
textLimit: 4096,
telegramCfg: {},
opts: { token: "token" },
});
expect(editMessageTelegram).toHaveBeenCalledWith(123, 999, "Hello final", expect.any(Object));
expect(deliverReplies).not.toHaveBeenCalled();
expect(draftStream.clear).not.toHaveBeenCalled();
expect(draftStream.stop).toHaveBeenCalled();
});
it("falls back to normal delivery when preview final is too long to edit", async () => {
const draftStream = {
update: vi.fn(),
flush: vi.fn().mockResolvedValue(undefined),
messageId: vi.fn().mockReturnValue(999),
clear: vi.fn().mockResolvedValue(undefined),
stop: vi.fn(),
};
createTelegramDraftStream.mockReturnValue(draftStream);
const longText = "x".repeat(5000);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: longText }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" });
const context = {
ctxPayload: {},
primaryCtx: { message: { chat: { id: 123, type: "private" } } },
msg: {
chat: { id: 123, type: "private" },
message_id: 456,
message_thread_id: 777,
},
chatId: 123,
isGroup: false,
resolvedThreadId: undefined,
replyThreadId: 777,
threadSpec: { id: 777, scope: "dm" },
historyKey: undefined,
historyLimit: 0,
groupHistories: new Map(),
route: { agentId: "default", accountId: "default" },
skillFilter: undefined,
sendTyping: vi.fn(),
sendRecordVoice: vi.fn(),
ackReactionPromise: null,
reactionApi: null,
removeAckAfterReply: false,
};
const bot = { api: { sendMessage: vi.fn(), editMessageText: vi.fn() } } as unknown as Bot;
const runtime = {
log: vi.fn(),
error: vi.fn(),
exit: () => {
throw new Error("exit");
},
};
await dispatchTelegramMessage({
context,
bot,
cfg: {},
runtime,
replyToMode: "first",
streamMode: "partial",
textLimit: 4096,
telegramCfg: {},
opts: { token: "token" },
});
expect(editMessageTelegram).not.toHaveBeenCalled();
expect(deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
replies: [expect.objectContaining({ text: longText })],
}),
);
expect(draftStream.clear).toHaveBeenCalledTimes(1);
expect(draftStream.stop).toHaveBeenCalled();
});
});

View File

@@ -3,7 +3,7 @@ import type { OpenClawConfig, ReplyToMode, TelegramAccountConfig } from "../conf
import type { RuntimeEnv } from "../runtime.js";
import type { TelegramMessageContext } from "./bot-message-context.js";
import type { TelegramBotOptions } from "./bot.js";
import type { TelegramStreamMode, TelegramContext } from "./bot/types.js";
import type { TelegramStreamMode } from "./bot/types.js";
import { resolveAgentDir } from "../agents/agent-scope.js";
import {
findModelInCatalog,
@@ -24,6 +24,7 @@ import { danger, logVerbose } from "../globals.js";
import { deliverReplies } from "./bot/delivery.js";
import { resolveTelegramDraftStreamingChunking } from "./draft-chunking.js";
import { createTelegramDraftStream } from "./draft-stream.js";
import { editMessageTelegram } from "./send.js";
import { cacheSticker, describeStickerImage } from "./sticker-cache.js";
const EMPTY_RESPONSE_FALLBACK = "No response generated. Please try again.";
@@ -42,8 +43,6 @@ async function resolveStickerVisionSupport(cfg: OpenClawConfig, agentId: string)
}
}
type ResolveBotTopicsEnabled = (ctx: TelegramContext) => boolean | Promise<boolean>;
type DispatchTelegramMessageParams = {
context: TelegramMessageContext;
bot: Bot;
@@ -54,7 +53,6 @@ type DispatchTelegramMessageParams = {
textLimit: number;
telegramCfg: TelegramAccountConfig;
opts: Pick<TelegramBotOptions, "token">;
resolveBotTopicsEnabled: ResolveBotTopicsEnabled;
};
export const dispatchTelegramMessage = async ({
@@ -67,11 +65,9 @@ export const dispatchTelegramMessage = async ({
textLimit,
telegramCfg,
opts,
resolveBotTopicsEnabled,
}: DispatchTelegramMessageParams) => {
const {
ctxPayload,
primaryCtx,
msg,
chatId,
isGroup,
@@ -88,19 +84,16 @@ export const dispatchTelegramMessage = async ({
removeAckAfterReply,
} = context;
const isPrivateChat = msg.chat.type === "private";
const draftThreadId = threadSpec.id;
const draftMaxChars = Math.min(textLimit, 4096);
const canStreamDraft =
streamMode !== "off" &&
isPrivateChat &&
typeof draftThreadId === "number" &&
(await resolveBotTopicsEnabled(primaryCtx));
const accountBlockStreamingEnabled =
typeof telegramCfg.blockStreaming === "boolean"
? telegramCfg.blockStreaming
: cfg.agents?.defaults?.blockStreamingDefault === "on";
const canStreamDraft = streamMode !== "off" && !accountBlockStreamingEnabled;
const draftStream = canStreamDraft
? createTelegramDraftStream({
api: bot.api,
chatId,
draftId: msg.message_id || Date.now(),
maxChars: draftMaxChars,
thread: threadSpec,
log: logVerbose,
@@ -172,8 +165,11 @@ export const dispatchTelegramMessage = async ({
};
const disableBlockStreaming =
Boolean(draftStream) ||
(typeof telegramCfg.blockStreaming === "boolean" ? !telegramCfg.blockStreaming : undefined);
typeof telegramCfg.blockStreaming === "boolean"
? !telegramCfg.blockStreaming
: draftStream
? true
: undefined;
const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({
cfg,
@@ -250,64 +246,109 @@ export const dispatchTelegramMessage = async ({
delivered: false,
skippedNonSilent: 0,
};
let finalizedViaPreviewMessage = false;
const { queuedFinal } = await dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg,
dispatcherOptions: {
...prefixOptions,
deliver: async (payload, info) => {
if (info.kind === "final") {
await flushDraft();
draftStream?.stop();
}
const result = await deliverReplies({
replies: [payload],
chatId: String(chatId),
token: opts.token,
runtime,
bot,
replyToMode,
textLimit,
thread: threadSpec,
tableMode,
chunkMode,
onVoiceRecording: sendRecordVoice,
linkPreview: telegramCfg.linkPreview,
replyQuoteText,
});
if (result.delivered) {
deliveryState.delivered = true;
}
},
onSkip: (_payload, info) => {
if (info.reason !== "silent") {
deliveryState.skippedNonSilent += 1;
}
},
onError: (err, info) => {
runtime.error?.(danger(`telegram ${info.kind} reply failed: ${String(err)}`));
},
onReplyStart: createTypingCallbacks({
start: sendTyping,
onStartError: (err) => {
logTypingFailure({
log: logVerbose,
channel: "telegram",
target: String(chatId),
error: err,
let queuedFinal = false;
try {
({ queuedFinal } = await dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg,
dispatcherOptions: {
...prefixOptions,
deliver: async (payload, info) => {
if (info.kind === "final") {
await flushDraft();
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
const previewMessageId = draftStream?.messageId();
const previewButtons = (
payload.channelData?.telegram as
| { buttons?: Array<Array<{ text: string; callback_data: string }>> }
| undefined
)?.buttons;
let draftStoppedForPreviewEdit = false;
if (!hasMedia && payload.text && typeof previewMessageId === "number") {
const canFinalizeViaPreviewEdit = payload.text.length <= draftMaxChars;
if (canFinalizeViaPreviewEdit) {
draftStream?.stop();
draftStoppedForPreviewEdit = true;
try {
await editMessageTelegram(chatId, previewMessageId, payload.text, {
api: bot.api,
cfg,
accountId: route.accountId,
linkPreview: telegramCfg.linkPreview,
buttons: previewButtons,
});
finalizedViaPreviewMessage = true;
deliveryState.delivered = true;
return;
} catch (err) {
logVerbose(
`telegram: preview final edit failed; falling back to standard send (${String(err)})`,
);
}
} else {
logVerbose(
`telegram: preview final too long for edit (${payload.text.length} > ${draftMaxChars}); falling back to standard send`,
);
}
}
if (!draftStoppedForPreviewEdit) {
draftStream?.stop();
}
}
const result = await deliverReplies({
replies: [payload],
chatId: String(chatId),
token: opts.token,
runtime,
bot,
replyToMode,
textLimit,
thread: threadSpec,
tableMode,
chunkMode,
onVoiceRecording: sendRecordVoice,
linkPreview: telegramCfg.linkPreview,
replyQuoteText,
});
if (result.delivered) {
deliveryState.delivered = true;
}
},
}).onReplyStart,
},
replyOptions: {
skillFilter,
disableBlockStreaming,
onPartialReply: draftStream ? (payload) => updateDraftFromPartial(payload.text) : undefined,
onModelSelected,
},
});
draftStream?.stop();
onSkip: (_payload, info) => {
if (info.reason !== "silent") {
deliveryState.skippedNonSilent += 1;
}
},
onError: (err, info) => {
runtime.error?.(danger(`telegram ${info.kind} reply failed: ${String(err)}`));
},
onReplyStart: createTypingCallbacks({
start: sendTyping,
onStartError: (err) => {
logTypingFailure({
log: logVerbose,
channel: "telegram",
target: String(chatId),
error: err,
});
},
}).onReplyStart,
},
replyOptions: {
skillFilter,
disableBlockStreaming,
onPartialReply: draftStream ? (payload) => updateDraftFromPartial(payload.text) : undefined,
onModelSelected,
},
}));
} finally {
if (!finalizedViaPreviewMessage) {
await draftStream?.clear();
}
draftStream?.stop();
}
let sentFallback = false;
if (!deliveryState.delivered && deliveryState.skippedNonSilent > 0) {
const result = await deliverReplies({

View File

@@ -36,10 +36,9 @@ describe("telegram bot message processor", () => {
resolveTelegramGroupConfig: () => ({}),
runtime: {},
replyToMode: "auto",
streamMode: "auto",
streamMode: "partial",
textLimit: 4096,
opts: {},
resolveBotTopicsEnabled: () => false,
};
it("dispatches when context is available", async () => {

View File

@@ -21,7 +21,6 @@ type TelegramMessageProcessorDeps = Omit<
streamMode: TelegramStreamMode;
textLimit: number;
opts: Pick<TelegramBotOptions, "token">;
resolveBotTopicsEnabled: (ctx: TelegramContext) => boolean | Promise<boolean>;
};
export const createTelegramMessageProcessor = (deps: TelegramMessageProcessorDeps) => {
@@ -45,7 +44,6 @@ export const createTelegramMessageProcessor = (deps: TelegramMessageProcessorDep
streamMode,
textLimit,
opts,
resolveBotTopicsEnabled,
} = deps;
return async (
@@ -86,7 +84,6 @@ export const createTelegramMessageProcessor = (deps: TelegramMessageProcessorDep
textLimit,
telegramCfg,
opts,
resolveBotTopicsEnabled,
});
};
};

View File

@@ -5,7 +5,6 @@ import { type Message, type UserFromGetMe, ReactionTypeEmoji } from "@grammyjs/t
import { Bot, webhookCallback } from "grammy";
import type { OpenClawConfig, ReplyToMode } from "../config/config.js";
import type { RuntimeEnv } from "../runtime.js";
import type { TelegramContext } from "./bot/types.js";
import { resolveDefaultAgentId } from "../agents/agent-scope.js";
import { resolveTextChunkLimit } from "../auto-reply/chunk.js";
import { isControlCommandMessage } from "../auto-reply/command-detection.js";
@@ -28,7 +27,6 @@ import { getChildLogger } from "../logging.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { resolveAgentRoute } from "../routing/resolve-route.js";
import { resolveTelegramAccount } from "./accounts.js";
import { withTelegramApiErrorLogging } from "./api-logging.js";
import { registerTelegramHandlers } from "./bot-handlers.js";
import { createTelegramMessageProcessor } from "./bot-message.js";
import { registerTelegramNativeCommands } from "./bot-native-commands.js";
@@ -264,32 +262,6 @@ export function createTelegramBot(opts: TelegramBotOptions) {
const mediaMaxBytes = (opts.mediaMaxMb ?? telegramCfg.mediaMaxMb ?? 5) * 1024 * 1024;
const logger = getChildLogger({ module: "telegram-auto-reply" });
const streamMode = resolveTelegramStreamMode(telegramCfg);
let botHasTopicsEnabled: boolean | undefined;
const resolveBotTopicsEnabled = async (ctx?: TelegramContext) => {
if (typeof ctx?.me?.has_topics_enabled === "boolean") {
botHasTopicsEnabled = ctx.me.has_topics_enabled;
return botHasTopicsEnabled;
}
if (typeof botHasTopicsEnabled === "boolean") {
return botHasTopicsEnabled;
}
if (typeof bot.api.getMe !== "function") {
botHasTopicsEnabled = false;
return botHasTopicsEnabled;
}
try {
const me = await withTelegramApiErrorLogging({
operation: "getMe",
runtime,
fn: () => bot.api.getMe(),
});
botHasTopicsEnabled = Boolean(me?.has_topics_enabled);
} catch (err) {
logVerbose(`telegram getMe failed: ${String(err)}`);
botHasTopicsEnabled = false;
}
return botHasTopicsEnabled;
};
const resolveGroupPolicy = (chatId: string | number) =>
resolveChannelGroupPolicy({
cfg,
@@ -363,7 +335,6 @@ export function createTelegramBot(opts: TelegramBotOptions) {
streamMode,
textLimit,
opts,
resolveBotTopicsEnabled,
});
registerTelegramNativeCommands({

View File

@@ -1,6 +1,6 @@
import type { Message, UserFromGetMe } from "@grammyjs/types";
/** App-specific stream mode for Telegram draft streaming. */
/** App-specific stream mode for Telegram stream previews. */
export type TelegramStreamMode = "off" | "partial" | "block";
/**

View File

@@ -2,52 +2,117 @@ import { describe, expect, it, vi } from "vitest";
import { createTelegramDraftStream } from "./draft-stream.js";
describe("createTelegramDraftStream", () => {
it("passes message_thread_id when provided", () => {
const api = { sendMessageDraft: vi.fn().mockResolvedValue(true) };
it("sends stream preview message with message_thread_id when provided", async () => {
const api = {
sendMessage: vi.fn().mockResolvedValue({ message_id: 17 }),
editMessageText: vi.fn().mockResolvedValue(true),
deleteMessage: vi.fn().mockResolvedValue(true),
};
const stream = createTelegramDraftStream({
// oxlint-disable-next-line typescript/no-explicit-any
api: api as any,
chatId: 123,
draftId: 42,
thread: { id: 99, scope: "forum" },
});
stream.update("Hello");
expect(api.sendMessageDraft).toHaveBeenCalledWith(123, 42, "Hello", {
message_thread_id: 99,
});
await vi.waitFor(() =>
expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", { message_thread_id: 99 }),
);
});
it("omits message_thread_id for general topic id", () => {
const api = { sendMessageDraft: vi.fn().mockResolvedValue(true) };
it("edits existing stream preview message on subsequent updates", async () => {
const api = {
sendMessage: vi.fn().mockResolvedValue({ message_id: 17 }),
editMessageText: vi.fn().mockResolvedValue(true),
deleteMessage: vi.fn().mockResolvedValue(true),
};
const stream = createTelegramDraftStream({
// oxlint-disable-next-line typescript/no-explicit-any
api: api as any,
chatId: 123,
thread: { id: 99, scope: "forum" },
});
stream.update("Hello");
await vi.waitFor(() =>
expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", { message_thread_id: 99 }),
);
await (api.sendMessage.mock.results[0]?.value as Promise<unknown>);
stream.update("Hello again");
await stream.flush();
expect(api.editMessageText).toHaveBeenCalledWith(123, 17, "Hello again");
});
it("waits for in-flight updates before final flush edit", async () => {
let resolveSend: ((value: { message_id: number }) => void) | undefined;
const firstSend = new Promise<{ message_id: number }>((resolve) => {
resolveSend = resolve;
});
const api = {
sendMessage: vi.fn().mockReturnValue(firstSend),
editMessageText: vi.fn().mockResolvedValue(true),
deleteMessage: vi.fn().mockResolvedValue(true),
};
const stream = createTelegramDraftStream({
// oxlint-disable-next-line typescript/no-explicit-any
api: api as any,
chatId: 123,
thread: { id: 99, scope: "forum" },
});
stream.update("Hello");
await vi.waitFor(() => expect(api.sendMessage).toHaveBeenCalledTimes(1));
stream.update("Hello final");
const flushPromise = stream.flush();
expect(api.editMessageText).not.toHaveBeenCalled();
resolveSend?.({ message_id: 17 });
await flushPromise;
expect(api.editMessageText).toHaveBeenCalledWith(123, 17, "Hello final");
});
it("omits message_thread_id for general topic id", async () => {
const api = {
sendMessage: vi.fn().mockResolvedValue({ message_id: 17 }),
editMessageText: vi.fn().mockResolvedValue(true),
deleteMessage: vi.fn().mockResolvedValue(true),
};
const stream = createTelegramDraftStream({
// oxlint-disable-next-line typescript/no-explicit-any
api: api as any,
chatId: 123,
draftId: 42,
thread: { id: 1, scope: "forum" },
});
stream.update("Hello");
expect(api.sendMessageDraft).toHaveBeenCalledWith(123, 42, "Hello", undefined);
await vi.waitFor(() => expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", undefined));
});
it("keeps message_thread_id for dm threads", () => {
const api = { sendMessageDraft: vi.fn().mockResolvedValue(true) };
it("keeps message_thread_id for dm threads and clears preview on cleanup", async () => {
const api = {
sendMessage: vi.fn().mockResolvedValue({ message_id: 17 }),
editMessageText: vi.fn().mockResolvedValue(true),
deleteMessage: vi.fn().mockResolvedValue(true),
};
const stream = createTelegramDraftStream({
// oxlint-disable-next-line typescript/no-explicit-any
api: api as any,
chatId: 123,
draftId: 42,
thread: { id: 1, scope: "dm" },
});
stream.update("Hello");
await vi.waitFor(() =>
expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", { message_thread_id: 1 }),
);
await stream.clear();
expect(api.sendMessageDraft).toHaveBeenCalledWith(123, 42, "Hello", {
message_thread_id: 1,
});
expect(api.deleteMessage).toHaveBeenCalledWith(123, 17);
});
});

View File

@@ -1,40 +1,43 @@
import type { Bot } from "grammy";
import { buildTelegramThreadParams, type TelegramThreadSpec } from "./bot/helpers.js";
const TELEGRAM_DRAFT_MAX_CHARS = 4096;
const DEFAULT_THROTTLE_MS = 300;
const TELEGRAM_STREAM_MAX_CHARS = 4096;
const DEFAULT_THROTTLE_MS = 1000;
export type TelegramDraftStream = {
update: (text: string) => void;
flush: () => Promise<void>;
messageId: () => number | undefined;
clear: () => Promise<void>;
stop: () => void;
};
export function createTelegramDraftStream(params: {
api: Bot["api"];
chatId: number;
draftId: number;
maxChars?: number;
thread?: TelegramThreadSpec | null;
throttleMs?: number;
log?: (message: string) => void;
warn?: (message: string) => void;
}): TelegramDraftStream {
const maxChars = Math.min(params.maxChars ?? TELEGRAM_DRAFT_MAX_CHARS, TELEGRAM_DRAFT_MAX_CHARS);
const throttleMs = Math.max(50, params.throttleMs ?? DEFAULT_THROTTLE_MS);
const rawDraftId = Number.isFinite(params.draftId) ? Math.trunc(params.draftId) : 1;
const draftId = rawDraftId === 0 ? 1 : Math.abs(rawDraftId);
const maxChars = Math.min(
params.maxChars ?? TELEGRAM_STREAM_MAX_CHARS,
TELEGRAM_STREAM_MAX_CHARS,
);
const throttleMs = Math.max(250, params.throttleMs ?? DEFAULT_THROTTLE_MS);
const chatId = params.chatId;
const threadParams = buildTelegramThreadParams(params.thread);
let streamMessageId: number | undefined;
let lastSentText = "";
let lastSentAt = 0;
let pendingText = "";
let inFlight = false;
let inFlightPromise: Promise<void> | undefined;
let timer: ReturnType<typeof setTimeout> | undefined;
let stopped = false;
const sendDraft = async (text: string) => {
const sendOrEditStreamMessage = async (text: string) => {
if (stopped) {
return;
}
@@ -43,10 +46,12 @@ export function createTelegramDraftStream(params: {
return;
}
if (trimmed.length > maxChars) {
// Drafts are capped at 4096 chars. Stop streaming once we exceed the cap
// so we don't keep sending failing updates or a truncated preview.
// Telegram text messages/edits cap at 4096 chars.
// Stop streaming once we exceed the cap to avoid repeated API failures.
stopped = true;
params.warn?.(`telegram draft stream stopped (draft length ${trimmed.length} > ${maxChars})`);
params.warn?.(
`telegram stream preview stopped (text length ${trimmed.length} > ${maxChars})`,
);
return;
}
if (trimmed === lastSentText) {
@@ -55,11 +60,22 @@ export function createTelegramDraftStream(params: {
lastSentText = trimmed;
lastSentAt = Date.now();
try {
await params.api.sendMessageDraft(chatId, draftId, trimmed, threadParams);
if (typeof streamMessageId === "number") {
await params.api.editMessageText(chatId, streamMessageId, trimmed);
return;
}
const sent = await params.api.sendMessage(chatId, trimmed, threadParams);
const sentMessageId = sent?.message_id;
if (typeof sentMessageId !== "number" || !Number.isFinite(sentMessageId)) {
stopped = true;
params.warn?.("telegram stream preview stopped (missing message id from sendMessage)");
return;
}
streamMessageId = Math.trunc(sentMessageId);
} catch (err) {
stopped = true;
params.warn?.(
`telegram draft stream failed: ${err instanceof Error ? err.message : String(err)}`,
`telegram stream preview failed: ${err instanceof Error ? err.message : String(err)}`,
);
}
};
@@ -69,30 +85,52 @@ export function createTelegramDraftStream(params: {
clearTimeout(timer);
timer = undefined;
}
if (inFlight) {
schedule();
return;
}
const text = pendingText;
const trimmed = text.trim();
if (!trimmed) {
if (pendingText === text) {
while (!stopped) {
if (inFlightPromise) {
await inFlightPromise;
continue;
}
const text = pendingText;
const trimmed = text.trim();
if (!trimmed) {
pendingText = "";
return;
}
if (pendingText) {
schedule();
pendingText = "";
const current = sendOrEditStreamMessage(text).finally(() => {
if (inFlightPromise === current) {
inFlightPromise = undefined;
}
});
inFlightPromise = current;
await current;
if (!pendingText) {
return;
}
return;
}
};
const clear = async () => {
if (timer) {
clearTimeout(timer);
timer = undefined;
}
pendingText = "";
inFlight = true;
try {
await sendDraft(text);
} finally {
inFlight = false;
stopped = true;
if (inFlightPromise) {
await inFlightPromise;
}
if (pendingText) {
schedule();
const messageId = streamMessageId;
streamMessageId = undefined;
if (typeof messageId !== "number") {
return;
}
try {
await params.api.deleteMessage(chatId, messageId);
} catch (err) {
params.warn?.(
`telegram stream preview cleanup failed: ${err instanceof Error ? err.message : String(err)}`,
);
}
};
@@ -111,7 +149,7 @@ export function createTelegramDraftStream(params: {
return;
}
pendingText = text;
if (inFlight) {
if (inFlightPromise) {
schedule();
return;
}
@@ -131,9 +169,13 @@ export function createTelegramDraftStream(params: {
}
};
params.log?.(
`telegram draft stream ready (draftId=${draftId}, maxChars=${maxChars}, throttleMs=${throttleMs})`,
);
params.log?.(`telegram stream preview ready (maxChars=${maxChars}, throttleMs=${throttleMs})`);
return { update, flush, stop };
return {
update,
flush,
messageId: () => streamMessageId,
clear,
stop,
};
}

View File

@@ -88,4 +88,23 @@ describe("editMessageTelegram", () => {
}),
);
});
it("disables link previews when linkPreview is false", async () => {
botApi.editMessageText.mockResolvedValue({ message_id: 1, chat: { id: "123" } });
await editMessageTelegram("123", 1, "https://example.com", {
token: "tok",
cfg: {},
linkPreview: false,
});
expect(botApi.editMessageText).toHaveBeenCalledTimes(1);
const params = (botApi.editMessageText.mock.calls[0] ?? [])[3] as Record<string, unknown>;
expect(params).toEqual(
expect.objectContaining({
parse_mode: "HTML",
link_preview_options: { is_disabled: true },
}),
);
});
});

View File

@@ -696,6 +696,8 @@ type TelegramEditOpts = {
api?: Bot["api"];
retry?: RetryConfig;
textMode?: "markdown" | "html";
/** Controls whether link previews are shown in the edited message. */
linkPreview?: boolean;
/** Inline keyboard buttons (reply markup). Pass empty array to remove buttons. */
buttons?: Array<Array<{ text: string; callback_data: string }>>;
/** Optional config injection to avoid global loadConfig() (improves testability). */
@@ -752,6 +754,9 @@ export async function editMessageTelegram(
const editParams: Record<string, unknown> = {
parse_mode: "HTML",
};
if (opts.linkPreview === false) {
editParams.link_preview_options = { is_disabled: true };
}
if (replyMarkup !== undefined) {
editParams.reply_markup = replyMarkup;
}
@@ -767,6 +772,9 @@ export async function editMessageTelegram(
console.warn(`telegram HTML parse failed, retrying as plain text: ${errText}`);
}
const plainParams: Record<string, unknown> = {};
if (opts.linkPreview === false) {
plainParams.link_preview_options = { is_disabled: true };
}
if (replyMarkup !== undefined) {
plainParams.reply_markup = replyMarkup;
}