Files
openclaw/src/mattermost/monitor.ts
Dominic Damoah bf6df6d6b7 feat: add Mattermost channel support
Add Mattermost as a supported messaging channel with bot API and WebSocket integration. Includes channel state tracking (tint, summary, details), multi-account support, and delivery target routing. Update documentation and tests to include Mattermost alongside existing channels.
2026-01-21 18:40:56 -05:00

775 lines
27 KiB
TypeScript

import WebSocket from "ws";
import {
resolveEffectiveMessagesConfig,
resolveHumanDelayConfig,
resolveIdentityName,
} from "../agents/identity.js";
import { chunkMarkdownText, resolveTextChunkLimit } from "../auto-reply/chunk.js";
import { hasControlCommand } from "../auto-reply/command-detection.js";
import { shouldHandleTextCommands } from "../auto-reply/commands-registry.js";
import { formatInboundEnvelope, formatInboundFromLabel } from "../auto-reply/envelope.js";
import {
createInboundDebouncer,
resolveInboundDebounceMs,
} from "../auto-reply/inbound-debounce.js";
import { dispatchReplyFromConfig } from "../auto-reply/reply/dispatch-from-config.js";
import { finalizeInboundContext } from "../auto-reply/reply/inbound-context.js";
import {
buildPendingHistoryContextFromMap,
clearHistoryEntries,
DEFAULT_GROUP_HISTORY_LIMIT,
recordPendingHistoryEntry,
type HistoryEntry,
} from "../auto-reply/reply/history.js";
import { createReplyDispatcherWithTyping } from "../auto-reply/reply/reply-dispatcher.js";
import {
extractShortModelName,
type ResponsePrefixContext,
} from "../auto-reply/reply/response-prefix-template.js";
import { buildMentionRegexes, matchesMentionPatterns } from "../auto-reply/reply/mentions.js";
import type { ReplyPayload } from "../auto-reply/types.js";
import type { ClawdbotConfig } from "../config/config.js";
import { loadConfig } from "../config/config.js";
import { resolveStorePath, updateLastRoute } from "../config/sessions.js";
import { danger, logVerbose, shouldLogVerbose } from "../globals.js";
import { createDedupeCache } from "../infra/dedupe.js";
import { rawDataToString } from "../infra/ws.js";
import { recordChannelActivity } from "../infra/channel-activity.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { getChildLogger } from "../logging.js";
import { mediaKindFromMime, type MediaKind } from "../media/constants.js";
import { fetchRemoteMedia, type FetchLike } from "../media/fetch.js";
import { saveMediaBuffer } from "../media/store.js";
import { resolveAgentRoute } from "../routing/resolve-route.js";
import { resolveThreadSessionKeys } from "../routing/session-key.js";
import type { RuntimeEnv } from "../runtime.js";
import type { ChannelAccountSnapshot } from "../channels/plugins/types.js";
import { resolveChannelMediaMaxBytes } from "../channels/plugins/media-limits.js";
import { resolveCommandAuthorizedFromAuthorizers } from "../channels/command-gating.js";
import { resolveMattermostAccount } from "./accounts.js";
import {
createMattermostClient,
fetchMattermostChannel,
fetchMattermostMe,
fetchMattermostUser,
normalizeMattermostBaseUrl,
sendMattermostTyping,
type MattermostChannel,
type MattermostPost,
type MattermostUser,
} from "./client.js";
import { sendMessageMattermost } from "./send.js";
export type MonitorMattermostOpts = {
botToken?: string;
baseUrl?: string;
accountId?: string;
config?: ClawdbotConfig;
runtime?: RuntimeEnv;
abortSignal?: AbortSignal;
statusSink?: (patch: Partial<ChannelAccountSnapshot>) => void;
};
type MattermostEventPayload = {
event?: string;
data?: {
post?: string;
channel_id?: string;
channel_name?: string;
channel_display_name?: string;
channel_type?: string;
sender_name?: string;
team_id?: string;
};
broadcast?: {
channel_id?: string;
team_id?: string;
user_id?: string;
};
};
const RECENT_MATTERMOST_MESSAGE_TTL_MS = 5 * 60_000;
const RECENT_MATTERMOST_MESSAGE_MAX = 2000;
const CHANNEL_CACHE_TTL_MS = 5 * 60_000;
const USER_CACHE_TTL_MS = 10 * 60_000;
const DEFAULT_ONCHAR_PREFIXES = [">", "!"];
const recentInboundMessages = createDedupeCache({
ttlMs: RECENT_MATTERMOST_MESSAGE_TTL_MS,
maxSize: RECENT_MATTERMOST_MESSAGE_MAX,
});
function resolveRuntime(opts: MonitorMattermostOpts): RuntimeEnv {
return (
opts.runtime ?? {
log: console.log,
error: console.error,
exit: (code: number): never => {
throw new Error(`exit ${code}`);
},
}
);
}
function normalizeMention(text: string, mention: string | undefined): string {
if (!mention) return text.trim();
const escaped = mention.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
const re = new RegExp(`@${escaped}\\b`, "gi");
return text.replace(re, " ").replace(/\s+/g, " ").trim();
}
function resolveOncharPrefixes(prefixes: string[] | undefined): string[] {
const cleaned = prefixes?.map((entry) => entry.trim()).filter(Boolean) ?? DEFAULT_ONCHAR_PREFIXES;
return cleaned.length > 0 ? cleaned : DEFAULT_ONCHAR_PREFIXES;
}
function stripOncharPrefix(
text: string,
prefixes: string[],
): { triggered: boolean; stripped: string } {
const trimmed = text.trimStart();
for (const prefix of prefixes) {
if (!prefix) continue;
if (trimmed.startsWith(prefix)) {
return {
triggered: true,
stripped: trimmed.slice(prefix.length).trimStart(),
};
}
}
return { triggered: false, stripped: text };
}
function isSystemPost(post: MattermostPost): boolean {
const type = post.type?.trim();
return Boolean(type);
}
function channelKind(channelType?: string | null): "dm" | "group" | "channel" {
if (!channelType) return "channel";
const normalized = channelType.trim().toUpperCase();
if (normalized === "D") return "dm";
if (normalized === "G") return "group";
return "channel";
}
function channelChatType(kind: "dm" | "group" | "channel"): "direct" | "group" | "channel" {
if (kind === "dm") return "direct";
if (kind === "group") return "group";
return "channel";
}
type MattermostMediaInfo = {
path: string;
contentType?: string;
kind: MediaKind;
};
function buildMattermostAttachmentPlaceholder(mediaList: MattermostMediaInfo[]): string {
if (mediaList.length === 0) return "";
if (mediaList.length === 1) {
const kind = mediaList[0].kind === "unknown" ? "document" : mediaList[0].kind;
return `<media:${kind}>`;
}
const allImages = mediaList.every((media) => media.kind === "image");
const label = allImages ? "image" : "file";
const suffix = mediaList.length === 1 ? label : `${label}s`;
const tag = allImages ? "<media:image>" : "<media:document>";
return `${tag} (${mediaList.length} ${suffix})`;
}
function buildMattermostMediaPayload(mediaList: MattermostMediaInfo[]): {
MediaPath?: string;
MediaType?: string;
MediaUrl?: string;
MediaPaths?: string[];
MediaUrls?: string[];
MediaTypes?: string[];
} {
const first = mediaList[0];
const mediaPaths = mediaList.map((media) => media.path);
const mediaTypes = mediaList.map((media) => media.contentType).filter(Boolean) as string[];
return {
MediaPath: first?.path,
MediaType: first?.contentType,
MediaUrl: first?.path,
MediaPaths: mediaPaths.length > 0 ? mediaPaths : undefined,
MediaUrls: mediaPaths.length > 0 ? mediaPaths : undefined,
MediaTypes: mediaTypes.length > 0 ? mediaTypes : undefined,
};
}
function buildMattermostWsUrl(baseUrl: string): string {
const normalized = normalizeMattermostBaseUrl(baseUrl);
if (!normalized) throw new Error("Mattermost baseUrl is required");
const wsBase = normalized.replace(/^http/i, "ws");
return `${wsBase}/api/v4/websocket`;
}
export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}): Promise<void> {
const runtime = resolveRuntime(opts);
const cfg = opts.config ?? loadConfig();
const account = resolveMattermostAccount({
cfg,
accountId: opts.accountId,
});
const botToken = opts.botToken?.trim() || account.botToken?.trim();
if (!botToken) {
throw new Error(
`Mattermost bot token missing for account "${account.accountId}" (set channels.mattermost.accounts.${account.accountId}.botToken or MATTERMOST_BOT_TOKEN for default).`,
);
}
const baseUrl = normalizeMattermostBaseUrl(opts.baseUrl ?? account.baseUrl);
if (!baseUrl) {
throw new Error(
`Mattermost baseUrl missing for account "${account.accountId}" (set channels.mattermost.accounts.${account.accountId}.baseUrl or MATTERMOST_URL for default).`,
);
}
const client = createMattermostClient({ baseUrl, botToken });
const botUser = await fetchMattermostMe(client);
const botUserId = botUser.id;
const botUsername = botUser.username?.trim() || undefined;
runtime.log?.(`mattermost connected as ${botUsername ? `@${botUsername}` : botUserId}`);
const channelCache = new Map<string, { value: MattermostChannel | null; expiresAt: number }>();
const userCache = new Map<string, { value: MattermostUser | null; expiresAt: number }>();
const logger = getChildLogger({ module: "mattermost" });
const mediaMaxBytes =
resolveChannelMediaMaxBytes({
cfg,
resolveChannelLimitMb: () => undefined,
accountId: account.accountId,
}) ?? 8 * 1024 * 1024;
const historyLimit = Math.max(
0,
cfg.messages?.groupChat?.historyLimit ?? DEFAULT_GROUP_HISTORY_LIMIT,
);
const channelHistories = new Map<string, HistoryEntry[]>();
const fetchWithAuth: FetchLike = (input, init) => {
const headers = new Headers(init?.headers);
headers.set("Authorization", `Bearer ${client.token}`);
return fetch(input, { ...init, headers });
};
const resolveMattermostMedia = async (
fileIds?: string[] | null,
): Promise<MattermostMediaInfo[]> => {
const ids = (fileIds ?? []).map((id) => id?.trim()).filter(Boolean) as string[];
if (ids.length === 0) return [];
const out: MattermostMediaInfo[] = [];
for (const fileId of ids) {
try {
const fetched = await fetchRemoteMedia({
url: `${client.apiBaseUrl}/files/${fileId}`,
fetchImpl: fetchWithAuth,
filePathHint: fileId,
maxBytes: mediaMaxBytes,
});
const saved = await saveMediaBuffer(
fetched.buffer,
fetched.contentType ?? undefined,
"inbound",
mediaMaxBytes,
);
const contentType = saved.contentType ?? fetched.contentType ?? undefined;
out.push({
path: saved.path,
contentType,
kind: mediaKindFromMime(contentType),
});
} catch (err) {
logger.debug?.(`mattermost: failed to download file ${fileId}: ${String(err)}`);
}
}
return out;
};
const sendTypingIndicator = async (channelId: string, parentId?: string) => {
try {
await sendMattermostTyping(client, { channelId, parentId });
} catch (err) {
logger.debug?.(`mattermost typing cue failed for channel ${channelId}: ${String(err)}`);
}
};
const resolveChannelInfo = async (channelId: string): Promise<MattermostChannel | null> => {
const cached = channelCache.get(channelId);
if (cached && cached.expiresAt > Date.now()) return cached.value;
try {
const info = await fetchMattermostChannel(client, channelId);
channelCache.set(channelId, {
value: info,
expiresAt: Date.now() + CHANNEL_CACHE_TTL_MS,
});
return info;
} catch (err) {
logger.debug?.(`mattermost: channel lookup failed: ${String(err)}`);
channelCache.set(channelId, {
value: null,
expiresAt: Date.now() + CHANNEL_CACHE_TTL_MS,
});
return null;
}
};
const resolveUserInfo = async (userId: string): Promise<MattermostUser | null> => {
const cached = userCache.get(userId);
if (cached && cached.expiresAt > Date.now()) return cached.value;
try {
const info = await fetchMattermostUser(client, userId);
userCache.set(userId, {
value: info,
expiresAt: Date.now() + USER_CACHE_TTL_MS,
});
return info;
} catch (err) {
logger.debug?.(`mattermost: user lookup failed: ${String(err)}`);
userCache.set(userId, {
value: null,
expiresAt: Date.now() + USER_CACHE_TTL_MS,
});
return null;
}
};
const handlePost = async (
post: MattermostPost,
payload: MattermostEventPayload,
messageIds?: string[],
) => {
const channelId = post.channel_id ?? payload.data?.channel_id ?? payload.broadcast?.channel_id;
if (!channelId) return;
const allMessageIds = messageIds?.length ? messageIds : post.id ? [post.id] : [];
if (allMessageIds.length === 0) return;
const dedupeEntries = allMessageIds.map((id) =>
recentInboundMessages.check(`${account.accountId}:${id}`),
);
if (dedupeEntries.length > 0 && dedupeEntries.every(Boolean)) return;
const senderId = post.user_id ?? payload.broadcast?.user_id;
if (!senderId) return;
if (senderId === botUserId) return;
if (isSystemPost(post)) return;
const channelInfo = await resolveChannelInfo(channelId);
const channelType = payload.data?.channel_type ?? channelInfo?.type ?? undefined;
const kind = channelKind(channelType);
const chatType = channelChatType(kind);
const teamId = payload.data?.team_id ?? channelInfo?.team_id ?? undefined;
const channelName = payload.data?.channel_name ?? channelInfo?.name ?? "";
const channelDisplay =
payload.data?.channel_display_name ?? channelInfo?.display_name ?? channelName;
const roomLabel = channelName ? `#${channelName}` : channelDisplay || `#${channelId}`;
const route = resolveAgentRoute({
cfg,
channel: "mattermost",
accountId: account.accountId,
teamId,
peer: {
kind,
id: kind === "dm" ? senderId : channelId,
},
});
const baseSessionKey = route.sessionKey;
const threadRootId = post.root_id?.trim() || undefined;
const threadKeys = resolveThreadSessionKeys({
baseSessionKey,
threadId: threadRootId,
parentSessionKey: threadRootId ? baseSessionKey : undefined,
});
const sessionKey = threadKeys.sessionKey;
const historyKey = kind === "dm" ? null : sessionKey;
const mentionRegexes = buildMentionRegexes(cfg, route.agentId);
const rawText = post.message?.trim() || "";
const wasMentioned =
kind !== "dm" &&
((botUsername ? rawText.toLowerCase().includes(`@${botUsername.toLowerCase()}`) : false) ||
matchesMentionPatterns(rawText, mentionRegexes));
const pendingBody =
rawText ||
(post.file_ids?.length
? `[Mattermost ${post.file_ids.length === 1 ? "file" : "files"}]`
: "");
const pendingSender = payload.data?.sender_name?.trim() || senderId;
const recordPendingHistory = () => {
if (!historyKey || historyLimit <= 0) return;
const trimmed = pendingBody.trim();
if (!trimmed) return;
recordPendingHistoryEntry({
historyMap: channelHistories,
historyKey,
limit: historyLimit,
entry: {
sender: pendingSender,
body: trimmed,
timestamp: typeof post.create_at === "number" ? post.create_at : undefined,
messageId: post.id ?? undefined,
},
});
};
const allowTextCommands = shouldHandleTextCommands({
cfg,
surface: "mattermost",
});
const isControlCommand = allowTextCommands && hasControlCommand(rawText, cfg);
const oncharEnabled = account.chatmode === "onchar" && kind !== "dm";
const oncharPrefixes = oncharEnabled ? resolveOncharPrefixes(account.oncharPrefixes) : [];
const oncharResult = oncharEnabled
? stripOncharPrefix(rawText, oncharPrefixes)
: { triggered: false, stripped: rawText };
const oncharTriggered = oncharResult.triggered;
const shouldRequireMention = kind === "channel" && (account.requireMention ?? true);
const shouldBypassMention = isControlCommand && shouldRequireMention && !wasMentioned;
const effectiveWasMentioned = wasMentioned || shouldBypassMention || oncharTriggered;
const canDetectMention = Boolean(botUsername) || mentionRegexes.length > 0;
if (oncharEnabled && !oncharTriggered && !wasMentioned && !isControlCommand) {
recordPendingHistory();
return;
}
if (kind === "channel" && shouldRequireMention && canDetectMention) {
if (!effectiveWasMentioned) {
recordPendingHistory();
return;
}
}
const senderName =
payload.data?.sender_name?.trim() ||
(await resolveUserInfo(senderId))?.username?.trim() ||
senderId;
const mediaList = await resolveMattermostMedia(post.file_ids);
const mediaPlaceholder = buildMattermostAttachmentPlaceholder(mediaList);
const bodySource = oncharTriggered ? oncharResult.stripped : rawText;
const baseText = [bodySource, mediaPlaceholder].filter(Boolean).join("\n").trim();
const bodyText = normalizeMention(baseText, botUsername);
if (!bodyText) return;
recordChannelActivity({
channel: "mattermost",
accountId: account.accountId,
direction: "inbound",
});
const fromLabel = formatInboundFromLabel({
isGroup: kind !== "dm",
groupLabel: channelDisplay || roomLabel,
groupId: channelId,
groupFallback: roomLabel || "Channel",
directLabel: senderName,
directId: senderId,
});
const preview = bodyText.replace(/\s+/g, " ").slice(0, 160);
const inboundLabel =
kind === "dm"
? `Mattermost DM from ${senderName}`
: `Mattermost message in ${roomLabel} from ${senderName}`;
enqueueSystemEvent(`${inboundLabel}: ${preview}`, {
sessionKey,
contextKey: `mattermost:message:${channelId}:${post.id ?? "unknown"}`,
});
const textWithId = `${bodyText}\n[mattermost message id: ${post.id ?? "unknown"} channel: ${channelId}]`;
const body = formatInboundEnvelope({
channel: "Mattermost",
from: fromLabel,
timestamp: typeof post.create_at === "number" ? post.create_at : undefined,
body: textWithId,
chatType,
sender: { name: senderName, id: senderId },
});
let combinedBody = body;
if (historyKey && historyLimit > 0) {
combinedBody = buildPendingHistoryContextFromMap({
historyMap: channelHistories,
historyKey,
limit: historyLimit,
currentMessage: combinedBody,
formatEntry: (entry) =>
formatInboundEnvelope({
channel: "Mattermost",
from: fromLabel,
timestamp: entry.timestamp,
body: `${entry.body}${
entry.messageId ? ` [id:${entry.messageId} channel:${channelId}]` : ""
}`,
chatType,
senderLabel: entry.sender,
}),
});
}
const to = kind === "dm" ? `user:${senderId}` : `channel:${channelId}`;
const mediaPayload = buildMattermostMediaPayload(mediaList);
const commandAuthorized = resolveCommandAuthorizedFromAuthorizers({
useAccessGroups: cfg.commands?.useAccessGroups ?? false,
authorizers: [],
});
const ctxPayload = finalizeInboundContext({
Body: combinedBody,
RawBody: bodyText,
CommandBody: bodyText,
From:
kind === "dm"
? `mattermost:${senderId}`
: kind === "group"
? `mattermost:group:${channelId}`
: `mattermost:channel:${channelId}`,
To: to,
SessionKey: sessionKey,
ParentSessionKey: threadKeys.parentSessionKey,
AccountId: route.accountId,
ChatType: chatType,
ConversationLabel: fromLabel,
GroupSubject: kind !== "dm" ? channelDisplay || roomLabel : undefined,
GroupChannel: channelName ? `#${channelName}` : undefined,
GroupSpace: teamId,
SenderName: senderName,
SenderId: senderId,
Provider: "mattermost" as const,
Surface: "mattermost" as const,
MessageSid: post.id ?? undefined,
MessageSids: allMessageIds.length > 1 ? allMessageIds : undefined,
MessageSidFirst: allMessageIds.length > 1 ? allMessageIds[0] : undefined,
MessageSidLast:
allMessageIds.length > 1 ? allMessageIds[allMessageIds.length - 1] : undefined,
ReplyToId: threadRootId,
MessageThreadId: threadRootId,
Timestamp: typeof post.create_at === "number" ? post.create_at : undefined,
WasMentioned: kind !== "dm" ? effectiveWasMentioned : undefined,
CommandAuthorized: commandAuthorized,
OriginatingChannel: "mattermost" as const,
OriginatingTo: to,
...mediaPayload,
});
if (kind === "dm") {
const sessionCfg = cfg.session;
const storePath = resolveStorePath(sessionCfg?.store, {
agentId: route.agentId,
});
await updateLastRoute({
storePath,
sessionKey: route.mainSessionKey,
deliveryContext: {
channel: "mattermost",
to,
accountId: route.accountId,
},
});
}
if (shouldLogVerbose()) {
const previewLine = bodyText.slice(0, 200).replace(/\n/g, "\\n");
logVerbose(
`mattermost inbound: from=${ctxPayload.From} len=${bodyText.length} preview="${previewLine}"`,
);
}
const textLimit = resolveTextChunkLimit(cfg, "mattermost", account.accountId, {
fallbackLimit: account.textChunkLimit ?? 4000,
});
let prefixContext: ResponsePrefixContext = {
identityName: resolveIdentityName(cfg, route.agentId),
};
const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({
responsePrefix: resolveEffectiveMessagesConfig(cfg, route.agentId).responsePrefix,
responsePrefixContextProvider: () => prefixContext,
humanDelay: resolveHumanDelayConfig(cfg, route.agentId),
deliver: async (payload: ReplyPayload) => {
const mediaUrls = payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []);
const text = payload.text ?? "";
if (mediaUrls.length === 0) {
const chunks = chunkMarkdownText(text, textLimit);
for (const chunk of chunks.length > 0 ? chunks : [text]) {
if (!chunk) continue;
await sendMessageMattermost(to, chunk, {
accountId: account.accountId,
replyToId: threadRootId,
});
}
} else {
let first = true;
for (const mediaUrl of mediaUrls) {
const caption = first ? text : "";
first = false;
await sendMessageMattermost(to, caption, {
accountId: account.accountId,
mediaUrl,
replyToId: threadRootId,
});
}
}
runtime.log?.(`delivered reply to ${to}`);
},
onError: (err, info) => {
runtime.error?.(danger(`mattermost ${info.kind} reply failed: ${String(err)}`));
},
onReplyStart: () => sendTypingIndicator(channelId, threadRootId),
});
await dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
...replyOptions,
disableBlockStreaming:
typeof account.blockStreaming === "boolean" ? !account.blockStreaming : undefined,
onModelSelected: (ctx) => {
prefixContext.provider = ctx.provider;
prefixContext.model = extractShortModelName(ctx.model);
prefixContext.modelFull = `${ctx.provider}/${ctx.model}`;
prefixContext.thinkingLevel = ctx.thinkLevel ?? "off";
},
},
});
markDispatchIdle();
if (historyKey && historyLimit > 0) {
clearHistoryEntries({ historyMap: channelHistories, historyKey });
}
};
const inboundDebounceMs = resolveInboundDebounceMs({ cfg, channel: "mattermost" });
const debouncer = createInboundDebouncer<{
post: MattermostPost;
payload: MattermostEventPayload;
}>({
debounceMs: inboundDebounceMs,
buildKey: (entry) => {
const channelId =
entry.post.channel_id ??
entry.payload.data?.channel_id ??
entry.payload.broadcast?.channel_id;
if (!channelId) return null;
const threadId = entry.post.root_id?.trim();
const threadKey = threadId ? `thread:${threadId}` : "channel";
return `mattermost:${account.accountId}:${channelId}:${threadKey}`;
},
shouldDebounce: (entry) => {
if (entry.post.file_ids && entry.post.file_ids.length > 0) return false;
const text = entry.post.message?.trim() ?? "";
if (!text) return false;
return !hasControlCommand(text, cfg);
},
onFlush: async (entries) => {
const last = entries.at(-1);
if (!last) return;
if (entries.length === 1) {
await handlePost(last.post, last.payload);
return;
}
const combinedText = entries
.map((entry) => entry.post.message?.trim() ?? "")
.filter(Boolean)
.join("\n");
const mergedPost: MattermostPost = {
...last.post,
message: combinedText,
file_ids: [],
};
const ids = entries.map((entry) => entry.post.id).filter(Boolean) as string[];
await handlePost(mergedPost, last.payload, ids.length > 0 ? ids : undefined);
},
onError: (err) => {
runtime.error?.(danger(`mattermost debounce flush failed: ${String(err)}`));
},
});
const wsUrl = buildMattermostWsUrl(baseUrl);
let seq = 1;
const connectOnce = async (): Promise<void> => {
const ws = new WebSocket(wsUrl);
const onAbort = () => ws.close();
opts.abortSignal?.addEventListener("abort", onAbort, { once: true });
return await new Promise((resolve) => {
ws.on("open", () => {
opts.statusSink?.({
connected: true,
lastConnectedAt: Date.now(),
lastError: null,
});
ws.send(
JSON.stringify({
seq: seq++,
action: "authentication_challenge",
data: { token: botToken },
}),
);
});
ws.on("message", async (data) => {
const raw = rawDataToString(data);
let payload: MattermostEventPayload;
try {
payload = JSON.parse(raw) as MattermostEventPayload;
} catch {
return;
}
if (payload.event !== "posted") return;
const postData = payload.data?.post;
if (!postData) return;
let post: MattermostPost | null = null;
if (typeof postData === "string") {
try {
post = JSON.parse(postData) as MattermostPost;
} catch {
return;
}
} else if (typeof postData === "object") {
post = postData as MattermostPost;
}
if (!post) return;
try {
await debouncer.enqueue({ post, payload });
} catch (err) {
runtime.error?.(danger(`mattermost handler failed: ${String(err)}`));
}
});
ws.on("close", (code, reason) => {
const message = reason.length > 0 ? reason.toString("utf8") : "";
opts.statusSink?.({
connected: false,
lastDisconnect: {
at: Date.now(),
status: code,
error: message || undefined,
},
});
opts.abortSignal?.removeEventListener("abort", onAbort);
resolve();
});
ws.on("error", (err) => {
runtime.error?.(danger(`mattermost websocket error: ${String(err)}`));
opts.statusSink?.({
lastError: String(err),
});
});
});
};
while (!opts.abortSignal?.aborted) {
await connectOnce();
if (opts.abortSignal?.aborted) return;
await new Promise((resolve) => setTimeout(resolve, 2000));
}
}