mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-10 09:42:44 +00:00
feat(telegram): add channel_post support for bot-to-bot communication (#17857)
Merged via /review-pr -> /prepare-pr -> /merge-pr.
Prepared head SHA: 27a343cd4d
Co-authored-by: theSamPadilla <35386211+theSamPadilla@users.noreply.github.com>
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Reviewed-by: @obviyus
This commit is contained in:
@@ -22,6 +22,7 @@ import { resolveAgentRoute } from "../routing/resolve-route.js";
|
||||
import { resolveThreadSessionKeys } from "../routing/session-key.js";
|
||||
import { withTelegramApiErrorLogging } from "./api-logging.js";
|
||||
import {
|
||||
firstDefined,
|
||||
isSenderAllowed,
|
||||
normalizeAllowFromWithStore,
|
||||
type NormalizedAllowFrom,
|
||||
@@ -317,36 +318,6 @@ export const registerTelegramHandlers = ({
|
||||
}, TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS);
|
||||
};
|
||||
|
||||
const enqueueMediaGroupFlush = async (mediaGroupId: string, entry: MediaGroupEntry) => {
|
||||
mediaGroupBuffer.delete(mediaGroupId);
|
||||
mediaGroupProcessing = mediaGroupProcessing
|
||||
.then(async () => {
|
||||
await processMediaGroup(entry);
|
||||
})
|
||||
.catch(() => undefined);
|
||||
await mediaGroupProcessing;
|
||||
};
|
||||
|
||||
const scheduleMediaGroupFlush = (mediaGroupId: string, entry: MediaGroupEntry) => {
|
||||
clearTimeout(entry.timer);
|
||||
entry.timer = setTimeout(async () => {
|
||||
await enqueueMediaGroupFlush(mediaGroupId, entry);
|
||||
}, mediaGroupTimeoutMs);
|
||||
};
|
||||
|
||||
const getOrCreateMediaGroupEntry = (mediaGroupId: string) => {
|
||||
const existing = mediaGroupBuffer.get(mediaGroupId);
|
||||
if (existing) {
|
||||
return existing;
|
||||
}
|
||||
const entry: MediaGroupEntry = {
|
||||
messages: [],
|
||||
timer: setTimeout(() => undefined, mediaGroupTimeoutMs),
|
||||
};
|
||||
mediaGroupBuffer.set(mediaGroupId, entry);
|
||||
return entry;
|
||||
};
|
||||
|
||||
const loadStoreAllowFrom = async () =>
|
||||
readChannelAllowFromStore("telegram", process.env, accountId).catch(() => []);
|
||||
|
||||
@@ -551,7 +522,176 @@ export const registerTelegramHandlers = ({
|
||||
runtime.error?.(danger(`telegram reaction handler failed: ${String(err)}`));
|
||||
}
|
||||
});
|
||||
const processInboundMessage = async (params: {
|
||||
ctx: TelegramContext;
|
||||
msg: Message;
|
||||
chatId: number;
|
||||
resolvedThreadId?: number;
|
||||
storeAllowFrom: string[];
|
||||
sendOversizeWarning: boolean;
|
||||
oversizeLogMessage: string;
|
||||
}) => {
|
||||
const {
|
||||
ctx,
|
||||
msg,
|
||||
chatId,
|
||||
resolvedThreadId,
|
||||
storeAllowFrom,
|
||||
sendOversizeWarning,
|
||||
oversizeLogMessage,
|
||||
} = params;
|
||||
|
||||
// Text fragment handling - Telegram splits long pastes into multiple inbound messages (~4096 chars).
|
||||
// We buffer “near-limit” messages and append immediately-following parts.
|
||||
const text = typeof msg.text === "string" ? msg.text : undefined;
|
||||
const isCommandLike = (text ?? "").trim().startsWith("/");
|
||||
if (text && !isCommandLike) {
|
||||
const nowMs = Date.now();
|
||||
const senderId = msg.from?.id != null ? String(msg.from.id) : "unknown";
|
||||
const key = `text:${chatId}:${resolvedThreadId ?? "main"}:${senderId}`;
|
||||
const existing = textFragmentBuffer.get(key);
|
||||
|
||||
if (existing) {
|
||||
const last = existing.messages.at(-1);
|
||||
const lastMsgId = last?.msg.message_id;
|
||||
const lastReceivedAtMs = last?.receivedAtMs ?? nowMs;
|
||||
const idGap = typeof lastMsgId === "number" ? msg.message_id - lastMsgId : Infinity;
|
||||
const timeGapMs = nowMs - lastReceivedAtMs;
|
||||
const canAppend =
|
||||
idGap > 0 &&
|
||||
idGap <= TELEGRAM_TEXT_FRAGMENT_MAX_ID_GAP &&
|
||||
timeGapMs >= 0 &&
|
||||
timeGapMs <= TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS;
|
||||
|
||||
if (canAppend) {
|
||||
const currentTotalChars = existing.messages.reduce(
|
||||
(sum, m) => sum + (m.msg.text?.length ?? 0),
|
||||
0,
|
||||
);
|
||||
const nextTotalChars = currentTotalChars + text.length;
|
||||
if (
|
||||
existing.messages.length + 1 <= TELEGRAM_TEXT_FRAGMENT_MAX_PARTS &&
|
||||
nextTotalChars <= TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS
|
||||
) {
|
||||
existing.messages.push({ msg, ctx, receivedAtMs: nowMs });
|
||||
scheduleTextFragmentFlush(existing);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Not appendable (or limits exceeded): flush buffered entry first, then continue normally.
|
||||
clearTimeout(existing.timer);
|
||||
textFragmentBuffer.delete(key);
|
||||
textFragmentProcessing = textFragmentProcessing
|
||||
.then(async () => {
|
||||
await flushTextFragments(existing);
|
||||
})
|
||||
.catch(() => undefined);
|
||||
await textFragmentProcessing;
|
||||
}
|
||||
|
||||
const shouldStart = text.length >= TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS;
|
||||
if (shouldStart) {
|
||||
const entry: TextFragmentEntry = {
|
||||
key,
|
||||
messages: [{ msg, ctx, receivedAtMs: nowMs }],
|
||||
timer: setTimeout(() => {}, TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS),
|
||||
};
|
||||
textFragmentBuffer.set(key, entry);
|
||||
scheduleTextFragmentFlush(entry);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Media group handling - buffer multi-image messages
|
||||
const mediaGroupId = msg.media_group_id;
|
||||
if (mediaGroupId) {
|
||||
const existing = mediaGroupBuffer.get(mediaGroupId);
|
||||
if (existing) {
|
||||
clearTimeout(existing.timer);
|
||||
existing.messages.push({ msg, ctx });
|
||||
existing.timer = setTimeout(async () => {
|
||||
mediaGroupBuffer.delete(mediaGroupId);
|
||||
mediaGroupProcessing = mediaGroupProcessing
|
||||
.then(async () => {
|
||||
await processMediaGroup(existing);
|
||||
})
|
||||
.catch(() => undefined);
|
||||
await mediaGroupProcessing;
|
||||
}, mediaGroupTimeoutMs);
|
||||
} else {
|
||||
const entry: MediaGroupEntry = {
|
||||
messages: [{ msg, ctx }],
|
||||
timer: setTimeout(async () => {
|
||||
mediaGroupBuffer.delete(mediaGroupId);
|
||||
mediaGroupProcessing = mediaGroupProcessing
|
||||
.then(async () => {
|
||||
await processMediaGroup(entry);
|
||||
})
|
||||
.catch(() => undefined);
|
||||
await mediaGroupProcessing;
|
||||
}, mediaGroupTimeoutMs),
|
||||
};
|
||||
mediaGroupBuffer.set(mediaGroupId, entry);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
let media: Awaited<ReturnType<typeof resolveMedia>> = null;
|
||||
try {
|
||||
media = await resolveMedia(ctx, mediaMaxBytes, opts.token, opts.proxyFetch);
|
||||
} catch (mediaErr) {
|
||||
const errMsg = String(mediaErr);
|
||||
if (errMsg.includes("exceeds") && errMsg.includes("MB limit")) {
|
||||
if (sendOversizeWarning) {
|
||||
const limitMb = Math.round(mediaMaxBytes / (1024 * 1024));
|
||||
await withTelegramApiErrorLogging({
|
||||
operation: "sendMessage",
|
||||
runtime,
|
||||
fn: () =>
|
||||
bot.api.sendMessage(chatId, `⚠️ File too large. Maximum size is ${limitMb}MB.`, {
|
||||
reply_to_message_id: msg.message_id,
|
||||
}),
|
||||
}).catch(() => {});
|
||||
}
|
||||
logger.warn({ chatId, error: errMsg }, oversizeLogMessage);
|
||||
return;
|
||||
}
|
||||
throw mediaErr;
|
||||
}
|
||||
|
||||
// Skip sticker-only messages where the sticker was skipped (animated/video)
|
||||
// These have no media and no text content to process.
|
||||
const hasText = Boolean((msg.text ?? msg.caption ?? "").trim());
|
||||
if (msg.sticker && !media && !hasText) {
|
||||
logVerbose("telegram: skipping sticker-only message (unsupported sticker type)");
|
||||
return;
|
||||
}
|
||||
|
||||
const allMedia = media
|
||||
? [
|
||||
{
|
||||
path: media.path,
|
||||
contentType: media.contentType,
|
||||
stickerMetadata: media.stickerMetadata,
|
||||
},
|
||||
]
|
||||
: [];
|
||||
const senderId = msg.from?.id ? String(msg.from.id) : "";
|
||||
const conversationKey =
|
||||
resolvedThreadId != null ? `${chatId}:topic:${resolvedThreadId}` : String(chatId);
|
||||
const debounceKey = senderId
|
||||
? `telegram:${accountId ?? "default"}:${conversationKey}:${senderId}`
|
||||
: null;
|
||||
await inboundDebouncer.enqueue({
|
||||
ctx,
|
||||
msg,
|
||||
allMedia,
|
||||
storeAllowFrom,
|
||||
debounceKey,
|
||||
botUsername: ctx.me?.username,
|
||||
});
|
||||
};
|
||||
bot.on("callback_query", async (ctx) => {
|
||||
const callback = ctx.callbackQuery;
|
||||
if (!callback) {
|
||||
@@ -945,124 +1085,156 @@ export const registerTelegramHandlers = ({
|
||||
return;
|
||||
}
|
||||
|
||||
// Text fragment handling - Telegram splits long pastes into multiple inbound messages (~4096 chars).
|
||||
// We buffer “near-limit” messages and append immediately-following parts.
|
||||
const text = typeof msg.text === "string" ? msg.text : undefined;
|
||||
const isCommandLike = (text ?? "").trim().startsWith("/");
|
||||
if (text && !isCommandLike) {
|
||||
const nowMs = Date.now();
|
||||
const senderId = msg.from?.id != null ? String(msg.from.id) : "unknown";
|
||||
const key = `text:${chatId}:${resolvedThreadId ?? "main"}:${senderId}`;
|
||||
const existing = textFragmentBuffer.get(key);
|
||||
|
||||
if (existing) {
|
||||
const last = existing.messages.at(-1);
|
||||
const lastMsgId = last?.msg.message_id;
|
||||
const lastReceivedAtMs = last?.receivedAtMs ?? nowMs;
|
||||
const idGap = typeof lastMsgId === "number" ? msg.message_id - lastMsgId : Infinity;
|
||||
const timeGapMs = nowMs - lastReceivedAtMs;
|
||||
const canAppend =
|
||||
idGap > 0 &&
|
||||
idGap <= TELEGRAM_TEXT_FRAGMENT_MAX_ID_GAP &&
|
||||
timeGapMs >= 0 &&
|
||||
timeGapMs <= TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS;
|
||||
|
||||
if (canAppend) {
|
||||
const currentTotalChars = existing.messages.reduce(
|
||||
(sum, m) => sum + (m.msg.text?.length ?? 0),
|
||||
0,
|
||||
);
|
||||
const nextTotalChars = currentTotalChars + text.length;
|
||||
if (
|
||||
existing.messages.length + 1 <= TELEGRAM_TEXT_FRAGMENT_MAX_PARTS &&
|
||||
nextTotalChars <= TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS
|
||||
) {
|
||||
existing.messages.push({ msg, ctx, receivedAtMs: nowMs });
|
||||
scheduleTextFragmentFlush(existing);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Not appendable (or limits exceeded): flush buffered entry first, then continue normally.
|
||||
clearTimeout(existing.timer);
|
||||
await runTextFragmentFlush(existing);
|
||||
}
|
||||
|
||||
const shouldStart = text.length >= TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS;
|
||||
if (shouldStart) {
|
||||
const entry: TextFragmentEntry = {
|
||||
key,
|
||||
messages: [{ msg, ctx, receivedAtMs: nowMs }],
|
||||
timer: setTimeout(() => {}, TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS),
|
||||
};
|
||||
textFragmentBuffer.set(key, entry);
|
||||
scheduleTextFragmentFlush(entry);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Media group handling - buffer multi-image messages
|
||||
const mediaGroupId = msg.media_group_id;
|
||||
if (mediaGroupId) {
|
||||
const entry = getOrCreateMediaGroupEntry(mediaGroupId);
|
||||
entry.messages.push({ msg, ctx });
|
||||
scheduleMediaGroupFlush(mediaGroupId, entry);
|
||||
return;
|
||||
}
|
||||
|
||||
let media: Awaited<ReturnType<typeof resolveMedia>> = null;
|
||||
try {
|
||||
media = await resolveMedia(ctx, mediaMaxBytes, opts.token, opts.proxyFetch);
|
||||
} catch (mediaErr) {
|
||||
const errMsg = String(mediaErr);
|
||||
if (errMsg.includes("exceeds") && errMsg.includes("MB limit")) {
|
||||
const limitMb = Math.round(mediaMaxBytes / (1024 * 1024));
|
||||
await withTelegramApiErrorLogging({
|
||||
operation: "sendMessage",
|
||||
runtime,
|
||||
fn: () =>
|
||||
bot.api.sendMessage(chatId, `⚠️ File too large. Maximum size is ${limitMb}MB.`, {
|
||||
reply_to_message_id: msg.message_id,
|
||||
}),
|
||||
}).catch(() => {});
|
||||
logger.warn({ chatId, error: errMsg }, "media exceeds size limit");
|
||||
return;
|
||||
}
|
||||
throw mediaErr;
|
||||
}
|
||||
|
||||
// Skip sticker-only messages where the sticker was skipped (animated/video)
|
||||
// These have no media and no text content to process.
|
||||
const hasText = Boolean((msg.text ?? msg.caption ?? "").trim());
|
||||
if (msg.sticker && !media && !hasText) {
|
||||
logVerbose("telegram: skipping sticker-only message (unsupported sticker type)");
|
||||
return;
|
||||
}
|
||||
|
||||
const allMedia = media
|
||||
? [
|
||||
{
|
||||
path: media.path,
|
||||
contentType: media.contentType,
|
||||
stickerMetadata: media.stickerMetadata,
|
||||
},
|
||||
]
|
||||
: [];
|
||||
const conversationKey =
|
||||
resolvedThreadId != null ? `${chatId}:topic:${resolvedThreadId}` : String(chatId);
|
||||
const debounceKey = senderId
|
||||
? `telegram:${accountId ?? "default"}:${conversationKey}:${senderId}`
|
||||
: null;
|
||||
await inboundDebouncer.enqueue({
|
||||
await processInboundMessage({
|
||||
ctx,
|
||||
msg,
|
||||
allMedia,
|
||||
chatId,
|
||||
resolvedThreadId,
|
||||
storeAllowFrom,
|
||||
debounceKey,
|
||||
botUsername: ctx.me?.username,
|
||||
sendOversizeWarning: true,
|
||||
oversizeLogMessage: "media exceeds size limit",
|
||||
});
|
||||
} catch (err) {
|
||||
runtime.error?.(danger(`handler failed: ${String(err)}`));
|
||||
}
|
||||
});
|
||||
|
||||
// Handle channel posts — enables bot-to-bot communication via Telegram channels.
|
||||
// Telegram bots cannot see other bot messages in groups, but CAN in channels.
|
||||
// This handler normalizes channel_post updates into the standard message pipeline.
|
||||
bot.on("channel_post", async (ctx) => {
|
||||
try {
|
||||
const post = ctx.channelPost;
|
||||
if (!post) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Deduplication check — same as the regular message handler
|
||||
if (shouldSkipUpdate(ctx)) {
|
||||
return;
|
||||
}
|
||||
|
||||
const chatId = post.chat.id;
|
||||
|
||||
// Use the full group allow-from context for access control (same as message handler)
|
||||
const groupAllowContext = await resolveTelegramGroupAllowFromContext({
|
||||
chatId,
|
||||
accountId,
|
||||
isForum: false,
|
||||
messageThreadId: undefined,
|
||||
groupAllowFrom,
|
||||
resolveTelegramGroupConfig,
|
||||
});
|
||||
const { storeAllowFrom, groupConfig, effectiveGroupAllow, hasGroupAllowOverride } =
|
||||
groupAllowContext;
|
||||
|
||||
// Check group allowlist (channels use the same groups config)
|
||||
const groupAllowlist = resolveGroupPolicy(chatId);
|
||||
if (groupAllowlist.allowlistEnabled && !groupAllowlist.allowed) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!groupConfig || groupConfig.enabled === false) {
|
||||
logVerbose(`Blocked telegram channel ${chatId} (channel disabled)`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Group policy filtering (same as message handler)
|
||||
const defaultGroupPolicy = cfg.channels?.defaults?.groupPolicy;
|
||||
const groupPolicy = firstDefined(
|
||||
groupConfig?.groupPolicy,
|
||||
telegramCfg.groupPolicy,
|
||||
defaultGroupPolicy,
|
||||
"open",
|
||||
);
|
||||
if (groupPolicy === "disabled") {
|
||||
logVerbose(`Blocked telegram channel message (groupPolicy: disabled)`);
|
||||
return;
|
||||
}
|
||||
|
||||
if (hasGroupAllowOverride) {
|
||||
const senderId = post.sender_chat?.id ?? post.from?.id;
|
||||
const senderUsername = post.sender_chat?.username ?? post.from?.username ?? "";
|
||||
const allowed =
|
||||
senderId != null &&
|
||||
isSenderAllowed({
|
||||
allow: effectiveGroupAllow,
|
||||
senderId: String(senderId),
|
||||
senderUsername,
|
||||
});
|
||||
if (!allowed) {
|
||||
logVerbose(
|
||||
`Blocked telegram channel sender ${senderId ?? "unknown"} (group allowFrom override)`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (groupPolicy === "allowlist") {
|
||||
const senderId = post.sender_chat?.id ?? post.from?.id;
|
||||
if (senderId == null) {
|
||||
logVerbose(`Blocked telegram channel message (no sender ID, groupPolicy: allowlist)`);
|
||||
return;
|
||||
}
|
||||
if (!effectiveGroupAllow.hasEntries) {
|
||||
logVerbose(
|
||||
"Blocked telegram channel message (groupPolicy: allowlist, no allowlist entries)",
|
||||
);
|
||||
return;
|
||||
}
|
||||
const senderUsername = post.sender_chat?.username ?? post.from?.username ?? "";
|
||||
if (
|
||||
!isSenderAllowed({
|
||||
allow: effectiveGroupAllow,
|
||||
senderId: String(senderId),
|
||||
senderUsername,
|
||||
})
|
||||
) {
|
||||
logVerbose(`Blocked telegram channel message from ${senderId} (groupPolicy: allowlist)`);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Build a synthetic `from` field since channel posts may not have one.
|
||||
// Use sender_chat (the bot/user that posted) if available.
|
||||
const syntheticFrom = post.sender_chat
|
||||
? {
|
||||
id: post.sender_chat.id,
|
||||
is_bot: true as const,
|
||||
first_name: post.sender_chat.title || "Channel",
|
||||
username: post.sender_chat.username,
|
||||
}
|
||||
: {
|
||||
id: chatId,
|
||||
is_bot: true as const,
|
||||
first_name: post.chat.title || "Channel",
|
||||
username: post.chat.username,
|
||||
};
|
||||
|
||||
const syntheticMsg: Message = {
|
||||
...post,
|
||||
from: post.from ?? syntheticFrom,
|
||||
chat: {
|
||||
...post.chat,
|
||||
type: "supergroup" as const,
|
||||
},
|
||||
} as Message;
|
||||
|
||||
const syntheticCtx = Object.create(ctx, {
|
||||
message: { value: syntheticMsg, writable: true, enumerable: true },
|
||||
});
|
||||
|
||||
await processInboundMessage({
|
||||
ctx: syntheticCtx as TelegramContext,
|
||||
msg: syntheticMsg,
|
||||
chatId,
|
||||
resolvedThreadId: undefined,
|
||||
storeAllowFrom,
|
||||
sendOversizeWarning: false,
|
||||
oversizeLogMessage: "channel post media exceeds size limit",
|
||||
});
|
||||
} catch (err) {
|
||||
runtime.error?.(danger(`channel_post handler failed: ${String(err)}`));
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user