mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-09 10:27:38 +00:00
feat: add Mattermost channel support
Add Mattermost as a supported messaging channel with bot API and WebSocket integration. Includes channel state tracking (tint, summary, details), multi-account support, and delivery target routing. Update documentation and tests to include Mattermost alongside existing channels.
This commit is contained in:
774
src/mattermost/monitor.ts
Normal file
774
src/mattermost/monitor.ts
Normal file
@@ -0,0 +1,774 @@
|
||||
import WebSocket from "ws";
|
||||
|
||||
import {
|
||||
resolveEffectiveMessagesConfig,
|
||||
resolveHumanDelayConfig,
|
||||
resolveIdentityName,
|
||||
} from "../agents/identity.js";
|
||||
import { chunkMarkdownText, resolveTextChunkLimit } from "../auto-reply/chunk.js";
|
||||
import { hasControlCommand } from "../auto-reply/command-detection.js";
|
||||
import { shouldHandleTextCommands } from "../auto-reply/commands-registry.js";
|
||||
import { formatInboundEnvelope, formatInboundFromLabel } from "../auto-reply/envelope.js";
|
||||
import {
|
||||
createInboundDebouncer,
|
||||
resolveInboundDebounceMs,
|
||||
} from "../auto-reply/inbound-debounce.js";
|
||||
import { dispatchReplyFromConfig } from "../auto-reply/reply/dispatch-from-config.js";
|
||||
import { finalizeInboundContext } from "../auto-reply/reply/inbound-context.js";
|
||||
import {
|
||||
buildPendingHistoryContextFromMap,
|
||||
clearHistoryEntries,
|
||||
DEFAULT_GROUP_HISTORY_LIMIT,
|
||||
recordPendingHistoryEntry,
|
||||
type HistoryEntry,
|
||||
} from "../auto-reply/reply/history.js";
|
||||
import { createReplyDispatcherWithTyping } from "../auto-reply/reply/reply-dispatcher.js";
|
||||
import {
|
||||
extractShortModelName,
|
||||
type ResponsePrefixContext,
|
||||
} from "../auto-reply/reply/response-prefix-template.js";
|
||||
import { buildMentionRegexes, matchesMentionPatterns } from "../auto-reply/reply/mentions.js";
|
||||
import type { ReplyPayload } from "../auto-reply/types.js";
|
||||
import type { ClawdbotConfig } from "../config/config.js";
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import { resolveStorePath, updateLastRoute } from "../config/sessions.js";
|
||||
import { danger, logVerbose, shouldLogVerbose } from "../globals.js";
|
||||
import { createDedupeCache } from "../infra/dedupe.js";
|
||||
import { rawDataToString } from "../infra/ws.js";
|
||||
import { recordChannelActivity } from "../infra/channel-activity.js";
|
||||
import { enqueueSystemEvent } from "../infra/system-events.js";
|
||||
import { getChildLogger } from "../logging.js";
|
||||
import { mediaKindFromMime, type MediaKind } from "../media/constants.js";
|
||||
import { fetchRemoteMedia, type FetchLike } from "../media/fetch.js";
|
||||
import { saveMediaBuffer } from "../media/store.js";
|
||||
import { resolveAgentRoute } from "../routing/resolve-route.js";
|
||||
import { resolveThreadSessionKeys } from "../routing/session-key.js";
|
||||
import type { RuntimeEnv } from "../runtime.js";
|
||||
import type { ChannelAccountSnapshot } from "../channels/plugins/types.js";
|
||||
import { resolveChannelMediaMaxBytes } from "../channels/plugins/media-limits.js";
|
||||
import { resolveCommandAuthorizedFromAuthorizers } from "../channels/command-gating.js";
|
||||
import { resolveMattermostAccount } from "./accounts.js";
|
||||
import {
|
||||
createMattermostClient,
|
||||
fetchMattermostChannel,
|
||||
fetchMattermostMe,
|
||||
fetchMattermostUser,
|
||||
normalizeMattermostBaseUrl,
|
||||
sendMattermostTyping,
|
||||
type MattermostChannel,
|
||||
type MattermostPost,
|
||||
type MattermostUser,
|
||||
} from "./client.js";
|
||||
import { sendMessageMattermost } from "./send.js";
|
||||
|
||||
export type MonitorMattermostOpts = {
|
||||
botToken?: string;
|
||||
baseUrl?: string;
|
||||
accountId?: string;
|
||||
config?: ClawdbotConfig;
|
||||
runtime?: RuntimeEnv;
|
||||
abortSignal?: AbortSignal;
|
||||
statusSink?: (patch: Partial<ChannelAccountSnapshot>) => void;
|
||||
};
|
||||
|
||||
type MattermostEventPayload = {
|
||||
event?: string;
|
||||
data?: {
|
||||
post?: string;
|
||||
channel_id?: string;
|
||||
channel_name?: string;
|
||||
channel_display_name?: string;
|
||||
channel_type?: string;
|
||||
sender_name?: string;
|
||||
team_id?: string;
|
||||
};
|
||||
broadcast?: {
|
||||
channel_id?: string;
|
||||
team_id?: string;
|
||||
user_id?: string;
|
||||
};
|
||||
};
|
||||
|
||||
const RECENT_MATTERMOST_MESSAGE_TTL_MS = 5 * 60_000;
|
||||
const RECENT_MATTERMOST_MESSAGE_MAX = 2000;
|
||||
const CHANNEL_CACHE_TTL_MS = 5 * 60_000;
|
||||
const USER_CACHE_TTL_MS = 10 * 60_000;
|
||||
const DEFAULT_ONCHAR_PREFIXES = [">", "!"];
|
||||
|
||||
const recentInboundMessages = createDedupeCache({
|
||||
ttlMs: RECENT_MATTERMOST_MESSAGE_TTL_MS,
|
||||
maxSize: RECENT_MATTERMOST_MESSAGE_MAX,
|
||||
});
|
||||
|
||||
function resolveRuntime(opts: MonitorMattermostOpts): RuntimeEnv {
|
||||
return (
|
||||
opts.runtime ?? {
|
||||
log: console.log,
|
||||
error: console.error,
|
||||
exit: (code: number): never => {
|
||||
throw new Error(`exit ${code}`);
|
||||
},
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
function normalizeMention(text: string, mention: string | undefined): string {
|
||||
if (!mention) return text.trim();
|
||||
const escaped = mention.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
|
||||
const re = new RegExp(`@${escaped}\\b`, "gi");
|
||||
return text.replace(re, " ").replace(/\s+/g, " ").trim();
|
||||
}
|
||||
|
||||
function resolveOncharPrefixes(prefixes: string[] | undefined): string[] {
|
||||
const cleaned = prefixes?.map((entry) => entry.trim()).filter(Boolean) ?? DEFAULT_ONCHAR_PREFIXES;
|
||||
return cleaned.length > 0 ? cleaned : DEFAULT_ONCHAR_PREFIXES;
|
||||
}
|
||||
|
||||
function stripOncharPrefix(
|
||||
text: string,
|
||||
prefixes: string[],
|
||||
): { triggered: boolean; stripped: string } {
|
||||
const trimmed = text.trimStart();
|
||||
for (const prefix of prefixes) {
|
||||
if (!prefix) continue;
|
||||
if (trimmed.startsWith(prefix)) {
|
||||
return {
|
||||
triggered: true,
|
||||
stripped: trimmed.slice(prefix.length).trimStart(),
|
||||
};
|
||||
}
|
||||
}
|
||||
return { triggered: false, stripped: text };
|
||||
}
|
||||
|
||||
function isSystemPost(post: MattermostPost): boolean {
|
||||
const type = post.type?.trim();
|
||||
return Boolean(type);
|
||||
}
|
||||
|
||||
function channelKind(channelType?: string | null): "dm" | "group" | "channel" {
|
||||
if (!channelType) return "channel";
|
||||
const normalized = channelType.trim().toUpperCase();
|
||||
if (normalized === "D") return "dm";
|
||||
if (normalized === "G") return "group";
|
||||
return "channel";
|
||||
}
|
||||
|
||||
function channelChatType(kind: "dm" | "group" | "channel"): "direct" | "group" | "channel" {
|
||||
if (kind === "dm") return "direct";
|
||||
if (kind === "group") return "group";
|
||||
return "channel";
|
||||
}
|
||||
|
||||
type MattermostMediaInfo = {
|
||||
path: string;
|
||||
contentType?: string;
|
||||
kind: MediaKind;
|
||||
};
|
||||
|
||||
function buildMattermostAttachmentPlaceholder(mediaList: MattermostMediaInfo[]): string {
|
||||
if (mediaList.length === 0) return "";
|
||||
if (mediaList.length === 1) {
|
||||
const kind = mediaList[0].kind === "unknown" ? "document" : mediaList[0].kind;
|
||||
return `<media:${kind}>`;
|
||||
}
|
||||
const allImages = mediaList.every((media) => media.kind === "image");
|
||||
const label = allImages ? "image" : "file";
|
||||
const suffix = mediaList.length === 1 ? label : `${label}s`;
|
||||
const tag = allImages ? "<media:image>" : "<media:document>";
|
||||
return `${tag} (${mediaList.length} ${suffix})`;
|
||||
}
|
||||
|
||||
function buildMattermostMediaPayload(mediaList: MattermostMediaInfo[]): {
|
||||
MediaPath?: string;
|
||||
MediaType?: string;
|
||||
MediaUrl?: string;
|
||||
MediaPaths?: string[];
|
||||
MediaUrls?: string[];
|
||||
MediaTypes?: string[];
|
||||
} {
|
||||
const first = mediaList[0];
|
||||
const mediaPaths = mediaList.map((media) => media.path);
|
||||
const mediaTypes = mediaList.map((media) => media.contentType).filter(Boolean) as string[];
|
||||
return {
|
||||
MediaPath: first?.path,
|
||||
MediaType: first?.contentType,
|
||||
MediaUrl: first?.path,
|
||||
MediaPaths: mediaPaths.length > 0 ? mediaPaths : undefined,
|
||||
MediaUrls: mediaPaths.length > 0 ? mediaPaths : undefined,
|
||||
MediaTypes: mediaTypes.length > 0 ? mediaTypes : undefined,
|
||||
};
|
||||
}
|
||||
|
||||
function buildMattermostWsUrl(baseUrl: string): string {
|
||||
const normalized = normalizeMattermostBaseUrl(baseUrl);
|
||||
if (!normalized) throw new Error("Mattermost baseUrl is required");
|
||||
const wsBase = normalized.replace(/^http/i, "ws");
|
||||
return `${wsBase}/api/v4/websocket`;
|
||||
}
|
||||
|
||||
export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}): Promise<void> {
|
||||
const runtime = resolveRuntime(opts);
|
||||
const cfg = opts.config ?? loadConfig();
|
||||
const account = resolveMattermostAccount({
|
||||
cfg,
|
||||
accountId: opts.accountId,
|
||||
});
|
||||
const botToken = opts.botToken?.trim() || account.botToken?.trim();
|
||||
if (!botToken) {
|
||||
throw new Error(
|
||||
`Mattermost bot token missing for account "${account.accountId}" (set channels.mattermost.accounts.${account.accountId}.botToken or MATTERMOST_BOT_TOKEN for default).`,
|
||||
);
|
||||
}
|
||||
const baseUrl = normalizeMattermostBaseUrl(opts.baseUrl ?? account.baseUrl);
|
||||
if (!baseUrl) {
|
||||
throw new Error(
|
||||
`Mattermost baseUrl missing for account "${account.accountId}" (set channels.mattermost.accounts.${account.accountId}.baseUrl or MATTERMOST_URL for default).`,
|
||||
);
|
||||
}
|
||||
|
||||
const client = createMattermostClient({ baseUrl, botToken });
|
||||
const botUser = await fetchMattermostMe(client);
|
||||
const botUserId = botUser.id;
|
||||
const botUsername = botUser.username?.trim() || undefined;
|
||||
runtime.log?.(`mattermost connected as ${botUsername ? `@${botUsername}` : botUserId}`);
|
||||
|
||||
const channelCache = new Map<string, { value: MattermostChannel | null; expiresAt: number }>();
|
||||
const userCache = new Map<string, { value: MattermostUser | null; expiresAt: number }>();
|
||||
const logger = getChildLogger({ module: "mattermost" });
|
||||
const mediaMaxBytes =
|
||||
resolveChannelMediaMaxBytes({
|
||||
cfg,
|
||||
resolveChannelLimitMb: () => undefined,
|
||||
accountId: account.accountId,
|
||||
}) ?? 8 * 1024 * 1024;
|
||||
const historyLimit = Math.max(
|
||||
0,
|
||||
cfg.messages?.groupChat?.historyLimit ?? DEFAULT_GROUP_HISTORY_LIMIT,
|
||||
);
|
||||
const channelHistories = new Map<string, HistoryEntry[]>();
|
||||
|
||||
const fetchWithAuth: FetchLike = (input, init) => {
|
||||
const headers = new Headers(init?.headers);
|
||||
headers.set("Authorization", `Bearer ${client.token}`);
|
||||
return fetch(input, { ...init, headers });
|
||||
};
|
||||
|
||||
const resolveMattermostMedia = async (
|
||||
fileIds?: string[] | null,
|
||||
): Promise<MattermostMediaInfo[]> => {
|
||||
const ids = (fileIds ?? []).map((id) => id?.trim()).filter(Boolean) as string[];
|
||||
if (ids.length === 0) return [];
|
||||
const out: MattermostMediaInfo[] = [];
|
||||
for (const fileId of ids) {
|
||||
try {
|
||||
const fetched = await fetchRemoteMedia({
|
||||
url: `${client.apiBaseUrl}/files/${fileId}`,
|
||||
fetchImpl: fetchWithAuth,
|
||||
filePathHint: fileId,
|
||||
maxBytes: mediaMaxBytes,
|
||||
});
|
||||
const saved = await saveMediaBuffer(
|
||||
fetched.buffer,
|
||||
fetched.contentType ?? undefined,
|
||||
"inbound",
|
||||
mediaMaxBytes,
|
||||
);
|
||||
const contentType = saved.contentType ?? fetched.contentType ?? undefined;
|
||||
out.push({
|
||||
path: saved.path,
|
||||
contentType,
|
||||
kind: mediaKindFromMime(contentType),
|
||||
});
|
||||
} catch (err) {
|
||||
logger.debug?.(`mattermost: failed to download file ${fileId}: ${String(err)}`);
|
||||
}
|
||||
}
|
||||
return out;
|
||||
};
|
||||
|
||||
const sendTypingIndicator = async (channelId: string, parentId?: string) => {
|
||||
try {
|
||||
await sendMattermostTyping(client, { channelId, parentId });
|
||||
} catch (err) {
|
||||
logger.debug?.(`mattermost typing cue failed for channel ${channelId}: ${String(err)}`);
|
||||
}
|
||||
};
|
||||
|
||||
const resolveChannelInfo = async (channelId: string): Promise<MattermostChannel | null> => {
|
||||
const cached = channelCache.get(channelId);
|
||||
if (cached && cached.expiresAt > Date.now()) return cached.value;
|
||||
try {
|
||||
const info = await fetchMattermostChannel(client, channelId);
|
||||
channelCache.set(channelId, {
|
||||
value: info,
|
||||
expiresAt: Date.now() + CHANNEL_CACHE_TTL_MS,
|
||||
});
|
||||
return info;
|
||||
} catch (err) {
|
||||
logger.debug?.(`mattermost: channel lookup failed: ${String(err)}`);
|
||||
channelCache.set(channelId, {
|
||||
value: null,
|
||||
expiresAt: Date.now() + CHANNEL_CACHE_TTL_MS,
|
||||
});
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
const resolveUserInfo = async (userId: string): Promise<MattermostUser | null> => {
|
||||
const cached = userCache.get(userId);
|
||||
if (cached && cached.expiresAt > Date.now()) return cached.value;
|
||||
try {
|
||||
const info = await fetchMattermostUser(client, userId);
|
||||
userCache.set(userId, {
|
||||
value: info,
|
||||
expiresAt: Date.now() + USER_CACHE_TTL_MS,
|
||||
});
|
||||
return info;
|
||||
} catch (err) {
|
||||
logger.debug?.(`mattermost: user lookup failed: ${String(err)}`);
|
||||
userCache.set(userId, {
|
||||
value: null,
|
||||
expiresAt: Date.now() + USER_CACHE_TTL_MS,
|
||||
});
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
const handlePost = async (
|
||||
post: MattermostPost,
|
||||
payload: MattermostEventPayload,
|
||||
messageIds?: string[],
|
||||
) => {
|
||||
const channelId = post.channel_id ?? payload.data?.channel_id ?? payload.broadcast?.channel_id;
|
||||
if (!channelId) return;
|
||||
|
||||
const allMessageIds = messageIds?.length ? messageIds : post.id ? [post.id] : [];
|
||||
if (allMessageIds.length === 0) return;
|
||||
const dedupeEntries = allMessageIds.map((id) =>
|
||||
recentInboundMessages.check(`${account.accountId}:${id}`),
|
||||
);
|
||||
if (dedupeEntries.length > 0 && dedupeEntries.every(Boolean)) return;
|
||||
|
||||
const senderId = post.user_id ?? payload.broadcast?.user_id;
|
||||
if (!senderId) return;
|
||||
if (senderId === botUserId) return;
|
||||
if (isSystemPost(post)) return;
|
||||
|
||||
const channelInfo = await resolveChannelInfo(channelId);
|
||||
const channelType = payload.data?.channel_type ?? channelInfo?.type ?? undefined;
|
||||
const kind = channelKind(channelType);
|
||||
const chatType = channelChatType(kind);
|
||||
|
||||
const teamId = payload.data?.team_id ?? channelInfo?.team_id ?? undefined;
|
||||
const channelName = payload.data?.channel_name ?? channelInfo?.name ?? "";
|
||||
const channelDisplay =
|
||||
payload.data?.channel_display_name ?? channelInfo?.display_name ?? channelName;
|
||||
const roomLabel = channelName ? `#${channelName}` : channelDisplay || `#${channelId}`;
|
||||
|
||||
const route = resolveAgentRoute({
|
||||
cfg,
|
||||
channel: "mattermost",
|
||||
accountId: account.accountId,
|
||||
teamId,
|
||||
peer: {
|
||||
kind,
|
||||
id: kind === "dm" ? senderId : channelId,
|
||||
},
|
||||
});
|
||||
|
||||
const baseSessionKey = route.sessionKey;
|
||||
const threadRootId = post.root_id?.trim() || undefined;
|
||||
const threadKeys = resolveThreadSessionKeys({
|
||||
baseSessionKey,
|
||||
threadId: threadRootId,
|
||||
parentSessionKey: threadRootId ? baseSessionKey : undefined,
|
||||
});
|
||||
const sessionKey = threadKeys.sessionKey;
|
||||
const historyKey = kind === "dm" ? null : sessionKey;
|
||||
|
||||
const mentionRegexes = buildMentionRegexes(cfg, route.agentId);
|
||||
const rawText = post.message?.trim() || "";
|
||||
const wasMentioned =
|
||||
kind !== "dm" &&
|
||||
((botUsername ? rawText.toLowerCase().includes(`@${botUsername.toLowerCase()}`) : false) ||
|
||||
matchesMentionPatterns(rawText, mentionRegexes));
|
||||
const pendingBody =
|
||||
rawText ||
|
||||
(post.file_ids?.length
|
||||
? `[Mattermost ${post.file_ids.length === 1 ? "file" : "files"}]`
|
||||
: "");
|
||||
const pendingSender = payload.data?.sender_name?.trim() || senderId;
|
||||
const recordPendingHistory = () => {
|
||||
if (!historyKey || historyLimit <= 0) return;
|
||||
const trimmed = pendingBody.trim();
|
||||
if (!trimmed) return;
|
||||
recordPendingHistoryEntry({
|
||||
historyMap: channelHistories,
|
||||
historyKey,
|
||||
limit: historyLimit,
|
||||
entry: {
|
||||
sender: pendingSender,
|
||||
body: trimmed,
|
||||
timestamp: typeof post.create_at === "number" ? post.create_at : undefined,
|
||||
messageId: post.id ?? undefined,
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
const allowTextCommands = shouldHandleTextCommands({
|
||||
cfg,
|
||||
surface: "mattermost",
|
||||
});
|
||||
const isControlCommand = allowTextCommands && hasControlCommand(rawText, cfg);
|
||||
const oncharEnabled = account.chatmode === "onchar" && kind !== "dm";
|
||||
const oncharPrefixes = oncharEnabled ? resolveOncharPrefixes(account.oncharPrefixes) : [];
|
||||
const oncharResult = oncharEnabled
|
||||
? stripOncharPrefix(rawText, oncharPrefixes)
|
||||
: { triggered: false, stripped: rawText };
|
||||
const oncharTriggered = oncharResult.triggered;
|
||||
|
||||
const shouldRequireMention = kind === "channel" && (account.requireMention ?? true);
|
||||
const shouldBypassMention = isControlCommand && shouldRequireMention && !wasMentioned;
|
||||
const effectiveWasMentioned = wasMentioned || shouldBypassMention || oncharTriggered;
|
||||
const canDetectMention = Boolean(botUsername) || mentionRegexes.length > 0;
|
||||
|
||||
if (oncharEnabled && !oncharTriggered && !wasMentioned && !isControlCommand) {
|
||||
recordPendingHistory();
|
||||
return;
|
||||
}
|
||||
|
||||
if (kind === "channel" && shouldRequireMention && canDetectMention) {
|
||||
if (!effectiveWasMentioned) {
|
||||
recordPendingHistory();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
const senderName =
|
||||
payload.data?.sender_name?.trim() ||
|
||||
(await resolveUserInfo(senderId))?.username?.trim() ||
|
||||
senderId;
|
||||
const mediaList = await resolveMattermostMedia(post.file_ids);
|
||||
const mediaPlaceholder = buildMattermostAttachmentPlaceholder(mediaList);
|
||||
const bodySource = oncharTriggered ? oncharResult.stripped : rawText;
|
||||
const baseText = [bodySource, mediaPlaceholder].filter(Boolean).join("\n").trim();
|
||||
const bodyText = normalizeMention(baseText, botUsername);
|
||||
if (!bodyText) return;
|
||||
|
||||
recordChannelActivity({
|
||||
channel: "mattermost",
|
||||
accountId: account.accountId,
|
||||
direction: "inbound",
|
||||
});
|
||||
|
||||
const fromLabel = formatInboundFromLabel({
|
||||
isGroup: kind !== "dm",
|
||||
groupLabel: channelDisplay || roomLabel,
|
||||
groupId: channelId,
|
||||
groupFallback: roomLabel || "Channel",
|
||||
directLabel: senderName,
|
||||
directId: senderId,
|
||||
});
|
||||
|
||||
const preview = bodyText.replace(/\s+/g, " ").slice(0, 160);
|
||||
const inboundLabel =
|
||||
kind === "dm"
|
||||
? `Mattermost DM from ${senderName}`
|
||||
: `Mattermost message in ${roomLabel} from ${senderName}`;
|
||||
enqueueSystemEvent(`${inboundLabel}: ${preview}`, {
|
||||
sessionKey,
|
||||
contextKey: `mattermost:message:${channelId}:${post.id ?? "unknown"}`,
|
||||
});
|
||||
|
||||
const textWithId = `${bodyText}\n[mattermost message id: ${post.id ?? "unknown"} channel: ${channelId}]`;
|
||||
const body = formatInboundEnvelope({
|
||||
channel: "Mattermost",
|
||||
from: fromLabel,
|
||||
timestamp: typeof post.create_at === "number" ? post.create_at : undefined,
|
||||
body: textWithId,
|
||||
chatType,
|
||||
sender: { name: senderName, id: senderId },
|
||||
});
|
||||
let combinedBody = body;
|
||||
if (historyKey && historyLimit > 0) {
|
||||
combinedBody = buildPendingHistoryContextFromMap({
|
||||
historyMap: channelHistories,
|
||||
historyKey,
|
||||
limit: historyLimit,
|
||||
currentMessage: combinedBody,
|
||||
formatEntry: (entry) =>
|
||||
formatInboundEnvelope({
|
||||
channel: "Mattermost",
|
||||
from: fromLabel,
|
||||
timestamp: entry.timestamp,
|
||||
body: `${entry.body}${
|
||||
entry.messageId ? ` [id:${entry.messageId} channel:${channelId}]` : ""
|
||||
}`,
|
||||
chatType,
|
||||
senderLabel: entry.sender,
|
||||
}),
|
||||
});
|
||||
}
|
||||
|
||||
const to = kind === "dm" ? `user:${senderId}` : `channel:${channelId}`;
|
||||
const mediaPayload = buildMattermostMediaPayload(mediaList);
|
||||
const commandAuthorized = resolveCommandAuthorizedFromAuthorizers({
|
||||
useAccessGroups: cfg.commands?.useAccessGroups ?? false,
|
||||
authorizers: [],
|
||||
});
|
||||
const ctxPayload = finalizeInboundContext({
|
||||
Body: combinedBody,
|
||||
RawBody: bodyText,
|
||||
CommandBody: bodyText,
|
||||
From:
|
||||
kind === "dm"
|
||||
? `mattermost:${senderId}`
|
||||
: kind === "group"
|
||||
? `mattermost:group:${channelId}`
|
||||
: `mattermost:channel:${channelId}`,
|
||||
To: to,
|
||||
SessionKey: sessionKey,
|
||||
ParentSessionKey: threadKeys.parentSessionKey,
|
||||
AccountId: route.accountId,
|
||||
ChatType: chatType,
|
||||
ConversationLabel: fromLabel,
|
||||
GroupSubject: kind !== "dm" ? channelDisplay || roomLabel : undefined,
|
||||
GroupChannel: channelName ? `#${channelName}` : undefined,
|
||||
GroupSpace: teamId,
|
||||
SenderName: senderName,
|
||||
SenderId: senderId,
|
||||
Provider: "mattermost" as const,
|
||||
Surface: "mattermost" as const,
|
||||
MessageSid: post.id ?? undefined,
|
||||
MessageSids: allMessageIds.length > 1 ? allMessageIds : undefined,
|
||||
MessageSidFirst: allMessageIds.length > 1 ? allMessageIds[0] : undefined,
|
||||
MessageSidLast:
|
||||
allMessageIds.length > 1 ? allMessageIds[allMessageIds.length - 1] : undefined,
|
||||
ReplyToId: threadRootId,
|
||||
MessageThreadId: threadRootId,
|
||||
Timestamp: typeof post.create_at === "number" ? post.create_at : undefined,
|
||||
WasMentioned: kind !== "dm" ? effectiveWasMentioned : undefined,
|
||||
CommandAuthorized: commandAuthorized,
|
||||
OriginatingChannel: "mattermost" as const,
|
||||
OriginatingTo: to,
|
||||
...mediaPayload,
|
||||
});
|
||||
|
||||
if (kind === "dm") {
|
||||
const sessionCfg = cfg.session;
|
||||
const storePath = resolveStorePath(sessionCfg?.store, {
|
||||
agentId: route.agentId,
|
||||
});
|
||||
await updateLastRoute({
|
||||
storePath,
|
||||
sessionKey: route.mainSessionKey,
|
||||
deliveryContext: {
|
||||
channel: "mattermost",
|
||||
to,
|
||||
accountId: route.accountId,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
if (shouldLogVerbose()) {
|
||||
const previewLine = bodyText.slice(0, 200).replace(/\n/g, "\\n");
|
||||
logVerbose(
|
||||
`mattermost inbound: from=${ctxPayload.From} len=${bodyText.length} preview="${previewLine}"`,
|
||||
);
|
||||
}
|
||||
|
||||
const textLimit = resolveTextChunkLimit(cfg, "mattermost", account.accountId, {
|
||||
fallbackLimit: account.textChunkLimit ?? 4000,
|
||||
});
|
||||
|
||||
let prefixContext: ResponsePrefixContext = {
|
||||
identityName: resolveIdentityName(cfg, route.agentId),
|
||||
};
|
||||
|
||||
const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({
|
||||
responsePrefix: resolveEffectiveMessagesConfig(cfg, route.agentId).responsePrefix,
|
||||
responsePrefixContextProvider: () => prefixContext,
|
||||
humanDelay: resolveHumanDelayConfig(cfg, route.agentId),
|
||||
deliver: async (payload: ReplyPayload) => {
|
||||
const mediaUrls = payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []);
|
||||
const text = payload.text ?? "";
|
||||
if (mediaUrls.length === 0) {
|
||||
const chunks = chunkMarkdownText(text, textLimit);
|
||||
for (const chunk of chunks.length > 0 ? chunks : [text]) {
|
||||
if (!chunk) continue;
|
||||
await sendMessageMattermost(to, chunk, {
|
||||
accountId: account.accountId,
|
||||
replyToId: threadRootId,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
let first = true;
|
||||
for (const mediaUrl of mediaUrls) {
|
||||
const caption = first ? text : "";
|
||||
first = false;
|
||||
await sendMessageMattermost(to, caption, {
|
||||
accountId: account.accountId,
|
||||
mediaUrl,
|
||||
replyToId: threadRootId,
|
||||
});
|
||||
}
|
||||
}
|
||||
runtime.log?.(`delivered reply to ${to}`);
|
||||
},
|
||||
onError: (err, info) => {
|
||||
runtime.error?.(danger(`mattermost ${info.kind} reply failed: ${String(err)}`));
|
||||
},
|
||||
onReplyStart: () => sendTypingIndicator(channelId, threadRootId),
|
||||
});
|
||||
|
||||
await dispatchReplyFromConfig({
|
||||
ctx: ctxPayload,
|
||||
cfg,
|
||||
dispatcher,
|
||||
replyOptions: {
|
||||
...replyOptions,
|
||||
disableBlockStreaming:
|
||||
typeof account.blockStreaming === "boolean" ? !account.blockStreaming : undefined,
|
||||
onModelSelected: (ctx) => {
|
||||
prefixContext.provider = ctx.provider;
|
||||
prefixContext.model = extractShortModelName(ctx.model);
|
||||
prefixContext.modelFull = `${ctx.provider}/${ctx.model}`;
|
||||
prefixContext.thinkingLevel = ctx.thinkLevel ?? "off";
|
||||
},
|
||||
},
|
||||
});
|
||||
markDispatchIdle();
|
||||
if (historyKey && historyLimit > 0) {
|
||||
clearHistoryEntries({ historyMap: channelHistories, historyKey });
|
||||
}
|
||||
};
|
||||
|
||||
const inboundDebounceMs = resolveInboundDebounceMs({ cfg, channel: "mattermost" });
|
||||
const debouncer = createInboundDebouncer<{
|
||||
post: MattermostPost;
|
||||
payload: MattermostEventPayload;
|
||||
}>({
|
||||
debounceMs: inboundDebounceMs,
|
||||
buildKey: (entry) => {
|
||||
const channelId =
|
||||
entry.post.channel_id ??
|
||||
entry.payload.data?.channel_id ??
|
||||
entry.payload.broadcast?.channel_id;
|
||||
if (!channelId) return null;
|
||||
const threadId = entry.post.root_id?.trim();
|
||||
const threadKey = threadId ? `thread:${threadId}` : "channel";
|
||||
return `mattermost:${account.accountId}:${channelId}:${threadKey}`;
|
||||
},
|
||||
shouldDebounce: (entry) => {
|
||||
if (entry.post.file_ids && entry.post.file_ids.length > 0) return false;
|
||||
const text = entry.post.message?.trim() ?? "";
|
||||
if (!text) return false;
|
||||
return !hasControlCommand(text, cfg);
|
||||
},
|
||||
onFlush: async (entries) => {
|
||||
const last = entries.at(-1);
|
||||
if (!last) return;
|
||||
if (entries.length === 1) {
|
||||
await handlePost(last.post, last.payload);
|
||||
return;
|
||||
}
|
||||
const combinedText = entries
|
||||
.map((entry) => entry.post.message?.trim() ?? "")
|
||||
.filter(Boolean)
|
||||
.join("\n");
|
||||
const mergedPost: MattermostPost = {
|
||||
...last.post,
|
||||
message: combinedText,
|
||||
file_ids: [],
|
||||
};
|
||||
const ids = entries.map((entry) => entry.post.id).filter(Boolean) as string[];
|
||||
await handlePost(mergedPost, last.payload, ids.length > 0 ? ids : undefined);
|
||||
},
|
||||
onError: (err) => {
|
||||
runtime.error?.(danger(`mattermost debounce flush failed: ${String(err)}`));
|
||||
},
|
||||
});
|
||||
|
||||
const wsUrl = buildMattermostWsUrl(baseUrl);
|
||||
let seq = 1;
|
||||
|
||||
const connectOnce = async (): Promise<void> => {
|
||||
const ws = new WebSocket(wsUrl);
|
||||
const onAbort = () => ws.close();
|
||||
opts.abortSignal?.addEventListener("abort", onAbort, { once: true });
|
||||
|
||||
return await new Promise((resolve) => {
|
||||
ws.on("open", () => {
|
||||
opts.statusSink?.({
|
||||
connected: true,
|
||||
lastConnectedAt: Date.now(),
|
||||
lastError: null,
|
||||
});
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
seq: seq++,
|
||||
action: "authentication_challenge",
|
||||
data: { token: botToken },
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
ws.on("message", async (data) => {
|
||||
const raw = rawDataToString(data);
|
||||
let payload: MattermostEventPayload;
|
||||
try {
|
||||
payload = JSON.parse(raw) as MattermostEventPayload;
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
if (payload.event !== "posted") return;
|
||||
const postData = payload.data?.post;
|
||||
if (!postData) return;
|
||||
let post: MattermostPost | null = null;
|
||||
if (typeof postData === "string") {
|
||||
try {
|
||||
post = JSON.parse(postData) as MattermostPost;
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
} else if (typeof postData === "object") {
|
||||
post = postData as MattermostPost;
|
||||
}
|
||||
if (!post) return;
|
||||
try {
|
||||
await debouncer.enqueue({ post, payload });
|
||||
} catch (err) {
|
||||
runtime.error?.(danger(`mattermost handler failed: ${String(err)}`));
|
||||
}
|
||||
});
|
||||
|
||||
ws.on("close", (code, reason) => {
|
||||
const message = reason.length > 0 ? reason.toString("utf8") : "";
|
||||
opts.statusSink?.({
|
||||
connected: false,
|
||||
lastDisconnect: {
|
||||
at: Date.now(),
|
||||
status: code,
|
||||
error: message || undefined,
|
||||
},
|
||||
});
|
||||
opts.abortSignal?.removeEventListener("abort", onAbort);
|
||||
resolve();
|
||||
});
|
||||
|
||||
ws.on("error", (err) => {
|
||||
runtime.error?.(danger(`mattermost websocket error: ${String(err)}`));
|
||||
opts.statusSink?.({
|
||||
lastError: String(err),
|
||||
});
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
while (!opts.abortSignal?.aborted) {
|
||||
await connectOnce();
|
||||
if (opts.abortSignal?.aborted) return;
|
||||
await new Promise((resolve) => setTimeout(resolve, 2000));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user