diff --git a/src/telegram/bot-handlers.ts b/src/telegram/bot-handlers.ts index f8873aa9da2..bd1a7bfa4bd 100644 --- a/src/telegram/bot-handlers.ts +++ b/src/telegram/bot-handlers.ts @@ -1,4 +1,5 @@ import type { Message } from "@grammyjs/types"; +import type { TelegramGroupConfig, TelegramTopicConfig } from "../config/types.js"; import type { TelegramMediaRef } from "./bot-message-context.js"; import type { TelegramContext } from "./bot/types.js"; import { resolveDefaultAgentId } from "../agents/agent-scope.js"; @@ -21,7 +22,11 @@ import { readChannelAllowFromStore } from "../pairing/pairing-store.js"; import { resolveAgentRoute } from "../routing/resolve-route.js"; import { resolveThreadSessionKeys } from "../routing/session-key.js"; import { withTelegramApiErrorLogging } from "./api-logging.js"; -import { firstDefined, isSenderAllowed, normalizeAllowFromWithStore } from "./bot-access.js"; +import { + isSenderAllowed, + normalizeAllowFromWithStore, + type NormalizedAllowFrom, +} from "./bot-access.js"; import { RegisterTelegramHandlerParams } from "./bot-native-commands.js"; import { MEDIA_GROUP_TIMEOUT_MS, type MediaGroupEntry } from "./bot-updates.js"; import { resolveMedia } from "./bot/delivery.js"; @@ -31,6 +36,10 @@ import { resolveTelegramForumThreadId, resolveTelegramGroupAllowFromContext, } from "./bot/helpers.js"; +import { + evaluateTelegramGroupBaseAccess, + evaluateTelegramGroupPolicyAccess, +} from "./group-access.js"; import { migrateTelegramGroupConfig } from "./group-migration.js"; import { resolveTelegramInlineButtonsScope } from "./inline-buttons.js"; import { @@ -94,6 +103,30 @@ export const registerTelegramHandlers = ({ debounceKey: string | null; botUsername?: string; }; + const buildSyntheticTextMessage = (params: { + base: Message; + text: string; + date?: number; + from?: Message["from"]; + }): Message => ({ + ...params.base, + ...(params.from ? { from: params.from } : {}), + text: params.text, + caption: undefined, + caption_entities: undefined, + entities: undefined, + ...(params.date != null ? { date: params.date } : {}), + }); + const buildSyntheticContext = ( + ctx: Pick & { getFile?: unknown }, + message: Message, + ): TelegramContext => { + const getFile = + typeof ctx.getFile === "function" + ? (ctx.getFile as TelegramContext["getFile"]).bind(ctx as object) + : async () => ({}); + return { message, me: ctx.me, getFile }; + }; const inboundDebouncer = createInboundDebouncer({ debounceMs, buildKey: (entry) => entry.debounceKey, @@ -125,19 +158,14 @@ export const registerTelegramHandlers = ({ } const first = entries[0]; const baseCtx = first.ctx; - const getFile = - typeof baseCtx.getFile === "function" ? baseCtx.getFile.bind(baseCtx) : async () => ({}); - const syntheticMessage: Message = { - ...first.msg, + const syntheticMessage = buildSyntheticTextMessage({ + base: first.msg, text: combinedText, - caption: undefined, - caption_entities: undefined, - entities: undefined, date: last.msg.date ?? first.msg.date, - }; + }); const messageIdOverride = last.msg.message_id ? String(last.msg.message_id) : undefined; await processMessage( - { message: syntheticMessage, me: baseCtx.me, getFile }, + buildSyntheticContext(baseCtx, syntheticMessage), [], first.storeAllowFrom, messageIdOverride ? { messageIdOverride } : undefined, @@ -227,11 +255,7 @@ export const registerTelegramHandlers = ({ } } - const storeAllowFrom = await readChannelAllowFromStore( - "telegram", - process.env, - accountId, - ).catch(() => []); + const storeAllowFrom = await loadStoreAllowFrom(); await processMessage(primaryEntry.ctx, allMedia, storeAllowFrom); } catch (err) { runtime.error?.(danger(`media group handler failed: ${String(err)}`)); @@ -253,48 +277,187 @@ export const registerTelegramHandlers = ({ return; } - const syntheticMessage: Message = { - ...first.msg, + const syntheticMessage = buildSyntheticTextMessage({ + base: first.msg, text: combinedText, - caption: undefined, - caption_entities: undefined, - entities: undefined, date: last.msg.date ?? first.msg.date, - }; + }); - const storeAllowFrom = await readChannelAllowFromStore( - "telegram", - process.env, - accountId, - ).catch(() => []); + const storeAllowFrom = await loadStoreAllowFrom(); const baseCtx = first.ctx; - const getFile = - typeof baseCtx.getFile === "function" ? baseCtx.getFile.bind(baseCtx) : async () => ({}); - await processMessage( - { message: syntheticMessage, me: baseCtx.me, getFile }, - [], - storeAllowFrom, - { messageIdOverride: String(last.msg.message_id) }, - ); + await processMessage(buildSyntheticContext(baseCtx, syntheticMessage), [], storeAllowFrom, { + messageIdOverride: String(last.msg.message_id), + }); } catch (err) { runtime.error?.(danger(`text fragment handler failed: ${String(err)}`)); } }; + const queueTextFragmentFlush = async (entry: TextFragmentEntry) => { + textFragmentProcessing = textFragmentProcessing + .then(async () => { + await flushTextFragments(entry); + }) + .catch(() => undefined); + await textFragmentProcessing; + }; + + const runTextFragmentFlush = async (entry: TextFragmentEntry) => { + textFragmentBuffer.delete(entry.key); + await queueTextFragmentFlush(entry); + }; + const scheduleTextFragmentFlush = (entry: TextFragmentEntry) => { clearTimeout(entry.timer); entry.timer = setTimeout(async () => { - textFragmentBuffer.delete(entry.key); - textFragmentProcessing = textFragmentProcessing - .then(async () => { - await flushTextFragments(entry); - }) - .catch(() => undefined); - await textFragmentProcessing; + await runTextFragmentFlush(entry); }, TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS); }; + const enqueueMediaGroupFlush = async (mediaGroupId: string, entry: MediaGroupEntry) => { + mediaGroupBuffer.delete(mediaGroupId); + mediaGroupProcessing = mediaGroupProcessing + .then(async () => { + await processMediaGroup(entry); + }) + .catch(() => undefined); + await mediaGroupProcessing; + }; + + const scheduleMediaGroupFlush = (mediaGroupId: string, entry: MediaGroupEntry) => { + clearTimeout(entry.timer); + entry.timer = setTimeout(async () => { + await enqueueMediaGroupFlush(mediaGroupId, entry); + }, mediaGroupTimeoutMs); + }; + + const getOrCreateMediaGroupEntry = (mediaGroupId: string) => { + const existing = mediaGroupBuffer.get(mediaGroupId); + if (existing) { + return existing; + } + const entry: MediaGroupEntry = { + messages: [], + timer: setTimeout(() => undefined, mediaGroupTimeoutMs), + }; + mediaGroupBuffer.set(mediaGroupId, entry); + return entry; + }; + + const loadStoreAllowFrom = async () => + readChannelAllowFromStore("telegram", process.env, accountId).catch(() => []); + + const isAllowlistAuthorized = ( + allow: NormalizedAllowFrom, + senderId: string, + senderUsername: string, + ) => + allow.hasWildcard || + (allow.hasEntries && + isSenderAllowed({ + allow, + senderId, + senderUsername, + })); + + const shouldSkipGroupMessage = (params: { + isGroup: boolean; + chatId: string | number; + chatTitle?: string; + resolvedThreadId?: number; + senderId: string; + senderUsername: string; + effectiveGroupAllow: NormalizedAllowFrom; + hasGroupAllowOverride: boolean; + groupConfig?: TelegramGroupConfig; + topicConfig?: TelegramTopicConfig; + }) => { + const { + isGroup, + chatId, + chatTitle, + resolvedThreadId, + senderId, + senderUsername, + effectiveGroupAllow, + hasGroupAllowOverride, + groupConfig, + topicConfig, + } = params; + const baseAccess = evaluateTelegramGroupBaseAccess({ + isGroup, + groupConfig, + topicConfig, + hasGroupAllowOverride, + effectiveGroupAllow, + senderId, + senderUsername, + enforceAllowOverride: true, + requireSenderForAllowOverride: true, + }); + if (!baseAccess.allowed) { + if (baseAccess.reason === "group-disabled") { + logVerbose(`Blocked telegram group ${chatId} (group disabled)`); + return true; + } + if (baseAccess.reason === "topic-disabled") { + logVerbose( + `Blocked telegram topic ${chatId} (${resolvedThreadId ?? "unknown"}) (topic disabled)`, + ); + return true; + } + logVerbose( + `Blocked telegram group sender ${senderId || "unknown"} (group allowFrom override)`, + ); + return true; + } + if (!isGroup) { + return false; + } + const policyAccess = evaluateTelegramGroupPolicyAccess({ + isGroup, + chatId, + cfg, + telegramCfg, + topicConfig, + groupConfig, + effectiveGroupAllow, + senderId, + senderUsername, + resolveGroupPolicy, + enforcePolicy: true, + useTopicAndGroupOverrides: true, + enforceAllowlistAuthorization: true, + allowEmptyAllowlistEntries: false, + requireSenderForAllowlistAuthorization: true, + checkChatAllowlist: true, + }); + if (!policyAccess.allowed) { + if (policyAccess.reason === "group-policy-disabled") { + logVerbose("Blocked telegram group message (groupPolicy: disabled)"); + return true; + } + if (policyAccess.reason === "group-policy-allowlist-no-sender") { + logVerbose("Blocked telegram group message (no sender ID, groupPolicy: allowlist)"); + return true; + } + if (policyAccess.reason === "group-policy-allowlist-empty") { + logVerbose( + "Blocked telegram group message (groupPolicy: allowlist, no group allowlist entries)", + ); + return true; + } + if (policyAccess.reason === "group-policy-allowlist-unauthorized") { + logVerbose(`Blocked telegram group message from ${senderId} (groupPolicy: allowlist)`); + return true; + } + logger.info({ chatId, title: chatTitle, reason: "not-allowed" }, "skipping group message"); + return true; + } + return false; + }; + bot.on("callback_query", async (ctx) => { const callback = ctx.callbackQuery; if (!callback) { @@ -303,11 +466,15 @@ export const registerTelegramHandlers = ({ if (shouldSkipUpdate(ctx)) { return; } + const answerCallbackQuery = + typeof (ctx as { answerCallbackQuery?: unknown }).answerCallbackQuery === "function" + ? () => ctx.answerCallbackQuery() + : () => bot.api.answerCallbackQuery(callback.id); // Answer immediately to prevent Telegram from retrying while we process await withTelegramApiErrorLogging({ operation: "answerCallbackQuery", runtime, - fn: () => bot.api.answerCallbackQuery(callback.id), + fn: answerCallbackQuery, }).catch(() => {}); try { const data = (callback.data ?? "").trim(); @@ -315,6 +482,38 @@ export const registerTelegramHandlers = ({ if (!data || !callbackMessage) { return; } + const editCallbackMessage = async ( + text: string, + params?: Parameters[3], + ) => { + const editTextFn = (ctx as { editMessageText?: unknown }).editMessageText; + if (typeof editTextFn === "function") { + return await ctx.editMessageText(text, params); + } + return await bot.api.editMessageText( + callbackMessage.chat.id, + callbackMessage.message_id, + text, + params, + ); + }; + const deleteCallbackMessage = async () => { + const deleteFn = (ctx as { deleteMessage?: unknown }).deleteMessage; + if (typeof deleteFn === "function") { + return await ctx.deleteMessage(); + } + return await bot.api.deleteMessage(callbackMessage.chat.id, callbackMessage.message_id); + }; + const replyToCallbackChat = async ( + text: string, + params?: Parameters[2], + ) => { + const replyFn = (ctx as { reply?: unknown }).reply; + if (typeof replyFn === "function") { + return await ctx.reply(text, params); + } + return await bot.api.sendMessage(callbackMessage.chat.id, text, params); + }; const inlineButtonsScope = resolveTelegramInlineButtonsScope({ cfg, @@ -344,8 +543,14 @@ export const registerTelegramHandlers = ({ groupAllowFrom, resolveTelegramGroupConfig, }); - const { resolvedThreadId, storeAllowFrom, groupConfig, topicConfig, effectiveGroupAllow } = - groupAllowContext; + const { + resolvedThreadId, + storeAllowFrom, + groupConfig, + topicConfig, + effectiveGroupAllow, + hasGroupAllowOverride, + } = groupAllowContext; const effectiveDmAllow = normalizeAllowFromWithStore({ allowFrom: telegramCfg.allowFrom, storeAllowFrom, @@ -353,75 +558,21 @@ export const registerTelegramHandlers = ({ const dmPolicy = telegramCfg.dmPolicy ?? "pairing"; const senderId = callback.from?.id ? String(callback.from.id) : ""; const senderUsername = callback.from?.username ?? ""; - - if (isGroup) { - if (groupConfig?.enabled === false) { - logVerbose(`Blocked telegram group ${chatId} (group disabled)`); - return; - } - if (topicConfig?.enabled === false) { - logVerbose( - `Blocked telegram topic ${chatId} (${resolvedThreadId ?? "unknown"}) (topic disabled)`, - ); - return; - } - if (groupAllowContext.hasGroupAllowOverride) { - const allowed = - senderId && - isSenderAllowed({ - allow: effectiveGroupAllow, - senderId, - senderUsername, - }); - if (!allowed) { - logVerbose( - `Blocked telegram group sender ${senderId || "unknown"} (group allowFrom override)`, - ); - return; - } - } - const defaultGroupPolicy = cfg.channels?.defaults?.groupPolicy; - const groupPolicy = firstDefined( - topicConfig?.groupPolicy, - groupConfig?.groupPolicy, - telegramCfg.groupPolicy, - defaultGroupPolicy, - "open", - ); - if (groupPolicy === "disabled") { - logVerbose(`Blocked telegram group message (groupPolicy: disabled)`); - return; - } - if (groupPolicy === "allowlist") { - if (!senderId) { - logVerbose(`Blocked telegram group message (no sender ID, groupPolicy: allowlist)`); - return; - } - if (!effectiveGroupAllow.hasEntries) { - logVerbose( - "Blocked telegram group message (groupPolicy: allowlist, no group allowlist entries)", - ); - return; - } - if ( - !isSenderAllowed({ - allow: effectiveGroupAllow, - senderId, - senderUsername, - }) - ) { - logVerbose(`Blocked telegram group message from ${senderId} (groupPolicy: allowlist)`); - return; - } - } - const groupAllowlist = resolveGroupPolicy(chatId); - if (groupAllowlist.allowlistEnabled && !groupAllowlist.allowed) { - logger.info( - { chatId, title: callbackMessage.chat.title, reason: "not-allowed" }, - "skipping group message", - ); - return; - } + if ( + shouldSkipGroupMessage({ + isGroup, + chatId, + chatTitle: callbackMessage.chat.title, + resolvedThreadId, + senderId, + senderUsername, + effectiveGroupAllow, + hasGroupAllowOverride, + groupConfig, + topicConfig, + }) + ) { + return; } if (inlineButtonsScope === "allowlist") { @@ -430,27 +581,13 @@ export const registerTelegramHandlers = ({ return; } if (dmPolicy !== "open") { - const allowed = - effectiveDmAllow.hasWildcard || - (effectiveDmAllow.hasEntries && - isSenderAllowed({ - allow: effectiveDmAllow, - senderId, - senderUsername, - })); + const allowed = isAllowlistAuthorized(effectiveDmAllow, senderId, senderUsername); if (!allowed) { return; } } } else { - const allowed = - effectiveGroupAllow.hasWildcard || - (effectiveGroupAllow.hasEntries && - isSenderAllowed({ - allow: effectiveGroupAllow, - senderId, - senderUsername, - })); + const allowed = isAllowlistAuthorized(effectiveGroupAllow, senderId, senderUsername); if (!allowed) { return; } @@ -487,12 +624,7 @@ export const registerTelegramHandlers = ({ : undefined; try { - await bot.api.editMessageText( - callbackMessage.chat.id, - callbackMessage.message_id, - result.text, - keyboard ? { reply_markup: keyboard } : undefined, - ); + await editCallbackMessage(result.text, keyboard ? { reply_markup: keyboard } : undefined); } catch (editErr) { const errStr = String(editErr); if (!errStr.includes("message is not modified")) { @@ -514,23 +646,14 @@ export const registerTelegramHandlers = ({ ) => { const keyboard = buildInlineKeyboard(buttons); try { - await bot.api.editMessageText( - callbackMessage.chat.id, - callbackMessage.message_id, - text, - keyboard ? { reply_markup: keyboard } : undefined, - ); + await editCallbackMessage(text, keyboard ? { reply_markup: keyboard } : undefined); } catch (editErr) { const errStr = String(editErr); if (errStr.includes("no text in the message")) { try { - await bot.api.deleteMessage(callbackMessage.chat.id, callbackMessage.message_id); + await deleteCallbackMessage(); } catch {} - await bot.api.sendMessage( - callbackMessage.chat.id, - text, - keyboard ? { reply_markup: keyboard } : undefined, - ); + await replyToCallbackChat(text, keyboard ? { reply_markup: keyboard } : undefined); } else if (!errStr.includes("message is not modified")) { throw editErr; } @@ -597,41 +720,27 @@ export const registerTelegramHandlers = ({ if (modelCallback.type === "select") { const { provider, model } = modelCallback; // Process model selection as a synthetic message with /model command - const syntheticMessage: Message = { - ...callbackMessage, + const syntheticMessage = buildSyntheticTextMessage({ + base: callbackMessage, from: callback.from, text: `/model ${provider}/${model}`, - caption: undefined, - caption_entities: undefined, - entities: undefined, - }; - const getFile = - typeof ctx.getFile === "function" ? ctx.getFile.bind(ctx) : async () => ({}); - await processMessage( - { message: syntheticMessage, me: ctx.me, getFile }, - [], - storeAllowFrom, - { - forceWasMentioned: true, - messageIdOverride: callback.id, - }, - ); + }); + await processMessage(buildSyntheticContext(ctx, syntheticMessage), [], storeAllowFrom, { + forceWasMentioned: true, + messageIdOverride: callback.id, + }); return; } return; } - const syntheticMessage: Message = { - ...callbackMessage, + const syntheticMessage = buildSyntheticTextMessage({ + base: callbackMessage, from: callback.from, text: data, - caption: undefined, - caption_entities: undefined, - entities: undefined, - }; - const getFile = typeof ctx.getFile === "function" ? ctx.getFile.bind(ctx) : async () => ({}); - await processMessage({ message: syntheticMessage, me: ctx.me, getFile }, [], storeAllowFrom, { + }); + await processMessage(buildSyntheticContext(ctx, syntheticMessage), [], storeAllowFrom, { forceWasMentioned: true, messageIdOverride: callback.id, }); @@ -723,85 +832,23 @@ export const registerTelegramHandlers = ({ hasGroupAllowOverride, } = groupAllowContext; - if (isGroup) { - if (groupConfig?.enabled === false) { - logVerbose(`Blocked telegram group ${chatId} (group disabled)`); - return; - } - if (topicConfig?.enabled === false) { - logVerbose( - `Blocked telegram topic ${chatId} (${resolvedThreadId ?? "unknown"}) (topic disabled)`, - ); - return; - } - if (hasGroupAllowOverride) { - const senderId = msg.from?.id; - const senderUsername = msg.from?.username ?? ""; - const allowed = - senderId != null && - isSenderAllowed({ - allow: effectiveGroupAllow, - senderId: String(senderId), - senderUsername, - }); - if (!allowed) { - logVerbose( - `Blocked telegram group sender ${senderId ?? "unknown"} (group allowFrom override)`, - ); - return; - } - } - // Group policy filtering: controls how group messages are handled - // - "open": groups bypass allowFrom, only mention-gating applies - // - "disabled": block all group messages entirely - // - "allowlist": only allow group messages from senders in groupAllowFrom/allowFrom - const defaultGroupPolicy = cfg.channels?.defaults?.groupPolicy; - const groupPolicy = firstDefined( - topicConfig?.groupPolicy, - groupConfig?.groupPolicy, - telegramCfg.groupPolicy, - defaultGroupPolicy, - "open", - ); - if (groupPolicy === "disabled") { - logVerbose(`Blocked telegram group message (groupPolicy: disabled)`); - return; - } - if (groupPolicy === "allowlist") { - // For allowlist mode, the sender (msg.from.id) must be in allowFrom - const senderId = msg.from?.id; - if (senderId == null) { - logVerbose(`Blocked telegram group message (no sender ID, groupPolicy: allowlist)`); - return; - } - if (!effectiveGroupAllow.hasEntries) { - logVerbose( - "Blocked telegram group message (groupPolicy: allowlist, no group allowlist entries)", - ); - return; - } - const senderUsername = msg.from?.username ?? ""; - if ( - !isSenderAllowed({ - allow: effectiveGroupAllow, - senderId: String(senderId), - senderUsername, - }) - ) { - logVerbose(`Blocked telegram group message from ${senderId} (groupPolicy: allowlist)`); - return; - } - } - - // Group allowlist based on configured group IDs. - const groupAllowlist = resolveGroupPolicy(chatId); - if (groupAllowlist.allowlistEnabled && !groupAllowlist.allowed) { - logger.info( - { chatId, title: msg.chat.title, reason: "not-allowed" }, - "skipping group message", - ); - return; - } + const senderId = msg.from?.id != null ? String(msg.from.id) : ""; + const senderUsername = msg.from?.username ?? ""; + if ( + shouldSkipGroupMessage({ + isGroup, + chatId, + chatTitle: msg.chat.title, + resolvedThreadId, + senderId, + senderUsername, + effectiveGroupAllow, + hasGroupAllowOverride, + groupConfig, + topicConfig, + }) + ) { + return; } // Text fragment handling - Telegram splits long pastes into multiple inbound messages (~4096 chars). @@ -844,13 +891,7 @@ export const registerTelegramHandlers = ({ // Not appendable (or limits exceeded): flush buffered entry first, then continue normally. clearTimeout(existing.timer); - textFragmentBuffer.delete(key); - textFragmentProcessing = textFragmentProcessing - .then(async () => { - await flushTextFragments(existing); - }) - .catch(() => undefined); - await textFragmentProcessing; + await runTextFragmentFlush(existing); } const shouldStart = text.length >= TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS; @@ -869,34 +910,9 @@ export const registerTelegramHandlers = ({ // Media group handling - buffer multi-image messages const mediaGroupId = msg.media_group_id; if (mediaGroupId) { - const existing = mediaGroupBuffer.get(mediaGroupId); - if (existing) { - clearTimeout(existing.timer); - existing.messages.push({ msg, ctx }); - existing.timer = setTimeout(async () => { - mediaGroupBuffer.delete(mediaGroupId); - mediaGroupProcessing = mediaGroupProcessing - .then(async () => { - await processMediaGroup(existing); - }) - .catch(() => undefined); - await mediaGroupProcessing; - }, mediaGroupTimeoutMs); - } else { - const entry: MediaGroupEntry = { - messages: [{ msg, ctx }], - timer: setTimeout(async () => { - mediaGroupBuffer.delete(mediaGroupId); - mediaGroupProcessing = mediaGroupProcessing - .then(async () => { - await processMediaGroup(entry); - }) - .catch(() => undefined); - await mediaGroupProcessing; - }, mediaGroupTimeoutMs), - }; - mediaGroupBuffer.set(mediaGroupId, entry); - } + const entry = getOrCreateMediaGroupEntry(mediaGroupId); + entry.messages.push({ msg, ctx }); + scheduleMediaGroupFlush(mediaGroupId, entry); return; } @@ -938,7 +954,6 @@ export const registerTelegramHandlers = ({ }, ] : []; - const senderId = msg.from?.id ? String(msg.from.id) : ""; const conversationKey = resolvedThreadId != null ? `${chatId}:topic:${resolvedThreadId}` : String(chatId); const debounceKey = senderId diff --git a/src/telegram/bot-message-context.ts b/src/telegram/bot-message-context.ts index a8ae50db178..7961fb469ed 100644 --- a/src/telegram/bot-message-context.ts +++ b/src/telegram/bot-message-context.ts @@ -49,6 +49,7 @@ import { buildTelegramGroupPeerId, buildTelegramParentPeer, buildTypingThreadParams, + resolveTelegramMediaPlaceholder, expandTextLinks, normalizeForwardedContext, describeReplyTarget, @@ -56,6 +57,7 @@ import { hasBotMention, resolveTelegramThreadSpec, } from "./bot/helpers.js"; +import { evaluateTelegramGroupBaseAccess } from "./group-access.js"; export type TelegramMediaRef = { path: string; @@ -192,15 +194,31 @@ export const buildTelegramMessageContext = async ({ storeAllowFrom, }); const hasGroupAllowOverride = typeof groupAllowOverride !== "undefined"; - - if (isGroup && groupConfig?.enabled === false) { - logVerbose(`Blocked telegram group ${chatId} (group disabled)`); - return null; - } - if (isGroup && topicConfig?.enabled === false) { - logVerbose( - `Blocked telegram topic ${chatId} (${resolvedThreadId ?? "unknown"}) (topic disabled)`, - ); + const senderId = msg.from?.id ? String(msg.from.id) : ""; + const senderUsername = msg.from?.username ?? ""; + const baseAccess = evaluateTelegramGroupBaseAccess({ + isGroup, + groupConfig, + topicConfig, + hasGroupAllowOverride, + effectiveGroupAllow, + senderId, + senderUsername, + enforceAllowOverride: true, + requireSenderForAllowOverride: false, + }); + if (!baseAccess.allowed) { + if (baseAccess.reason === "group-disabled") { + logVerbose(`Blocked telegram group ${chatId} (group disabled)`); + return null; + } + if (baseAccess.reason === "topic-disabled") { + logVerbose( + `Blocked telegram topic ${chatId} (${resolvedThreadId ?? "unknown"}) (topic disabled)`, + ); + return null; + } + logVerbose(`Blocked telegram group sender ${senderId || "unknown"} (group allowFrom override)`); return null; } @@ -320,21 +338,6 @@ export const buildTelegramMessageContext = async ({ } const botUsername = primaryCtx.me?.username?.toLowerCase(); - const senderId = msg.from?.id ? String(msg.from.id) : ""; - const senderUsername = msg.from?.username ?? ""; - if (isGroup && hasGroupAllowOverride) { - const allowed = isSenderAllowed({ - allow: effectiveGroupAllow, - senderId, - senderUsername, - }); - if (!allowed) { - logVerbose( - `Blocked telegram group sender ${senderId || "unknown"} (group allowFrom override)`, - ); - return null; - } - } const allowForCommands = isGroup ? effectiveGroupAllow : effectiveDmAllow; const senderAllowedForCommands = isSenderAllowed({ allow: allowForCommands, @@ -354,20 +357,7 @@ export const buildTelegramMessageContext = async ({ const commandAuthorized = commandGate.commandAuthorized; const historyKey = isGroup ? buildTelegramGroupPeerId(chatId, resolvedThreadId) : undefined; - let placeholder = ""; - if (msg.photo) { - placeholder = ""; - } else if (msg.video) { - placeholder = ""; - } else if (msg.video_note) { - placeholder = ""; - } else if (msg.audio || msg.voice) { - placeholder = ""; - } else if (msg.document) { - placeholder = ""; - } else if (msg.sticker) { - placeholder = ""; - } + let placeholder = resolveTelegramMediaPlaceholder(msg) ?? ""; // Check if sticker has a cached description - if so, use it instead of sending the image const cachedStickerDescription = allMedia[0]?.stickerMetadata?.cachedDescription; diff --git a/src/telegram/bot-message-dispatch.ts b/src/telegram/bot-message-dispatch.ts index eac21eb82fb..361a897185c 100644 --- a/src/telegram/bot-message-dispatch.ts +++ b/src/telegram/bot-message-dispatch.ts @@ -249,6 +249,25 @@ export const dispatchTelegramMessage = async ({ skippedNonSilent: 0, }; let finalizedViaPreviewMessage = false; + const clearGroupHistory = () => { + if (isGroup && historyKey) { + clearHistoryEntriesIfEnabled({ historyMap: groupHistories, historyKey, limit: historyLimit }); + } + }; + const deliveryBaseOptions = { + chatId: String(chatId), + token: opts.token, + runtime, + bot, + mediaLocalRoots, + replyToMode, + textLimit, + thread: threadSpec, + tableMode, + chunkMode, + linkPreview: telegramCfg.linkPreview, + replyQuoteText, + }; let queuedFinal = false; try { @@ -300,20 +319,9 @@ export const dispatchTelegramMessage = async ({ } } const result = await deliverReplies({ + ...deliveryBaseOptions, replies: [payload], - chatId: String(chatId), - token: opts.token, - runtime, - bot, - mediaLocalRoots, - replyToMode, - textLimit, - thread: threadSpec, - tableMode, - chunkMode, onVoiceRecording: sendRecordVoice, - linkPreview: telegramCfg.linkPreview, - replyQuoteText, }); if (result.delivered) { deliveryState.delivered = true; @@ -356,27 +364,14 @@ export const dispatchTelegramMessage = async ({ if (!deliveryState.delivered && deliveryState.skippedNonSilent > 0) { const result = await deliverReplies({ replies: [{ text: EMPTY_RESPONSE_FALLBACK }], - chatId: String(chatId), - token: opts.token, - runtime, - bot, - mediaLocalRoots, - replyToMode, - textLimit, - thread: threadSpec, - tableMode, - chunkMode, - linkPreview: telegramCfg.linkPreview, - replyQuoteText, + ...deliveryBaseOptions, }); sentFallback = result.delivered; } const hasFinalResponse = queuedFinal || sentFallback; if (!hasFinalResponse) { - if (isGroup && historyKey) { - clearHistoryEntriesIfEnabled({ historyMap: groupHistories, historyKey, limit: historyLimit }); - } + clearGroupHistory(); return; } removeAckReactionAfterReply({ @@ -396,7 +391,5 @@ export const dispatchTelegramMessage = async ({ }); }, }); - if (isGroup && historyKey) { - clearHistoryEntriesIfEnabled({ historyMap: groupHistories, historyKey, limit: historyLimit }); - } + clearGroupHistory(); }; diff --git a/src/telegram/bot-native-commands.ts b/src/telegram/bot-native-commands.ts index 468726e2bb8..f22d44c3b77 100644 --- a/src/telegram/bot-native-commands.ts +++ b/src/telegram/bot-native-commands.ts @@ -55,6 +55,10 @@ import { resolveTelegramGroupAllowFromContext, resolveTelegramThreadSpec, } from "./bot/helpers.js"; +import { + evaluateTelegramGroupBaseAccess, + evaluateTelegramGroupPolicyAccess, +} from "./group-access.js"; import { buildInlineKeyboard } from "./send.js"; const EMPTY_RESPONSE_FALLBACK = "No response generated. Please try again."; @@ -172,68 +176,71 @@ async function resolveTelegramCommandAuth(params: { effectiveGroupAllow, hasGroupAllowOverride, } = groupAllowContext; - const senderIdRaw = msg.from?.id; - const senderId = senderIdRaw ? String(senderIdRaw) : ""; + const senderId = msg.from?.id ? String(msg.from.id) : ""; const senderUsername = msg.from?.username ?? ""; - const isGroupSenderAllowed = () => - senderIdRaw != null && - isSenderAllowed({ - allow: effectiveGroupAllow, - senderId: String(senderIdRaw), - senderUsername, - }); - - const rejectNotAuthorized = async () => { + const sendAuthMessage = async (text: string) => { await withTelegramApiErrorLogging({ operation: "sendMessage", - fn: () => bot.api.sendMessage(chatId, "You are not authorized to use this command."), + fn: () => bot.api.sendMessage(chatId, text), }); return null; }; + const rejectNotAuthorized = async () => { + return await sendAuthMessage("You are not authorized to use this command."); + }; - if (isGroup && groupConfig?.enabled === false) { - await withTelegramApiErrorLogging({ - operation: "sendMessage", - fn: () => bot.api.sendMessage(chatId, "This group is disabled."), - }); - return null; + const baseAccess = evaluateTelegramGroupBaseAccess({ + isGroup, + groupConfig, + topicConfig, + hasGroupAllowOverride, + effectiveGroupAllow, + senderId, + senderUsername, + enforceAllowOverride: requireAuth, + requireSenderForAllowOverride: true, + }); + if (!baseAccess.allowed) { + if (baseAccess.reason === "group-disabled") { + return await sendAuthMessage("This group is disabled."); + } + if (baseAccess.reason === "topic-disabled") { + return await sendAuthMessage("This topic is disabled."); + } + return await rejectNotAuthorized(); } - if (isGroup && topicConfig?.enabled === false) { - await withTelegramApiErrorLogging({ - operation: "sendMessage", - fn: () => bot.api.sendMessage(chatId, "This topic is disabled."), - }); - return null; - } - if (requireAuth && isGroup && hasGroupAllowOverride) { - if (!isGroupSenderAllowed()) { + + const policyAccess = evaluateTelegramGroupPolicyAccess({ + isGroup, + chatId, + cfg, + telegramCfg, + topicConfig, + groupConfig, + effectiveGroupAllow, + senderId, + senderUsername, + resolveGroupPolicy, + enforcePolicy: useAccessGroups, + useTopicAndGroupOverrides: false, + enforceAllowlistAuthorization: requireAuth, + allowEmptyAllowlistEntries: true, + requireSenderForAllowlistAuthorization: true, + checkChatAllowlist: useAccessGroups, + }); + if (!policyAccess.allowed) { + if (policyAccess.reason === "group-policy-disabled") { + return await sendAuthMessage("Telegram group commands are disabled."); + } + if ( + policyAccess.reason === "group-policy-allowlist-no-sender" || + policyAccess.reason === "group-policy-allowlist-unauthorized" + ) { return await rejectNotAuthorized(); } - } - - if (isGroup && useAccessGroups) { - const defaultGroupPolicy = cfg.channels?.defaults?.groupPolicy; - const groupPolicy = telegramCfg.groupPolicy ?? defaultGroupPolicy ?? "open"; - if (groupPolicy === "disabled") { - await withTelegramApiErrorLogging({ - operation: "sendMessage", - fn: () => bot.api.sendMessage(chatId, "Telegram group commands are disabled."), - }); - return null; - } - if (groupPolicy === "allowlist" && requireAuth) { - if (!isGroupSenderAllowed()) { - return await rejectNotAuthorized(); - } - } - const groupAllowlist = resolveGroupPolicy(chatId); - if (groupAllowlist.allowlistEnabled && !groupAllowlist.allowed) { - await withTelegramApiErrorLogging({ - operation: "sendMessage", - fn: () => bot.api.sendMessage(chatId, "This group is not allowed."), - }); - return null; + if (policyAccess.reason === "group-chat-not-allowed") { + return await sendAuthMessage("This group is not allowed."); } } @@ -252,11 +259,7 @@ async function resolveTelegramCommandAuth(params: { modeWhenAccessGroupsOff: "configured", }); if (requireAuth && !commandAuthorized) { - await withTelegramApiErrorLogging({ - operation: "sendMessage", - fn: () => bot.api.sendMessage(chatId, "You are not authorized to use this command."), - }); - return null; + return await rejectNotAuthorized(); } return { @@ -357,6 +360,60 @@ export const registerTelegramNativeCommands = ({ // Keep hidden commands callable by registering handlers for the full catalog. syncTelegramMenuCommands({ bot, runtime, commandsToRegister }); + const resolveCommandRuntimeContext = (params: { + msg: NonNullable; + isGroup: boolean; + isForum: boolean; + resolvedThreadId?: number; + }) => { + const { msg, isGroup, isForum, resolvedThreadId } = params; + const chatId = msg.chat.id; + const messageThreadId = (msg as { message_thread_id?: number }).message_thread_id; + const threadSpec = resolveTelegramThreadSpec({ + isGroup, + isForum, + messageThreadId, + }); + const parentPeer = buildTelegramParentPeer({ isGroup, resolvedThreadId, chatId }); + const route = resolveAgentRoute({ + cfg, + channel: "telegram", + accountId, + peer: { + kind: isGroup ? "group" : "direct", + id: isGroup ? buildTelegramGroupPeerId(chatId, resolvedThreadId) : String(chatId), + }, + parentPeer, + }); + const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId); + const tableMode = resolveMarkdownTableMode({ + cfg, + channel: "telegram", + accountId: route.accountId, + }); + const chunkMode = resolveChunkMode(cfg, "telegram", route.accountId); + return { chatId, threadSpec, route, mediaLocalRoots, tableMode, chunkMode }; + }; + const buildCommandDeliveryBaseOptions = (params: { + chatId: string | number; + mediaLocalRoots?: readonly string[]; + threadSpec: ReturnType; + tableMode: ReturnType; + chunkMode: ReturnType; + }) => ({ + chatId: String(params.chatId), + token: opts.token, + runtime, + bot, + mediaLocalRoots: params.mediaLocalRoots, + replyToMode, + textLimit, + thread: params.threadSpec, + tableMode: params.tableMode, + chunkMode: params.chunkMode, + linkPreview: telegramCfg.linkPreview, + }); + if (commandsToRegister.length > 0 || pluginCatalog.commands.length > 0) { if (typeof (bot as unknown as { command?: unknown }).command !== "function") { logVerbose("telegram: bot.command unavailable; skipping native handlers"); @@ -397,11 +454,19 @@ export const registerTelegramNativeCommands = ({ topicConfig, commandAuthorized, } = auth; - const messageThreadId = (msg as { message_thread_id?: number }).message_thread_id; - const threadSpec = resolveTelegramThreadSpec({ - isGroup, - isForum, - messageThreadId, + const { threadSpec, route, mediaLocalRoots, tableMode, chunkMode } = + resolveCommandRuntimeContext({ + msg, + isGroup, + isForum, + resolvedThreadId, + }); + const deliveryBaseOptions = buildCommandDeliveryBaseOptions({ + chatId, + mediaLocalRoots, + threadSpec, + tableMode, + chunkMode, }); const threadParams = buildTelegramThreadParams(threadSpec) ?? {}; @@ -455,18 +520,6 @@ export const registerTelegramNativeCommands = ({ }); return; } - const parentPeer = buildTelegramParentPeer({ isGroup, resolvedThreadId, chatId }); - const route = resolveAgentRoute({ - cfg, - channel: "telegram", - accountId, - peer: { - kind: isGroup ? "group" : "direct", - id: isGroup ? buildTelegramGroupPeerId(chatId, resolvedThreadId) : String(chatId), - }, - parentPeer, - }); - const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId); const baseSessionKey = route.sessionKey; // DMs: use raw messageThreadId for thread sessions (not resolvedThreadId which is for forums) const dmThreadId = threadSpec.scope === "dm" ? threadSpec.id : undefined; @@ -478,11 +531,6 @@ export const registerTelegramNativeCommands = ({ }) : null; const sessionKey = threadKeys?.sessionKey ?? baseSessionKey; - const tableMode = resolveMarkdownTableMode({ - cfg, - channel: "telegram", - accountId: route.accountId, - }); const skillFilter = firstDefined(topicConfig?.skills, groupConfig?.skills); const systemPromptParts = [ groupConfig?.systemPrompt?.trim() || null, @@ -530,7 +578,6 @@ export const registerTelegramNativeCommands = ({ typeof telegramCfg.blockStreaming === "boolean" ? !telegramCfg.blockStreaming : undefined; - const chunkMode = resolveChunkMode(cfg, "telegram", route.accountId); const deliveryState = { delivered: false, @@ -552,17 +599,7 @@ export const registerTelegramNativeCommands = ({ deliver: async (payload, _info) => { const result = await deliverReplies({ replies: [payload], - chatId: String(chatId), - token: opts.token, - runtime, - bot, - mediaLocalRoots, - replyToMode, - textLimit, - thread: threadSpec, - tableMode, - chunkMode, - linkPreview: telegramCfg.linkPreview, + ...deliveryBaseOptions, }); if (result.delivered) { deliveryState.delivered = true; @@ -586,17 +623,7 @@ export const registerTelegramNativeCommands = ({ if (!deliveryState.delivered && deliveryState.skippedNonSilent > 0) { await deliverReplies({ replies: [{ text: EMPTY_RESPONSE_FALLBACK }], - chatId: String(chatId), - token: opts.token, - runtime, - bot, - mediaLocalRoots, - replyToMode, - textLimit, - thread: threadSpec, - tableMode, - chunkMode, - linkPreview: telegramCfg.linkPreview, + ...deliveryBaseOptions, }); } }); @@ -640,24 +667,20 @@ export const registerTelegramNativeCommands = ({ return; } const { senderId, commandAuthorized, isGroup, isForum, resolvedThreadId } = auth; - const messageThreadId = (msg as { message_thread_id?: number }).message_thread_id; - const threadSpec = resolveTelegramThreadSpec({ - isGroup, - isForum, - messageThreadId, + const { threadSpec, mediaLocalRoots, tableMode, chunkMode } = + resolveCommandRuntimeContext({ + msg, + isGroup, + isForum, + resolvedThreadId, + }); + const deliveryBaseOptions = buildCommandDeliveryBaseOptions({ + chatId, + mediaLocalRoots, + threadSpec, + tableMode, + chunkMode, }); - const parentPeer = buildTelegramParentPeer({ isGroup, resolvedThreadId, chatId }); - const route = resolveAgentRoute({ - cfg, - channel: "telegram", - accountId, - peer: { - kind: isGroup ? "group" : "direct", - id: isGroup ? buildTelegramGroupPeerId(chatId, resolvedThreadId) : String(chatId), - }, - parentPeer, - }); - const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId); const from = isGroup ? buildTelegramGroupFrom(chatId, threadSpec.id) : `telegram:${chatId}`; @@ -676,26 +699,10 @@ export const registerTelegramNativeCommands = ({ accountId, messageThreadId: threadSpec.id, }); - const tableMode = resolveMarkdownTableMode({ - cfg, - channel: "telegram", - accountId: route.accountId, - }); - const chunkMode = resolveChunkMode(cfg, "telegram", route.accountId); await deliverReplies({ replies: [result], - chatId: String(chatId), - token: opts.token, - runtime, - bot, - mediaLocalRoots, - replyToMode, - textLimit, - thread: threadSpec, - tableMode, - chunkMode, - linkPreview: telegramCfg.linkPreview, + ...deliveryBaseOptions, }); }); } diff --git a/src/telegram/bot/delivery.ts b/src/telegram/bot/delivery.ts index 4e4fe670601..0379e205abf 100644 --- a/src/telegram/bot/delivery.ts +++ b/src/telegram/bot/delivery.ts @@ -26,6 +26,7 @@ import { cacheSticker, getCachedSticker } from "../sticker-cache.js"; import { resolveTelegramVoiceSend } from "../voice.js"; import { buildTelegramThreadParams, + resolveTelegramMediaPlaceholder, resolveTelegramReplyId, type TelegramThreadSpec, } from "./helpers.js"; @@ -429,16 +430,7 @@ export async function resolveMedia( throw new Error("fetch is not available; set channels.telegram.proxy in config"); } const saved = await downloadAndSaveTelegramFile(file.file_path, fetchImpl); - let placeholder = ""; - if (msg.photo) { - placeholder = ""; - } else if (msg.video) { - placeholder = ""; - } else if (msg.video_note) { - placeholder = ""; - } else if (msg.audio || msg.voice) { - placeholder = ""; - } + const placeholder = resolveTelegramMediaPlaceholder(msg) ?? ""; return { path: saved.path, contentType: saved.contentType, placeholder }; } diff --git a/src/telegram/bot/helpers.ts b/src/telegram/bot/helpers.ts index b5b33667d70..a39d5f15d6a 100644 --- a/src/telegram/bot/helpers.ts +++ b/src/telegram/bot/helpers.ts @@ -197,6 +197,33 @@ export function buildSenderName(msg: Message) { return name || undefined; } +export function resolveTelegramMediaPlaceholder( + msg: + | Pick + | undefined + | null, +): string | undefined { + if (!msg) { + return undefined; + } + if (msg.photo) { + return ""; + } + if (msg.video || msg.video_note) { + return ""; + } + if (msg.audio || msg.voice) { + return ""; + } + if (msg.document) { + return ""; + } + if (msg.sticker) { + return ""; + } + return undefined; +} + export function buildSenderLabel(msg: Message, senderId?: number | string) { const name = buildSenderName(msg); const username = msg.from?.username ? `@${msg.from.username}` : undefined; @@ -318,15 +345,8 @@ export function describeReplyTarget(msg: Message): TelegramReplyTarget | null { const replyBody = (replyLike.text ?? replyLike.caption ?? "").trim(); body = replyBody; if (!body) { - if (replyLike.photo) { - body = ""; - } else if (replyLike.video) { - body = ""; - } else if (replyLike.audio || replyLike.voice) { - body = ""; - } else if (replyLike.document) { - body = ""; - } else { + body = resolveTelegramMediaPlaceholder(replyLike) ?? ""; + if (!body) { const locationData = extractTelegramLocation(replyLike); if (locationData) { body = formatLocationText(locationData); diff --git a/src/telegram/download.ts b/src/telegram/download.ts deleted file mode 100644 index 8da41eab312..00000000000 --- a/src/telegram/download.ts +++ /dev/null @@ -1,57 +0,0 @@ -import { detectMime } from "../media/mime.js"; -import { type SavedMedia, saveMediaBuffer } from "../media/store.js"; - -export type TelegramFileInfo = { - file_id: string; - file_unique_id?: string; - file_size?: number; - file_path?: string; -}; - -export async function getTelegramFile( - token: string, - fileId: string, - timeoutMs = 30_000, -): Promise { - const res = await fetch( - `https://api.telegram.org/bot${token}/getFile?file_id=${encodeURIComponent(fileId)}`, - { signal: AbortSignal.timeout(timeoutMs) }, - ); - if (!res.ok) { - throw new Error(`getFile failed: ${res.status} ${res.statusText}`); - } - const json = (await res.json()) as { ok: boolean; result?: TelegramFileInfo }; - if (!json.ok || !json.result?.file_path) { - throw new Error("getFile returned no file_path"); - } - return json.result; -} - -export async function downloadTelegramFile( - token: string, - info: TelegramFileInfo, - maxBytes?: number, - timeoutMs = 60_000, -): Promise { - if (!info.file_path) { - throw new Error("file_path missing"); - } - const url = `https://api.telegram.org/file/bot${token}/${info.file_path}`; - const res = await fetch(url, { signal: AbortSignal.timeout(timeoutMs) }); - if (!res.ok || !res.body) { - throw new Error(`Failed to download telegram file: HTTP ${res.status}`); - } - const array = Buffer.from(await res.arrayBuffer()); - const mime = await detectMime({ - buffer: array, - headerMime: res.headers.get("content-type"), - filePath: info.file_path, - }); - // save with inbound subdir - const saved = await saveMediaBuffer(array, mime, "inbound", maxBytes, info.file_path); - // Ensure extension matches mime if possible - if (!saved.contentType && mime) { - saved.contentType = mime; - } - return saved; -} diff --git a/src/telegram/fetch.test.ts b/src/telegram/fetch.test.ts index a4f0d88c297..c20d3ad5e9a 100644 --- a/src/telegram/fetch.test.ts +++ b/src/telegram/fetch.test.ts @@ -1,5 +1,4 @@ import { afterEach, describe, expect, it, vi } from "vitest"; -import { downloadTelegramFile, getTelegramFile, type TelegramFileInfo } from "./download.js"; import { resetTelegramFetchStateForTests, resolveTelegramFetch } from "./fetch.js"; const setDefaultAutoSelectFamily = vi.hoisted(() => vi.fn()); @@ -61,36 +60,3 @@ describe("resolveTelegramFetch", () => { expect(setDefaultAutoSelectFamily).toHaveBeenCalledWith(false); }); }); - -describe("telegram download", () => { - it("fetches file info", async () => { - const json = vi.fn().mockResolvedValue({ ok: true, result: { file_path: "photos/1.jpg" } }); - vi.spyOn(globalThis, "fetch" as never).mockResolvedValueOnce({ - ok: true, - status: 200, - statusText: "OK", - json, - } as Response); - const info = await getTelegramFile("tok", "fid"); - expect(info.file_path).toBe("photos/1.jpg"); - }); - - it("downloads and saves", async () => { - const info: TelegramFileInfo = { - file_id: "fid", - file_path: "photos/1.jpg", - }; - const arrayBuffer = async () => new Uint8Array([1, 2, 3, 4]).buffer; - vi.spyOn(globalThis, "fetch" as never).mockResolvedValueOnce({ - ok: true, - status: 200, - statusText: "OK", - body: true, - arrayBuffer, - headers: { get: () => "image/jpeg" }, - } as Response); - const saved = await downloadTelegramFile("tok", info, 1024 * 1024); - expect(saved.path).toBeTruthy(); - expect(saved.contentType).toBe("image/jpeg"); - }); -}); diff --git a/src/telegram/group-access.ts b/src/telegram/group-access.ts new file mode 100644 index 00000000000..02375218171 --- /dev/null +++ b/src/telegram/group-access.ts @@ -0,0 +1,141 @@ +import type { OpenClawConfig } from "../config/config.js"; +import type { ChannelGroupPolicy } from "../config/group-policy.js"; +import type { + TelegramAccountConfig, + TelegramGroupConfig, + TelegramTopicConfig, +} from "../config/types.js"; +import { isSenderAllowed, type NormalizedAllowFrom } from "./bot-access.js"; +import { firstDefined } from "./bot-access.js"; + +export type TelegramGroupBaseBlockReason = + | "group-disabled" + | "topic-disabled" + | "group-override-unauthorized"; + +export type TelegramGroupBaseAccessResult = + | { allowed: true } + | { allowed: false; reason: TelegramGroupBaseBlockReason }; + +export const evaluateTelegramGroupBaseAccess = (params: { + isGroup: boolean; + groupConfig?: TelegramGroupConfig; + topicConfig?: TelegramTopicConfig; + hasGroupAllowOverride: boolean; + effectiveGroupAllow: NormalizedAllowFrom; + senderId?: string; + senderUsername?: string; + enforceAllowOverride: boolean; + requireSenderForAllowOverride: boolean; +}): TelegramGroupBaseAccessResult => { + if (!params.isGroup) { + return { allowed: true }; + } + if (params.groupConfig?.enabled === false) { + return { allowed: false, reason: "group-disabled" }; + } + if (params.topicConfig?.enabled === false) { + return { allowed: false, reason: "topic-disabled" }; + } + if (!params.enforceAllowOverride || !params.hasGroupAllowOverride) { + return { allowed: true }; + } + + const senderId = params.senderId ?? ""; + if (params.requireSenderForAllowOverride && !senderId) { + return { allowed: false, reason: "group-override-unauthorized" }; + } + + const allowed = isSenderAllowed({ + allow: params.effectiveGroupAllow, + senderId, + senderUsername: params.senderUsername ?? "", + }); + if (!allowed) { + return { allowed: false, reason: "group-override-unauthorized" }; + } + return { allowed: true }; +}; + +export type TelegramGroupPolicyBlockReason = + | "group-policy-disabled" + | "group-policy-allowlist-no-sender" + | "group-policy-allowlist-empty" + | "group-policy-allowlist-unauthorized" + | "group-chat-not-allowed"; + +export type TelegramGroupPolicyAccessResult = + | { allowed: true; groupPolicy: "open" | "disabled" | "allowlist" } + | { + allowed: false; + reason: TelegramGroupPolicyBlockReason; + groupPolicy: "open" | "disabled" | "allowlist"; + }; + +export const evaluateTelegramGroupPolicyAccess = (params: { + isGroup: boolean; + chatId: string | number; + cfg: OpenClawConfig; + telegramCfg: TelegramAccountConfig; + topicConfig?: TelegramTopicConfig; + groupConfig?: TelegramGroupConfig; + effectiveGroupAllow: NormalizedAllowFrom; + senderId?: string; + senderUsername?: string; + resolveGroupPolicy: (chatId: string | number) => ChannelGroupPolicy; + enforcePolicy: boolean; + useTopicAndGroupOverrides: boolean; + enforceAllowlistAuthorization: boolean; + allowEmptyAllowlistEntries: boolean; + requireSenderForAllowlistAuthorization: boolean; + checkChatAllowlist: boolean; +}): TelegramGroupPolicyAccessResult => { + const fallbackPolicy = + firstDefined( + params.telegramCfg.groupPolicy, + params.cfg.channels?.defaults?.groupPolicy, + "open", + ) ?? "open"; + const groupPolicy = params.useTopicAndGroupOverrides + ? (firstDefined( + params.topicConfig?.groupPolicy, + params.groupConfig?.groupPolicy, + params.telegramCfg.groupPolicy, + params.cfg.channels?.defaults?.groupPolicy, + "open", + ) ?? "open") + : fallbackPolicy; + + if (!params.isGroup || !params.enforcePolicy) { + return { allowed: true, groupPolicy }; + } + if (groupPolicy === "disabled") { + return { allowed: false, reason: "group-policy-disabled", groupPolicy }; + } + if (groupPolicy === "allowlist" && params.enforceAllowlistAuthorization) { + const senderId = params.senderId ?? ""; + if (params.requireSenderForAllowlistAuthorization && !senderId) { + return { allowed: false, reason: "group-policy-allowlist-no-sender", groupPolicy }; + } + if (!params.allowEmptyAllowlistEntries && !params.effectiveGroupAllow.hasEntries) { + return { allowed: false, reason: "group-policy-allowlist-empty", groupPolicy }; + } + const senderUsername = params.senderUsername ?? ""; + if ( + !isSenderAllowed({ + allow: params.effectiveGroupAllow, + senderId, + senderUsername, + }) + ) { + return { allowed: false, reason: "group-policy-allowlist-unauthorized", groupPolicy }; + } + } + if (params.checkChatAllowlist) { + const groupAllowlist = params.resolveGroupPolicy(params.chatId); + if (groupAllowlist.allowlistEnabled && !groupAllowlist.allowed) { + return { allowed: false, reason: "group-chat-not-allowed", groupPolicy }; + } + } + return { allowed: true, groupPolicy }; +}; diff --git a/src/telegram/group-migration.ts b/src/telegram/group-migration.ts index 085aeabaf6f..921e34d5a9b 100644 --- a/src/telegram/group-migration.ts +++ b/src/telegram/group-migration.ts @@ -66,24 +66,19 @@ export function migrateTelegramGroupConfig(params: { let migrated = false; let skippedExisting = false; - const accountGroups = resolveAccountGroups(params.cfg, params.accountId).groups; - if (accountGroups) { - const result = migrateTelegramGroupsInPlace(accountGroups, params.oldChatId, params.newChatId); - if (result.migrated) { - migrated = true; - scopes.push("account"); - } - if (result.skippedExisting) { - skippedExisting = true; - } - } + const migrationTargets: Array<{ + scope: MigrationScope; + groups: TelegramGroups | undefined; + }> = [ + { scope: "account", groups: resolveAccountGroups(params.cfg, params.accountId).groups }, + { scope: "global", groups: params.cfg.channels?.telegram?.groups }, + ]; - const globalGroups = params.cfg.channels?.telegram?.groups; - if (globalGroups) { - const result = migrateTelegramGroupsInPlace(globalGroups, params.oldChatId, params.newChatId); + for (const target of migrationTargets) { + const result = migrateTelegramGroupsInPlace(target.groups, params.oldChatId, params.newChatId); if (result.migrated) { migrated = true; - scopes.push("global"); + scopes.push(target.scope); } if (result.skippedExisting) { skippedExisting = true; diff --git a/src/telegram/index.ts b/src/telegram/index.ts deleted file mode 100644 index 5ffb8dacaf6..00000000000 --- a/src/telegram/index.ts +++ /dev/null @@ -1,4 +0,0 @@ -export { createTelegramBot, createTelegramWebhookCallback } from "./bot.js"; -export { monitorTelegramProvider } from "./monitor.js"; -export { reactMessageTelegram, sendMessageTelegram, sendPollTelegram } from "./send.js"; -export { startTelegramWebhook } from "./webhook.js"; diff --git a/src/telegram/inline-buttons.ts b/src/telegram/inline-buttons.ts index 1cf770d0a8f..1137d61d1cd 100644 --- a/src/telegram/inline-buttons.ts +++ b/src/telegram/inline-buttons.ts @@ -1,7 +1,6 @@ import type { OpenClawConfig } from "../config/config.js"; import type { TelegramInlineButtonsScope } from "../config/types.telegram.js"; import { listTelegramAccountIds, resolveTelegramAccount } from "./accounts.js"; -import { parseTelegramTarget } from "./targets.js"; const DEFAULT_INLINE_BUTTONS_SCOPE: TelegramInlineButtonsScope = "allowlist"; @@ -65,17 +64,4 @@ export function isTelegramInlineButtonsEnabled(params: { ); } -export function resolveTelegramTargetChatType(target: string): "direct" | "group" | "unknown" { - if (!target.trim()) { - return "unknown"; - } - const parsed = parseTelegramTarget(target); - const chatId = parsed.chatId.trim(); - if (!chatId) { - return "unknown"; - } - if (/^-?\d+$/.test(chatId)) { - return chatId.startsWith("-") ? "group" : "direct"; - } - return "unknown"; -} +export { resolveTelegramTargetChatType } from "./targets.js"; diff --git a/src/telegram/send.ts b/src/telegram/send.ts index ec451844f65..72cf2eb5037 100644 --- a/src/telegram/send.ts +++ b/src/telegram/send.ts @@ -63,6 +63,11 @@ type TelegramSendResult = { chatId: string; }; +type TelegramMessageLike = { + message_id?: number; + chat?: { id?: string | number }; +}; + type TelegramReactionOpts = { token?: string; accountId?: string; @@ -76,6 +81,7 @@ const PARSE_ERR_RE = /can't parse entities|parse entities|find end of the entity const THREAD_NOT_FOUND_RE = /400:\s*Bad Request:\s*message thread not found/i; const MESSAGE_NOT_MODIFIED_RE = /400:\s*Bad Request:\s*message is not modified|MESSAGE_NOT_MODIFIED/i; +const CHAT_NOT_FOUND_RE = /400: Bad Request: chat not found/i; const diagLogger = createSubsystemLogger("telegram/diagnostic"); function createTelegramHttpLogger(cfg: ReturnType) { @@ -213,6 +219,177 @@ function removeMessageThreadIdParam( return Object.keys(next).length > 0 ? next : undefined; } +function isTelegramHtmlParseError(err: unknown): boolean { + return PARSE_ERR_RE.test(formatErrorMessage(err)); +} + +function buildTelegramThreadReplyParams(params: { + targetMessageThreadId?: number; + messageThreadId?: number; + replyToMessageId?: number; + quoteText?: string; +}): Record { + const messageThreadId = + params.messageThreadId != null ? params.messageThreadId : params.targetMessageThreadId; + const threadSpec = + messageThreadId != null ? { id: messageThreadId, scope: "forum" as const } : undefined; + const threadIdParams = buildTelegramThreadParams(threadSpec); + const threadParams: Record = threadIdParams ? { ...threadIdParams } : {}; + + if (params.replyToMessageId != null) { + const replyToMessageId = Math.trunc(params.replyToMessageId); + if (params.quoteText?.trim()) { + threadParams.reply_parameters = { + message_id: replyToMessageId, + quote: params.quoteText.trim(), + }; + } else { + threadParams.reply_to_message_id = replyToMessageId; + } + } + return threadParams; +} + +async function withTelegramHtmlParseFallback(params: { + label: string; + verbose?: boolean; + requestHtml: (label: string) => Promise; + requestPlain: (label: string) => Promise; +}): Promise { + try { + return await params.requestHtml(params.label); + } catch (err) { + if (!isTelegramHtmlParseError(err)) { + throw err; + } + if (params.verbose) { + console.warn( + `telegram ${params.label} failed with HTML parse error, retrying as plain text: ${formatErrorMessage( + err, + )}`, + ); + } + return await params.requestPlain(`${params.label}-plain`); + } +} + +type TelegramApiContext = { + cfg: ReturnType; + account: ResolvedTelegramAccount; + api: Bot["api"]; +}; + +function resolveTelegramApiContext(opts: { + token?: string; + accountId?: string; + api?: Bot["api"]; + cfg?: ReturnType; +}): TelegramApiContext { + const cfg = opts.cfg ?? loadConfig(); + const account = resolveTelegramAccount({ + cfg, + accountId: opts.accountId, + }); + const token = resolveToken(opts.token, account); + const client = resolveTelegramClientOptions(account); + const api = opts.api ?? new Bot(token, client ? { client } : undefined).api; + return { cfg, account, api }; +} + +type TelegramRequestWithDiag = ( + fn: () => Promise, + label?: string, + options?: { shouldLog?: (err: unknown) => boolean }, +) => Promise; + +function createTelegramRequestWithDiag(params: { + cfg: ReturnType; + account: ResolvedTelegramAccount; + retry?: RetryConfig; + verbose?: boolean; + shouldRetry?: (err: unknown) => boolean; + useApiErrorLogging?: boolean; +}): TelegramRequestWithDiag { + const request = createTelegramRetryRunner({ + retry: params.retry, + configRetry: params.account.config.retry, + verbose: params.verbose, + ...(params.shouldRetry ? { shouldRetry: params.shouldRetry } : {}), + }); + const logHttpError = createTelegramHttpLogger(params.cfg); + return ( + fn: () => Promise, + label?: string, + options?: { shouldLog?: (err: unknown) => boolean }, + ) => { + const runRequest = () => request(fn, label); + const call = + params.useApiErrorLogging === false + ? runRequest() + : withTelegramApiErrorLogging({ + operation: label ?? "request", + fn: runRequest, + ...(options?.shouldLog ? { shouldLog: options.shouldLog } : {}), + }); + return call.catch((err) => { + logHttpError(label ?? "request", err); + throw err; + }); + }; +} + +function wrapTelegramChatNotFoundError(err: unknown, params: { chatId: string; input: string }) { + if (!CHAT_NOT_FOUND_RE.test(formatErrorMessage(err))) { + return err; + } + return new Error( + [ + `Telegram send failed: chat not found (chat_id=${params.chatId}).`, + "Likely: bot not started in DM, bot removed from group/channel, group migrated (new -100… id), or wrong bot token.", + `Input was: ${JSON.stringify(params.input)}.`, + ].join(" "), + ); +} + +async function withTelegramThreadFallback( + params: Record | undefined, + label: string, + verbose: boolean | undefined, + attempt: ( + effectiveParams: Record | undefined, + effectiveLabel: string, + ) => Promise, +): Promise { + try { + return await attempt(params, label); + } catch (err) { + if (!hasMessageThreadIdParam(params) || !isTelegramThreadNotFoundError(err)) { + throw err; + } + if (verbose) { + console.warn( + `telegram ${label} failed with message_thread_id, retrying without thread: ${formatErrorMessage(err)}`, + ); + } + const retriedParams = removeMessageThreadIdParam(params); + return await attempt(retriedParams, `${label}-threadless`); + } +} + +function createRequestWithChatNotFound(params: { + requestWithDiag: TelegramRequestWithDiag; + chatId: string; + input: string; +}) { + return async (fn: () => Promise, label: string) => + params.requestWithDiag(fn, label).catch((err) => { + throw wrapTelegramChatNotFoundError(err, { + chatId: params.chatId, + input: params.input, + }); + }); +} + export function buildInlineKeyboard( buttons?: TelegramSendOpts["buttons"], ): InlineKeyboardMarkup | undefined { @@ -242,92 +419,31 @@ export async function sendMessageTelegram( text: string, opts: TelegramSendOpts = {}, ): Promise { - const cfg = loadConfig(); - const account = resolveTelegramAccount({ - cfg, - accountId: opts.accountId, - }); - const token = resolveToken(opts.token, account); + const { cfg, account, api } = resolveTelegramApiContext(opts); const target = parseTelegramTarget(to); const chatId = normalizeChatId(target.chatId); - // Use provided api or create a new Bot instance. The nullish coalescing - // operator ensures api is always defined (Bot.api is always non-null). - const client = resolveTelegramClientOptions(account); - const api = opts.api ?? new Bot(token, client ? { client } : undefined).api; const mediaUrl = opts.mediaUrl?.trim(); const replyMarkup = buildInlineKeyboard(opts.buttons); - // Build optional params for forum topics and reply threading. - // Only include these if actually provided to keep API calls clean. - const messageThreadId = - opts.messageThreadId != null ? opts.messageThreadId : target.messageThreadId; - const threadSpec = - messageThreadId != null ? { id: messageThreadId, scope: "forum" as const } : undefined; - const threadIdParams = buildTelegramThreadParams(threadSpec); - const threadParams: Record = threadIdParams ? { ...threadIdParams } : {}; - const quoteText = opts.quoteText?.trim(); - if (opts.replyToMessageId != null) { - if (quoteText) { - threadParams.reply_parameters = { - message_id: Math.trunc(opts.replyToMessageId), - quote: quoteText, - }; - } else { - threadParams.reply_to_message_id = Math.trunc(opts.replyToMessageId); - } - } + const threadParams = buildTelegramThreadReplyParams({ + targetMessageThreadId: target.messageThreadId, + messageThreadId: opts.messageThreadId, + replyToMessageId: opts.replyToMessageId, + quoteText: opts.quoteText, + }); const hasThreadParams = Object.keys(threadParams).length > 0; - const request = createTelegramRetryRunner({ + const requestWithDiag = createTelegramRequestWithDiag({ + cfg, + account, retry: opts.retry, - configRetry: account.config.retry, verbose: opts.verbose, shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "send" }), }); - const logHttpError = createTelegramHttpLogger(cfg); - const requestWithDiag = (fn: () => Promise, label?: string) => - withTelegramApiErrorLogging({ - operation: label ?? "request", - fn: () => request(fn, label), - }).catch((err) => { - logHttpError(label ?? "request", err); - throw err; - }); - const wrapChatNotFound = (err: unknown) => { - if (!/400: Bad Request: chat not found/i.test(formatErrorMessage(err))) { - return err; - } - return new Error( - [ - `Telegram send failed: chat not found (chat_id=${chatId}).`, - "Likely: bot not started in DM, bot removed from group/channel, group migrated (new -100… id), or wrong bot token.", - `Input was: ${JSON.stringify(to)}.`, - ].join(" "), - ); - }; - - const sendWithThreadFallback = async ( - params: Record | undefined, - label: string, - attempt: ( - effectiveParams: Record | undefined, - effectiveLabel: string, - ) => Promise, - ): Promise => { - try { - return await attempt(params, label); - } catch (err) { - if (!hasMessageThreadIdParam(params) || !isTelegramThreadNotFoundError(err)) { - throw err; - } - if (opts.verbose) { - console.warn( - `telegram ${label} failed with message_thread_id, retrying without thread: ${formatErrorMessage(err)}`, - ); - } - const retriedParams = removeMessageThreadIdParam(params); - return await attempt(retriedParams, `${label}-threadless`); - } - }; + const requestWithChatNotFound = createRequestWithChatNotFound({ + requestWithDiag, + chatId, + input: to, + }); const textMode = opts.textMode ?? "markdown"; const tableMode = resolveMarkdownTableMode({ @@ -346,48 +462,50 @@ export async function sendMessageTelegram( params?: Record, fallbackText?: string, ) => { - return await sendWithThreadFallback(params, "message", async (effectiveParams, label) => { - const htmlText = renderHtmlText(rawText); - const baseParams = effectiveParams ? { ...effectiveParams } : {}; - if (linkPreviewOptions) { - baseParams.link_preview_options = linkPreviewOptions; - } - const hasBaseParams = Object.keys(baseParams).length > 0; - const sendParams = { - parse_mode: "HTML" as const, - ...baseParams, - ...(opts.silent === true ? { disable_notification: true } : {}), - }; - const res = await requestWithDiag( - () => - api.sendMessage(chatId, htmlText, sendParams as Parameters[2]), - label, - ).catch(async (err) => { - // Telegram rejects malformed HTML (e.g., unsupported tags or entities). - // When that happens, fall back to plain text so the message still delivers. - const errText = formatErrorMessage(err); - if (PARSE_ERR_RE.test(errText)) { - if (opts.verbose) { - console.warn(`telegram HTML parse failed, retrying as plain text: ${errText}`); - } - const fallback = fallbackText ?? rawText; - const plainParams = hasBaseParams - ? (baseParams as Parameters[2]) - : undefined; - return await requestWithDiag( - () => - plainParams - ? api.sendMessage(chatId, fallback, plainParams) - : api.sendMessage(chatId, fallback), - `${label}-plain`, - ).catch((err2) => { - throw wrapChatNotFound(err2); - }); + return await withTelegramThreadFallback( + params, + "message", + opts.verbose, + async (effectiveParams, label) => { + const htmlText = renderHtmlText(rawText); + const baseParams = effectiveParams ? { ...effectiveParams } : {}; + if (linkPreviewOptions) { + baseParams.link_preview_options = linkPreviewOptions; } - throw wrapChatNotFound(err); - }); - return res; - }); + const hasBaseParams = Object.keys(baseParams).length > 0; + const sendParams = { + parse_mode: "HTML" as const, + ...baseParams, + ...(opts.silent === true ? { disable_notification: true } : {}), + }; + return await withTelegramHtmlParseFallback({ + label, + verbose: opts.verbose, + requestHtml: (retryLabel) => + requestWithChatNotFound( + () => + api.sendMessage( + chatId, + htmlText, + sendParams as Parameters[2], + ), + retryLabel, + ), + requestPlain: (retryLabel) => { + const plainParams = hasBaseParams + ? (baseParams as Parameters[2]) + : undefined; + return requestWithChatNotFound( + () => + plainParams + ? api.sendMessage(chatId, fallbackText ?? rawText, plainParams) + : api.sendMessage(chatId, fallbackText ?? rawText), + retryLabel, + ); + }, + }); + }, + ); }; if (mediaUrl) { @@ -429,124 +547,105 @@ export async function sendMessageTelegram( ...baseMediaParams, ...(opts.silent === true ? { disable_notification: true } : {}), }; - let result: - | Awaited> - | Awaited> - | Awaited> - | Awaited> - | Awaited> - | Awaited> - | Awaited>; - if (isGif) { - result = await sendWithThreadFallback( + const sendMedia = async ( + label: string, + sender: ( + effectiveParams: Record | undefined, + ) => Promise, + ) => + await withTelegramThreadFallback( mediaParams, - "animation", - async (effectiveParams, label) => - requestWithDiag( - () => - api.sendAnimation( + label, + opts.verbose, + async (effectiveParams, retryLabel) => + requestWithChatNotFound(() => sender(effectiveParams), retryLabel), + ); + + const mediaSender = (() => { + if (isGif) { + return { + label: "animation", + sender: (effectiveParams: Record | undefined) => + api.sendAnimation( + chatId, + file, + effectiveParams as Parameters[2], + ) as Promise, + }; + } + if (kind === "image") { + return { + label: "photo", + sender: (effectiveParams: Record | undefined) => + api.sendPhoto( + chatId, + file, + effectiveParams as Parameters[2], + ) as Promise, + }; + } + if (kind === "video") { + if (isVideoNote) { + return { + label: "video_note", + sender: (effectiveParams: Record | undefined) => + api.sendVideoNote( chatId, file, - effectiveParams as Parameters[2], - ), - label, - ).catch((err) => { - throw wrapChatNotFound(err); - }), - ); - } else if (kind === "image") { - result = await sendWithThreadFallback(mediaParams, "photo", async (effectiveParams, label) => - requestWithDiag( - () => api.sendPhoto(chatId, file, effectiveParams as Parameters[2]), - label, - ).catch((err) => { - throw wrapChatNotFound(err); - }), - ); - } else if (kind === "video") { - if (isVideoNote) { - result = await sendWithThreadFallback( - mediaParams, - "video_note", - async (effectiveParams, label) => - requestWithDiag( - () => - api.sendVideoNote( - chatId, - file, - effectiveParams as Parameters[2], - ), - label, - ).catch((err) => { - throw wrapChatNotFound(err); - }), - ); - } else { - result = await sendWithThreadFallback( - mediaParams, - "video", - async (effectiveParams, label) => - requestWithDiag( - () => - api.sendVideo(chatId, file, effectiveParams as Parameters[2]), - label, - ).catch((err) => { - throw wrapChatNotFound(err); - }), - ); + effectiveParams as Parameters[2], + ) as Promise, + }; + } + return { + label: "video", + sender: (effectiveParams: Record | undefined) => + api.sendVideo( + chatId, + file, + effectiveParams as Parameters[2], + ) as Promise, + }; } - } else if (kind === "audio") { - const { useVoice } = resolveTelegramVoiceSend({ - wantsVoice: opts.asVoice === true, // default false (backward compatible) - contentType: media.contentType, - fileName, - logFallback: logVerbose, - }); - if (useVoice) { - result = await sendWithThreadFallback( - mediaParams, - "voice", - async (effectiveParams, label) => - requestWithDiag( - () => - api.sendVoice(chatId, file, effectiveParams as Parameters[2]), - label, - ).catch((err) => { - throw wrapChatNotFound(err); - }), - ); - } else { - result = await sendWithThreadFallback( - mediaParams, - "audio", - async (effectiveParams, label) => - requestWithDiag( - () => - api.sendAudio(chatId, file, effectiveParams as Parameters[2]), - label, - ).catch((err) => { - throw wrapChatNotFound(err); - }), - ); - } - } else { - result = await sendWithThreadFallback( - mediaParams, - "document", - async (effectiveParams, label) => - requestWithDiag( - () => - api.sendDocument( + if (kind === "audio") { + const { useVoice } = resolveTelegramVoiceSend({ + wantsVoice: opts.asVoice === true, // default false (backward compatible) + contentType: media.contentType, + fileName, + logFallback: logVerbose, + }); + if (useVoice) { + return { + label: "voice", + sender: (effectiveParams: Record | undefined) => + api.sendVoice( chatId, file, - effectiveParams as Parameters[2], - ), - label, - ).catch((err) => { - throw wrapChatNotFound(err); - }), - ); - } + effectiveParams as Parameters[2], + ) as Promise, + }; + } + return { + label: "audio", + sender: (effectiveParams: Record | undefined) => + api.sendAudio( + chatId, + file, + effectiveParams as Parameters[2], + ) as Promise, + }; + } + return { + label: "document", + sender: (effectiveParams: Record | undefined) => + api.sendDocument( + chatId, + file, + effectiveParams as Parameters[2], + ) as Promise, + }; + })(); + + const result = await sendMedia(mediaSender.label, mediaSender.sender); const mediaMessageId = String(result?.message_id ?? "unknown"); const resolvedChatId = String(result?.chat?.id ?? chatId); if (result?.message_id) { @@ -608,31 +707,16 @@ export async function reactMessageTelegram( emoji: string, opts: TelegramReactionOpts = {}, ): Promise<{ ok: true } | { ok: false; warning: string }> { - const cfg = loadConfig(); - const account = resolveTelegramAccount({ - cfg, - accountId: opts.accountId, - }); - const token = resolveToken(opts.token, account); + const { cfg, account, api } = resolveTelegramApiContext(opts); const chatId = normalizeChatId(String(chatIdInput)); const messageId = normalizeMessageId(messageIdInput); - const client = resolveTelegramClientOptions(account); - const api = opts.api ?? new Bot(token, client ? { client } : undefined).api; - const request = createTelegramRetryRunner({ + const requestWithDiag = createTelegramRequestWithDiag({ + cfg, + account, retry: opts.retry, - configRetry: account.config.retry, verbose: opts.verbose, shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "send" }), }); - const logHttpError = createTelegramHttpLogger(cfg); - const requestWithDiag = (fn: () => Promise, label?: string) => - withTelegramApiErrorLogging({ - operation: label ?? "request", - fn: () => request(fn, label), - }).catch((err) => { - logHttpError(label ?? "request", err); - throw err; - }); const remove = opts.remove === true; const trimmedEmoji = emoji.trim(); // Build the reaction array. We cast emoji to the grammY union type since @@ -669,31 +753,16 @@ export async function deleteMessageTelegram( messageIdInput: string | number, opts: TelegramDeleteOpts = {}, ): Promise<{ ok: true }> { - const cfg = loadConfig(); - const account = resolveTelegramAccount({ - cfg, - accountId: opts.accountId, - }); - const token = resolveToken(opts.token, account); + const { cfg, account, api } = resolveTelegramApiContext(opts); const chatId = normalizeChatId(String(chatIdInput)); const messageId = normalizeMessageId(messageIdInput); - const client = resolveTelegramClientOptions(account); - const api = opts.api ?? new Bot(token, client ? { client } : undefined).api; - const request = createTelegramRetryRunner({ + const requestWithDiag = createTelegramRequestWithDiag({ + cfg, + account, retry: opts.retry, - configRetry: account.config.retry, verbose: opts.verbose, shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "send" }), }); - const logHttpError = createTelegramHttpLogger(cfg); - const requestWithDiag = (fn: () => Promise, label?: string) => - withTelegramApiErrorLogging({ - operation: label ?? "request", - fn: () => request(fn, label), - }).catch((err) => { - logHttpError(label ?? "request", err); - throw err; - }); await requestWithDiag(() => api.deleteMessage(chatId, messageId), "deleteMessage"); logVerbose(`[telegram] Deleted message ${messageId} from chat ${chatId}`); return { ok: true }; @@ -720,35 +789,23 @@ export async function editMessageTelegram( text: string, opts: TelegramEditOpts = {}, ): Promise<{ ok: true; messageId: string; chatId: string }> { - const cfg = opts.cfg ?? loadConfig(); - const account = resolveTelegramAccount({ - cfg, - accountId: opts.accountId, + const { cfg, account, api } = resolveTelegramApiContext({ + ...opts, + cfg: opts.cfg, }); - const token = resolveToken(opts.token, account); const chatId = normalizeChatId(String(chatIdInput)); const messageId = normalizeMessageId(messageIdInput); - const client = resolveTelegramClientOptions(account); - const api = opts.api ?? new Bot(token, client ? { client } : undefined).api; - const request = createTelegramRetryRunner({ + const requestWithDiag = createTelegramRequestWithDiag({ + cfg, + account, retry: opts.retry, - configRetry: account.config.retry, verbose: opts.verbose, }); - const logHttpError = createTelegramHttpLogger(cfg); - const requestWithDiag = ( + const requestWithEditShouldLog = ( fn: () => Promise, label?: string, shouldLog?: (err: unknown) => boolean, - ) => - withTelegramApiErrorLogging({ - operation: label ?? "request", - fn: () => request(fn, label), - shouldLog, - }).catch((err) => { - logHttpError(label ?? "request", err); - throw err; - }); + ) => requestWithDiag(fn, label, shouldLog ? { shouldLog } : undefined); const textMode = opts.textMode ?? "markdown"; const tableMode = resolveMarkdownTableMode({ @@ -775,45 +832,41 @@ export async function editMessageTelegram( if (replyMarkup !== undefined) { editParams.reply_markup = replyMarkup; } + const plainParams: Record = {}; + if (opts.linkPreview === false) { + plainParams.link_preview_options = { is_disabled: true }; + } + if (replyMarkup !== undefined) { + plainParams.reply_markup = replyMarkup; + } - await requestWithDiag( - () => api.editMessageText(chatId, messageId, htmlText, editParams), - "editMessage", - (err) => !isTelegramMessageNotModifiedError(err), - ).catch(async (err) => { + try { + await withTelegramHtmlParseFallback({ + label: "editMessage", + verbose: opts.verbose, + requestHtml: (retryLabel) => + requestWithEditShouldLog( + () => api.editMessageText(chatId, messageId, htmlText, editParams), + retryLabel, + (err) => !isTelegramMessageNotModifiedError(err), + ), + requestPlain: (retryLabel) => + requestWithEditShouldLog( + () => + Object.keys(plainParams).length > 0 + ? api.editMessageText(chatId, messageId, text, plainParams) + : api.editMessageText(chatId, messageId, text), + retryLabel, + (plainErr) => !isTelegramMessageNotModifiedError(plainErr), + ), + }); + } catch (err) { if (isTelegramMessageNotModifiedError(err)) { - return; + // no-op: Telegram reports message content unchanged, treat as success + } else { + throw err; } - - // Telegram rejects malformed HTML. Fall back to plain text. - const errText = formatErrorMessage(err); - if (PARSE_ERR_RE.test(errText)) { - if (opts.verbose) { - console.warn(`telegram HTML parse failed, retrying as plain text: ${errText}`); - } - const plainParams: Record = {}; - if (opts.linkPreview === false) { - plainParams.link_preview_options = { is_disabled: true }; - } - if (replyMarkup !== undefined) { - plainParams.reply_markup = replyMarkup; - } - return await requestWithDiag( - () => - Object.keys(plainParams).length > 0 - ? api.editMessageText(chatId, messageId, text, plainParams) - : api.editMessageText(chatId, messageId, text), - "editMessage-plain", - (plainErr) => !isTelegramMessageNotModifiedError(plainErr), - ).catch((plainErr) => { - if (isTelegramMessageNotModifiedError(plainErr)) { - return; - } - throw plainErr; - }); - } - throw err; - }); + } logVerbose(`[telegram] Edited message ${messageId} in chat ${chatId}`); return { ok: true, messageId: String(messageId), chatId }; @@ -859,90 +912,38 @@ export async function sendStickerTelegram( throw new Error("Telegram sticker file_id is required"); } - const cfg = loadConfig(); - const account = resolveTelegramAccount({ - cfg, - accountId: opts.accountId, - }); - const token = resolveToken(opts.token, account); + const { cfg, account, api } = resolveTelegramApiContext(opts); const target = parseTelegramTarget(to); const chatId = normalizeChatId(target.chatId); - const client = resolveTelegramClientOptions(account); - const api = opts.api ?? new Bot(token, client ? { client } : undefined).api; - const messageThreadId = - opts.messageThreadId != null ? opts.messageThreadId : target.messageThreadId; - const threadSpec = - messageThreadId != null ? { id: messageThreadId, scope: "forum" as const } : undefined; - const threadIdParams = buildTelegramThreadParams(threadSpec); - const threadParams: Record = threadIdParams ? { ...threadIdParams } : {}; - if (opts.replyToMessageId != null) { - threadParams.reply_to_message_id = Math.trunc(opts.replyToMessageId); - } + const threadParams = buildTelegramThreadReplyParams({ + targetMessageThreadId: target.messageThreadId, + messageThreadId: opts.messageThreadId, + replyToMessageId: opts.replyToMessageId, + }); const hasThreadParams = Object.keys(threadParams).length > 0; - const request = createTelegramRetryRunner({ + const requestWithDiag = createTelegramRequestWithDiag({ + cfg, + account, retry: opts.retry, - configRetry: account.config.retry, verbose: opts.verbose, + useApiErrorLogging: false, + }); + const requestWithChatNotFound = createRequestWithChatNotFound({ + requestWithDiag, + chatId, + input: to, }); - const logHttpError = createTelegramHttpLogger(cfg); - const requestWithDiag = (fn: () => Promise, label?: string) => - request(fn, label).catch((err) => { - logHttpError(label ?? "request", err); - throw err; - }); - - const wrapChatNotFound = (err: unknown) => { - if (!/400: Bad Request: chat not found/i.test(formatErrorMessage(err))) { - return err; - } - return new Error( - [ - `Telegram send failed: chat not found (chat_id=${chatId}).`, - "Likely: bot not started in DM, bot removed from group/channel, group migrated (new -100… id), or wrong bot token.", - `Input was: ${JSON.stringify(to)}.`, - ].join(" "), - ); - }; - - const sendWithThreadFallback = async ( - params: Record | undefined, - label: string, - attempt: ( - effectiveParams: Record | undefined, - effectiveLabel: string, - ) => Promise, - ): Promise => { - try { - return await attempt(params, label); - } catch (err) { - if (!hasMessageThreadIdParam(params) || !isTelegramThreadNotFoundError(err)) { - throw err; - } - if (opts.verbose) { - console.warn( - `telegram ${label} failed with message_thread_id, retrying without thread: ${formatErrorMessage(err)}`, - ); - } - const retriedParams = removeMessageThreadIdParam(params) as - | Record - | undefined; - return await attempt(retriedParams, `${label}-threadless`); - } - }; const stickerParams = hasThreadParams ? threadParams : undefined; - const result = await sendWithThreadFallback( + const result = await withTelegramThreadFallback( stickerParams, "sticker", + opts.verbose, async (effectiveParams, label) => - requestWithDiag(() => api.sendSticker(chatId, fileId.trim(), effectiveParams), label).catch( - (err) => { - throw wrapChatNotFound(err); - }, - ), + requestWithChatNotFound(() => api.sendSticker(chatId, fileId.trim(), effectiveParams), label), ); const messageId = String(result?.message_id ?? "unknown"); @@ -986,81 +987,34 @@ export async function sendPollTelegram( poll: PollInput, opts: TelegramPollOpts = {}, ): Promise<{ messageId: string; chatId: string; pollId?: string }> { - const cfg = loadConfig(); - const account = resolveTelegramAccount({ - cfg, - accountId: opts.accountId, - }); - const token = resolveToken(opts.token, account); + const { cfg, account, api } = resolveTelegramApiContext(opts); const target = parseTelegramTarget(to); const chatId = normalizeChatId(target.chatId); - const client = resolveTelegramClientOptions(account); - const api = opts.api ?? new Bot(token, client ? { client } : undefined).api; // Normalize the poll input (validates question, options, maxSelections) const normalizedPoll = normalizePollInput(poll, { maxOptions: 10 }); - const messageThreadId = - opts.messageThreadId != null ? opts.messageThreadId : target.messageThreadId; - const threadSpec = - messageThreadId != null ? { id: messageThreadId, scope: "forum" as const } : undefined; - const threadIdParams = buildTelegramThreadParams(threadSpec); + const threadParams = buildTelegramThreadReplyParams({ + targetMessageThreadId: target.messageThreadId, + messageThreadId: opts.messageThreadId, + replyToMessageId: opts.replyToMessageId, + }); // Build poll options as simple strings (Grammy accepts string[] or InputPollOption[]) const pollOptions = normalizedPoll.options; - const request = createTelegramRetryRunner({ + const requestWithDiag = createTelegramRequestWithDiag({ + cfg, + account, retry: opts.retry, - configRetry: account.config.retry, verbose: opts.verbose, shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "send" }), }); - const logHttpError = createTelegramHttpLogger(cfg); - const requestWithDiag = (fn: () => Promise, label?: string) => - withTelegramApiErrorLogging({ - operation: label ?? "request", - fn: () => request(fn, label), - }).catch((err) => { - logHttpError(label ?? "request", err); - throw err; - }); - - const wrapChatNotFound = (err: unknown) => { - if (!/400: Bad Request: chat not found/i.test(formatErrorMessage(err))) { - return err; - } - return new Error( - [ - `Telegram send failed: chat not found (chat_id=${chatId}).`, - "Likely: bot not started in DM, bot removed from group/channel, group migrated (new -100… id), or wrong bot token.", - `Input was: ${JSON.stringify(to)}.`, - ].join(" "), - ); - }; - - const sendWithThreadFallback = async ( - params: Record | undefined, - label: string, - attempt: ( - effectiveParams: Record | undefined, - effectiveLabel: string, - ) => Promise, - ): Promise => { - try { - return await attempt(params, label); - } catch (err) { - if (!hasMessageThreadIdParam(params) || !isTelegramThreadNotFoundError(err)) { - throw err; - } - if (opts.verbose) { - console.warn( - `telegram ${label} failed with message_thread_id, retrying without thread: ${formatErrorMessage(err)}`, - ); - } - const retriedParams = removeMessageThreadIdParam(params); - return await attempt(retriedParams, `${label}-threadless`); - } - }; + const requestWithChatNotFound = createRequestWithChatNotFound({ + requestWithDiag, + chatId, + input: to, + }); const durationSeconds = normalizedPoll.durationSeconds; if (durationSeconds === undefined && normalizedPoll.durationHours !== undefined) { @@ -1078,20 +1032,19 @@ export async function sendPollTelegram( allows_multiple_answers: normalizedPoll.maxSelections > 1, is_anonymous: opts.isAnonymous ?? true, ...(durationSeconds !== undefined ? { open_period: durationSeconds } : {}), - ...(threadIdParams ? threadIdParams : {}), - ...(opts.replyToMessageId != null - ? { reply_to_message_id: Math.trunc(opts.replyToMessageId) } - : {}), + ...(Object.keys(threadParams).length > 0 ? threadParams : {}), ...(opts.silent === true ? { disable_notification: true } : {}), }; - const result = await sendWithThreadFallback(pollParams, "poll", async (effectiveParams, label) => - requestWithDiag( - () => api.sendPoll(chatId, normalizedPoll.question, pollOptions, effectiveParams), - label, - ).catch((err) => { - throw wrapChatNotFound(err); - }), + const result = await withTelegramThreadFallback( + pollParams, + "poll", + opts.verbose, + async (effectiveParams, label) => + requestWithChatNotFound( + () => api.sendPoll(chatId, normalizedPoll.question, pollOptions, effectiveParams), + label, + ), ); const messageId = String(result?.message_id ?? "unknown"); diff --git a/src/telegram/targets.test.ts b/src/telegram/targets.test.ts index e25e38b2c3c..51d34206c6d 100644 --- a/src/telegram/targets.test.ts +++ b/src/telegram/targets.test.ts @@ -23,12 +23,14 @@ describe("parseTelegramTarget", () => { it("parses plain chatId", () => { expect(parseTelegramTarget("-1001234567890")).toEqual({ chatId: "-1001234567890", + chatType: "group", }); }); it("parses @username", () => { expect(parseTelegramTarget("@mychannel")).toEqual({ chatId: "@mychannel", + chatType: "unknown", }); }); @@ -36,6 +38,7 @@ describe("parseTelegramTarget", () => { expect(parseTelegramTarget("-1001234567890:123")).toEqual({ chatId: "-1001234567890", messageThreadId: 123, + chatType: "group", }); }); @@ -43,6 +46,7 @@ describe("parseTelegramTarget", () => { expect(parseTelegramTarget("-1001234567890:topic:456")).toEqual({ chatId: "-1001234567890", messageThreadId: 456, + chatType: "group", }); }); @@ -50,12 +54,14 @@ describe("parseTelegramTarget", () => { expect(parseTelegramTarget(" -1001234567890:99 ")).toEqual({ chatId: "-1001234567890", messageThreadId: 99, + chatType: "group", }); }); it("does not treat non-numeric suffix as topicId", () => { expect(parseTelegramTarget("-1001234567890:abc")).toEqual({ chatId: "-1001234567890:abc", + chatType: "unknown", }); }); @@ -63,6 +69,7 @@ describe("parseTelegramTarget", () => { expect(parseTelegramTarget("telegram:group:-1001234567890:topic:456")).toEqual({ chatId: "-1001234567890", messageThreadId: 456, + chatType: "group", }); }); }); diff --git a/src/telegram/targets.ts b/src/telegram/targets.ts index cb26c0d06b3..346bb3e35c5 100644 --- a/src/telegram/targets.ts +++ b/src/telegram/targets.ts @@ -1,6 +1,7 @@ export type TelegramTarget = { chatId: string; messageThreadId?: number; + chatType: "direct" | "group" | "unknown"; }; export function stripTelegramInternalPrefixes(to: string): string { @@ -33,6 +34,17 @@ export function stripTelegramInternalPrefixes(to: string): string { * - `chatId:topicId` (numeric topic/thread ID) * - `chatId:topic:topicId` (explicit topic marker; preferred) */ +function resolveTelegramChatType(chatId: string): "direct" | "group" | "unknown" { + const trimmed = chatId.trim(); + if (!trimmed) { + return "unknown"; + } + if (/^-?\d+$/.test(trimmed)) { + return trimmed.startsWith("-") ? "group" : "direct"; + } + return "unknown"; +} + export function parseTelegramTarget(to: string): TelegramTarget { const normalized = stripTelegramInternalPrefixes(to); @@ -41,6 +53,7 @@ export function parseTelegramTarget(to: string): TelegramTarget { return { chatId: topicMatch[1], messageThreadId: Number.parseInt(topicMatch[2], 10), + chatType: resolveTelegramChatType(topicMatch[1]), }; } @@ -49,8 +62,16 @@ export function parseTelegramTarget(to: string): TelegramTarget { return { chatId: colonMatch[1], messageThreadId: Number.parseInt(colonMatch[2], 10), + chatType: resolveTelegramChatType(colonMatch[1]), }; } - return { chatId: normalized }; + return { + chatId: normalized, + chatType: resolveTelegramChatType(normalized), + }; +} + +export function resolveTelegramTargetChatType(target: string): "direct" | "group" | "unknown" { + return parseTelegramTarget(target).chatType; } diff --git a/src/telegram/webhook-set.ts b/src/telegram/webhook-set.ts deleted file mode 100644 index 1bee5248526..00000000000 --- a/src/telegram/webhook-set.ts +++ /dev/null @@ -1,41 +0,0 @@ -import { type ApiClientOptions, Bot } from "grammy"; -import type { TelegramNetworkConfig } from "../config/types.telegram.js"; -import { withTelegramApiErrorLogging } from "./api-logging.js"; -import { resolveTelegramFetch } from "./fetch.js"; - -export async function setTelegramWebhook(opts: { - token: string; - url: string; - secret?: string; - dropPendingUpdates?: boolean; - network?: TelegramNetworkConfig; -}) { - const fetchImpl = resolveTelegramFetch(undefined, { network: opts.network }); - const client: ApiClientOptions | undefined = fetchImpl - ? { fetch: fetchImpl as unknown as ApiClientOptions["fetch"] } - : undefined; - const bot = new Bot(opts.token, client ? { client } : undefined); - await withTelegramApiErrorLogging({ - operation: "setWebhook", - fn: () => - bot.api.setWebhook(opts.url, { - secret_token: opts.secret, - drop_pending_updates: opts.dropPendingUpdates ?? false, - }), - }); -} - -export async function deleteTelegramWebhook(opts: { - token: string; - network?: TelegramNetworkConfig; -}) { - const fetchImpl = resolveTelegramFetch(undefined, { network: opts.network }); - const client: ApiClientOptions | undefined = fetchImpl - ? { fetch: fetchImpl as unknown as ApiClientOptions["fetch"] } - : undefined; - const bot = new Bot(opts.token, client ? { client } : undefined); - await withTelegramApiErrorLogging({ - operation: "deleteWebhook", - fn: () => bot.api.deleteWebhook(), - }); -}