mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-09 11:07:41 +00:00
feat(telegram/acp): Topic Binding, Pin Binding Message, Fix Spawn Param Parsing (#36683)
* fix(acp): normalize unicode flags and Telegram topic binding * feat(telegram/acp): restore topic-bound ACP and session bindings * fix(acpx): clarify permission-denied guidance * feat(telegram/acp): pin spawn bind notice in topics * docs(telegram): document ACP topic thread binding behavior * refactor(reply): share Telegram conversation-id resolver * fix(telegram/acp): preserve bound session routing semantics * fix(telegram): respect binding persistence and expiry reporting * refactor(telegram): simplify binding lifecycle persistence * fix(telegram): bind acp spawns in direct messages * fix: document telegram ACP topic binding changelog (#36683) (thanks @huntharo) --------- Co-authored-by: Onur <2453968+osolmaz@users.noreply.github.com>
This commit is contained in:
116
src/telegram/bot-message-context.thread-binding.test.ts
Normal file
116
src/telegram/bot-message-context.thread-binding.test.ts
Normal file
@@ -0,0 +1,116 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
const hoisted = vi.hoisted(() => {
|
||||
const resolveByConversationMock = vi.fn();
|
||||
const touchMock = vi.fn();
|
||||
return {
|
||||
resolveByConversationMock,
|
||||
touchMock,
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("../infra/outbound/session-binding-service.js", async (importOriginal) => {
|
||||
const actual =
|
||||
await importOriginal<typeof import("../infra/outbound/session-binding-service.js")>();
|
||||
return {
|
||||
...actual,
|
||||
getSessionBindingService: () => ({
|
||||
bind: vi.fn(),
|
||||
getCapabilities: vi.fn(),
|
||||
listBySession: vi.fn(),
|
||||
resolveByConversation: (ref: unknown) => hoisted.resolveByConversationMock(ref),
|
||||
touch: (bindingId: string, at?: number) => hoisted.touchMock(bindingId, at),
|
||||
unbind: vi.fn(),
|
||||
}),
|
||||
};
|
||||
});
|
||||
|
||||
const { buildTelegramMessageContextForTest } =
|
||||
await import("./bot-message-context.test-harness.js");
|
||||
|
||||
describe("buildTelegramMessageContext bound conversation override", () => {
|
||||
beforeEach(() => {
|
||||
hoisted.resolveByConversationMock.mockReset().mockReturnValue(null);
|
||||
hoisted.touchMock.mockReset();
|
||||
});
|
||||
|
||||
it("routes forum topic messages to the bound session", async () => {
|
||||
hoisted.resolveByConversationMock.mockReturnValue({
|
||||
bindingId: "default:-100200300:topic:77",
|
||||
targetSessionKey: "agent:codex-acp:session-1",
|
||||
});
|
||||
|
||||
const ctx = await buildTelegramMessageContextForTest({
|
||||
message: {
|
||||
message_id: 1,
|
||||
chat: { id: -100200300, type: "supergroup", is_forum: true },
|
||||
message_thread_id: 77,
|
||||
date: 1_700_000_000,
|
||||
text: "hello",
|
||||
from: { id: 42, first_name: "Alice" },
|
||||
},
|
||||
options: { forceWasMentioned: true },
|
||||
resolveGroupActivation: () => true,
|
||||
});
|
||||
|
||||
expect(hoisted.resolveByConversationMock).toHaveBeenCalledWith({
|
||||
channel: "telegram",
|
||||
accountId: "default",
|
||||
conversationId: "-100200300:topic:77",
|
||||
});
|
||||
expect(ctx?.ctxPayload?.SessionKey).toBe("agent:codex-acp:session-1");
|
||||
expect(hoisted.touchMock).toHaveBeenCalledWith("default:-100200300:topic:77", undefined);
|
||||
});
|
||||
|
||||
it("treats named-account bound conversations as explicit route matches", async () => {
|
||||
hoisted.resolveByConversationMock.mockReturnValue({
|
||||
bindingId: "work:-100200300:topic:77",
|
||||
targetSessionKey: "agent:codex-acp:session-2",
|
||||
});
|
||||
|
||||
const ctx = await buildTelegramMessageContextForTest({
|
||||
accountId: "work",
|
||||
message: {
|
||||
message_id: 1,
|
||||
chat: { id: -100200300, type: "supergroup", is_forum: true },
|
||||
message_thread_id: 77,
|
||||
date: 1_700_000_000,
|
||||
text: "hello",
|
||||
from: { id: 42, first_name: "Alice" },
|
||||
},
|
||||
options: { forceWasMentioned: true },
|
||||
resolveGroupActivation: () => true,
|
||||
});
|
||||
|
||||
expect(ctx).not.toBeNull();
|
||||
expect(ctx?.route.accountId).toBe("work");
|
||||
expect(ctx?.route.matchedBy).toBe("binding.channel");
|
||||
expect(ctx?.ctxPayload?.SessionKey).toBe("agent:codex-acp:session-2");
|
||||
expect(hoisted.touchMock).toHaveBeenCalledWith("work:-100200300:topic:77", undefined);
|
||||
});
|
||||
|
||||
it("routes dm messages to the bound session", async () => {
|
||||
hoisted.resolveByConversationMock.mockReturnValue({
|
||||
bindingId: "default:1234",
|
||||
targetSessionKey: "agent:codex-acp:session-dm",
|
||||
});
|
||||
|
||||
const ctx = await buildTelegramMessageContextForTest({
|
||||
message: {
|
||||
message_id: 1,
|
||||
chat: { id: 1234, type: "private" },
|
||||
date: 1_700_000_000,
|
||||
text: "hello",
|
||||
from: { id: 42, first_name: "Alice" },
|
||||
},
|
||||
});
|
||||
|
||||
expect(hoisted.resolveByConversationMock).toHaveBeenCalledWith({
|
||||
channel: "telegram",
|
||||
accountId: "default",
|
||||
conversationId: "1234",
|
||||
});
|
||||
expect(ctx?.ctxPayload?.SessionKey).toBe("agent:codex-acp:session-dm");
|
||||
expect(hoisted.touchMock).toHaveBeenCalledWith("default:1234", undefined);
|
||||
});
|
||||
});
|
||||
@@ -42,6 +42,7 @@ import type {
|
||||
} from "../config/types.js";
|
||||
import { logVerbose, shouldLogVerbose } from "../globals.js";
|
||||
import { recordChannelActivity } from "../infra/channel-activity.js";
|
||||
import { getSessionBindingService } from "../infra/outbound/session-binding-service.js";
|
||||
import {
|
||||
buildAgentSessionKey,
|
||||
pickFirstExistingAgentId,
|
||||
@@ -51,6 +52,7 @@ import {
|
||||
import {
|
||||
DEFAULT_ACCOUNT_ID,
|
||||
buildAgentMainSessionKey,
|
||||
resolveAgentIdFromSessionKey,
|
||||
resolveThreadSessionKeys,
|
||||
} from "../routing/session-key.js";
|
||||
import { resolvePinnedMainDmOwnerFromAllowlist } from "../security/dm-policy-shared.js";
|
||||
@@ -257,9 +259,37 @@ export const buildTelegramMessageContext = async ({
|
||||
conversationId: peerId,
|
||||
parentConversationId: isGroup ? String(chatId) : undefined,
|
||||
});
|
||||
const configuredBinding = configuredRoute.configuredBinding;
|
||||
const configuredBindingSessionKey = configuredRoute.boundSessionKey ?? "";
|
||||
let configuredBinding = configuredRoute.configuredBinding;
|
||||
let configuredBindingSessionKey = configuredRoute.boundSessionKey ?? "";
|
||||
route = configuredRoute.route;
|
||||
const threadBindingConversationId =
|
||||
replyThreadId != null
|
||||
? `${chatId}:topic:${replyThreadId}`
|
||||
: !isGroup
|
||||
? String(chatId)
|
||||
: undefined;
|
||||
if (threadBindingConversationId) {
|
||||
const threadBinding = getSessionBindingService().resolveByConversation({
|
||||
channel: "telegram",
|
||||
accountId: account.accountId,
|
||||
conversationId: threadBindingConversationId,
|
||||
});
|
||||
const boundSessionKey = threadBinding?.targetSessionKey?.trim();
|
||||
if (threadBinding && boundSessionKey) {
|
||||
route = {
|
||||
...route,
|
||||
sessionKey: boundSessionKey,
|
||||
agentId: resolveAgentIdFromSessionKey(boundSessionKey),
|
||||
matchedBy: "binding.channel",
|
||||
};
|
||||
configuredBinding = null;
|
||||
configuredBindingSessionKey = "";
|
||||
getSessionBindingService().touch(threadBinding.bindingId);
|
||||
logVerbose(
|
||||
`telegram: routed via bound conversation ${threadBindingConversationId} -> ${boundSessionKey}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
const requiresExplicitAccountBinding = (candidate: ResolvedAgentRoute): boolean =>
|
||||
candidate.accountId !== DEFAULT_ACCOUNT_ID && candidate.matchedBy === "default";
|
||||
// Fail closed for named Telegram accounts when route resolution falls back to
|
||||
|
||||
@@ -5,6 +5,11 @@ import { Bot } from "grammy";
|
||||
import { resolveDefaultAgentId } from "../agents/agent-scope.js";
|
||||
import { resolveTextChunkLimit } from "../auto-reply/chunk.js";
|
||||
import { DEFAULT_GROUP_HISTORY_LIMIT, type HistoryEntry } from "../auto-reply/reply/history.js";
|
||||
import {
|
||||
resolveThreadBindingIdleTimeoutMsForChannel,
|
||||
resolveThreadBindingMaxAgeMsForChannel,
|
||||
resolveThreadBindingSpawnPolicy,
|
||||
} from "../channels/thread-bindings-policy.js";
|
||||
import {
|
||||
isNativeCommandsExplicitlyDisabled,
|
||||
resolveNativeCommandsEnabled,
|
||||
@@ -36,6 +41,7 @@ import { buildTelegramGroupPeerId, resolveTelegramStreamMode } from "./bot/helpe
|
||||
import { resolveTelegramFetch } from "./fetch.js";
|
||||
import { createTelegramSendChatActionHandler } from "./sendchataction-401-backoff.js";
|
||||
import { getTelegramSequentialKey } from "./sequential-key.js";
|
||||
import { createTelegramThreadBindingManager } from "./thread-bindings.js";
|
||||
|
||||
export type TelegramBotOptions = {
|
||||
token: string;
|
||||
@@ -67,6 +73,27 @@ export function createTelegramBot(opts: TelegramBotOptions) {
|
||||
cfg,
|
||||
accountId: opts.accountId,
|
||||
});
|
||||
const threadBindingPolicy = resolveThreadBindingSpawnPolicy({
|
||||
cfg,
|
||||
channel: "telegram",
|
||||
accountId: account.accountId,
|
||||
kind: "subagent",
|
||||
});
|
||||
const threadBindingManager = threadBindingPolicy.enabled
|
||||
? createTelegramThreadBindingManager({
|
||||
accountId: account.accountId,
|
||||
idleTimeoutMs: resolveThreadBindingIdleTimeoutMsForChannel({
|
||||
cfg,
|
||||
channel: "telegram",
|
||||
accountId: account.accountId,
|
||||
}),
|
||||
maxAgeMs: resolveThreadBindingMaxAgeMsForChannel({
|
||||
cfg,
|
||||
channel: "telegram",
|
||||
accountId: account.accountId,
|
||||
}),
|
||||
})
|
||||
: null;
|
||||
const telegramCfg = account.config;
|
||||
|
||||
const fetchImpl = resolveTelegramFetch(opts.proxyFetch, {
|
||||
@@ -379,5 +406,11 @@ export function createTelegramBot(opts: TelegramBotOptions) {
|
||||
logger,
|
||||
});
|
||||
|
||||
const originalStop = bot.stop.bind(bot);
|
||||
bot.stop = ((...args: Parameters<typeof originalStop>) => {
|
||||
threadBindingManager?.stop();
|
||||
return originalStop(...args);
|
||||
}) as typeof bot.stop;
|
||||
|
||||
return bot;
|
||||
}
|
||||
|
||||
@@ -36,6 +36,11 @@ type DeliveryProgress = {
|
||||
deliveredCount: number;
|
||||
};
|
||||
|
||||
type TelegramReplyChannelData = {
|
||||
buttons?: TelegramInlineButtons;
|
||||
pin?: boolean;
|
||||
};
|
||||
|
||||
type ChunkTextFn = (markdown: string) => ReturnType<typeof markdownToTelegramChunks>;
|
||||
|
||||
function buildChunkTextResolver(params: {
|
||||
@@ -102,7 +107,8 @@ async function deliverTextReply(params: {
|
||||
replyToId?: number;
|
||||
replyToMode: ReplyToMode;
|
||||
progress: DeliveryProgress;
|
||||
}): Promise<void> {
|
||||
}): Promise<number | undefined> {
|
||||
let firstDeliveredMessageId: number | undefined;
|
||||
const chunks = params.chunkText(params.replyText);
|
||||
for (let i = 0; i < chunks.length; i += 1) {
|
||||
const chunk = chunks[i];
|
||||
@@ -115,18 +121,28 @@ async function deliverTextReply(params: {
|
||||
replyToMode: params.replyToMode,
|
||||
progress: params.progress,
|
||||
});
|
||||
await sendTelegramText(params.bot, params.chatId, chunk.html, params.runtime, {
|
||||
replyToMessageId: replyToForChunk,
|
||||
replyQuoteText: params.replyQuoteText,
|
||||
thread: params.thread,
|
||||
textMode: "html",
|
||||
plainText: chunk.text,
|
||||
linkPreview: params.linkPreview,
|
||||
replyMarkup: shouldAttachButtons ? params.replyMarkup : undefined,
|
||||
});
|
||||
const messageId = await sendTelegramText(
|
||||
params.bot,
|
||||
params.chatId,
|
||||
chunk.html,
|
||||
params.runtime,
|
||||
{
|
||||
replyToMessageId: replyToForChunk,
|
||||
replyQuoteText: params.replyQuoteText,
|
||||
thread: params.thread,
|
||||
textMode: "html",
|
||||
plainText: chunk.text,
|
||||
linkPreview: params.linkPreview,
|
||||
replyMarkup: shouldAttachButtons ? params.replyMarkup : undefined,
|
||||
},
|
||||
);
|
||||
if (firstDeliveredMessageId == null) {
|
||||
firstDeliveredMessageId = messageId;
|
||||
}
|
||||
markReplyApplied(params.progress, replyToForChunk);
|
||||
markDelivered(params.progress);
|
||||
}
|
||||
return firstDeliveredMessageId;
|
||||
}
|
||||
|
||||
async function sendPendingFollowUpText(params: {
|
||||
@@ -188,14 +204,15 @@ async function sendTelegramVoiceFallbackText(opts: {
|
||||
linkPreview?: boolean;
|
||||
replyMarkup?: ReturnType<typeof buildInlineKeyboard>;
|
||||
replyQuoteText?: string;
|
||||
}): Promise<void> {
|
||||
}): Promise<number | undefined> {
|
||||
let firstDeliveredMessageId: number | undefined;
|
||||
const chunks = opts.chunkText(opts.text);
|
||||
let appliedReplyTo = false;
|
||||
for (let i = 0; i < chunks.length; i += 1) {
|
||||
const chunk = chunks[i];
|
||||
// Only apply reply reference, quote text, and buttons to the first chunk.
|
||||
const replyToForChunk = !appliedReplyTo ? opts.replyToId : undefined;
|
||||
await sendTelegramText(opts.bot, opts.chatId, chunk.html, opts.runtime, {
|
||||
const messageId = await sendTelegramText(opts.bot, opts.chatId, chunk.html, opts.runtime, {
|
||||
replyToMessageId: replyToForChunk,
|
||||
replyQuoteText: !appliedReplyTo ? opts.replyQuoteText : undefined,
|
||||
thread: opts.thread,
|
||||
@@ -204,10 +221,14 @@ async function sendTelegramVoiceFallbackText(opts: {
|
||||
linkPreview: opts.linkPreview,
|
||||
replyMarkup: !appliedReplyTo ? opts.replyMarkup : undefined,
|
||||
});
|
||||
if (firstDeliveredMessageId == null) {
|
||||
firstDeliveredMessageId = messageId;
|
||||
}
|
||||
if (replyToForChunk) {
|
||||
appliedReplyTo = true;
|
||||
}
|
||||
}
|
||||
return firstDeliveredMessageId;
|
||||
}
|
||||
|
||||
async function deliverMediaReply(params: {
|
||||
@@ -227,7 +248,8 @@ async function deliverMediaReply(params: {
|
||||
replyToId?: number;
|
||||
replyToMode: ReplyToMode;
|
||||
progress: DeliveryProgress;
|
||||
}): Promise<void> {
|
||||
}): Promise<number | undefined> {
|
||||
let firstDeliveredMessageId: number | undefined;
|
||||
let first = true;
|
||||
let pendingFollowUpText: string | undefined;
|
||||
for (const mediaUrl of params.mediaList) {
|
||||
@@ -269,7 +291,7 @@ async function deliverMediaReply(params: {
|
||||
}),
|
||||
};
|
||||
if (isGif) {
|
||||
await sendTelegramWithThreadFallback({
|
||||
const result = await sendTelegramWithThreadFallback({
|
||||
operation: "sendAnimation",
|
||||
runtime: params.runtime,
|
||||
thread: params.thread,
|
||||
@@ -277,9 +299,12 @@ async function deliverMediaReply(params: {
|
||||
send: (effectiveParams) =>
|
||||
params.bot.api.sendAnimation(params.chatId, file, { ...effectiveParams }),
|
||||
});
|
||||
if (firstDeliveredMessageId == null) {
|
||||
firstDeliveredMessageId = result.message_id;
|
||||
}
|
||||
markDelivered(params.progress);
|
||||
} else if (kind === "image") {
|
||||
await sendTelegramWithThreadFallback({
|
||||
const result = await sendTelegramWithThreadFallback({
|
||||
operation: "sendPhoto",
|
||||
runtime: params.runtime,
|
||||
thread: params.thread,
|
||||
@@ -287,9 +312,12 @@ async function deliverMediaReply(params: {
|
||||
send: (effectiveParams) =>
|
||||
params.bot.api.sendPhoto(params.chatId, file, { ...effectiveParams }),
|
||||
});
|
||||
if (firstDeliveredMessageId == null) {
|
||||
firstDeliveredMessageId = result.message_id;
|
||||
}
|
||||
markDelivered(params.progress);
|
||||
} else if (kind === "video") {
|
||||
await sendTelegramWithThreadFallback({
|
||||
const result = await sendTelegramWithThreadFallback({
|
||||
operation: "sendVideo",
|
||||
runtime: params.runtime,
|
||||
thread: params.thread,
|
||||
@@ -297,6 +325,9 @@ async function deliverMediaReply(params: {
|
||||
send: (effectiveParams) =>
|
||||
params.bot.api.sendVideo(params.chatId, file, { ...effectiveParams }),
|
||||
});
|
||||
if (firstDeliveredMessageId == null) {
|
||||
firstDeliveredMessageId = result.message_id;
|
||||
}
|
||||
markDelivered(params.progress);
|
||||
} else if (kind === "audio") {
|
||||
const { useVoice } = resolveTelegramVoiceSend({
|
||||
@@ -308,7 +339,7 @@ async function deliverMediaReply(params: {
|
||||
if (useVoice) {
|
||||
await params.onVoiceRecording?.();
|
||||
try {
|
||||
await sendTelegramWithThreadFallback({
|
||||
const result = await sendTelegramWithThreadFallback({
|
||||
operation: "sendVoice",
|
||||
runtime: params.runtime,
|
||||
thread: params.thread,
|
||||
@@ -317,6 +348,9 @@ async function deliverMediaReply(params: {
|
||||
send: (effectiveParams) =>
|
||||
params.bot.api.sendVoice(params.chatId, file, { ...effectiveParams }),
|
||||
});
|
||||
if (firstDeliveredMessageId == null) {
|
||||
firstDeliveredMessageId = result.message_id;
|
||||
}
|
||||
markDelivered(params.progress);
|
||||
} catch (voiceErr) {
|
||||
if (isVoiceMessagesForbidden(voiceErr)) {
|
||||
@@ -332,7 +366,7 @@ async function deliverMediaReply(params: {
|
||||
replyToMode: params.replyToMode,
|
||||
progress: params.progress,
|
||||
});
|
||||
await sendTelegramVoiceFallbackText({
|
||||
const fallbackMessageId = await sendTelegramVoiceFallbackText({
|
||||
bot: params.bot,
|
||||
chatId: params.chatId,
|
||||
runtime: params.runtime,
|
||||
@@ -344,6 +378,9 @@ async function deliverMediaReply(params: {
|
||||
replyMarkup: params.replyMarkup,
|
||||
replyQuoteText: params.replyQuoteText,
|
||||
});
|
||||
if (firstDeliveredMessageId == null) {
|
||||
firstDeliveredMessageId = fallbackMessageId;
|
||||
}
|
||||
markReplyApplied(params.progress, voiceFallbackReplyTo);
|
||||
markDelivered(params.progress);
|
||||
continue;
|
||||
@@ -355,7 +392,7 @@ async function deliverMediaReply(params: {
|
||||
const noCaptionParams = { ...mediaParams };
|
||||
delete noCaptionParams.caption;
|
||||
delete noCaptionParams.parse_mode;
|
||||
await sendTelegramWithThreadFallback({
|
||||
const result = await sendTelegramWithThreadFallback({
|
||||
operation: "sendVoice",
|
||||
runtime: params.runtime,
|
||||
thread: params.thread,
|
||||
@@ -363,6 +400,9 @@ async function deliverMediaReply(params: {
|
||||
send: (effectiveParams) =>
|
||||
params.bot.api.sendVoice(params.chatId, file, { ...effectiveParams }),
|
||||
});
|
||||
if (firstDeliveredMessageId == null) {
|
||||
firstDeliveredMessageId = result.message_id;
|
||||
}
|
||||
markDelivered(params.progress);
|
||||
const fallbackText = params.reply.text;
|
||||
if (fallbackText?.trim()) {
|
||||
@@ -384,7 +424,7 @@ async function deliverMediaReply(params: {
|
||||
throw voiceErr;
|
||||
}
|
||||
} else {
|
||||
await sendTelegramWithThreadFallback({
|
||||
const result = await sendTelegramWithThreadFallback({
|
||||
operation: "sendAudio",
|
||||
runtime: params.runtime,
|
||||
thread: params.thread,
|
||||
@@ -392,10 +432,13 @@ async function deliverMediaReply(params: {
|
||||
send: (effectiveParams) =>
|
||||
params.bot.api.sendAudio(params.chatId, file, { ...effectiveParams }),
|
||||
});
|
||||
if (firstDeliveredMessageId == null) {
|
||||
firstDeliveredMessageId = result.message_id;
|
||||
}
|
||||
markDelivered(params.progress);
|
||||
}
|
||||
} else {
|
||||
await sendTelegramWithThreadFallback({
|
||||
const result = await sendTelegramWithThreadFallback({
|
||||
operation: "sendDocument",
|
||||
runtime: params.runtime,
|
||||
thread: params.thread,
|
||||
@@ -403,6 +446,9 @@ async function deliverMediaReply(params: {
|
||||
send: (effectiveParams) =>
|
||||
params.bot.api.sendDocument(params.chatId, file, { ...effectiveParams }),
|
||||
});
|
||||
if (firstDeliveredMessageId == null) {
|
||||
firstDeliveredMessageId = result.message_id;
|
||||
}
|
||||
markDelivered(params.progress);
|
||||
}
|
||||
markReplyApplied(params.progress, replyToMessageId);
|
||||
@@ -423,6 +469,28 @@ async function deliverMediaReply(params: {
|
||||
pendingFollowUpText = undefined;
|
||||
}
|
||||
}
|
||||
return firstDeliveredMessageId;
|
||||
}
|
||||
|
||||
async function maybePinFirstDeliveredMessage(params: {
|
||||
shouldPin: boolean;
|
||||
bot: Bot;
|
||||
chatId: string;
|
||||
runtime: RuntimeEnv;
|
||||
firstDeliveredMessageId?: number;
|
||||
}): Promise<void> {
|
||||
if (!params.shouldPin || typeof params.firstDeliveredMessageId !== "number") {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await params.bot.api.pinChatMessage(params.chatId, params.firstDeliveredMessageId, {
|
||||
disable_notification: true,
|
||||
});
|
||||
} catch (err) {
|
||||
logVerbose(
|
||||
`telegram pinChatMessage failed chat=${params.chatId} message=${params.firstDeliveredMessageId}: ${formatErrorMessage(err)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
export async function deliverReplies(params: {
|
||||
@@ -507,12 +575,12 @@ export async function deliverReplies(params: {
|
||||
const deliveredCountBeforeReply = progress.deliveredCount;
|
||||
const replyToId =
|
||||
params.replyToMode === "off" ? undefined : resolveTelegramReplyId(reply.replyToId);
|
||||
const telegramData = reply.channelData?.telegram as
|
||||
| { buttons?: TelegramInlineButtons }
|
||||
| undefined;
|
||||
const telegramData = reply.channelData?.telegram as TelegramReplyChannelData | undefined;
|
||||
const shouldPinFirstMessage = telegramData?.pin === true;
|
||||
const replyMarkup = buildInlineKeyboard(telegramData?.buttons);
|
||||
let firstDeliveredMessageId: number | undefined;
|
||||
if (mediaList.length === 0) {
|
||||
await deliverTextReply({
|
||||
firstDeliveredMessageId = await deliverTextReply({
|
||||
bot: params.bot,
|
||||
chatId: params.chatId,
|
||||
runtime: params.runtime,
|
||||
@@ -527,7 +595,7 @@ export async function deliverReplies(params: {
|
||||
progress,
|
||||
});
|
||||
} else {
|
||||
await deliverMediaReply({
|
||||
firstDeliveredMessageId = await deliverMediaReply({
|
||||
reply,
|
||||
mediaList,
|
||||
bot: params.bot,
|
||||
@@ -546,6 +614,13 @@ export async function deliverReplies(params: {
|
||||
progress,
|
||||
});
|
||||
}
|
||||
await maybePinFirstDeliveredMessage({
|
||||
shouldPin: shouldPinFirstMessage,
|
||||
bot: params.bot,
|
||||
chatId: params.chatId,
|
||||
runtime: params.runtime,
|
||||
firstDeliveredMessageId,
|
||||
});
|
||||
|
||||
if (hasMessageSentHooks) {
|
||||
const deliveredThisReply = progress.deliveredCount > deliveredCountBeforeReply;
|
||||
|
||||
@@ -708,6 +708,45 @@ describe("deliverReplies", () => {
|
||||
expect(sendPhoto.mock.calls[1][2]).not.toHaveProperty("reply_to_message_id");
|
||||
});
|
||||
|
||||
it("pins the first delivered text message when telegram pin is requested", async () => {
|
||||
const runtime = createRuntime();
|
||||
const sendMessage = vi
|
||||
.fn()
|
||||
.mockResolvedValueOnce({ message_id: 101, chat: { id: "123" } })
|
||||
.mockResolvedValueOnce({ message_id: 102, chat: { id: "123" } });
|
||||
const pinChatMessage = vi.fn().mockResolvedValue(true);
|
||||
const bot = createBot({ sendMessage, pinChatMessage });
|
||||
|
||||
await deliverReplies({
|
||||
replies: [{ text: "chunk-one\n\nchunk-two", channelData: { telegram: { pin: true } } }],
|
||||
chatId: "123",
|
||||
token: "tok",
|
||||
runtime,
|
||||
bot,
|
||||
replyToMode: "off",
|
||||
textLimit: 12,
|
||||
});
|
||||
|
||||
expect(pinChatMessage).toHaveBeenCalledTimes(1);
|
||||
expect(pinChatMessage).toHaveBeenCalledWith("123", 101, { disable_notification: true });
|
||||
});
|
||||
|
||||
it("continues when pinning fails", async () => {
|
||||
const runtime = createRuntime();
|
||||
const sendMessage = vi.fn().mockResolvedValue({ message_id: 201, chat: { id: "123" } });
|
||||
const pinChatMessage = vi.fn().mockRejectedValue(new Error("pin failed"));
|
||||
const bot = createBot({ sendMessage, pinChatMessage });
|
||||
|
||||
await deliverWith({
|
||||
replies: [{ text: "hello", channelData: { telegram: { pin: true } } }],
|
||||
runtime,
|
||||
bot,
|
||||
});
|
||||
|
||||
expect(sendMessage).toHaveBeenCalledTimes(1);
|
||||
expect(pinChatMessage).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("rethrows VOICE_MESSAGES_FORBIDDEN when no text fallback is available", async () => {
|
||||
const { runtime, sendVoice, sendMessage, bot } = createVoiceFailureHarness({
|
||||
voiceError: createVoiceMessagesForbiddenError(),
|
||||
|
||||
166
src/telegram/thread-bindings.test.ts
Normal file
166
src/telegram/thread-bindings.test.ts
Normal file
@@ -0,0 +1,166 @@
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { resolveStateDir } from "../config/paths.js";
|
||||
import { getSessionBindingService } from "../infra/outbound/session-binding-service.js";
|
||||
import {
|
||||
__testing,
|
||||
createTelegramThreadBindingManager,
|
||||
setTelegramThreadBindingIdleTimeoutBySessionKey,
|
||||
setTelegramThreadBindingMaxAgeBySessionKey,
|
||||
} from "./thread-bindings.js";
|
||||
|
||||
describe("telegram thread bindings", () => {
|
||||
let stateDirOverride: string | undefined;
|
||||
|
||||
beforeEach(() => {
|
||||
__testing.resetTelegramThreadBindingsForTests();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
if (stateDirOverride) {
|
||||
delete process.env.OPENCLAW_STATE_DIR;
|
||||
fs.rmSync(stateDirOverride, { recursive: true, force: true });
|
||||
stateDirOverride = undefined;
|
||||
}
|
||||
});
|
||||
|
||||
it("registers a telegram binding adapter and binds current conversations", async () => {
|
||||
const manager = createTelegramThreadBindingManager({
|
||||
accountId: "work",
|
||||
persist: false,
|
||||
enableSweeper: false,
|
||||
idleTimeoutMs: 30_000,
|
||||
maxAgeMs: 0,
|
||||
});
|
||||
const bound = await getSessionBindingService().bind({
|
||||
targetSessionKey: "agent:main:subagent:child-1",
|
||||
targetKind: "subagent",
|
||||
conversation: {
|
||||
channel: "telegram",
|
||||
accountId: "work",
|
||||
conversationId: "-100200300:topic:77",
|
||||
},
|
||||
placement: "current",
|
||||
metadata: {
|
||||
boundBy: "user-1",
|
||||
},
|
||||
});
|
||||
|
||||
expect(bound.conversation.channel).toBe("telegram");
|
||||
expect(bound.conversation.accountId).toBe("work");
|
||||
expect(bound.conversation.conversationId).toBe("-100200300:topic:77");
|
||||
expect(bound.targetSessionKey).toBe("agent:main:subagent:child-1");
|
||||
expect(manager.getByConversationId("-100200300:topic:77")?.boundBy).toBe("user-1");
|
||||
});
|
||||
|
||||
it("does not support child placement", async () => {
|
||||
createTelegramThreadBindingManager({
|
||||
accountId: "default",
|
||||
persist: false,
|
||||
enableSweeper: false,
|
||||
});
|
||||
|
||||
await expect(
|
||||
getSessionBindingService().bind({
|
||||
targetSessionKey: "agent:main:subagent:child-1",
|
||||
targetKind: "subagent",
|
||||
conversation: {
|
||||
channel: "telegram",
|
||||
accountId: "default",
|
||||
conversationId: "-100200300:topic:77",
|
||||
},
|
||||
placement: "child",
|
||||
}),
|
||||
).rejects.toMatchObject({
|
||||
code: "BINDING_CAPABILITY_UNSUPPORTED",
|
||||
});
|
||||
});
|
||||
|
||||
it("updates lifecycle windows by session key", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-03-06T10:00:00.000Z"));
|
||||
const manager = createTelegramThreadBindingManager({
|
||||
accountId: "work",
|
||||
persist: false,
|
||||
enableSweeper: false,
|
||||
});
|
||||
|
||||
await getSessionBindingService().bind({
|
||||
targetSessionKey: "agent:main:subagent:child-1",
|
||||
targetKind: "subagent",
|
||||
conversation: {
|
||||
channel: "telegram",
|
||||
accountId: "work",
|
||||
conversationId: "1234",
|
||||
},
|
||||
});
|
||||
const original = manager.listBySessionKey("agent:main:subagent:child-1")[0];
|
||||
expect(original).toBeDefined();
|
||||
|
||||
const idleUpdated = setTelegramThreadBindingIdleTimeoutBySessionKey({
|
||||
accountId: "work",
|
||||
targetSessionKey: "agent:main:subagent:child-1",
|
||||
idleTimeoutMs: 2 * 60 * 60 * 1000,
|
||||
});
|
||||
vi.setSystemTime(new Date("2026-03-06T12:00:00.000Z"));
|
||||
const maxAgeUpdated = setTelegramThreadBindingMaxAgeBySessionKey({
|
||||
accountId: "work",
|
||||
targetSessionKey: "agent:main:subagent:child-1",
|
||||
maxAgeMs: 6 * 60 * 60 * 1000,
|
||||
});
|
||||
|
||||
expect(idleUpdated).toHaveLength(1);
|
||||
expect(idleUpdated[0]?.idleTimeoutMs).toBe(2 * 60 * 60 * 1000);
|
||||
expect(maxAgeUpdated).toHaveLength(1);
|
||||
expect(maxAgeUpdated[0]?.maxAgeMs).toBe(6 * 60 * 60 * 1000);
|
||||
expect(maxAgeUpdated[0]?.boundAt).toBe(original?.boundAt);
|
||||
expect(maxAgeUpdated[0]?.lastActivityAt).toBe(Date.parse("2026-03-06T12:00:00.000Z"));
|
||||
expect(manager.listBySessionKey("agent:main:subagent:child-1")[0]?.maxAgeMs).toBe(
|
||||
6 * 60 * 60 * 1000,
|
||||
);
|
||||
});
|
||||
|
||||
it("does not persist lifecycle updates when manager persistence is disabled", async () => {
|
||||
stateDirOverride = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-bindings-"));
|
||||
process.env.OPENCLAW_STATE_DIR = stateDirOverride;
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-03-06T10:00:00.000Z"));
|
||||
|
||||
createTelegramThreadBindingManager({
|
||||
accountId: "no-persist",
|
||||
persist: false,
|
||||
enableSweeper: false,
|
||||
});
|
||||
|
||||
await getSessionBindingService().bind({
|
||||
targetSessionKey: "agent:main:subagent:child-2",
|
||||
targetKind: "subagent",
|
||||
conversation: {
|
||||
channel: "telegram",
|
||||
accountId: "no-persist",
|
||||
conversationId: "-100200300:topic:88",
|
||||
},
|
||||
});
|
||||
|
||||
setTelegramThreadBindingIdleTimeoutBySessionKey({
|
||||
accountId: "no-persist",
|
||||
targetSessionKey: "agent:main:subagent:child-2",
|
||||
idleTimeoutMs: 60 * 60 * 1000,
|
||||
});
|
||||
setTelegramThreadBindingMaxAgeBySessionKey({
|
||||
accountId: "no-persist",
|
||||
targetSessionKey: "agent:main:subagent:child-2",
|
||||
maxAgeMs: 2 * 60 * 60 * 1000,
|
||||
});
|
||||
|
||||
const statePath = path.join(
|
||||
resolveStateDir(process.env, os.homedir),
|
||||
"telegram",
|
||||
"thread-bindings-no-persist.json",
|
||||
);
|
||||
expect(fs.existsSync(statePath)).toBe(false);
|
||||
});
|
||||
});
|
||||
741
src/telegram/thread-bindings.ts
Normal file
741
src/telegram/thread-bindings.ts
Normal file
@@ -0,0 +1,741 @@
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { formatThreadBindingDurationLabel } from "../channels/thread-bindings-messages.js";
|
||||
import { resolveStateDir } from "../config/paths.js";
|
||||
import { logVerbose } from "../globals.js";
|
||||
import { writeJsonAtomic } from "../infra/json-files.js";
|
||||
import {
|
||||
registerSessionBindingAdapter,
|
||||
unregisterSessionBindingAdapter,
|
||||
type BindingTargetKind,
|
||||
type SessionBindingRecord,
|
||||
} from "../infra/outbound/session-binding-service.js";
|
||||
import { normalizeAccountId } from "../routing/session-key.js";
|
||||
|
||||
const DEFAULT_THREAD_BINDING_IDLE_TIMEOUT_MS = 24 * 60 * 60 * 1000;
|
||||
const DEFAULT_THREAD_BINDING_MAX_AGE_MS = 0;
|
||||
const THREAD_BINDINGS_SWEEP_INTERVAL_MS = 60_000;
|
||||
const STORE_VERSION = 1;
|
||||
|
||||
type TelegramBindingTargetKind = "subagent" | "acp";
|
||||
|
||||
export type TelegramThreadBindingRecord = {
|
||||
accountId: string;
|
||||
conversationId: string;
|
||||
targetKind: TelegramBindingTargetKind;
|
||||
targetSessionKey: string;
|
||||
agentId?: string;
|
||||
label?: string;
|
||||
boundBy?: string;
|
||||
boundAt: number;
|
||||
lastActivityAt: number;
|
||||
idleTimeoutMs?: number;
|
||||
maxAgeMs?: number;
|
||||
};
|
||||
|
||||
type StoredTelegramBindingState = {
|
||||
version: number;
|
||||
bindings: TelegramThreadBindingRecord[];
|
||||
};
|
||||
|
||||
export type TelegramThreadBindingManager = {
|
||||
accountId: string;
|
||||
shouldPersistMutations: () => boolean;
|
||||
getIdleTimeoutMs: () => number;
|
||||
getMaxAgeMs: () => number;
|
||||
getByConversationId: (conversationId: string) => TelegramThreadBindingRecord | undefined;
|
||||
listBySessionKey: (targetSessionKey: string) => TelegramThreadBindingRecord[];
|
||||
listBindings: () => TelegramThreadBindingRecord[];
|
||||
touchConversation: (conversationId: string, at?: number) => TelegramThreadBindingRecord | null;
|
||||
unbindConversation: (params: {
|
||||
conversationId: string;
|
||||
reason?: string;
|
||||
sendFarewell?: boolean;
|
||||
}) => TelegramThreadBindingRecord | null;
|
||||
unbindBySessionKey: (params: {
|
||||
targetSessionKey: string;
|
||||
reason?: string;
|
||||
sendFarewell?: boolean;
|
||||
}) => TelegramThreadBindingRecord[];
|
||||
stop: () => void;
|
||||
};
|
||||
|
||||
const MANAGERS_BY_ACCOUNT_ID = new Map<string, TelegramThreadBindingManager>();
|
||||
const BINDINGS_BY_ACCOUNT_CONVERSATION = new Map<string, TelegramThreadBindingRecord>();
|
||||
|
||||
function normalizeDurationMs(raw: unknown, fallback: number): number {
|
||||
if (typeof raw !== "number" || !Number.isFinite(raw)) {
|
||||
return fallback;
|
||||
}
|
||||
return Math.max(0, Math.floor(raw));
|
||||
}
|
||||
|
||||
function normalizeConversationId(raw: unknown): string | undefined {
|
||||
if (typeof raw !== "string") {
|
||||
return undefined;
|
||||
}
|
||||
const trimmed = raw.trim();
|
||||
return trimmed || undefined;
|
||||
}
|
||||
|
||||
function resolveBindingKey(params: { accountId: string; conversationId: string }): string {
|
||||
return `${params.accountId}:${params.conversationId}`;
|
||||
}
|
||||
|
||||
function toSessionBindingTargetKind(raw: TelegramBindingTargetKind): BindingTargetKind {
|
||||
return raw === "subagent" ? "subagent" : "session";
|
||||
}
|
||||
|
||||
function toTelegramTargetKind(raw: BindingTargetKind): TelegramBindingTargetKind {
|
||||
return raw === "subagent" ? "subagent" : "acp";
|
||||
}
|
||||
|
||||
function resolveEffectiveBindingExpiresAt(params: {
|
||||
record: TelegramThreadBindingRecord;
|
||||
defaultIdleTimeoutMs: number;
|
||||
defaultMaxAgeMs: number;
|
||||
}): number | undefined {
|
||||
const idleTimeoutMs =
|
||||
typeof params.record.idleTimeoutMs === "number"
|
||||
? Math.max(0, Math.floor(params.record.idleTimeoutMs))
|
||||
: params.defaultIdleTimeoutMs;
|
||||
const maxAgeMs =
|
||||
typeof params.record.maxAgeMs === "number"
|
||||
? Math.max(0, Math.floor(params.record.maxAgeMs))
|
||||
: params.defaultMaxAgeMs;
|
||||
|
||||
const inactivityExpiresAt =
|
||||
idleTimeoutMs > 0
|
||||
? Math.max(params.record.lastActivityAt, params.record.boundAt) + idleTimeoutMs
|
||||
: undefined;
|
||||
const maxAgeExpiresAt = maxAgeMs > 0 ? params.record.boundAt + maxAgeMs : undefined;
|
||||
|
||||
if (inactivityExpiresAt != null && maxAgeExpiresAt != null) {
|
||||
return Math.min(inactivityExpiresAt, maxAgeExpiresAt);
|
||||
}
|
||||
return inactivityExpiresAt ?? maxAgeExpiresAt;
|
||||
}
|
||||
|
||||
function toSessionBindingRecord(
|
||||
record: TelegramThreadBindingRecord,
|
||||
defaults: { idleTimeoutMs: number; maxAgeMs: number },
|
||||
): SessionBindingRecord {
|
||||
return {
|
||||
bindingId: resolveBindingKey({
|
||||
accountId: record.accountId,
|
||||
conversationId: record.conversationId,
|
||||
}),
|
||||
targetSessionKey: record.targetSessionKey,
|
||||
targetKind: toSessionBindingTargetKind(record.targetKind),
|
||||
conversation: {
|
||||
channel: "telegram",
|
||||
accountId: record.accountId,
|
||||
conversationId: record.conversationId,
|
||||
},
|
||||
status: "active",
|
||||
boundAt: record.boundAt,
|
||||
expiresAt: resolveEffectiveBindingExpiresAt({
|
||||
record,
|
||||
defaultIdleTimeoutMs: defaults.idleTimeoutMs,
|
||||
defaultMaxAgeMs: defaults.maxAgeMs,
|
||||
}),
|
||||
metadata: {
|
||||
agentId: record.agentId,
|
||||
label: record.label,
|
||||
boundBy: record.boundBy,
|
||||
lastActivityAt: record.lastActivityAt,
|
||||
idleTimeoutMs:
|
||||
typeof record.idleTimeoutMs === "number"
|
||||
? Math.max(0, Math.floor(record.idleTimeoutMs))
|
||||
: defaults.idleTimeoutMs,
|
||||
maxAgeMs:
|
||||
typeof record.maxAgeMs === "number"
|
||||
? Math.max(0, Math.floor(record.maxAgeMs))
|
||||
: defaults.maxAgeMs,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function fromSessionBindingInput(params: {
|
||||
accountId: string;
|
||||
input: {
|
||||
targetSessionKey: string;
|
||||
targetKind: BindingTargetKind;
|
||||
conversationId: string;
|
||||
metadata?: Record<string, unknown>;
|
||||
};
|
||||
}): TelegramThreadBindingRecord {
|
||||
const now = Date.now();
|
||||
const metadata = params.input.metadata ?? {};
|
||||
const existing = BINDINGS_BY_ACCOUNT_CONVERSATION.get(
|
||||
resolveBindingKey({
|
||||
accountId: params.accountId,
|
||||
conversationId: params.input.conversationId,
|
||||
}),
|
||||
);
|
||||
|
||||
const record: TelegramThreadBindingRecord = {
|
||||
accountId: params.accountId,
|
||||
conversationId: params.input.conversationId,
|
||||
targetKind: toTelegramTargetKind(params.input.targetKind),
|
||||
targetSessionKey: params.input.targetSessionKey,
|
||||
agentId:
|
||||
typeof metadata.agentId === "string" && metadata.agentId.trim()
|
||||
? metadata.agentId.trim()
|
||||
: existing?.agentId,
|
||||
label:
|
||||
typeof metadata.label === "string" && metadata.label.trim()
|
||||
? metadata.label.trim()
|
||||
: existing?.label,
|
||||
boundBy:
|
||||
typeof metadata.boundBy === "string" && metadata.boundBy.trim()
|
||||
? metadata.boundBy.trim()
|
||||
: existing?.boundBy,
|
||||
boundAt: now,
|
||||
lastActivityAt: now,
|
||||
};
|
||||
|
||||
if (typeof metadata.idleTimeoutMs === "number" && Number.isFinite(metadata.idleTimeoutMs)) {
|
||||
record.idleTimeoutMs = Math.max(0, Math.floor(metadata.idleTimeoutMs));
|
||||
} else if (typeof existing?.idleTimeoutMs === "number") {
|
||||
record.idleTimeoutMs = existing.idleTimeoutMs;
|
||||
}
|
||||
|
||||
if (typeof metadata.maxAgeMs === "number" && Number.isFinite(metadata.maxAgeMs)) {
|
||||
record.maxAgeMs = Math.max(0, Math.floor(metadata.maxAgeMs));
|
||||
} else if (typeof existing?.maxAgeMs === "number") {
|
||||
record.maxAgeMs = existing.maxAgeMs;
|
||||
}
|
||||
|
||||
return record;
|
||||
}
|
||||
|
||||
function resolveBindingsPath(accountId: string, env: NodeJS.ProcessEnv = process.env): string {
|
||||
const stateDir = resolveStateDir(env, os.homedir);
|
||||
return path.join(stateDir, "telegram", `thread-bindings-${accountId}.json`);
|
||||
}
|
||||
|
||||
function summarizeLifecycleForLog(
|
||||
record: TelegramThreadBindingRecord,
|
||||
defaults: {
|
||||
idleTimeoutMs: number;
|
||||
maxAgeMs: number;
|
||||
},
|
||||
) {
|
||||
const idleTimeoutMs =
|
||||
typeof record.idleTimeoutMs === "number" ? record.idleTimeoutMs : defaults.idleTimeoutMs;
|
||||
const maxAgeMs = typeof record.maxAgeMs === "number" ? record.maxAgeMs : defaults.maxAgeMs;
|
||||
const idleLabel = formatThreadBindingDurationLabel(Math.max(0, Math.floor(idleTimeoutMs)));
|
||||
const maxAgeLabel = formatThreadBindingDurationLabel(Math.max(0, Math.floor(maxAgeMs)));
|
||||
return `idle=${idleLabel} maxAge=${maxAgeLabel}`;
|
||||
}
|
||||
|
||||
function loadBindingsFromDisk(accountId: string): TelegramThreadBindingRecord[] {
|
||||
const filePath = resolveBindingsPath(accountId);
|
||||
try {
|
||||
const raw = fs.readFileSync(filePath, "utf-8");
|
||||
const parsed = JSON.parse(raw) as StoredTelegramBindingState;
|
||||
if (parsed?.version !== STORE_VERSION || !Array.isArray(parsed.bindings)) {
|
||||
return [];
|
||||
}
|
||||
const bindings: TelegramThreadBindingRecord[] = [];
|
||||
for (const entry of parsed.bindings) {
|
||||
const conversationId = normalizeConversationId(entry?.conversationId);
|
||||
const targetSessionKey =
|
||||
typeof entry?.targetSessionKey === "string" ? entry.targetSessionKey.trim() : "";
|
||||
const targetKind = entry?.targetKind === "subagent" ? "subagent" : "acp";
|
||||
if (!conversationId || !targetSessionKey) {
|
||||
continue;
|
||||
}
|
||||
const boundAt =
|
||||
typeof entry?.boundAt === "number" && Number.isFinite(entry.boundAt)
|
||||
? Math.floor(entry.boundAt)
|
||||
: Date.now();
|
||||
const lastActivityAt =
|
||||
typeof entry?.lastActivityAt === "number" && Number.isFinite(entry.lastActivityAt)
|
||||
? Math.floor(entry.lastActivityAt)
|
||||
: boundAt;
|
||||
const record: TelegramThreadBindingRecord = {
|
||||
accountId,
|
||||
conversationId,
|
||||
targetSessionKey,
|
||||
targetKind,
|
||||
boundAt,
|
||||
lastActivityAt,
|
||||
};
|
||||
if (typeof entry?.idleTimeoutMs === "number" && Number.isFinite(entry.idleTimeoutMs)) {
|
||||
record.idleTimeoutMs = Math.max(0, Math.floor(entry.idleTimeoutMs));
|
||||
}
|
||||
if (typeof entry?.maxAgeMs === "number" && Number.isFinite(entry.maxAgeMs)) {
|
||||
record.maxAgeMs = Math.max(0, Math.floor(entry.maxAgeMs));
|
||||
}
|
||||
if (typeof entry?.agentId === "string" && entry.agentId.trim()) {
|
||||
record.agentId = entry.agentId.trim();
|
||||
}
|
||||
if (typeof entry?.label === "string" && entry.label.trim()) {
|
||||
record.label = entry.label.trim();
|
||||
}
|
||||
if (typeof entry?.boundBy === "string" && entry.boundBy.trim()) {
|
||||
record.boundBy = entry.boundBy.trim();
|
||||
}
|
||||
bindings.push(record);
|
||||
}
|
||||
return bindings;
|
||||
} catch (err) {
|
||||
const code = (err as { code?: string }).code;
|
||||
if (code !== "ENOENT") {
|
||||
logVerbose(`telegram thread bindings load failed (${accountId}): ${String(err)}`);
|
||||
}
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
async function persistBindingsToDisk(params: {
|
||||
accountId: string;
|
||||
persist: boolean;
|
||||
}): Promise<void> {
|
||||
if (!params.persist) {
|
||||
return;
|
||||
}
|
||||
const bindings = [...BINDINGS_BY_ACCOUNT_CONVERSATION.values()].filter(
|
||||
(entry) => entry.accountId === params.accountId,
|
||||
);
|
||||
const payload: StoredTelegramBindingState = {
|
||||
version: STORE_VERSION,
|
||||
bindings,
|
||||
};
|
||||
await writeJsonAtomic(resolveBindingsPath(params.accountId), payload, {
|
||||
mode: 0o600,
|
||||
trailingNewline: true,
|
||||
ensureDirMode: 0o700,
|
||||
});
|
||||
}
|
||||
|
||||
function resolveThreadIdFromBindingId(params: {
|
||||
accountId: string;
|
||||
bindingId?: string;
|
||||
}): string | undefined {
|
||||
const bindingId = params.bindingId?.trim();
|
||||
if (!bindingId) {
|
||||
return undefined;
|
||||
}
|
||||
const prefix = `${params.accountId}:`;
|
||||
if (!bindingId.startsWith(prefix)) {
|
||||
return undefined;
|
||||
}
|
||||
const conversationId = bindingId.slice(prefix.length).trim();
|
||||
return conversationId || undefined;
|
||||
}
|
||||
|
||||
function normalizeTimestampMs(raw: unknown): number {
|
||||
if (typeof raw !== "number" || !Number.isFinite(raw)) {
|
||||
return Date.now();
|
||||
}
|
||||
return Math.max(0, Math.floor(raw));
|
||||
}
|
||||
|
||||
function shouldExpireByIdle(params: {
|
||||
now: number;
|
||||
record: TelegramThreadBindingRecord;
|
||||
defaultIdleTimeoutMs: number;
|
||||
}): boolean {
|
||||
const idleTimeoutMs =
|
||||
typeof params.record.idleTimeoutMs === "number"
|
||||
? Math.max(0, Math.floor(params.record.idleTimeoutMs))
|
||||
: params.defaultIdleTimeoutMs;
|
||||
if (idleTimeoutMs <= 0) {
|
||||
return false;
|
||||
}
|
||||
return (
|
||||
params.now >= Math.max(params.record.lastActivityAt, params.record.boundAt) + idleTimeoutMs
|
||||
);
|
||||
}
|
||||
|
||||
function shouldExpireByMaxAge(params: {
|
||||
now: number;
|
||||
record: TelegramThreadBindingRecord;
|
||||
defaultMaxAgeMs: number;
|
||||
}): boolean {
|
||||
const maxAgeMs =
|
||||
typeof params.record.maxAgeMs === "number"
|
||||
? Math.max(0, Math.floor(params.record.maxAgeMs))
|
||||
: params.defaultMaxAgeMs;
|
||||
if (maxAgeMs <= 0) {
|
||||
return false;
|
||||
}
|
||||
return params.now >= params.record.boundAt + maxAgeMs;
|
||||
}
|
||||
|
||||
export function createTelegramThreadBindingManager(
|
||||
params: {
|
||||
accountId?: string;
|
||||
persist?: boolean;
|
||||
idleTimeoutMs?: number;
|
||||
maxAgeMs?: number;
|
||||
enableSweeper?: boolean;
|
||||
} = {},
|
||||
): TelegramThreadBindingManager {
|
||||
const accountId = normalizeAccountId(params.accountId);
|
||||
const existing = MANAGERS_BY_ACCOUNT_ID.get(accountId);
|
||||
if (existing) {
|
||||
return existing;
|
||||
}
|
||||
|
||||
const persist = params.persist ?? true;
|
||||
const idleTimeoutMs = normalizeDurationMs(
|
||||
params.idleTimeoutMs,
|
||||
DEFAULT_THREAD_BINDING_IDLE_TIMEOUT_MS,
|
||||
);
|
||||
const maxAgeMs = normalizeDurationMs(params.maxAgeMs, DEFAULT_THREAD_BINDING_MAX_AGE_MS);
|
||||
|
||||
const loaded = loadBindingsFromDisk(accountId);
|
||||
for (const entry of loaded) {
|
||||
const key = resolveBindingKey({
|
||||
accountId,
|
||||
conversationId: entry.conversationId,
|
||||
});
|
||||
BINDINGS_BY_ACCOUNT_CONVERSATION.set(key, {
|
||||
...entry,
|
||||
accountId,
|
||||
});
|
||||
}
|
||||
|
||||
const listBindingsForAccount = () =>
|
||||
[...BINDINGS_BY_ACCOUNT_CONVERSATION.values()].filter((entry) => entry.accountId === accountId);
|
||||
|
||||
let sweepTimer: NodeJS.Timeout | null = null;
|
||||
|
||||
const manager: TelegramThreadBindingManager = {
|
||||
accountId,
|
||||
shouldPersistMutations: () => persist,
|
||||
getIdleTimeoutMs: () => idleTimeoutMs,
|
||||
getMaxAgeMs: () => maxAgeMs,
|
||||
getByConversationId: (conversationIdRaw) => {
|
||||
const conversationId = normalizeConversationId(conversationIdRaw);
|
||||
if (!conversationId) {
|
||||
return undefined;
|
||||
}
|
||||
return BINDINGS_BY_ACCOUNT_CONVERSATION.get(
|
||||
resolveBindingKey({
|
||||
accountId,
|
||||
conversationId,
|
||||
}),
|
||||
);
|
||||
},
|
||||
listBySessionKey: (targetSessionKeyRaw) => {
|
||||
const targetSessionKey = targetSessionKeyRaw.trim();
|
||||
if (!targetSessionKey) {
|
||||
return [];
|
||||
}
|
||||
return listBindingsForAccount().filter(
|
||||
(entry) => entry.targetSessionKey === targetSessionKey,
|
||||
);
|
||||
},
|
||||
listBindings: () => listBindingsForAccount(),
|
||||
touchConversation: (conversationIdRaw, at) => {
|
||||
const conversationId = normalizeConversationId(conversationIdRaw);
|
||||
if (!conversationId) {
|
||||
return null;
|
||||
}
|
||||
const key = resolveBindingKey({ accountId, conversationId });
|
||||
const existing = BINDINGS_BY_ACCOUNT_CONVERSATION.get(key);
|
||||
if (!existing) {
|
||||
return null;
|
||||
}
|
||||
const nextRecord: TelegramThreadBindingRecord = {
|
||||
...existing,
|
||||
lastActivityAt: normalizeTimestampMs(at ?? Date.now()),
|
||||
};
|
||||
BINDINGS_BY_ACCOUNT_CONVERSATION.set(key, nextRecord);
|
||||
void persistBindingsToDisk({ accountId, persist: manager.shouldPersistMutations() });
|
||||
return nextRecord;
|
||||
},
|
||||
unbindConversation: (unbindParams) => {
|
||||
const conversationId = normalizeConversationId(unbindParams.conversationId);
|
||||
if (!conversationId) {
|
||||
return null;
|
||||
}
|
||||
const key = resolveBindingKey({ accountId, conversationId });
|
||||
const removed = BINDINGS_BY_ACCOUNT_CONVERSATION.get(key) ?? null;
|
||||
if (!removed) {
|
||||
return null;
|
||||
}
|
||||
BINDINGS_BY_ACCOUNT_CONVERSATION.delete(key);
|
||||
void persistBindingsToDisk({ accountId, persist: manager.shouldPersistMutations() });
|
||||
return removed;
|
||||
},
|
||||
unbindBySessionKey: (unbindParams) => {
|
||||
const targetSessionKey = unbindParams.targetSessionKey.trim();
|
||||
if (!targetSessionKey) {
|
||||
return [];
|
||||
}
|
||||
const removed: TelegramThreadBindingRecord[] = [];
|
||||
for (const entry of listBindingsForAccount()) {
|
||||
if (entry.targetSessionKey !== targetSessionKey) {
|
||||
continue;
|
||||
}
|
||||
const key = resolveBindingKey({
|
||||
accountId,
|
||||
conversationId: entry.conversationId,
|
||||
});
|
||||
BINDINGS_BY_ACCOUNT_CONVERSATION.delete(key);
|
||||
removed.push(entry);
|
||||
}
|
||||
if (removed.length > 0) {
|
||||
void persistBindingsToDisk({ accountId, persist: manager.shouldPersistMutations() });
|
||||
}
|
||||
return removed;
|
||||
},
|
||||
stop: () => {
|
||||
if (sweepTimer) {
|
||||
clearInterval(sweepTimer);
|
||||
sweepTimer = null;
|
||||
}
|
||||
unregisterSessionBindingAdapter({ channel: "telegram", accountId });
|
||||
const existingManager = MANAGERS_BY_ACCOUNT_ID.get(accountId);
|
||||
if (existingManager === manager) {
|
||||
MANAGERS_BY_ACCOUNT_ID.delete(accountId);
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
registerSessionBindingAdapter({
|
||||
channel: "telegram",
|
||||
accountId,
|
||||
capabilities: {
|
||||
placements: ["current"],
|
||||
},
|
||||
bind: async (input) => {
|
||||
if (input.conversation.channel !== "telegram") {
|
||||
return null;
|
||||
}
|
||||
if (input.placement === "child") {
|
||||
return null;
|
||||
}
|
||||
const conversationId = normalizeConversationId(input.conversation.conversationId);
|
||||
const targetSessionKey = input.targetSessionKey.trim();
|
||||
if (!conversationId || !targetSessionKey) {
|
||||
return null;
|
||||
}
|
||||
const record = fromSessionBindingInput({
|
||||
accountId,
|
||||
input: {
|
||||
targetSessionKey,
|
||||
targetKind: input.targetKind,
|
||||
conversationId,
|
||||
metadata: input.metadata,
|
||||
},
|
||||
});
|
||||
BINDINGS_BY_ACCOUNT_CONVERSATION.set(
|
||||
resolveBindingKey({ accountId, conversationId }),
|
||||
record,
|
||||
);
|
||||
void persistBindingsToDisk({ accountId, persist: manager.shouldPersistMutations() });
|
||||
logVerbose(
|
||||
`telegram: bound conversation ${conversationId} -> ${targetSessionKey} (${summarizeLifecycleForLog(
|
||||
record,
|
||||
{
|
||||
idleTimeoutMs,
|
||||
maxAgeMs,
|
||||
},
|
||||
)})`,
|
||||
);
|
||||
return toSessionBindingRecord(record, {
|
||||
idleTimeoutMs,
|
||||
maxAgeMs,
|
||||
});
|
||||
},
|
||||
listBySession: (targetSessionKeyRaw) => {
|
||||
const targetSessionKey = targetSessionKeyRaw.trim();
|
||||
if (!targetSessionKey) {
|
||||
return [];
|
||||
}
|
||||
return manager.listBySessionKey(targetSessionKey).map((entry) =>
|
||||
toSessionBindingRecord(entry, {
|
||||
idleTimeoutMs,
|
||||
maxAgeMs,
|
||||
}),
|
||||
);
|
||||
},
|
||||
resolveByConversation: (ref) => {
|
||||
if (ref.channel !== "telegram") {
|
||||
return null;
|
||||
}
|
||||
const conversationId = normalizeConversationId(ref.conversationId);
|
||||
if (!conversationId) {
|
||||
return null;
|
||||
}
|
||||
const record = manager.getByConversationId(conversationId);
|
||||
return record
|
||||
? toSessionBindingRecord(record, {
|
||||
idleTimeoutMs,
|
||||
maxAgeMs,
|
||||
})
|
||||
: null;
|
||||
},
|
||||
touch: (bindingId, at) => {
|
||||
const conversationId = resolveThreadIdFromBindingId({
|
||||
accountId,
|
||||
bindingId,
|
||||
});
|
||||
if (!conversationId) {
|
||||
return;
|
||||
}
|
||||
manager.touchConversation(conversationId, at);
|
||||
},
|
||||
unbind: async (input) => {
|
||||
if (input.targetSessionKey?.trim()) {
|
||||
const removed = manager.unbindBySessionKey({
|
||||
targetSessionKey: input.targetSessionKey,
|
||||
reason: input.reason,
|
||||
sendFarewell: false,
|
||||
});
|
||||
return removed.map((entry) =>
|
||||
toSessionBindingRecord(entry, {
|
||||
idleTimeoutMs,
|
||||
maxAgeMs,
|
||||
}),
|
||||
);
|
||||
}
|
||||
const conversationId = resolveThreadIdFromBindingId({
|
||||
accountId,
|
||||
bindingId: input.bindingId,
|
||||
});
|
||||
if (!conversationId) {
|
||||
return [];
|
||||
}
|
||||
const removed = manager.unbindConversation({
|
||||
conversationId,
|
||||
reason: input.reason,
|
||||
sendFarewell: false,
|
||||
});
|
||||
return removed
|
||||
? [
|
||||
toSessionBindingRecord(removed, {
|
||||
idleTimeoutMs,
|
||||
maxAgeMs,
|
||||
}),
|
||||
]
|
||||
: [];
|
||||
},
|
||||
});
|
||||
|
||||
const sweeperEnabled = params.enableSweeper !== false;
|
||||
if (sweeperEnabled) {
|
||||
sweepTimer = setInterval(() => {
|
||||
const now = Date.now();
|
||||
for (const record of listBindingsForAccount()) {
|
||||
const idleExpired = shouldExpireByIdle({
|
||||
now,
|
||||
record,
|
||||
defaultIdleTimeoutMs: idleTimeoutMs,
|
||||
});
|
||||
const maxAgeExpired = shouldExpireByMaxAge({
|
||||
now,
|
||||
record,
|
||||
defaultMaxAgeMs: maxAgeMs,
|
||||
});
|
||||
if (!idleExpired && !maxAgeExpired) {
|
||||
continue;
|
||||
}
|
||||
manager.unbindConversation({
|
||||
conversationId: record.conversationId,
|
||||
reason: idleExpired ? "idle-expired" : "max-age-expired",
|
||||
sendFarewell: false,
|
||||
});
|
||||
}
|
||||
}, THREAD_BINDINGS_SWEEP_INTERVAL_MS);
|
||||
sweepTimer.unref?.();
|
||||
}
|
||||
|
||||
MANAGERS_BY_ACCOUNT_ID.set(accountId, manager);
|
||||
return manager;
|
||||
}
|
||||
|
||||
export function getTelegramThreadBindingManager(
|
||||
accountId?: string,
|
||||
): TelegramThreadBindingManager | null {
|
||||
return MANAGERS_BY_ACCOUNT_ID.get(normalizeAccountId(accountId)) ?? null;
|
||||
}
|
||||
|
||||
function updateTelegramBindingsBySessionKey(params: {
|
||||
manager: TelegramThreadBindingManager;
|
||||
targetSessionKey: string;
|
||||
update: (entry: TelegramThreadBindingRecord, now: number) => TelegramThreadBindingRecord;
|
||||
}): TelegramThreadBindingRecord[] {
|
||||
const targetSessionKey = params.targetSessionKey.trim();
|
||||
if (!targetSessionKey) {
|
||||
return [];
|
||||
}
|
||||
const now = Date.now();
|
||||
const updated: TelegramThreadBindingRecord[] = [];
|
||||
for (const entry of params.manager.listBySessionKey(targetSessionKey)) {
|
||||
const key = resolveBindingKey({
|
||||
accountId: params.manager.accountId,
|
||||
conversationId: entry.conversationId,
|
||||
});
|
||||
const next = params.update(entry, now);
|
||||
BINDINGS_BY_ACCOUNT_CONVERSATION.set(key, next);
|
||||
updated.push(next);
|
||||
}
|
||||
if (updated.length > 0) {
|
||||
void persistBindingsToDisk({
|
||||
accountId: params.manager.accountId,
|
||||
persist: params.manager.shouldPersistMutations(),
|
||||
});
|
||||
}
|
||||
return updated;
|
||||
}
|
||||
|
||||
export function setTelegramThreadBindingIdleTimeoutBySessionKey(params: {
|
||||
targetSessionKey: string;
|
||||
accountId?: string;
|
||||
idleTimeoutMs: number;
|
||||
}): TelegramThreadBindingRecord[] {
|
||||
const manager = getTelegramThreadBindingManager(params.accountId);
|
||||
if (!manager) {
|
||||
return [];
|
||||
}
|
||||
const idleTimeoutMs = normalizeDurationMs(params.idleTimeoutMs, 0);
|
||||
return updateTelegramBindingsBySessionKey({
|
||||
manager,
|
||||
targetSessionKey: params.targetSessionKey,
|
||||
update: (entry, now) => ({
|
||||
...entry,
|
||||
idleTimeoutMs,
|
||||
lastActivityAt: now,
|
||||
}),
|
||||
});
|
||||
}
|
||||
|
||||
export function setTelegramThreadBindingMaxAgeBySessionKey(params: {
|
||||
targetSessionKey: string;
|
||||
accountId?: string;
|
||||
maxAgeMs: number;
|
||||
}): TelegramThreadBindingRecord[] {
|
||||
const manager = getTelegramThreadBindingManager(params.accountId);
|
||||
if (!manager) {
|
||||
return [];
|
||||
}
|
||||
const maxAgeMs = normalizeDurationMs(params.maxAgeMs, 0);
|
||||
return updateTelegramBindingsBySessionKey({
|
||||
manager,
|
||||
targetSessionKey: params.targetSessionKey,
|
||||
update: (entry, now) => ({
|
||||
...entry,
|
||||
maxAgeMs,
|
||||
lastActivityAt: now,
|
||||
}),
|
||||
});
|
||||
}
|
||||
|
||||
export const __testing = {
|
||||
resetTelegramThreadBindingsForTests() {
|
||||
for (const manager of MANAGERS_BY_ACCOUNT_ID.values()) {
|
||||
manager.stop();
|
||||
}
|
||||
MANAGERS_BY_ACCOUNT_ID.clear();
|
||||
BINDINGS_BY_ACCOUNT_CONVERSATION.clear();
|
||||
},
|
||||
};
|
||||
Reference in New Issue
Block a user