fix: align telegram channel_post handling

This commit is contained in:
Ayaan Zaidi
2026-02-17 14:03:08 +05:30
parent d3d168e563
commit 97b9f7feae
3 changed files with 352 additions and 148 deletions

View File

@@ -93,6 +93,7 @@ Docs: https://docs.openclaw.ai
- Plugins: expose `llm_input` and `llm_output` hook payloads so extensions can observe prompt/input context and model output usage details. (#16724) Thanks @SecondThread.
- Subagents: nested sub-agents (sub-sub-agents) with configurable depth. Set `agents.defaults.subagents.maxSpawnDepth: 2` to allow sub-agents to spawn their own children. Includes `maxChildrenPerAgent` limit (default 5), depth-aware tool policy, and proper announce chain routing. (#14447) Thanks @tyler6204.
- Slack/Discord/Telegram: add per-channel ack reaction overrides (account/channel-level) to support platform-specific emoji formats. (#17092) Thanks @zerone0x.
- Telegram: add `channel_post` inbound support for channel-based bot-to-bot wake/trigger flows, with channel allowlist gating and message/media batching parity.
- Cron/Gateway: add finished-run webhook delivery toggle (`notify`) and dedicated webhook auth token support (`cron.webhookToken`) for outbound cron webhook posts. (#14535) Thanks @advaitpaliwal.
- Channels: deduplicate probe/token resolution base types across core + extensions while preserving per-channel error typing. (#16986) Thanks @iyoda and @thewilloftheshadow.
- Memory: add MMR (Maximal Marginal Relevance) re-ranking for hybrid search diversity. Configurable via `memorySearch.query.hybrid.mmr`. Thanks @rodrigouroz.

View File

@@ -22,6 +22,7 @@ import { resolveAgentRoute } from "../routing/resolve-route.js";
import { resolveThreadSessionKeys } from "../routing/session-key.js";
import { withTelegramApiErrorLogging } from "./api-logging.js";
import {
firstDefined,
isSenderAllowed,
normalizeAllowFromWithStore,
type NormalizedAllowFrom,
@@ -551,7 +552,176 @@ export const registerTelegramHandlers = ({
runtime.error?.(danger(`telegram reaction handler failed: ${String(err)}`));
}
});
const processInboundMessage = async (params: {
ctx: TelegramContext;
msg: Message;
chatId: number;
resolvedThreadId?: number;
storeAllowFrom: string[];
sendOversizeWarning: boolean;
oversizeLogMessage: string;
}) => {
const {
ctx,
msg,
chatId,
resolvedThreadId,
storeAllowFrom,
sendOversizeWarning,
oversizeLogMessage,
} = params;
// Text fragment handling - Telegram splits long pastes into multiple inbound messages (~4096 chars).
// We buffer “near-limit” messages and append immediately-following parts.
const text = typeof msg.text === "string" ? msg.text : undefined;
const isCommandLike = (text ?? "").trim().startsWith("/");
if (text && !isCommandLike) {
const nowMs = Date.now();
const senderId = msg.from?.id != null ? String(msg.from.id) : "unknown";
const key = `text:${chatId}:${resolvedThreadId ?? "main"}:${senderId}`;
const existing = textFragmentBuffer.get(key);
if (existing) {
const last = existing.messages.at(-1);
const lastMsgId = last?.msg.message_id;
const lastReceivedAtMs = last?.receivedAtMs ?? nowMs;
const idGap = typeof lastMsgId === "number" ? msg.message_id - lastMsgId : Infinity;
const timeGapMs = nowMs - lastReceivedAtMs;
const canAppend =
idGap > 0 &&
idGap <= TELEGRAM_TEXT_FRAGMENT_MAX_ID_GAP &&
timeGapMs >= 0 &&
timeGapMs <= TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS;
if (canAppend) {
const currentTotalChars = existing.messages.reduce(
(sum, m) => sum + (m.msg.text?.length ?? 0),
0,
);
const nextTotalChars = currentTotalChars + text.length;
if (
existing.messages.length + 1 <= TELEGRAM_TEXT_FRAGMENT_MAX_PARTS &&
nextTotalChars <= TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS
) {
existing.messages.push({ msg, ctx, receivedAtMs: nowMs });
scheduleTextFragmentFlush(existing);
return;
}
}
// Not appendable (or limits exceeded): flush buffered entry first, then continue normally.
clearTimeout(existing.timer);
textFragmentBuffer.delete(key);
textFragmentProcessing = textFragmentProcessing
.then(async () => {
await flushTextFragments(existing);
})
.catch(() => undefined);
await textFragmentProcessing;
}
const shouldStart = text.length >= TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS;
if (shouldStart) {
const entry: TextFragmentEntry = {
key,
messages: [{ msg, ctx, receivedAtMs: nowMs }],
timer: setTimeout(() => {}, TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS),
};
textFragmentBuffer.set(key, entry);
scheduleTextFragmentFlush(entry);
return;
}
}
// Media group handling - buffer multi-image messages
const mediaGroupId = msg.media_group_id;
if (mediaGroupId) {
const existing = mediaGroupBuffer.get(mediaGroupId);
if (existing) {
clearTimeout(existing.timer);
existing.messages.push({ msg, ctx });
existing.timer = setTimeout(async () => {
mediaGroupBuffer.delete(mediaGroupId);
mediaGroupProcessing = mediaGroupProcessing
.then(async () => {
await processMediaGroup(existing);
})
.catch(() => undefined);
await mediaGroupProcessing;
}, mediaGroupTimeoutMs);
} else {
const entry: MediaGroupEntry = {
messages: [{ msg, ctx }],
timer: setTimeout(async () => {
mediaGroupBuffer.delete(mediaGroupId);
mediaGroupProcessing = mediaGroupProcessing
.then(async () => {
await processMediaGroup(entry);
})
.catch(() => undefined);
await mediaGroupProcessing;
}, mediaGroupTimeoutMs),
};
mediaGroupBuffer.set(mediaGroupId, entry);
}
return;
}
let media: Awaited<ReturnType<typeof resolveMedia>> = null;
try {
media = await resolveMedia(ctx, mediaMaxBytes, opts.token, opts.proxyFetch);
} catch (mediaErr) {
const errMsg = String(mediaErr);
if (errMsg.includes("exceeds") && errMsg.includes("MB limit")) {
if (sendOversizeWarning) {
const limitMb = Math.round(mediaMaxBytes / (1024 * 1024));
await withTelegramApiErrorLogging({
operation: "sendMessage",
runtime,
fn: () =>
bot.api.sendMessage(chatId, `⚠️ File too large. Maximum size is ${limitMb}MB.`, {
reply_to_message_id: msg.message_id,
}),
}).catch(() => {});
}
logger.warn({ chatId, error: errMsg }, oversizeLogMessage);
return;
}
throw mediaErr;
}
// Skip sticker-only messages where the sticker was skipped (animated/video)
// These have no media and no text content to process.
const hasText = Boolean((msg.text ?? msg.caption ?? "").trim());
if (msg.sticker && !media && !hasText) {
logVerbose("telegram: skipping sticker-only message (unsupported sticker type)");
return;
}
const allMedia = media
? [
{
path: media.path,
contentType: media.contentType,
stickerMetadata: media.stickerMetadata,
},
]
: [];
const senderId = msg.from?.id ? String(msg.from.id) : "";
const conversationKey =
resolvedThreadId != null ? `${chatId}:topic:${resolvedThreadId}` : String(chatId);
const debounceKey = senderId
? `telegram:${accountId ?? "default"}:${conversationKey}:${senderId}`
: null;
await inboundDebouncer.enqueue({
ctx,
msg,
allMedia,
storeAllowFrom,
debounceKey,
botUsername: ctx.me?.username,
});
};
bot.on("callback_query", async (ctx) => {
const callback = ctx.callbackQuery;
if (!callback) {
@@ -945,121 +1115,14 @@ export const registerTelegramHandlers = ({
return;
}
// Text fragment handling - Telegram splits long pastes into multiple inbound messages (~4096 chars).
// We buffer “near-limit” messages and append immediately-following parts.
const text = typeof msg.text === "string" ? msg.text : undefined;
const isCommandLike = (text ?? "").trim().startsWith("/");
if (text && !isCommandLike) {
const nowMs = Date.now();
const senderId = msg.from?.id != null ? String(msg.from.id) : "unknown";
const key = `text:${chatId}:${resolvedThreadId ?? "main"}:${senderId}`;
const existing = textFragmentBuffer.get(key);
if (existing) {
const last = existing.messages.at(-1);
const lastMsgId = last?.msg.message_id;
const lastReceivedAtMs = last?.receivedAtMs ?? nowMs;
const idGap = typeof lastMsgId === "number" ? msg.message_id - lastMsgId : Infinity;
const timeGapMs = nowMs - lastReceivedAtMs;
const canAppend =
idGap > 0 &&
idGap <= TELEGRAM_TEXT_FRAGMENT_MAX_ID_GAP &&
timeGapMs >= 0 &&
timeGapMs <= TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS;
if (canAppend) {
const currentTotalChars = existing.messages.reduce(
(sum, m) => sum + (m.msg.text?.length ?? 0),
0,
);
const nextTotalChars = currentTotalChars + text.length;
if (
existing.messages.length + 1 <= TELEGRAM_TEXT_FRAGMENT_MAX_PARTS &&
nextTotalChars <= TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS
) {
existing.messages.push({ msg, ctx, receivedAtMs: nowMs });
scheduleTextFragmentFlush(existing);
return;
}
}
// Not appendable (or limits exceeded): flush buffered entry first, then continue normally.
clearTimeout(existing.timer);
await runTextFragmentFlush(existing);
}
const shouldStart = text.length >= TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS;
if (shouldStart) {
const entry: TextFragmentEntry = {
key,
messages: [{ msg, ctx, receivedAtMs: nowMs }],
timer: setTimeout(() => {}, TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS),
};
textFragmentBuffer.set(key, entry);
scheduleTextFragmentFlush(entry);
return;
}
}
// Media group handling - buffer multi-image messages
const mediaGroupId = msg.media_group_id;
if (mediaGroupId) {
const entry = getOrCreateMediaGroupEntry(mediaGroupId);
entry.messages.push({ msg, ctx });
scheduleMediaGroupFlush(mediaGroupId, entry);
return;
}
let media: Awaited<ReturnType<typeof resolveMedia>> = null;
try {
media = await resolveMedia(ctx, mediaMaxBytes, opts.token, opts.proxyFetch);
} catch (mediaErr) {
const errMsg = String(mediaErr);
if (errMsg.includes("exceeds") && errMsg.includes("MB limit")) {
const limitMb = Math.round(mediaMaxBytes / (1024 * 1024));
await withTelegramApiErrorLogging({
operation: "sendMessage",
runtime,
fn: () =>
bot.api.sendMessage(chatId, `⚠️ File too large. Maximum size is ${limitMb}MB.`, {
reply_to_message_id: msg.message_id,
}),
}).catch(() => {});
logger.warn({ chatId, error: errMsg }, "media exceeds size limit");
return;
}
throw mediaErr;
}
// Skip sticker-only messages where the sticker was skipped (animated/video)
// These have no media and no text content to process.
const hasText = Boolean((msg.text ?? msg.caption ?? "").trim());
if (msg.sticker && !media && !hasText) {
logVerbose("telegram: skipping sticker-only message (unsupported sticker type)");
return;
}
const allMedia = media
? [
{
path: media.path,
contentType: media.contentType,
stickerMetadata: media.stickerMetadata,
},
]
: [];
const conversationKey =
resolvedThreadId != null ? `${chatId}:topic:${resolvedThreadId}` : String(chatId);
const debounceKey = senderId
? `telegram:${accountId ?? "default"}:${conversationKey}:${senderId}`
: null;
await inboundDebouncer.enqueue({
await processInboundMessage({
ctx,
msg,
allMedia,
chatId,
resolvedThreadId,
storeAllowFrom,
debounceKey,
botUsername: ctx.me?.username,
sendOversizeWarning: true,
oversizeLogMessage: "media exceeds size limit",
});
} catch (err) {
runtime.error?.(danger(`handler failed: ${String(err)}`));
@@ -1191,44 +1254,14 @@ export const registerTelegramHandlers = ({
message: { value: syntheticMsg, writable: true, enumerable: true },
});
// Resolve media (same as message handler)
let media: Awaited<ReturnType<typeof resolveMedia>> = null;
try {
media = await resolveMedia(syntheticCtx, mediaMaxBytes, opts.token, opts.proxyFetch);
} catch (mediaErr) {
const errMsg = String(mediaErr);
if (errMsg.includes("exceeds") && errMsg.includes("MB limit")) {
logger.warn({ chatId, error: errMsg }, "channel post media exceeds size limit");
} else {
throw mediaErr;
}
}
const allMedia = media
? [
{
path: media.path,
contentType: media.contentType,
stickerMetadata: media.stickerMetadata,
},
]
: [];
// Compute debounce key (same pattern as message handler)
const senderId = post.sender_chat?.id ?? post.from?.id;
const senderIdStr = senderId ? String(senderId) : "";
const conversationKey = String(chatId);
const debounceKey = senderIdStr
? `telegram:${accountId ?? "default"}:${conversationKey}:${senderIdStr}`
: null;
await inboundDebouncer.enqueue({
ctx: syntheticCtx,
await processInboundMessage({
ctx: syntheticCtx as TelegramContext,
msg: syntheticMsg,
allMedia,
chatId,
resolvedThreadId: undefined,
storeAllowFrom,
debounceKey,
botUsername: ctx.me?.username,
sendOversizeWarning: false,
oversizeLogMessage: "channel post media exceeds size limit",
});
} catch (err) {
runtime.error?.(danger(`channel_post handler failed: ${String(err)}`));

View File

@@ -45,6 +45,14 @@ const mockMessage = (message: Pick<Message, "chat"> & Partial<Message>): Message
date: 0,
...message,
}) as Message;
const TELEGRAM_TEST_TIMINGS = {
mediaGroupFlushMs: 20,
textFragmentGapMs: 30,
} as const;
const sleep = async (ms: number) => {
await new Promise<void>((resolve) => setTimeout(resolve, ms));
};
describe("createTelegramBot", () => {
beforeEach(() => {
@@ -1864,6 +1872,168 @@ describe("createTelegramBot", () => {
expect(sendMessageSpy).toHaveBeenCalledTimes(1);
expect(sendMessageSpy.mock.calls[0]?.[1]).toContain("final reply");
});
it("buffers channel_post media groups and processes them together", async () => {
onSpy.mockReset();
replySpy.mockReset();
loadConfig.mockReturnValue({
channels: {
telegram: {
groupPolicy: "open",
groups: {
"-100777111222": {
enabled: true,
requireMention: false,
},
},
},
},
});
const fetchSpy = vi.spyOn(globalThis, "fetch" as never).mockResolvedValue({
ok: true,
status: 200,
statusText: "OK",
headers: { get: () => "image/png" },
arrayBuffer: async () => new Uint8Array([0x89, 0x50, 0x4e, 0x47]).buffer,
} as Response);
createTelegramBot({ token: "tok", testTimings: TELEGRAM_TEST_TIMINGS });
const handler = getOnHandler("channel_post") as (ctx: Record<string, unknown>) => Promise<void>;
const first = handler({
channelPost: {
chat: { id: -100777111222, type: "channel", title: "Wake Channel" },
message_id: 201,
caption: "album caption",
date: 1736380800,
media_group_id: "channel-album-1",
photo: [{ file_id: "p1" }],
},
me: { username: "openclaw_bot" },
getFile: async () => ({ file_path: "photos/p1.jpg" }),
});
const second = handler({
channelPost: {
chat: { id: -100777111222, type: "channel", title: "Wake Channel" },
message_id: 202,
date: 1736380801,
media_group_id: "channel-album-1",
photo: [{ file_id: "p2" }],
},
me: { username: "openclaw_bot" },
getFile: async () => ({ file_path: "photos/p2.jpg" }),
});
await Promise.all([first, second]);
expect(replySpy).not.toHaveBeenCalled();
await sleep(TELEGRAM_TEST_TIMINGS.mediaGroupFlushMs + 80);
expect(replySpy).toHaveBeenCalledTimes(1);
const payload = replySpy.mock.calls[0]?.[0] as { Body?: string; MediaPaths?: string[] };
expect(payload.Body).toContain("album caption");
expect(payload.MediaPaths).toHaveLength(2);
fetchSpy.mockRestore();
});
it("coalesces channel_post near-limit text fragments into one message", async () => {
onSpy.mockReset();
replySpy.mockReset();
loadConfig.mockReturnValue({
channels: {
telegram: {
groupPolicy: "open",
groups: {
"-100777111222": {
enabled: true,
requireMention: false,
},
},
},
},
});
createTelegramBot({ token: "tok", testTimings: TELEGRAM_TEST_TIMINGS });
const handler = getOnHandler("channel_post") as (ctx: Record<string, unknown>) => Promise<void>;
const part1 = "A".repeat(4050);
const part2 = "B".repeat(50);
await handler({
channelPost: {
chat: { id: -100777111222, type: "channel", title: "Wake Channel" },
message_id: 301,
date: 1736380800,
text: part1,
},
me: { username: "openclaw_bot" },
getFile: async () => ({}),
});
await handler({
channelPost: {
chat: { id: -100777111222, type: "channel", title: "Wake Channel" },
message_id: 302,
date: 1736380801,
text: part2,
},
me: { username: "openclaw_bot" },
getFile: async () => ({}),
});
expect(replySpy).not.toHaveBeenCalled();
await sleep(TELEGRAM_TEST_TIMINGS.textFragmentGapMs + 100);
expect(replySpy).toHaveBeenCalledTimes(1);
const payload = replySpy.mock.calls[0]?.[0] as { RawBody?: string };
expect(payload.RawBody).toContain(part1.slice(0, 32));
expect(payload.RawBody).toContain(part2.slice(0, 32));
});
it("drops oversized channel_post media instead of dispatching a placeholder message", async () => {
onSpy.mockReset();
replySpy.mockReset();
loadConfig.mockReturnValue({
channels: {
telegram: {
groupPolicy: "open",
groups: {
"-100777111222": {
enabled: true,
requireMention: false,
},
},
},
},
});
const fetchSpy = vi.spyOn(globalThis, "fetch" as never).mockResolvedValue({
ok: true,
status: 200,
statusText: "OK",
headers: { get: () => "image/jpeg" },
arrayBuffer: async () => new Uint8Array([0xff, 0xd8, 0xff, 0x00]).buffer,
} as Response);
createTelegramBot({ token: "tok", mediaMaxMb: 0 });
const handler = getOnHandler("channel_post") as (ctx: Record<string, unknown>) => Promise<void>;
await handler({
channelPost: {
chat: { id: -100777111222, type: "channel", title: "Wake Channel" },
message_id: 401,
date: 1736380800,
photo: [{ file_id: "oversized" }],
},
me: { username: "openclaw_bot" },
getFile: async () => ({ file_path: "photos/oversized.jpg" }),
});
expect(replySpy).not.toHaveBeenCalled();
fetchSpy.mockRestore();
});
it("dedupes duplicate message updates by update_id", async () => {
onSpy.mockReset();
replySpy.mockReset();