refactor(telegram): simplify send/dispatch/target handling (#17819)

Merged via /review-pr -> /prepare-pr -> /merge-pr.

Prepared head SHA: fcb7aeeca3
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Reviewed-by: @obviyus
This commit is contained in:
Ayaan Zaidi
2026-02-16 14:00:34 +05:30
committed by GitHub
parent 1f607bec49
commit b6a9741ba4
16 changed files with 1125 additions and 1141 deletions

View File

@@ -1,4 +1,5 @@
import type { Message } from "@grammyjs/types";
import type { TelegramGroupConfig, TelegramTopicConfig } from "../config/types.js";
import type { TelegramMediaRef } from "./bot-message-context.js";
import type { TelegramContext } from "./bot/types.js";
import { resolveDefaultAgentId } from "../agents/agent-scope.js";
@@ -21,7 +22,11 @@ import { readChannelAllowFromStore } from "../pairing/pairing-store.js";
import { resolveAgentRoute } from "../routing/resolve-route.js";
import { resolveThreadSessionKeys } from "../routing/session-key.js";
import { withTelegramApiErrorLogging } from "./api-logging.js";
import { firstDefined, isSenderAllowed, normalizeAllowFromWithStore } from "./bot-access.js";
import {
isSenderAllowed,
normalizeAllowFromWithStore,
type NormalizedAllowFrom,
} from "./bot-access.js";
import { RegisterTelegramHandlerParams } from "./bot-native-commands.js";
import { MEDIA_GROUP_TIMEOUT_MS, type MediaGroupEntry } from "./bot-updates.js";
import { resolveMedia } from "./bot/delivery.js";
@@ -31,6 +36,10 @@ import {
resolveTelegramForumThreadId,
resolveTelegramGroupAllowFromContext,
} from "./bot/helpers.js";
import {
evaluateTelegramGroupBaseAccess,
evaluateTelegramGroupPolicyAccess,
} from "./group-access.js";
import { migrateTelegramGroupConfig } from "./group-migration.js";
import { resolveTelegramInlineButtonsScope } from "./inline-buttons.js";
import {
@@ -94,6 +103,30 @@ export const registerTelegramHandlers = ({
debounceKey: string | null;
botUsername?: string;
};
const buildSyntheticTextMessage = (params: {
base: Message;
text: string;
date?: number;
from?: Message["from"];
}): Message => ({
...params.base,
...(params.from ? { from: params.from } : {}),
text: params.text,
caption: undefined,
caption_entities: undefined,
entities: undefined,
...(params.date != null ? { date: params.date } : {}),
});
const buildSyntheticContext = (
ctx: Pick<TelegramContext, "me"> & { getFile?: unknown },
message: Message,
): TelegramContext => {
const getFile =
typeof ctx.getFile === "function"
? (ctx.getFile as TelegramContext["getFile"]).bind(ctx as object)
: async () => ({});
return { message, me: ctx.me, getFile };
};
const inboundDebouncer = createInboundDebouncer<TelegramDebounceEntry>({
debounceMs,
buildKey: (entry) => entry.debounceKey,
@@ -125,19 +158,14 @@ export const registerTelegramHandlers = ({
}
const first = entries[0];
const baseCtx = first.ctx;
const getFile =
typeof baseCtx.getFile === "function" ? baseCtx.getFile.bind(baseCtx) : async () => ({});
const syntheticMessage: Message = {
...first.msg,
const syntheticMessage = buildSyntheticTextMessage({
base: first.msg,
text: combinedText,
caption: undefined,
caption_entities: undefined,
entities: undefined,
date: last.msg.date ?? first.msg.date,
};
});
const messageIdOverride = last.msg.message_id ? String(last.msg.message_id) : undefined;
await processMessage(
{ message: syntheticMessage, me: baseCtx.me, getFile },
buildSyntheticContext(baseCtx, syntheticMessage),
[],
first.storeAllowFrom,
messageIdOverride ? { messageIdOverride } : undefined,
@@ -227,11 +255,7 @@ export const registerTelegramHandlers = ({
}
}
const storeAllowFrom = await readChannelAllowFromStore(
"telegram",
process.env,
accountId,
).catch(() => []);
const storeAllowFrom = await loadStoreAllowFrom();
await processMessage(primaryEntry.ctx, allMedia, storeAllowFrom);
} catch (err) {
runtime.error?.(danger(`media group handler failed: ${String(err)}`));
@@ -253,48 +277,187 @@ export const registerTelegramHandlers = ({
return;
}
const syntheticMessage: Message = {
...first.msg,
const syntheticMessage = buildSyntheticTextMessage({
base: first.msg,
text: combinedText,
caption: undefined,
caption_entities: undefined,
entities: undefined,
date: last.msg.date ?? first.msg.date,
};
});
const storeAllowFrom = await readChannelAllowFromStore(
"telegram",
process.env,
accountId,
).catch(() => []);
const storeAllowFrom = await loadStoreAllowFrom();
const baseCtx = first.ctx;
const getFile =
typeof baseCtx.getFile === "function" ? baseCtx.getFile.bind(baseCtx) : async () => ({});
await processMessage(
{ message: syntheticMessage, me: baseCtx.me, getFile },
[],
storeAllowFrom,
{ messageIdOverride: String(last.msg.message_id) },
);
await processMessage(buildSyntheticContext(baseCtx, syntheticMessage), [], storeAllowFrom, {
messageIdOverride: String(last.msg.message_id),
});
} catch (err) {
runtime.error?.(danger(`text fragment handler failed: ${String(err)}`));
}
};
const queueTextFragmentFlush = async (entry: TextFragmentEntry) => {
textFragmentProcessing = textFragmentProcessing
.then(async () => {
await flushTextFragments(entry);
})
.catch(() => undefined);
await textFragmentProcessing;
};
const runTextFragmentFlush = async (entry: TextFragmentEntry) => {
textFragmentBuffer.delete(entry.key);
await queueTextFragmentFlush(entry);
};
const scheduleTextFragmentFlush = (entry: TextFragmentEntry) => {
clearTimeout(entry.timer);
entry.timer = setTimeout(async () => {
textFragmentBuffer.delete(entry.key);
textFragmentProcessing = textFragmentProcessing
.then(async () => {
await flushTextFragments(entry);
})
.catch(() => undefined);
await textFragmentProcessing;
await runTextFragmentFlush(entry);
}, TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS);
};
const enqueueMediaGroupFlush = async (mediaGroupId: string, entry: MediaGroupEntry) => {
mediaGroupBuffer.delete(mediaGroupId);
mediaGroupProcessing = mediaGroupProcessing
.then(async () => {
await processMediaGroup(entry);
})
.catch(() => undefined);
await mediaGroupProcessing;
};
const scheduleMediaGroupFlush = (mediaGroupId: string, entry: MediaGroupEntry) => {
clearTimeout(entry.timer);
entry.timer = setTimeout(async () => {
await enqueueMediaGroupFlush(mediaGroupId, entry);
}, mediaGroupTimeoutMs);
};
const getOrCreateMediaGroupEntry = (mediaGroupId: string) => {
const existing = mediaGroupBuffer.get(mediaGroupId);
if (existing) {
return existing;
}
const entry: MediaGroupEntry = {
messages: [],
timer: setTimeout(() => undefined, mediaGroupTimeoutMs),
};
mediaGroupBuffer.set(mediaGroupId, entry);
return entry;
};
const loadStoreAllowFrom = async () =>
readChannelAllowFromStore("telegram", process.env, accountId).catch(() => []);
const isAllowlistAuthorized = (
allow: NormalizedAllowFrom,
senderId: string,
senderUsername: string,
) =>
allow.hasWildcard ||
(allow.hasEntries &&
isSenderAllowed({
allow,
senderId,
senderUsername,
}));
const shouldSkipGroupMessage = (params: {
isGroup: boolean;
chatId: string | number;
chatTitle?: string;
resolvedThreadId?: number;
senderId: string;
senderUsername: string;
effectiveGroupAllow: NormalizedAllowFrom;
hasGroupAllowOverride: boolean;
groupConfig?: TelegramGroupConfig;
topicConfig?: TelegramTopicConfig;
}) => {
const {
isGroup,
chatId,
chatTitle,
resolvedThreadId,
senderId,
senderUsername,
effectiveGroupAllow,
hasGroupAllowOverride,
groupConfig,
topicConfig,
} = params;
const baseAccess = evaluateTelegramGroupBaseAccess({
isGroup,
groupConfig,
topicConfig,
hasGroupAllowOverride,
effectiveGroupAllow,
senderId,
senderUsername,
enforceAllowOverride: true,
requireSenderForAllowOverride: true,
});
if (!baseAccess.allowed) {
if (baseAccess.reason === "group-disabled") {
logVerbose(`Blocked telegram group ${chatId} (group disabled)`);
return true;
}
if (baseAccess.reason === "topic-disabled") {
logVerbose(
`Blocked telegram topic ${chatId} (${resolvedThreadId ?? "unknown"}) (topic disabled)`,
);
return true;
}
logVerbose(
`Blocked telegram group sender ${senderId || "unknown"} (group allowFrom override)`,
);
return true;
}
if (!isGroup) {
return false;
}
const policyAccess = evaluateTelegramGroupPolicyAccess({
isGroup,
chatId,
cfg,
telegramCfg,
topicConfig,
groupConfig,
effectiveGroupAllow,
senderId,
senderUsername,
resolveGroupPolicy,
enforcePolicy: true,
useTopicAndGroupOverrides: true,
enforceAllowlistAuthorization: true,
allowEmptyAllowlistEntries: false,
requireSenderForAllowlistAuthorization: true,
checkChatAllowlist: true,
});
if (!policyAccess.allowed) {
if (policyAccess.reason === "group-policy-disabled") {
logVerbose("Blocked telegram group message (groupPolicy: disabled)");
return true;
}
if (policyAccess.reason === "group-policy-allowlist-no-sender") {
logVerbose("Blocked telegram group message (no sender ID, groupPolicy: allowlist)");
return true;
}
if (policyAccess.reason === "group-policy-allowlist-empty") {
logVerbose(
"Blocked telegram group message (groupPolicy: allowlist, no group allowlist entries)",
);
return true;
}
if (policyAccess.reason === "group-policy-allowlist-unauthorized") {
logVerbose(`Blocked telegram group message from ${senderId} (groupPolicy: allowlist)`);
return true;
}
logger.info({ chatId, title: chatTitle, reason: "not-allowed" }, "skipping group message");
return true;
}
return false;
};
bot.on("callback_query", async (ctx) => {
const callback = ctx.callbackQuery;
if (!callback) {
@@ -303,11 +466,15 @@ export const registerTelegramHandlers = ({
if (shouldSkipUpdate(ctx)) {
return;
}
const answerCallbackQuery =
typeof (ctx as { answerCallbackQuery?: unknown }).answerCallbackQuery === "function"
? () => ctx.answerCallbackQuery()
: () => bot.api.answerCallbackQuery(callback.id);
// Answer immediately to prevent Telegram from retrying while we process
await withTelegramApiErrorLogging({
operation: "answerCallbackQuery",
runtime,
fn: () => bot.api.answerCallbackQuery(callback.id),
fn: answerCallbackQuery,
}).catch(() => {});
try {
const data = (callback.data ?? "").trim();
@@ -315,6 +482,38 @@ export const registerTelegramHandlers = ({
if (!data || !callbackMessage) {
return;
}
const editCallbackMessage = async (
text: string,
params?: Parameters<typeof bot.api.editMessageText>[3],
) => {
const editTextFn = (ctx as { editMessageText?: unknown }).editMessageText;
if (typeof editTextFn === "function") {
return await ctx.editMessageText(text, params);
}
return await bot.api.editMessageText(
callbackMessage.chat.id,
callbackMessage.message_id,
text,
params,
);
};
const deleteCallbackMessage = async () => {
const deleteFn = (ctx as { deleteMessage?: unknown }).deleteMessage;
if (typeof deleteFn === "function") {
return await ctx.deleteMessage();
}
return await bot.api.deleteMessage(callbackMessage.chat.id, callbackMessage.message_id);
};
const replyToCallbackChat = async (
text: string,
params?: Parameters<typeof bot.api.sendMessage>[2],
) => {
const replyFn = (ctx as { reply?: unknown }).reply;
if (typeof replyFn === "function") {
return await ctx.reply(text, params);
}
return await bot.api.sendMessage(callbackMessage.chat.id, text, params);
};
const inlineButtonsScope = resolveTelegramInlineButtonsScope({
cfg,
@@ -344,8 +543,14 @@ export const registerTelegramHandlers = ({
groupAllowFrom,
resolveTelegramGroupConfig,
});
const { resolvedThreadId, storeAllowFrom, groupConfig, topicConfig, effectiveGroupAllow } =
groupAllowContext;
const {
resolvedThreadId,
storeAllowFrom,
groupConfig,
topicConfig,
effectiveGroupAllow,
hasGroupAllowOverride,
} = groupAllowContext;
const effectiveDmAllow = normalizeAllowFromWithStore({
allowFrom: telegramCfg.allowFrom,
storeAllowFrom,
@@ -353,75 +558,21 @@ export const registerTelegramHandlers = ({
const dmPolicy = telegramCfg.dmPolicy ?? "pairing";
const senderId = callback.from?.id ? String(callback.from.id) : "";
const senderUsername = callback.from?.username ?? "";
if (isGroup) {
if (groupConfig?.enabled === false) {
logVerbose(`Blocked telegram group ${chatId} (group disabled)`);
return;
}
if (topicConfig?.enabled === false) {
logVerbose(
`Blocked telegram topic ${chatId} (${resolvedThreadId ?? "unknown"}) (topic disabled)`,
);
return;
}
if (groupAllowContext.hasGroupAllowOverride) {
const allowed =
senderId &&
isSenderAllowed({
allow: effectiveGroupAllow,
senderId,
senderUsername,
});
if (!allowed) {
logVerbose(
`Blocked telegram group sender ${senderId || "unknown"} (group allowFrom override)`,
);
return;
}
}
const defaultGroupPolicy = cfg.channels?.defaults?.groupPolicy;
const groupPolicy = firstDefined(
topicConfig?.groupPolicy,
groupConfig?.groupPolicy,
telegramCfg.groupPolicy,
defaultGroupPolicy,
"open",
);
if (groupPolicy === "disabled") {
logVerbose(`Blocked telegram group message (groupPolicy: disabled)`);
return;
}
if (groupPolicy === "allowlist") {
if (!senderId) {
logVerbose(`Blocked telegram group message (no sender ID, groupPolicy: allowlist)`);
return;
}
if (!effectiveGroupAllow.hasEntries) {
logVerbose(
"Blocked telegram group message (groupPolicy: allowlist, no group allowlist entries)",
);
return;
}
if (
!isSenderAllowed({
allow: effectiveGroupAllow,
senderId,
senderUsername,
})
) {
logVerbose(`Blocked telegram group message from ${senderId} (groupPolicy: allowlist)`);
return;
}
}
const groupAllowlist = resolveGroupPolicy(chatId);
if (groupAllowlist.allowlistEnabled && !groupAllowlist.allowed) {
logger.info(
{ chatId, title: callbackMessage.chat.title, reason: "not-allowed" },
"skipping group message",
);
return;
}
if (
shouldSkipGroupMessage({
isGroup,
chatId,
chatTitle: callbackMessage.chat.title,
resolvedThreadId,
senderId,
senderUsername,
effectiveGroupAllow,
hasGroupAllowOverride,
groupConfig,
topicConfig,
})
) {
return;
}
if (inlineButtonsScope === "allowlist") {
@@ -430,27 +581,13 @@ export const registerTelegramHandlers = ({
return;
}
if (dmPolicy !== "open") {
const allowed =
effectiveDmAllow.hasWildcard ||
(effectiveDmAllow.hasEntries &&
isSenderAllowed({
allow: effectiveDmAllow,
senderId,
senderUsername,
}));
const allowed = isAllowlistAuthorized(effectiveDmAllow, senderId, senderUsername);
if (!allowed) {
return;
}
}
} else {
const allowed =
effectiveGroupAllow.hasWildcard ||
(effectiveGroupAllow.hasEntries &&
isSenderAllowed({
allow: effectiveGroupAllow,
senderId,
senderUsername,
}));
const allowed = isAllowlistAuthorized(effectiveGroupAllow, senderId, senderUsername);
if (!allowed) {
return;
}
@@ -487,12 +624,7 @@ export const registerTelegramHandlers = ({
: undefined;
try {
await bot.api.editMessageText(
callbackMessage.chat.id,
callbackMessage.message_id,
result.text,
keyboard ? { reply_markup: keyboard } : undefined,
);
await editCallbackMessage(result.text, keyboard ? { reply_markup: keyboard } : undefined);
} catch (editErr) {
const errStr = String(editErr);
if (!errStr.includes("message is not modified")) {
@@ -514,23 +646,14 @@ export const registerTelegramHandlers = ({
) => {
const keyboard = buildInlineKeyboard(buttons);
try {
await bot.api.editMessageText(
callbackMessage.chat.id,
callbackMessage.message_id,
text,
keyboard ? { reply_markup: keyboard } : undefined,
);
await editCallbackMessage(text, keyboard ? { reply_markup: keyboard } : undefined);
} catch (editErr) {
const errStr = String(editErr);
if (errStr.includes("no text in the message")) {
try {
await bot.api.deleteMessage(callbackMessage.chat.id, callbackMessage.message_id);
await deleteCallbackMessage();
} catch {}
await bot.api.sendMessage(
callbackMessage.chat.id,
text,
keyboard ? { reply_markup: keyboard } : undefined,
);
await replyToCallbackChat(text, keyboard ? { reply_markup: keyboard } : undefined);
} else if (!errStr.includes("message is not modified")) {
throw editErr;
}
@@ -597,41 +720,27 @@ export const registerTelegramHandlers = ({
if (modelCallback.type === "select") {
const { provider, model } = modelCallback;
// Process model selection as a synthetic message with /model command
const syntheticMessage: Message = {
...callbackMessage,
const syntheticMessage = buildSyntheticTextMessage({
base: callbackMessage,
from: callback.from,
text: `/model ${provider}/${model}`,
caption: undefined,
caption_entities: undefined,
entities: undefined,
};
const getFile =
typeof ctx.getFile === "function" ? ctx.getFile.bind(ctx) : async () => ({});
await processMessage(
{ message: syntheticMessage, me: ctx.me, getFile },
[],
storeAllowFrom,
{
forceWasMentioned: true,
messageIdOverride: callback.id,
},
);
});
await processMessage(buildSyntheticContext(ctx, syntheticMessage), [], storeAllowFrom, {
forceWasMentioned: true,
messageIdOverride: callback.id,
});
return;
}
return;
}
const syntheticMessage: Message = {
...callbackMessage,
const syntheticMessage = buildSyntheticTextMessage({
base: callbackMessage,
from: callback.from,
text: data,
caption: undefined,
caption_entities: undefined,
entities: undefined,
};
const getFile = typeof ctx.getFile === "function" ? ctx.getFile.bind(ctx) : async () => ({});
await processMessage({ message: syntheticMessage, me: ctx.me, getFile }, [], storeAllowFrom, {
});
await processMessage(buildSyntheticContext(ctx, syntheticMessage), [], storeAllowFrom, {
forceWasMentioned: true,
messageIdOverride: callback.id,
});
@@ -723,85 +832,23 @@ export const registerTelegramHandlers = ({
hasGroupAllowOverride,
} = groupAllowContext;
if (isGroup) {
if (groupConfig?.enabled === false) {
logVerbose(`Blocked telegram group ${chatId} (group disabled)`);
return;
}
if (topicConfig?.enabled === false) {
logVerbose(
`Blocked telegram topic ${chatId} (${resolvedThreadId ?? "unknown"}) (topic disabled)`,
);
return;
}
if (hasGroupAllowOverride) {
const senderId = msg.from?.id;
const senderUsername = msg.from?.username ?? "";
const allowed =
senderId != null &&
isSenderAllowed({
allow: effectiveGroupAllow,
senderId: String(senderId),
senderUsername,
});
if (!allowed) {
logVerbose(
`Blocked telegram group sender ${senderId ?? "unknown"} (group allowFrom override)`,
);
return;
}
}
// Group policy filtering: controls how group messages are handled
// - "open": groups bypass allowFrom, only mention-gating applies
// - "disabled": block all group messages entirely
// - "allowlist": only allow group messages from senders in groupAllowFrom/allowFrom
const defaultGroupPolicy = cfg.channels?.defaults?.groupPolicy;
const groupPolicy = firstDefined(
topicConfig?.groupPolicy,
groupConfig?.groupPolicy,
telegramCfg.groupPolicy,
defaultGroupPolicy,
"open",
);
if (groupPolicy === "disabled") {
logVerbose(`Blocked telegram group message (groupPolicy: disabled)`);
return;
}
if (groupPolicy === "allowlist") {
// For allowlist mode, the sender (msg.from.id) must be in allowFrom
const senderId = msg.from?.id;
if (senderId == null) {
logVerbose(`Blocked telegram group message (no sender ID, groupPolicy: allowlist)`);
return;
}
if (!effectiveGroupAllow.hasEntries) {
logVerbose(
"Blocked telegram group message (groupPolicy: allowlist, no group allowlist entries)",
);
return;
}
const senderUsername = msg.from?.username ?? "";
if (
!isSenderAllowed({
allow: effectiveGroupAllow,
senderId: String(senderId),
senderUsername,
})
) {
logVerbose(`Blocked telegram group message from ${senderId} (groupPolicy: allowlist)`);
return;
}
}
// Group allowlist based on configured group IDs.
const groupAllowlist = resolveGroupPolicy(chatId);
if (groupAllowlist.allowlistEnabled && !groupAllowlist.allowed) {
logger.info(
{ chatId, title: msg.chat.title, reason: "not-allowed" },
"skipping group message",
);
return;
}
const senderId = msg.from?.id != null ? String(msg.from.id) : "";
const senderUsername = msg.from?.username ?? "";
if (
shouldSkipGroupMessage({
isGroup,
chatId,
chatTitle: msg.chat.title,
resolvedThreadId,
senderId,
senderUsername,
effectiveGroupAllow,
hasGroupAllowOverride,
groupConfig,
topicConfig,
})
) {
return;
}
// Text fragment handling - Telegram splits long pastes into multiple inbound messages (~4096 chars).
@@ -844,13 +891,7 @@ export const registerTelegramHandlers = ({
// Not appendable (or limits exceeded): flush buffered entry first, then continue normally.
clearTimeout(existing.timer);
textFragmentBuffer.delete(key);
textFragmentProcessing = textFragmentProcessing
.then(async () => {
await flushTextFragments(existing);
})
.catch(() => undefined);
await textFragmentProcessing;
await runTextFragmentFlush(existing);
}
const shouldStart = text.length >= TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS;
@@ -869,34 +910,9 @@ export const registerTelegramHandlers = ({
// Media group handling - buffer multi-image messages
const mediaGroupId = msg.media_group_id;
if (mediaGroupId) {
const existing = mediaGroupBuffer.get(mediaGroupId);
if (existing) {
clearTimeout(existing.timer);
existing.messages.push({ msg, ctx });
existing.timer = setTimeout(async () => {
mediaGroupBuffer.delete(mediaGroupId);
mediaGroupProcessing = mediaGroupProcessing
.then(async () => {
await processMediaGroup(existing);
})
.catch(() => undefined);
await mediaGroupProcessing;
}, mediaGroupTimeoutMs);
} else {
const entry: MediaGroupEntry = {
messages: [{ msg, ctx }],
timer: setTimeout(async () => {
mediaGroupBuffer.delete(mediaGroupId);
mediaGroupProcessing = mediaGroupProcessing
.then(async () => {
await processMediaGroup(entry);
})
.catch(() => undefined);
await mediaGroupProcessing;
}, mediaGroupTimeoutMs),
};
mediaGroupBuffer.set(mediaGroupId, entry);
}
const entry = getOrCreateMediaGroupEntry(mediaGroupId);
entry.messages.push({ msg, ctx });
scheduleMediaGroupFlush(mediaGroupId, entry);
return;
}
@@ -938,7 +954,6 @@ export const registerTelegramHandlers = ({
},
]
: [];
const senderId = msg.from?.id ? String(msg.from.id) : "";
const conversationKey =
resolvedThreadId != null ? `${chatId}:topic:${resolvedThreadId}` : String(chatId);
const debounceKey = senderId