import { isMessagingToolDuplicate } from "../../agents/pi-embedded-helpers.js"; import type { MessagingToolSend } from "../../agents/pi-embedded-runner.js"; import { normalizeChannelId } from "../../channels/plugins/index.js"; import type { ReplyToMode } from "../../config/types.js"; import { normalizeTargetForProvider } from "../../infra/outbound/target-normalization.js"; import { normalizeOptionalAccountId } from "../../routing/account-id.js"; import { parseTelegramTarget } from "../../telegram/targets.js"; import type { OriginatingChannelType } from "../templating.js"; import type { ReplyPayload } from "../types.js"; import { extractReplyToTag } from "./reply-tags.js"; import { createReplyToModeFilterForChannel } from "./reply-threading.js"; function resolveReplyThreadingForPayload(params: { payload: ReplyPayload; implicitReplyToId?: string; currentMessageId?: string; }): ReplyPayload { const implicitReplyToId = params.implicitReplyToId?.trim() || undefined; const currentMessageId = params.currentMessageId?.trim() || undefined; // 1) Apply implicit reply threading first (replyToMode will strip later if needed). let resolved: ReplyPayload = params.payload.replyToId || params.payload.replyToCurrent === false || !implicitReplyToId ? params.payload : { ...params.payload, replyToId: implicitReplyToId }; // 2) Parse explicit reply tags from text (if present) and clean them. if (typeof resolved.text === "string" && resolved.text.includes("[[")) { const { cleaned, replyToId, replyToCurrent, hasTag } = extractReplyToTag( resolved.text, currentMessageId, ); resolved = { ...resolved, text: cleaned ? cleaned : undefined, replyToId: replyToId ?? resolved.replyToId, replyToTag: hasTag || resolved.replyToTag, replyToCurrent: replyToCurrent || resolved.replyToCurrent, }; } // 3) If replyToCurrent was set out-of-band (e.g. tags already stripped upstream), // ensure replyToId is set to the current message id when available. if (resolved.replyToCurrent && !resolved.replyToId && currentMessageId) { resolved = { ...resolved, replyToId: currentMessageId, }; } return resolved; } // Backward-compatible helper: apply explicit reply tags/directives to a single payload. // This intentionally does not apply implicit threading. export function applyReplyTagsToPayload( payload: ReplyPayload, currentMessageId?: string, ): ReplyPayload { return resolveReplyThreadingForPayload({ payload, currentMessageId }); } export function isRenderablePayload(payload: ReplyPayload): boolean { return Boolean( payload.text || payload.mediaUrl || (payload.mediaUrls && payload.mediaUrls.length > 0) || payload.audioAsVoice || payload.channelData, ); } export function shouldSuppressReasoningPayload(payload: ReplyPayload): boolean { return payload.isReasoning === true; } export function applyReplyThreading(params: { payloads: ReplyPayload[]; replyToMode: ReplyToMode; replyToChannel?: OriginatingChannelType; currentMessageId?: string; }): ReplyPayload[] { const { payloads, replyToMode, replyToChannel, currentMessageId } = params; const applyReplyToMode = createReplyToModeFilterForChannel(replyToMode, replyToChannel); const implicitReplyToId = currentMessageId?.trim() || undefined; return payloads .map((payload) => resolveReplyThreadingForPayload({ payload, implicitReplyToId, currentMessageId }), ) .filter(isRenderablePayload) .map(applyReplyToMode); } export function filterMessagingToolDuplicates(params: { payloads: ReplyPayload[]; sentTexts: string[]; }): ReplyPayload[] { const { payloads, sentTexts } = params; if (sentTexts.length === 0) { return payloads; } return payloads.filter((payload) => !isMessagingToolDuplicate(payload.text ?? "", sentTexts)); } export function filterMessagingToolMediaDuplicates(params: { payloads: ReplyPayload[]; sentMediaUrls: string[]; }): ReplyPayload[] { const normalizeMediaForDedupe = (value: string): string => { const trimmed = value.trim(); if (!trimmed) { return ""; } if (!trimmed.toLowerCase().startsWith("file://")) { return trimmed; } try { const parsed = new URL(trimmed); if (parsed.protocol === "file:") { return decodeURIComponent(parsed.pathname || ""); } } catch { // Keep fallback below for non-URL-like inputs. } return trimmed.replace(/^file:\/\//i, ""); }; const { payloads, sentMediaUrls } = params; if (sentMediaUrls.length === 0) { return payloads; } const sentSet = new Set(sentMediaUrls.map(normalizeMediaForDedupe).filter(Boolean)); return payloads.map((payload) => { const mediaUrl = payload.mediaUrl; const mediaUrls = payload.mediaUrls; const stripSingle = mediaUrl && sentSet.has(normalizeMediaForDedupe(mediaUrl)); const filteredUrls = mediaUrls?.filter((u) => !sentSet.has(normalizeMediaForDedupe(u))); if (!stripSingle && (!mediaUrls || filteredUrls?.length === mediaUrls.length)) { return payload; // No change } return { ...payload, mediaUrl: stripSingle ? undefined : mediaUrl, mediaUrls: filteredUrls?.length ? filteredUrls : undefined, }; }); } const PROVIDER_ALIAS_MAP: Record = { lark: "feishu", }; function normalizeProviderForComparison(value?: string): string | undefined { const trimmed = value?.trim(); if (!trimmed) { return undefined; } const lowered = trimmed.toLowerCase(); const normalizedChannel = normalizeChannelId(trimmed); if (normalizedChannel) { return normalizedChannel; } return PROVIDER_ALIAS_MAP[lowered] ?? lowered; } function normalizeThreadIdForComparison(value?: string): string | undefined { const trimmed = value?.trim(); if (!trimmed) { return undefined; } if (/^-?\d+$/.test(trimmed)) { return String(Number.parseInt(trimmed, 10)); } return trimmed.toLowerCase(); } function resolveTargetProviderForComparison(params: { currentProvider: string; targetProvider?: string; }): string { const targetProvider = normalizeProviderForComparison(params.targetProvider); if (!targetProvider || targetProvider === "message") { return params.currentProvider; } return targetProvider; } function targetsMatchForSuppression(params: { provider: string; originTarget: string; targetKey: string; targetThreadId?: string; }): boolean { if (params.provider !== "telegram") { return params.targetKey === params.originTarget; } const origin = parseTelegramTarget(params.originTarget); const target = parseTelegramTarget(params.targetKey); const explicitTargetThreadId = normalizeThreadIdForComparison(params.targetThreadId); const targetThreadId = explicitTargetThreadId ?? (target.messageThreadId != null ? String(target.messageThreadId) : undefined); const originThreadId = origin.messageThreadId != null ? String(origin.messageThreadId) : undefined; if (origin.chatId.trim().toLowerCase() !== target.chatId.trim().toLowerCase()) { return false; } if (originThreadId && targetThreadId != null) { return originThreadId === targetThreadId; } if (originThreadId && targetThreadId == null) { return false; } if (!originThreadId && targetThreadId != null) { return false; } // chatId already matched and neither side carries thread context. return true; } export function shouldSuppressMessagingToolReplies(params: { messageProvider?: string; messagingToolSentTargets?: MessagingToolSend[]; originatingTo?: string; accountId?: string; }): boolean { const provider = normalizeProviderForComparison(params.messageProvider); if (!provider) { return false; } const originTarget = normalizeTargetForProvider(provider, params.originatingTo); if (!originTarget) { return false; } const originAccount = normalizeOptionalAccountId(params.accountId); const sentTargets = params.messagingToolSentTargets ?? []; if (sentTargets.length === 0) { return false; } return sentTargets.some((target) => { const targetProvider = resolveTargetProviderForComparison({ currentProvider: provider, targetProvider: target?.provider, }); if (targetProvider !== provider) { return false; } const targetKey = normalizeTargetForProvider(targetProvider, target.to); if (!targetKey) { return false; } const targetAccount = normalizeOptionalAccountId(target.accountId); if (originAccount && targetAccount && originAccount !== targetAccount) { return false; } return targetsMatchForSuppression({ provider, originTarget, targetKey, targetThreadId: target.threadId, }); }); }