mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-08 14:18:26 +00:00
refactor(imessage): split monitor inbound processing
This commit is contained in:
34
src/imessage/monitor/abort-handler.ts
Normal file
34
src/imessage/monitor/abort-handler.ts
Normal file
@@ -0,0 +1,34 @@
|
||||
export type IMessageMonitorClient = {
|
||||
request: (method: string, params?: Record<string, unknown>) => Promise<unknown>;
|
||||
stop: () => Promise<void>;
|
||||
};
|
||||
|
||||
export function attachIMessageMonitorAbortHandler(params: {
|
||||
abortSignal?: AbortSignal;
|
||||
client: IMessageMonitorClient;
|
||||
getSubscriptionId: () => number | null;
|
||||
}): () => void {
|
||||
const abort = params.abortSignal;
|
||||
if (!abort) {
|
||||
return () => {};
|
||||
}
|
||||
|
||||
const onAbort = () => {
|
||||
const subscriptionId = params.getSubscriptionId();
|
||||
if (subscriptionId) {
|
||||
void params.client
|
||||
.request("watch.unsubscribe", {
|
||||
subscription: subscriptionId,
|
||||
})
|
||||
.catch(() => {
|
||||
// Ignore disconnect errors during shutdown.
|
||||
});
|
||||
}
|
||||
void params.client.stop().catch(() => {
|
||||
// Ignore disconnect errors during shutdown.
|
||||
});
|
||||
};
|
||||
|
||||
abort.addEventListener("abort", onAbort, { once: true });
|
||||
return () => abort.removeEventListener("abort", onAbort);
|
||||
}
|
||||
483
src/imessage/monitor/inbound-processing.ts
Normal file
483
src/imessage/monitor/inbound-processing.ts
Normal file
@@ -0,0 +1,483 @@
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import type { MonitorIMessageOpts, IMessagePayload } from "./types.js";
|
||||
import { hasControlCommand } from "../../auto-reply/command-detection.js";
|
||||
import {
|
||||
formatInboundEnvelope,
|
||||
formatInboundFromLabel,
|
||||
resolveEnvelopeFormatOptions,
|
||||
type EnvelopeFormatOptions,
|
||||
} from "../../auto-reply/envelope.js";
|
||||
import {
|
||||
buildPendingHistoryContextFromMap,
|
||||
recordPendingHistoryEntryIfEnabled,
|
||||
type HistoryEntry,
|
||||
} from "../../auto-reply/reply/history.js";
|
||||
import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js";
|
||||
import { buildMentionRegexes, matchesMentionPatterns } from "../../auto-reply/reply/mentions.js";
|
||||
import { resolveControlCommandGate } from "../../channels/command-gating.js";
|
||||
import { logInboundDrop } from "../../channels/logging.js";
|
||||
import {
|
||||
resolveChannelGroupPolicy,
|
||||
resolveChannelGroupRequireMention,
|
||||
} from "../../config/group-policy.js";
|
||||
import { resolveAgentRoute } from "../../routing/resolve-route.js";
|
||||
import { truncateUtf16Safe } from "../../utils.js";
|
||||
import {
|
||||
formatIMessageChatTarget,
|
||||
isAllowedIMessageSender,
|
||||
normalizeIMessageHandle,
|
||||
} from "../targets.js";
|
||||
|
||||
type IMessageReplyContext = {
|
||||
id?: string;
|
||||
body: string;
|
||||
sender?: string;
|
||||
};
|
||||
|
||||
function normalizeReplyField(value: unknown): string | undefined {
|
||||
if (typeof value === "string") {
|
||||
const trimmed = value.trim();
|
||||
return trimmed ? trimmed : undefined;
|
||||
}
|
||||
if (typeof value === "number") {
|
||||
return String(value);
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function describeReplyContext(message: IMessagePayload): IMessageReplyContext | null {
|
||||
const body = normalizeReplyField(message.reply_to_text);
|
||||
if (!body) {
|
||||
return null;
|
||||
}
|
||||
const id = normalizeReplyField(message.reply_to_id);
|
||||
const sender = normalizeReplyField(message.reply_to_sender);
|
||||
return { body, id, sender };
|
||||
}
|
||||
|
||||
export type IMessageInboundDispatchDecision = {
|
||||
kind: "dispatch";
|
||||
isGroup: boolean;
|
||||
chatId?: number;
|
||||
chatGuid?: string;
|
||||
chatIdentifier?: string;
|
||||
groupId?: string;
|
||||
historyKey?: string;
|
||||
sender: string;
|
||||
senderNormalized: string;
|
||||
route: ReturnType<typeof resolveAgentRoute>;
|
||||
bodyText: string;
|
||||
createdAt?: number;
|
||||
replyContext: IMessageReplyContext | null;
|
||||
effectiveWasMentioned: boolean;
|
||||
commandAuthorized: boolean;
|
||||
// Used for allowlist checks for control commands.
|
||||
effectiveDmAllowFrom: string[];
|
||||
effectiveGroupAllowFrom: string[];
|
||||
};
|
||||
|
||||
export type IMessageInboundDecision =
|
||||
| { kind: "drop"; reason: string }
|
||||
| { kind: "pairing"; senderId: string }
|
||||
| IMessageInboundDispatchDecision;
|
||||
|
||||
export function resolveIMessageInboundDecision(params: {
|
||||
cfg: OpenClawConfig;
|
||||
accountId: string;
|
||||
message: IMessagePayload;
|
||||
opts?: Pick<MonitorIMessageOpts, "requireMention">;
|
||||
messageText: string;
|
||||
bodyText: string;
|
||||
allowFrom: string[];
|
||||
groupAllowFrom: string[];
|
||||
groupPolicy: string;
|
||||
dmPolicy: string;
|
||||
storeAllowFrom: string[];
|
||||
historyLimit: number;
|
||||
groupHistories: Map<string, HistoryEntry[]>;
|
||||
echoCache?: { has: (scope: string, text: string) => boolean };
|
||||
logVerbose?: (msg: string) => void;
|
||||
}): IMessageInboundDecision {
|
||||
const senderRaw = params.message.sender ?? "";
|
||||
const sender = senderRaw.trim();
|
||||
if (!sender) {
|
||||
return { kind: "drop", reason: "missing sender" };
|
||||
}
|
||||
const senderNormalized = normalizeIMessageHandle(sender);
|
||||
if (params.message.is_from_me) {
|
||||
return { kind: "drop", reason: "from me" };
|
||||
}
|
||||
|
||||
const chatId = params.message.chat_id ?? undefined;
|
||||
const chatGuid = params.message.chat_guid ?? undefined;
|
||||
const chatIdentifier = params.message.chat_identifier ?? undefined;
|
||||
|
||||
const groupIdCandidate = chatId !== undefined ? String(chatId) : undefined;
|
||||
const groupListPolicy = groupIdCandidate
|
||||
? resolveChannelGroupPolicy({
|
||||
cfg: params.cfg,
|
||||
channel: "imessage",
|
||||
accountId: params.accountId,
|
||||
groupId: groupIdCandidate,
|
||||
})
|
||||
: {
|
||||
allowlistEnabled: false,
|
||||
allowed: true,
|
||||
groupConfig: undefined,
|
||||
defaultConfig: undefined,
|
||||
};
|
||||
|
||||
// If the owner explicitly configures a chat_id under imessage.groups, treat that thread as a
|
||||
// "group" for permission gating + session isolation, even when is_group=false.
|
||||
const treatAsGroupByConfig = Boolean(
|
||||
groupIdCandidate && groupListPolicy.allowlistEnabled && groupListPolicy.groupConfig,
|
||||
);
|
||||
const isGroup = Boolean(params.message.is_group) || treatAsGroupByConfig;
|
||||
if (isGroup && !chatId) {
|
||||
return { kind: "drop", reason: "group without chat_id" };
|
||||
}
|
||||
|
||||
const groupId = isGroup ? groupIdCandidate : undefined;
|
||||
const effectiveDmAllowFrom = Array.from(new Set([...params.allowFrom, ...params.storeAllowFrom]))
|
||||
.map((v) => String(v).trim())
|
||||
.filter(Boolean);
|
||||
// Keep DM pairing-store authorization scoped to DMs; group access must come from explicit group allowlist config.
|
||||
const effectiveGroupAllowFrom = Array.from(new Set(params.groupAllowFrom))
|
||||
.map((v) => String(v).trim())
|
||||
.filter(Boolean);
|
||||
|
||||
if (isGroup) {
|
||||
if (params.groupPolicy === "disabled") {
|
||||
params.logVerbose?.("Blocked iMessage group message (groupPolicy: disabled)");
|
||||
return { kind: "drop", reason: "groupPolicy disabled" };
|
||||
}
|
||||
if (params.groupPolicy === "allowlist") {
|
||||
if (effectiveGroupAllowFrom.length === 0) {
|
||||
params.logVerbose?.(
|
||||
"Blocked iMessage group message (groupPolicy: allowlist, no groupAllowFrom)",
|
||||
);
|
||||
return { kind: "drop", reason: "groupPolicy allowlist (empty groupAllowFrom)" };
|
||||
}
|
||||
const allowed = isAllowedIMessageSender({
|
||||
allowFrom: effectiveGroupAllowFrom,
|
||||
sender,
|
||||
chatId,
|
||||
chatGuid,
|
||||
chatIdentifier,
|
||||
});
|
||||
if (!allowed) {
|
||||
params.logVerbose?.(`Blocked iMessage sender ${sender} (not in groupAllowFrom)`);
|
||||
return { kind: "drop", reason: "not in groupAllowFrom" };
|
||||
}
|
||||
}
|
||||
if (groupListPolicy.allowlistEnabled && !groupListPolicy.allowed) {
|
||||
params.logVerbose?.(
|
||||
`imessage: skipping group message (${groupId ?? "unknown"}) not in allowlist`,
|
||||
);
|
||||
return { kind: "drop", reason: "group id not in allowlist" };
|
||||
}
|
||||
}
|
||||
|
||||
const dmHasWildcard = effectiveDmAllowFrom.includes("*");
|
||||
const dmAuthorized =
|
||||
params.dmPolicy === "open"
|
||||
? true
|
||||
: dmHasWildcard ||
|
||||
(effectiveDmAllowFrom.length > 0 &&
|
||||
isAllowedIMessageSender({
|
||||
allowFrom: effectiveDmAllowFrom,
|
||||
sender,
|
||||
chatId,
|
||||
chatGuid,
|
||||
chatIdentifier,
|
||||
}));
|
||||
|
||||
if (!isGroup) {
|
||||
if (params.dmPolicy === "disabled") {
|
||||
return { kind: "drop", reason: "dmPolicy disabled" };
|
||||
}
|
||||
if (!dmAuthorized) {
|
||||
if (params.dmPolicy === "pairing") {
|
||||
return { kind: "pairing", senderId: senderNormalized };
|
||||
}
|
||||
params.logVerbose?.(`Blocked iMessage sender ${sender} (dmPolicy=${params.dmPolicy})`);
|
||||
return { kind: "drop", reason: "dmPolicy blocked" };
|
||||
}
|
||||
}
|
||||
|
||||
const route = resolveAgentRoute({
|
||||
cfg: params.cfg,
|
||||
channel: "imessage",
|
||||
accountId: params.accountId,
|
||||
peer: {
|
||||
kind: isGroup ? "group" : "direct",
|
||||
id: isGroup ? String(chatId ?? "unknown") : senderNormalized,
|
||||
},
|
||||
});
|
||||
const mentionRegexes = buildMentionRegexes(params.cfg, route.agentId);
|
||||
const messageText = params.messageText.trim();
|
||||
const bodyText = params.bodyText.trim();
|
||||
if (!bodyText) {
|
||||
return { kind: "drop", reason: "empty body" };
|
||||
}
|
||||
|
||||
// Echo detection: check if the received message matches a recently sent message (within 5 seconds).
|
||||
// Scope by conversation so same text in different chats is not conflated.
|
||||
if (params.echoCache && messageText) {
|
||||
const echoScope = buildIMessageEchoScope({
|
||||
accountId: params.accountId,
|
||||
isGroup,
|
||||
chatId,
|
||||
sender,
|
||||
});
|
||||
if (params.echoCache.has(echoScope, messageText)) {
|
||||
params.logVerbose?.(describeIMessageEchoDropLog({ messageText }));
|
||||
return { kind: "drop", reason: "echo" };
|
||||
}
|
||||
}
|
||||
|
||||
const replyContext = describeReplyContext(params.message);
|
||||
const createdAt = params.message.created_at ? Date.parse(params.message.created_at) : undefined;
|
||||
const historyKey = isGroup
|
||||
? String(chatId ?? chatGuid ?? chatIdentifier ?? "unknown")
|
||||
: undefined;
|
||||
|
||||
const mentioned = isGroup ? matchesMentionPatterns(messageText, mentionRegexes) : true;
|
||||
const requireMention = resolveChannelGroupRequireMention({
|
||||
cfg: params.cfg,
|
||||
channel: "imessage",
|
||||
accountId: params.accountId,
|
||||
groupId,
|
||||
requireMentionOverride: params.opts?.requireMention,
|
||||
overrideOrder: "before-config",
|
||||
});
|
||||
const canDetectMention = mentionRegexes.length > 0;
|
||||
|
||||
const useAccessGroups = params.cfg.commands?.useAccessGroups !== false;
|
||||
const ownerAllowedForCommands =
|
||||
effectiveDmAllowFrom.length > 0
|
||||
? isAllowedIMessageSender({
|
||||
allowFrom: effectiveDmAllowFrom,
|
||||
sender,
|
||||
chatId,
|
||||
chatGuid,
|
||||
chatIdentifier,
|
||||
})
|
||||
: false;
|
||||
const groupAllowedForCommands =
|
||||
effectiveGroupAllowFrom.length > 0
|
||||
? isAllowedIMessageSender({
|
||||
allowFrom: effectiveGroupAllowFrom,
|
||||
sender,
|
||||
chatId,
|
||||
chatGuid,
|
||||
chatIdentifier,
|
||||
})
|
||||
: false;
|
||||
const hasControlCommandInMessage = hasControlCommand(messageText, params.cfg);
|
||||
const commandGate = resolveControlCommandGate({
|
||||
useAccessGroups,
|
||||
authorizers: [
|
||||
{ configured: effectiveDmAllowFrom.length > 0, allowed: ownerAllowedForCommands },
|
||||
{ configured: effectiveGroupAllowFrom.length > 0, allowed: groupAllowedForCommands },
|
||||
],
|
||||
allowTextCommands: true,
|
||||
hasControlCommand: hasControlCommandInMessage,
|
||||
});
|
||||
const commandAuthorized = isGroup ? commandGate.commandAuthorized : dmAuthorized;
|
||||
if (isGroup && commandGate.shouldBlock) {
|
||||
if (params.logVerbose) {
|
||||
logInboundDrop({
|
||||
log: params.logVerbose,
|
||||
channel: "imessage",
|
||||
reason: "control command (unauthorized)",
|
||||
target: sender,
|
||||
});
|
||||
}
|
||||
return { kind: "drop", reason: "control command (unauthorized)" };
|
||||
}
|
||||
|
||||
const shouldBypassMention =
|
||||
isGroup && requireMention && !mentioned && commandAuthorized && hasControlCommandInMessage;
|
||||
const effectiveWasMentioned = mentioned || shouldBypassMention;
|
||||
if (isGroup && requireMention && canDetectMention && !mentioned && !shouldBypassMention) {
|
||||
params.logVerbose?.(`imessage: skipping group message (no mention)`);
|
||||
recordPendingHistoryEntryIfEnabled({
|
||||
historyMap: params.groupHistories,
|
||||
historyKey: historyKey ?? "",
|
||||
limit: params.historyLimit,
|
||||
entry: historyKey
|
||||
? {
|
||||
sender: senderNormalized,
|
||||
body: bodyText,
|
||||
timestamp: createdAt,
|
||||
messageId: params.message.id ? String(params.message.id) : undefined,
|
||||
}
|
||||
: null,
|
||||
});
|
||||
return { kind: "drop", reason: "no mention" };
|
||||
}
|
||||
|
||||
return {
|
||||
kind: "dispatch",
|
||||
isGroup,
|
||||
chatId,
|
||||
chatGuid,
|
||||
chatIdentifier,
|
||||
groupId,
|
||||
historyKey,
|
||||
sender,
|
||||
senderNormalized,
|
||||
route,
|
||||
bodyText,
|
||||
createdAt,
|
||||
replyContext,
|
||||
effectiveWasMentioned,
|
||||
commandAuthorized,
|
||||
effectiveDmAllowFrom,
|
||||
effectiveGroupAllowFrom,
|
||||
};
|
||||
}
|
||||
|
||||
export function buildIMessageInboundContext(params: {
|
||||
cfg: OpenClawConfig;
|
||||
decision: IMessageInboundDispatchDecision;
|
||||
message: IMessagePayload;
|
||||
envelopeOptions?: EnvelopeFormatOptions;
|
||||
previousTimestamp?: number;
|
||||
remoteHost?: string;
|
||||
media?: {
|
||||
path?: string;
|
||||
type?: string;
|
||||
paths?: string[];
|
||||
types?: Array<string | undefined>;
|
||||
};
|
||||
historyLimit: number;
|
||||
groupHistories: Map<string, HistoryEntry[]>;
|
||||
}): {
|
||||
ctxPayload: ReturnType<typeof finalizeInboundContext>;
|
||||
fromLabel: string;
|
||||
chatTarget?: string;
|
||||
imessageTo: string;
|
||||
inboundHistory?: Array<{ sender: string; body: string; timestamp?: number }>;
|
||||
} {
|
||||
const envelopeOptions = params.envelopeOptions ?? resolveEnvelopeFormatOptions(params.cfg);
|
||||
const { decision } = params;
|
||||
const chatId = decision.chatId;
|
||||
const chatTarget =
|
||||
decision.isGroup && chatId != null ? formatIMessageChatTarget(chatId) : undefined;
|
||||
|
||||
const replySuffix = decision.replyContext
|
||||
? `\n\n[Replying to ${decision.replyContext.sender ?? "unknown sender"}${
|
||||
decision.replyContext.id ? ` id:${decision.replyContext.id}` : ""
|
||||
}]\n${decision.replyContext.body}\n[/Replying]`
|
||||
: "";
|
||||
|
||||
const fromLabel = formatInboundFromLabel({
|
||||
isGroup: decision.isGroup,
|
||||
groupLabel: params.message.chat_name ?? undefined,
|
||||
groupId: chatId !== undefined ? String(chatId) : "unknown",
|
||||
groupFallback: "Group",
|
||||
directLabel: decision.senderNormalized,
|
||||
directId: decision.sender,
|
||||
});
|
||||
|
||||
const body = formatInboundEnvelope({
|
||||
channel: "iMessage",
|
||||
from: fromLabel,
|
||||
timestamp: decision.createdAt,
|
||||
body: `${decision.bodyText}${replySuffix}`,
|
||||
chatType: decision.isGroup ? "group" : "direct",
|
||||
sender: { name: decision.senderNormalized, id: decision.sender },
|
||||
previousTimestamp: params.previousTimestamp,
|
||||
envelope: envelopeOptions,
|
||||
});
|
||||
|
||||
let combinedBody = body;
|
||||
if (decision.isGroup && decision.historyKey) {
|
||||
combinedBody = buildPendingHistoryContextFromMap({
|
||||
historyMap: params.groupHistories,
|
||||
historyKey: decision.historyKey,
|
||||
limit: params.historyLimit,
|
||||
currentMessage: combinedBody,
|
||||
formatEntry: (entry) =>
|
||||
formatInboundEnvelope({
|
||||
channel: "iMessage",
|
||||
from: fromLabel,
|
||||
timestamp: entry.timestamp,
|
||||
body: `${entry.body}${entry.messageId ? ` [id:${entry.messageId}]` : ""}`,
|
||||
chatType: "group",
|
||||
senderLabel: entry.sender,
|
||||
envelope: envelopeOptions,
|
||||
}),
|
||||
});
|
||||
}
|
||||
|
||||
const imessageTo = (decision.isGroup ? chatTarget : undefined) || `imessage:${decision.sender}`;
|
||||
const inboundHistory =
|
||||
decision.isGroup && decision.historyKey && params.historyLimit > 0
|
||||
? (params.groupHistories.get(decision.historyKey) ?? []).map((entry) => ({
|
||||
sender: entry.sender,
|
||||
body: entry.body,
|
||||
timestamp: entry.timestamp,
|
||||
}))
|
||||
: undefined;
|
||||
|
||||
const ctxPayload = finalizeInboundContext({
|
||||
Body: combinedBody,
|
||||
BodyForAgent: decision.bodyText,
|
||||
InboundHistory: inboundHistory,
|
||||
RawBody: decision.bodyText,
|
||||
CommandBody: decision.bodyText,
|
||||
From: decision.isGroup
|
||||
? `imessage:group:${chatId ?? "unknown"}`
|
||||
: `imessage:${decision.sender}`,
|
||||
To: imessageTo,
|
||||
SessionKey: decision.route.sessionKey,
|
||||
AccountId: decision.route.accountId,
|
||||
ChatType: decision.isGroup ? "group" : "direct",
|
||||
ConversationLabel: fromLabel,
|
||||
GroupSubject: decision.isGroup ? (params.message.chat_name ?? undefined) : undefined,
|
||||
GroupMembers: decision.isGroup
|
||||
? (params.message.participants ?? []).filter(Boolean).join(", ")
|
||||
: undefined,
|
||||
SenderName: decision.senderNormalized,
|
||||
SenderId: decision.sender,
|
||||
Provider: "imessage",
|
||||
Surface: "imessage",
|
||||
MessageSid: params.message.id ? String(params.message.id) : undefined,
|
||||
ReplyToId: decision.replyContext?.id,
|
||||
ReplyToBody: decision.replyContext?.body,
|
||||
ReplyToSender: decision.replyContext?.sender,
|
||||
Timestamp: decision.createdAt,
|
||||
MediaPath: params.media?.path,
|
||||
MediaType: params.media?.type,
|
||||
MediaUrl: params.media?.path,
|
||||
MediaPaths:
|
||||
params.media?.paths && params.media.paths.length > 0 ? params.media.paths : undefined,
|
||||
MediaTypes:
|
||||
params.media?.types && params.media.types.length > 0 ? params.media.types : undefined,
|
||||
MediaUrls:
|
||||
params.media?.paths && params.media.paths.length > 0 ? params.media.paths : undefined,
|
||||
MediaRemoteHost: params.remoteHost,
|
||||
WasMentioned: decision.effectiveWasMentioned,
|
||||
CommandAuthorized: decision.commandAuthorized,
|
||||
OriginatingChannel: "imessage" as const,
|
||||
OriginatingTo: imessageTo,
|
||||
});
|
||||
|
||||
return { ctxPayload, fromLabel, chatTarget, imessageTo, inboundHistory };
|
||||
}
|
||||
|
||||
export function buildIMessageEchoScope(params: {
|
||||
accountId: string;
|
||||
isGroup: boolean;
|
||||
chatId?: number;
|
||||
sender: string;
|
||||
}): string {
|
||||
return `${params.accountId}:${params.isGroup ? formatIMessageChatTarget(params.chatId) : `imessage:${params.sender}`}`;
|
||||
}
|
||||
|
||||
export function describeIMessageEchoDropLog(params: { messageText: string }): string {
|
||||
return `imessage: skipping echo message (matches recently sent text within 5s): "${truncateUtf16Safe(params.messageText, 50)}"`;
|
||||
}
|
||||
@@ -4,34 +4,19 @@ import { resolveHumanDelayConfig } from "../../agents/identity.js";
|
||||
import { resolveTextChunkLimit } from "../../auto-reply/chunk.js";
|
||||
import { hasControlCommand } from "../../auto-reply/command-detection.js";
|
||||
import { dispatchInboundMessage } from "../../auto-reply/dispatch.js";
|
||||
import {
|
||||
formatInboundEnvelope,
|
||||
formatInboundFromLabel,
|
||||
resolveEnvelopeFormatOptions,
|
||||
} from "../../auto-reply/envelope.js";
|
||||
import {
|
||||
createInboundDebouncer,
|
||||
resolveInboundDebounceMs,
|
||||
} from "../../auto-reply/inbound-debounce.js";
|
||||
import {
|
||||
buildPendingHistoryContextFromMap,
|
||||
clearHistoryEntriesIfEnabled,
|
||||
DEFAULT_GROUP_HISTORY_LIMIT,
|
||||
recordPendingHistoryEntryIfEnabled,
|
||||
type HistoryEntry,
|
||||
} from "../../auto-reply/reply/history.js";
|
||||
import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js";
|
||||
import { buildMentionRegexes, matchesMentionPatterns } from "../../auto-reply/reply/mentions.js";
|
||||
import { createReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.js";
|
||||
import { resolveControlCommandGate } from "../../channels/command-gating.js";
|
||||
import { logInboundDrop } from "../../channels/logging.js";
|
||||
import { createReplyPrefixOptions } from "../../channels/reply-prefix.js";
|
||||
import { recordInboundSession } from "../../channels/session.js";
|
||||
import { loadConfig } from "../../config/config.js";
|
||||
import {
|
||||
resolveChannelGroupPolicy,
|
||||
resolveChannelGroupRequireMention,
|
||||
} from "../../config/group-policy.js";
|
||||
import { readSessionUpdatedAt, resolveStorePath } from "../../config/sessions.js";
|
||||
import { danger, logVerbose, shouldLogVerbose } from "../../globals.js";
|
||||
import { waitForTransportReady } from "../../infra/transport-ready.js";
|
||||
@@ -41,19 +26,18 @@ import {
|
||||
readChannelAllowFromStore,
|
||||
upsertChannelPairingRequest,
|
||||
} from "../../pairing/pairing-store.js";
|
||||
import { resolveAgentRoute } from "../../routing/resolve-route.js";
|
||||
import { truncateUtf16Safe } from "../../utils.js";
|
||||
import { resolveIMessageAccount } from "../accounts.js";
|
||||
import { createIMessageRpcClient } from "../client.js";
|
||||
import { DEFAULT_IMESSAGE_PROBE_TIMEOUT_MS } from "../constants.js";
|
||||
import { probeIMessage } from "../probe.js";
|
||||
import { sendMessageIMessage } from "../send.js";
|
||||
import {
|
||||
formatIMessageChatTarget,
|
||||
isAllowedIMessageSender,
|
||||
normalizeIMessageHandle,
|
||||
} from "../targets.js";
|
||||
import { attachIMessageMonitorAbortHandler } from "./abort-handler.js";
|
||||
import { deliverReplies } from "./deliver.js";
|
||||
import {
|
||||
buildIMessageInboundContext,
|
||||
resolveIMessageInboundDecision,
|
||||
} from "./inbound-processing.js";
|
||||
import { parseIMessageNotification } from "./parse-notification.js";
|
||||
import { normalizeAllowList, resolveRuntime } from "./runtime.js";
|
||||
|
||||
@@ -85,33 +69,6 @@ async function detectRemoteHostFromCliPath(cliPath: string): Promise<string | un
|
||||
}
|
||||
}
|
||||
|
||||
type IMessageReplyContext = {
|
||||
id?: string;
|
||||
body: string;
|
||||
sender?: string;
|
||||
};
|
||||
|
||||
function normalizeReplyField(value: unknown): string | undefined {
|
||||
if (typeof value === "string") {
|
||||
const trimmed = value.trim();
|
||||
return trimmed ? trimmed : undefined;
|
||||
}
|
||||
if (typeof value === "number") {
|
||||
return String(value);
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function describeReplyContext(message: IMessagePayload): IMessageReplyContext | null {
|
||||
const body = normalizeReplyField(message.reply_to_text);
|
||||
if (!body) {
|
||||
return null;
|
||||
}
|
||||
const id = normalizeReplyField(message.reply_to_id);
|
||||
const sender = normalizeReplyField(message.reply_to_sender);
|
||||
return { body, id, sender };
|
||||
}
|
||||
|
||||
/**
|
||||
* Cache for recently sent messages, used for echo detection.
|
||||
* Keys are scoped by conversation (accountId:target) so the same text in different chats is not conflated.
|
||||
@@ -248,165 +205,8 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
|
||||
});
|
||||
|
||||
async function handleMessageNow(message: IMessagePayload) {
|
||||
const senderRaw = message.sender ?? "";
|
||||
const sender = senderRaw.trim();
|
||||
if (!sender) {
|
||||
return;
|
||||
}
|
||||
const senderNormalized = normalizeIMessageHandle(sender);
|
||||
if (message.is_from_me) {
|
||||
return;
|
||||
}
|
||||
|
||||
const chatId = message.chat_id ?? undefined;
|
||||
const chatGuid = message.chat_guid ?? undefined;
|
||||
const chatIdentifier = message.chat_identifier ?? undefined;
|
||||
|
||||
const groupIdCandidate = chatId !== undefined ? String(chatId) : undefined;
|
||||
const groupListPolicy = groupIdCandidate
|
||||
? resolveChannelGroupPolicy({
|
||||
cfg,
|
||||
channel: "imessage",
|
||||
accountId: accountInfo.accountId,
|
||||
groupId: groupIdCandidate,
|
||||
})
|
||||
: {
|
||||
allowlistEnabled: false,
|
||||
allowed: true,
|
||||
groupConfig: undefined,
|
||||
defaultConfig: undefined,
|
||||
};
|
||||
|
||||
// Some iMessage threads can have multiple participants but still report
|
||||
// is_group=false depending on how Messages stores the identifier.
|
||||
// If the owner explicitly configures a chat_id under imessage.groups, treat
|
||||
// that thread as a "group" for permission gating and session isolation.
|
||||
const treatAsGroupByConfig = Boolean(
|
||||
groupIdCandidate && groupListPolicy.allowlistEnabled && groupListPolicy.groupConfig,
|
||||
);
|
||||
|
||||
const isGroup = Boolean(message.is_group) || treatAsGroupByConfig;
|
||||
if (isGroup && !chatId) {
|
||||
return;
|
||||
}
|
||||
|
||||
const groupId = isGroup ? groupIdCandidate : undefined;
|
||||
const storeAllowFrom = await readChannelAllowFromStore("imessage").catch(() => []);
|
||||
const effectiveDmAllowFrom = Array.from(new Set([...allowFrom, ...storeAllowFrom]))
|
||||
.map((v) => String(v).trim())
|
||||
.filter(Boolean);
|
||||
// Keep DM pairing-store authorization scoped to DMs; group access must come
|
||||
// from explicit group allowlist config.
|
||||
const effectiveGroupAllowFrom = Array.from(new Set(groupAllowFrom))
|
||||
.map((v) => String(v).trim())
|
||||
.filter(Boolean);
|
||||
|
||||
if (isGroup) {
|
||||
if (groupPolicy === "disabled") {
|
||||
logVerbose("Blocked iMessage group message (groupPolicy: disabled)");
|
||||
return;
|
||||
}
|
||||
if (groupPolicy === "allowlist") {
|
||||
if (effectiveGroupAllowFrom.length === 0) {
|
||||
logVerbose("Blocked iMessage group message (groupPolicy: allowlist, no groupAllowFrom)");
|
||||
return;
|
||||
}
|
||||
const allowed = isAllowedIMessageSender({
|
||||
allowFrom: effectiveGroupAllowFrom,
|
||||
sender,
|
||||
chatId: chatId ?? undefined,
|
||||
chatGuid,
|
||||
chatIdentifier,
|
||||
});
|
||||
if (!allowed) {
|
||||
logVerbose(`Blocked iMessage sender ${sender} (not in groupAllowFrom)`);
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (groupListPolicy.allowlistEnabled && !groupListPolicy.allowed) {
|
||||
logVerbose(`imessage: skipping group message (${groupId ?? "unknown"}) not in allowlist`);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
const dmHasWildcard = effectiveDmAllowFrom.includes("*");
|
||||
const dmAuthorized =
|
||||
dmPolicy === "open"
|
||||
? true
|
||||
: dmHasWildcard ||
|
||||
(effectiveDmAllowFrom.length > 0 &&
|
||||
isAllowedIMessageSender({
|
||||
allowFrom: effectiveDmAllowFrom,
|
||||
sender,
|
||||
chatId: chatId ?? undefined,
|
||||
chatGuid,
|
||||
chatIdentifier,
|
||||
}));
|
||||
if (!isGroup) {
|
||||
if (dmPolicy === "disabled") {
|
||||
return;
|
||||
}
|
||||
if (!dmAuthorized) {
|
||||
if (dmPolicy === "pairing") {
|
||||
const senderId = normalizeIMessageHandle(sender);
|
||||
const { code, created } = await upsertChannelPairingRequest({
|
||||
channel: "imessage",
|
||||
id: senderId,
|
||||
meta: {
|
||||
sender: senderId,
|
||||
chatId: chatId ? String(chatId) : undefined,
|
||||
},
|
||||
});
|
||||
if (created) {
|
||||
logVerbose(`imessage pairing request sender=${senderId}`);
|
||||
try {
|
||||
await sendMessageIMessage(
|
||||
sender,
|
||||
buildPairingReply({
|
||||
channel: "imessage",
|
||||
idLine: `Your iMessage sender id: ${senderId}`,
|
||||
code,
|
||||
}),
|
||||
{
|
||||
client,
|
||||
maxBytes: mediaMaxBytes,
|
||||
accountId: accountInfo.accountId,
|
||||
...(chatId ? { chatId } : {}),
|
||||
},
|
||||
);
|
||||
} catch (err) {
|
||||
logVerbose(`imessage pairing reply failed for ${senderId}: ${String(err)}`);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logVerbose(`Blocked iMessage sender ${sender} (dmPolicy=${dmPolicy})`);
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
const route = resolveAgentRoute({
|
||||
cfg,
|
||||
channel: "imessage",
|
||||
accountId: accountInfo.accountId,
|
||||
peer: {
|
||||
kind: isGroup ? "group" : "direct",
|
||||
id: isGroup ? String(chatId ?? "unknown") : normalizeIMessageHandle(sender),
|
||||
},
|
||||
});
|
||||
const mentionRegexes = buildMentionRegexes(cfg, route.agentId);
|
||||
const messageText = (message.text ?? "").trim();
|
||||
|
||||
// Echo detection: check if the received message matches a recently sent message (within 5 seconds).
|
||||
// Scope by conversation so same text in different chats is not conflated.
|
||||
const echoScope = `${accountInfo.accountId}:${isGroup ? formatIMessageChatTarget(chatId) : `imessage:${sender}`}`;
|
||||
if (messageText && sentMessageCache.has(echoScope, messageText)) {
|
||||
logVerbose(
|
||||
`imessage: skipping echo message (matches recently sent text within 5s): "${truncateUtf16Safe(messageText, 50)}"`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const attachments = includeAttachments ? (message.attachments ?? []) : [];
|
||||
// Filter to valid attachments with paths
|
||||
const validAttachments = attachments.filter((entry) => entry?.original_path && !entry?.missing);
|
||||
@@ -419,196 +219,103 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
|
||||
const kind = mediaKindFromMime(mediaType ?? undefined);
|
||||
const placeholder = kind ? `<media:${kind}>` : attachments?.length ? "<media:attachment>" : "";
|
||||
const bodyText = messageText || placeholder;
|
||||
if (!bodyText) {
|
||||
return;
|
||||
}
|
||||
const replyContext = describeReplyContext(message);
|
||||
const createdAt = message.created_at ? Date.parse(message.created_at) : undefined;
|
||||
const historyKey = isGroup
|
||||
? String(chatId ?? chatGuid ?? chatIdentifier ?? "unknown")
|
||||
: undefined;
|
||||
const mentioned = isGroup ? matchesMentionPatterns(messageText, mentionRegexes) : true;
|
||||
const requireMention = resolveChannelGroupRequireMention({
|
||||
|
||||
const storeAllowFrom = await readChannelAllowFromStore("imessage").catch(() => []);
|
||||
const decision = resolveIMessageInboundDecision({
|
||||
cfg,
|
||||
channel: "imessage",
|
||||
accountId: accountInfo.accountId,
|
||||
groupId,
|
||||
requireMentionOverride: opts.requireMention,
|
||||
overrideOrder: "before-config",
|
||||
message,
|
||||
opts,
|
||||
messageText,
|
||||
bodyText,
|
||||
allowFrom,
|
||||
groupAllowFrom,
|
||||
groupPolicy,
|
||||
dmPolicy,
|
||||
storeAllowFrom,
|
||||
historyLimit,
|
||||
groupHistories,
|
||||
echoCache: sentMessageCache,
|
||||
logVerbose,
|
||||
});
|
||||
const canDetectMention = mentionRegexes.length > 0;
|
||||
const useAccessGroups = cfg.commands?.useAccessGroups !== false;
|
||||
const ownerAllowedForCommands =
|
||||
effectiveDmAllowFrom.length > 0
|
||||
? isAllowedIMessageSender({
|
||||
allowFrom: effectiveDmAllowFrom,
|
||||
sender,
|
||||
chatId: chatId ?? undefined,
|
||||
chatGuid,
|
||||
chatIdentifier,
|
||||
})
|
||||
: false;
|
||||
const groupAllowedForCommands =
|
||||
effectiveGroupAllowFrom.length > 0
|
||||
? isAllowedIMessageSender({
|
||||
allowFrom: effectiveGroupAllowFrom,
|
||||
sender,
|
||||
chatId: chatId ?? undefined,
|
||||
chatGuid,
|
||||
chatIdentifier,
|
||||
})
|
||||
: false;
|
||||
const hasControlCommandInMessage = hasControlCommand(messageText, cfg);
|
||||
const commandGate = resolveControlCommandGate({
|
||||
useAccessGroups,
|
||||
authorizers: [
|
||||
{ configured: effectiveDmAllowFrom.length > 0, allowed: ownerAllowedForCommands },
|
||||
{ configured: effectiveGroupAllowFrom.length > 0, allowed: groupAllowedForCommands },
|
||||
],
|
||||
allowTextCommands: true,
|
||||
hasControlCommand: hasControlCommandInMessage,
|
||||
});
|
||||
const commandAuthorized = isGroup ? commandGate.commandAuthorized : dmAuthorized;
|
||||
if (isGroup && commandGate.shouldBlock) {
|
||||
logInboundDrop({
|
||||
log: logVerbose,
|
||||
channel: "imessage",
|
||||
reason: "control command (unauthorized)",
|
||||
target: sender,
|
||||
});
|
||||
return;
|
||||
}
|
||||
const shouldBypassMention =
|
||||
isGroup && requireMention && !mentioned && commandAuthorized && hasControlCommandInMessage;
|
||||
const effectiveWasMentioned = mentioned || shouldBypassMention;
|
||||
if (isGroup && requireMention && canDetectMention && !mentioned && !shouldBypassMention) {
|
||||
logVerbose(`imessage: skipping group message (no mention)`);
|
||||
recordPendingHistoryEntryIfEnabled({
|
||||
historyMap: groupHistories,
|
||||
historyKey: historyKey ?? "",
|
||||
limit: historyLimit,
|
||||
entry: historyKey
|
||||
? {
|
||||
sender: senderNormalized,
|
||||
body: bodyText,
|
||||
timestamp: createdAt,
|
||||
messageId: message.id ? String(message.id) : undefined,
|
||||
}
|
||||
: null,
|
||||
});
|
||||
|
||||
if (decision.kind === "drop") {
|
||||
return;
|
||||
}
|
||||
|
||||
const chatId = message.chat_id ?? undefined;
|
||||
if (decision.kind === "pairing") {
|
||||
const sender = (message.sender ?? "").trim();
|
||||
if (!sender) {
|
||||
return;
|
||||
}
|
||||
const { code, created } = await upsertChannelPairingRequest({
|
||||
channel: "imessage",
|
||||
id: decision.senderId,
|
||||
meta: {
|
||||
sender: decision.senderId,
|
||||
chatId: chatId ? String(chatId) : undefined,
|
||||
},
|
||||
});
|
||||
if (created) {
|
||||
logVerbose(`imessage pairing request sender=${decision.senderId}`);
|
||||
try {
|
||||
await sendMessageIMessage(
|
||||
sender,
|
||||
buildPairingReply({
|
||||
channel: "imessage",
|
||||
idLine: `Your iMessage sender id: ${decision.senderId}`,
|
||||
code,
|
||||
}),
|
||||
{
|
||||
client,
|
||||
maxBytes: mediaMaxBytes,
|
||||
accountId: accountInfo.accountId,
|
||||
...(chatId ? { chatId } : {}),
|
||||
},
|
||||
);
|
||||
} catch (err) {
|
||||
logVerbose(`imessage pairing reply failed for ${decision.senderId}: ${String(err)}`);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
const chatTarget = formatIMessageChatTarget(chatId);
|
||||
const fromLabel = formatInboundFromLabel({
|
||||
isGroup,
|
||||
groupLabel: message.chat_name ?? undefined,
|
||||
groupId: chatId !== undefined ? String(chatId) : "unknown",
|
||||
groupFallback: "Group",
|
||||
directLabel: senderNormalized,
|
||||
directId: sender,
|
||||
});
|
||||
const storePath = resolveStorePath(cfg.session?.store, {
|
||||
agentId: route.agentId,
|
||||
agentId: decision.route.agentId,
|
||||
});
|
||||
const envelopeOptions = resolveEnvelopeFormatOptions(cfg);
|
||||
const previousTimestamp = readSessionUpdatedAt({
|
||||
storePath,
|
||||
sessionKey: route.sessionKey,
|
||||
sessionKey: decision.route.sessionKey,
|
||||
});
|
||||
const replySuffix = replyContext
|
||||
? `\n\n[Replying to ${replyContext.sender ?? "unknown sender"}${
|
||||
replyContext.id ? ` id:${replyContext.id}` : ""
|
||||
}]\n${replyContext.body}\n[/Replying]`
|
||||
: "";
|
||||
const body = formatInboundEnvelope({
|
||||
channel: "iMessage",
|
||||
from: fromLabel,
|
||||
timestamp: createdAt,
|
||||
body: `${bodyText}${replySuffix}`,
|
||||
chatType: isGroup ? "group" : "direct",
|
||||
sender: { name: senderNormalized, id: sender },
|
||||
const { ctxPayload, chatTarget } = buildIMessageInboundContext({
|
||||
cfg,
|
||||
decision,
|
||||
message,
|
||||
previousTimestamp,
|
||||
envelope: envelopeOptions,
|
||||
});
|
||||
let combinedBody = body;
|
||||
if (isGroup && historyKey) {
|
||||
combinedBody = buildPendingHistoryContextFromMap({
|
||||
historyMap: groupHistories,
|
||||
historyKey,
|
||||
limit: historyLimit,
|
||||
currentMessage: combinedBody,
|
||||
formatEntry: (entry) =>
|
||||
formatInboundEnvelope({
|
||||
channel: "iMessage",
|
||||
from: fromLabel,
|
||||
timestamp: entry.timestamp,
|
||||
body: `${entry.body}${entry.messageId ? ` [id:${entry.messageId}]` : ""}`,
|
||||
chatType: "group",
|
||||
senderLabel: entry.sender,
|
||||
envelope: envelopeOptions,
|
||||
}),
|
||||
});
|
||||
}
|
||||
|
||||
const imessageTo = (isGroup ? chatTarget : undefined) || `imessage:${sender}`;
|
||||
const inboundHistory =
|
||||
isGroup && historyKey && historyLimit > 0
|
||||
? (groupHistories.get(historyKey) ?? []).map((entry) => ({
|
||||
sender: entry.sender,
|
||||
body: entry.body,
|
||||
timestamp: entry.timestamp,
|
||||
}))
|
||||
: undefined;
|
||||
const ctxPayload = finalizeInboundContext({
|
||||
Body: combinedBody,
|
||||
BodyForAgent: bodyText,
|
||||
InboundHistory: inboundHistory,
|
||||
RawBody: bodyText,
|
||||
CommandBody: bodyText,
|
||||
From: isGroup ? `imessage:group:${chatId ?? "unknown"}` : `imessage:${sender}`,
|
||||
To: imessageTo,
|
||||
SessionKey: route.sessionKey,
|
||||
AccountId: route.accountId,
|
||||
ChatType: isGroup ? "group" : "direct",
|
||||
ConversationLabel: fromLabel,
|
||||
GroupSubject: isGroup ? (message.chat_name ?? undefined) : undefined,
|
||||
GroupMembers: isGroup ? (message.participants ?? []).filter(Boolean).join(", ") : undefined,
|
||||
SenderName: senderNormalized,
|
||||
SenderId: sender,
|
||||
Provider: "imessage",
|
||||
Surface: "imessage",
|
||||
MessageSid: message.id ? String(message.id) : undefined,
|
||||
ReplyToId: replyContext?.id,
|
||||
ReplyToBody: replyContext?.body,
|
||||
ReplyToSender: replyContext?.sender,
|
||||
Timestamp: createdAt,
|
||||
MediaPath: mediaPath,
|
||||
MediaType: mediaType,
|
||||
MediaUrl: mediaPath,
|
||||
MediaPaths: mediaPaths.length > 0 ? mediaPaths : undefined,
|
||||
MediaTypes: mediaTypes.length > 0 ? mediaTypes : undefined,
|
||||
MediaUrls: mediaPaths.length > 0 ? mediaPaths : undefined,
|
||||
MediaRemoteHost: remoteHost,
|
||||
WasMentioned: effectiveWasMentioned,
|
||||
CommandAuthorized: commandAuthorized,
|
||||
// Originating channel for reply routing.
|
||||
OriginatingChannel: "imessage" as const,
|
||||
OriginatingTo: imessageTo,
|
||||
remoteHost,
|
||||
historyLimit,
|
||||
groupHistories,
|
||||
media: {
|
||||
path: mediaPath,
|
||||
type: mediaType,
|
||||
paths: mediaPaths,
|
||||
types: mediaTypes,
|
||||
},
|
||||
});
|
||||
|
||||
const updateTarget = (isGroup ? chatTarget : undefined) || sender;
|
||||
const updateTarget = chatTarget || decision.sender;
|
||||
await recordInboundSession({
|
||||
storePath,
|
||||
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
|
||||
sessionKey: ctxPayload.SessionKey ?? decision.route.sessionKey,
|
||||
ctx: ctxPayload,
|
||||
updateLastRoute:
|
||||
!isGroup && updateTarget
|
||||
!decision.isGroup && updateTarget
|
||||
? {
|
||||
sessionKey: route.mainSessionKey,
|
||||
sessionKey: decision.route.mainSessionKey,
|
||||
channel: "imessage",
|
||||
to: updateTarget,
|
||||
accountId: route.accountId,
|
||||
accountId: decision.route.accountId,
|
||||
}
|
||||
: undefined,
|
||||
onRecordError: (err) => {
|
||||
@@ -617,26 +324,33 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
|
||||
});
|
||||
|
||||
if (shouldLogVerbose()) {
|
||||
const preview = truncateUtf16Safe(body, 200).replace(/\n/g, "\\n");
|
||||
const preview = truncateUtf16Safe(String(ctxPayload.Body ?? ""), 200).replace(/\n/g, "\\n");
|
||||
logVerbose(
|
||||
`imessage inbound: chatId=${chatId ?? "unknown"} from=${ctxPayload.From} len=${body.length} preview="${preview}"`,
|
||||
`imessage inbound: chatId=${chatId ?? "unknown"} from=${ctxPayload.From} len=${
|
||||
String(ctxPayload.Body ?? "").length
|
||||
} preview="${preview}"`,
|
||||
);
|
||||
}
|
||||
|
||||
const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({
|
||||
cfg,
|
||||
agentId: route.agentId,
|
||||
agentId: decision.route.agentId,
|
||||
channel: "imessage",
|
||||
accountId: route.accountId,
|
||||
accountId: decision.route.accountId,
|
||||
});
|
||||
|
||||
const dispatcher = createReplyDispatcher({
|
||||
...prefixOptions,
|
||||
humanDelay: resolveHumanDelayConfig(cfg, route.agentId),
|
||||
humanDelay: resolveHumanDelayConfig(cfg, decision.route.agentId),
|
||||
deliver: async (payload) => {
|
||||
const target = ctxPayload.To;
|
||||
if (!target) {
|
||||
runtime.error?.(danger("imessage: missing delivery target"));
|
||||
return;
|
||||
}
|
||||
await deliverReplies({
|
||||
replies: [payload],
|
||||
target: ctxPayload.To,
|
||||
target,
|
||||
client,
|
||||
accountId: accountInfo.accountId,
|
||||
runtime,
|
||||
@@ -664,17 +378,21 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
|
||||
});
|
||||
|
||||
if (!queuedFinal) {
|
||||
if (isGroup && historyKey) {
|
||||
if (decision.isGroup && decision.historyKey) {
|
||||
clearHistoryEntriesIfEnabled({
|
||||
historyMap: groupHistories,
|
||||
historyKey,
|
||||
historyKey: decision.historyKey,
|
||||
limit: historyLimit,
|
||||
});
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (isGroup && historyKey) {
|
||||
clearHistoryEntriesIfEnabled({ historyMap: groupHistories, historyKey, limit: historyLimit });
|
||||
if (decision.isGroup && decision.historyKey) {
|
||||
clearHistoryEntriesIfEnabled({
|
||||
historyMap: groupHistories,
|
||||
historyKey: decision.historyKey,
|
||||
limit: historyLimit,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -728,21 +446,11 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
|
||||
|
||||
let subscriptionId: number | null = null;
|
||||
const abort = opts.abortSignal;
|
||||
const onAbort = () => {
|
||||
if (subscriptionId) {
|
||||
void client
|
||||
.request("watch.unsubscribe", {
|
||||
subscription: subscriptionId,
|
||||
})
|
||||
.catch(() => {
|
||||
// Ignore disconnect errors during shutdown.
|
||||
});
|
||||
}
|
||||
void client.stop().catch(() => {
|
||||
// Ignore disconnect errors during shutdown.
|
||||
});
|
||||
};
|
||||
abort?.addEventListener("abort", onAbort, { once: true });
|
||||
const detachAbortHandler = attachIMessageMonitorAbortHandler({
|
||||
abortSignal: abort,
|
||||
client,
|
||||
getSubscriptionId: () => subscriptionId,
|
||||
});
|
||||
|
||||
try {
|
||||
const result = await client.request<{ subscription?: number }>("watch.subscribe", {
|
||||
@@ -757,7 +465,7 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
|
||||
runtime.error?.(danger(`imessage: monitor failed: ${String(err)}`));
|
||||
throw err;
|
||||
} finally {
|
||||
abort?.removeEventListener("abort", onAbort);
|
||||
detachAbortHandler();
|
||||
await client.stop();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user