From a6158873f579eb0247a2e714b25386e21283c1b7 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 15 Feb 2026 19:17:57 +0000 Subject: [PATCH] refactor(imessage): split monitor inbound processing --- src/imessage/monitor.gating.test.ts | 355 ++++++++++++ ...nitor.shutdown.unhandled-rejection.test.ts | 49 ++ ...essages-without-mention-by-default.test.ts | 546 ------------------ src/imessage/monitor.test-harness.ts | 170 ------ src/imessage/monitor/abort-handler.ts | 34 ++ src/imessage/monitor/inbound-processing.ts | 483 ++++++++++++++++ src/imessage/monitor/monitor-provider.ts | 508 ++++------------ 7 files changed, 1029 insertions(+), 1116 deletions(-) create mode 100644 src/imessage/monitor.gating.test.ts create mode 100644 src/imessage/monitor.shutdown.unhandled-rejection.test.ts delete mode 100644 src/imessage/monitor.skips-group-messages-without-mention-by-default.test.ts delete mode 100644 src/imessage/monitor.test-harness.ts create mode 100644 src/imessage/monitor/abort-handler.ts create mode 100644 src/imessage/monitor/inbound-processing.ts diff --git a/src/imessage/monitor.gating.test.ts b/src/imessage/monitor.gating.test.ts new file mode 100644 index 00000000000..16068b38ea3 --- /dev/null +++ b/src/imessage/monitor.gating.test.ts @@ -0,0 +1,355 @@ +import { describe, expect, it } from "vitest"; +import type { OpenClawConfig } from "../config/config.js"; +import type { IMessagePayload } from "./monitor/types.js"; +import { + buildIMessageInboundContext, + resolveIMessageInboundDecision, +} from "./monitor/inbound-processing.js"; +import { parseIMessageNotification } from "./monitor/parse-notification.js"; + +function baseCfg(): OpenClawConfig { + return { + channels: { + imessage: { + dmPolicy: "open", + allowFrom: ["*"], + groupPolicy: "open", + groups: { "*": { requireMention: true } }, + }, + }, + session: { mainKey: "main" }, + messages: { + groupChat: { mentionPatterns: ["@openclaw"] }, + }, + } as unknown as OpenClawConfig; +} + +function resolve(params: { + cfg?: OpenClawConfig; + message: IMessagePayload; + storeAllowFrom?: string[]; +}) { + const cfg = params.cfg ?? baseCfg(); + const groupHistories = new Map(); + return resolveIMessageInboundDecision({ + cfg, + accountId: "default", + message: params.message, + opts: {}, + messageText: (params.message.text ?? "").trim(), + bodyText: (params.message.text ?? "").trim(), + allowFrom: ["*"], + groupAllowFrom: [], + groupPolicy: cfg.channels?.imessage?.groupPolicy ?? "open", + dmPolicy: cfg.channels?.imessage?.dmPolicy ?? "pairing", + storeAllowFrom: params.storeAllowFrom ?? [], + historyLimit: 0, + groupHistories, + }); +} + +describe("imessage monitor gating + envelope builders", () => { + it("parseIMessageNotification rejects malformed payloads", () => { + expect( + parseIMessageNotification({ + message: { chat_id: 1, sender: { nested: "nope" } }, + }), + ).toBeNull(); + }); + + it("drops group messages without mention by default", () => { + const decision = resolve({ + message: { + id: 1, + chat_id: 99, + sender: "+15550001111", + is_from_me: false, + text: "hello group", + is_group: true, + }, + }); + expect(decision.kind).toBe("drop"); + if (decision.kind !== "drop") { + throw new Error("expected drop decision"); + } + expect(decision.reason).toBe("no mention"); + }); + + it("dispatches group messages with mention and builds a group envelope", () => { + const cfg = baseCfg(); + const groupHistories = new Map(); + const message: IMessagePayload = { + id: 3, + chat_id: 42, + sender: "+15550002222", + is_from_me: false, + text: "@openclaw ping", + is_group: true, + chat_name: "Lobster Squad", + participants: ["+1555", "+1556"], + }; + const decision = resolveIMessageInboundDecision({ + cfg, + accountId: "default", + message, + opts: {}, + messageText: message.text, + bodyText: message.text, + allowFrom: ["*"], + groupAllowFrom: [], + groupPolicy: "open", + dmPolicy: "open", + storeAllowFrom: [], + historyLimit: 0, + groupHistories, + }); + expect(decision.kind).toBe("dispatch"); + + const { ctxPayload } = buildIMessageInboundContext({ + cfg, + decision, + message, + historyLimit: 0, + groupHistories, + }); + + expect(ctxPayload.ChatType).toBe("group"); + expect(ctxPayload.SessionKey).toBe("agent:main:imessage:group:42"); + expect(String(ctxPayload.Body ?? "")).toContain("+15550002222:"); + expect(String(ctxPayload.Body ?? "")).not.toContain("[from:"); + expect(ctxPayload.To).toBe("chat_id:42"); + }); + + it("includes reply-to context fields + suffix", () => { + const cfg = baseCfg(); + const groupHistories = new Map(); + const message: IMessagePayload = { + id: 5, + chat_id: 55, + sender: "+15550001111", + is_from_me: false, + text: "replying now", + is_group: false, + reply_to_id: 9001, + reply_to_text: "original message", + reply_to_sender: "+15559998888", + }; + const decision = resolveIMessageInboundDecision({ + cfg, + accountId: "default", + message, + opts: {}, + messageText: message.text, + bodyText: message.text, + allowFrom: ["*"], + groupAllowFrom: [], + groupPolicy: "open", + dmPolicy: "open", + storeAllowFrom: [], + historyLimit: 0, + groupHistories, + }); + expect(decision.kind).toBe("dispatch"); + + const { ctxPayload } = buildIMessageInboundContext({ + cfg, + decision, + message, + historyLimit: 0, + groupHistories, + }); + + expect(ctxPayload.ReplyToId).toBe("9001"); + expect(ctxPayload.ReplyToBody).toBe("original message"); + expect(ctxPayload.ReplyToSender).toBe("+15559998888"); + expect(String(ctxPayload.Body ?? "")).toContain("[Replying to +15559998888 id:9001]"); + expect(String(ctxPayload.Body ?? "")).toContain("original message"); + }); + + it("treats configured chat_id as a group session even when is_group is false", () => { + const cfg = baseCfg(); + cfg.channels ??= {}; + cfg.channels.imessage ??= {}; + cfg.channels.imessage.groups = { "2": { requireMention: false } }; + + const groupHistories = new Map(); + const message: IMessagePayload = { + id: 14, + chat_id: 2, + sender: "+15550001111", + is_from_me: false, + text: "hello", + is_group: false, + }; + const decision = resolveIMessageInboundDecision({ + cfg, + accountId: "default", + message, + opts: {}, + messageText: message.text, + bodyText: message.text, + allowFrom: ["*"], + groupAllowFrom: [], + groupPolicy: "open", + dmPolicy: "open", + storeAllowFrom: [], + historyLimit: 0, + groupHistories, + }); + expect(decision.kind).toBe("dispatch"); + expect(decision.isGroup).toBe(true); + expect(decision.route.sessionKey).toBe("agent:main:imessage:group:2"); + }); + + it("allows group messages when requireMention is true but no mentionPatterns exist", () => { + const cfg = baseCfg(); + cfg.messages ??= {}; + cfg.messages.groupChat ??= {}; + cfg.messages.groupChat.mentionPatterns = []; + + const groupHistories = new Map(); + const decision = resolveIMessageInboundDecision({ + cfg, + accountId: "default", + message: { + id: 12, + chat_id: 777, + sender: "+15550001111", + is_from_me: false, + text: "hello group", + is_group: true, + }, + opts: {}, + messageText: "hello group", + bodyText: "hello group", + allowFrom: ["*"], + groupAllowFrom: [], + groupPolicy: "open", + dmPolicy: "open", + storeAllowFrom: [], + historyLimit: 0, + groupHistories, + }); + expect(decision.kind).toBe("dispatch"); + }); + + it("blocks group messages when imessage.groups is set without a wildcard", () => { + const cfg = baseCfg(); + cfg.channels ??= {}; + cfg.channels.imessage ??= {}; + cfg.channels.imessage.groups = { "99": { requireMention: false } }; + + const groupHistories = new Map(); + const decision = resolveIMessageInboundDecision({ + cfg, + accountId: "default", + message: { + id: 13, + chat_id: 123, + sender: "+15550001111", + is_from_me: false, + text: "@openclaw hello", + is_group: true, + }, + opts: {}, + messageText: "@openclaw hello", + bodyText: "@openclaw hello", + allowFrom: ["*"], + groupAllowFrom: [], + groupPolicy: "open", + dmPolicy: "open", + storeAllowFrom: [], + historyLimit: 0, + groupHistories, + }); + expect(decision.kind).toBe("drop"); + }); + + it("honors group allowlist and ignores pairing-store senders in groups", () => { + const cfg = baseCfg(); + cfg.channels ??= {}; + cfg.channels.imessage ??= {}; + cfg.channels.imessage.groupPolicy = "allowlist"; + + const groupHistories = new Map(); + const denied = resolveIMessageInboundDecision({ + cfg, + accountId: "default", + message: { + id: 3, + chat_id: 202, + sender: "+15550003333", + is_from_me: false, + text: "@openclaw hi", + is_group: true, + }, + opts: {}, + messageText: "@openclaw hi", + bodyText: "@openclaw hi", + allowFrom: ["*"], + groupAllowFrom: ["chat_id:101"], + groupPolicy: "allowlist", + dmPolicy: "pairing", + storeAllowFrom: ["+15550003333"], + historyLimit: 0, + groupHistories, + }); + expect(denied.kind).toBe("drop"); + + const allowed = resolveIMessageInboundDecision({ + cfg, + accountId: "default", + message: { + id: 33, + chat_id: 101, + sender: "+15550003333", + is_from_me: false, + text: "@openclaw ok", + is_group: true, + }, + opts: {}, + messageText: "@openclaw ok", + bodyText: "@openclaw ok", + allowFrom: ["*"], + groupAllowFrom: ["chat_id:101"], + groupPolicy: "allowlist", + dmPolicy: "pairing", + storeAllowFrom: ["+15550003333"], + historyLimit: 0, + groupHistories, + }); + expect(allowed.kind).toBe("dispatch"); + }); + + it("blocks group messages when groupPolicy is disabled", () => { + const cfg = baseCfg(); + cfg.channels ??= {}; + cfg.channels.imessage ??= {}; + cfg.channels.imessage.groupPolicy = "disabled"; + + const groupHistories = new Map(); + const decision = resolveIMessageInboundDecision({ + cfg, + accountId: "default", + message: { + id: 10, + chat_id: 303, + sender: "+15550003333", + is_from_me: false, + text: "@openclaw hi", + is_group: true, + }, + opts: {}, + messageText: "@openclaw hi", + bodyText: "@openclaw hi", + allowFrom: ["*"], + groupAllowFrom: [], + groupPolicy: "disabled", + dmPolicy: "open", + storeAllowFrom: [], + historyLimit: 0, + groupHistories, + }); + expect(decision.kind).toBe("drop"); + }); +}); diff --git a/src/imessage/monitor.shutdown.unhandled-rejection.test.ts b/src/imessage/monitor.shutdown.unhandled-rejection.test.ts new file mode 100644 index 00000000000..ecc85991a41 --- /dev/null +++ b/src/imessage/monitor.shutdown.unhandled-rejection.test.ts @@ -0,0 +1,49 @@ +import { describe, expect, it, vi } from "vitest"; +import { attachIMessageMonitorAbortHandler } from "./monitor/abort-handler.js"; + +describe("monitorIMessageProvider", () => { + it("does not trigger unhandledRejection when aborting during shutdown", async () => { + const abortController = new AbortController(); + let subscriptionId: number | null = 1; + const requestMock = vi.fn((method: string, _params?: Record) => { + if (method === "watch.unsubscribe") { + return Promise.reject(new Error("imsg rpc closed")); + } + return Promise.resolve({}); + }); + const stopMock = vi.fn(async () => {}); + + const unhandled: unknown[] = []; + const onUnhandled = (reason: unknown) => { + unhandled.push(reason); + }; + process.on("unhandledRejection", onUnhandled); + + try { + const detach = attachIMessageMonitorAbortHandler({ + abortSignal: abortController.signal, + client: { + request: requestMock, + stop: stopMock, + }, + getSubscriptionId: () => subscriptionId, + }); + abortController.abort(); + // Give the event loop a turn to surface any unhandledRejection, if present. + await new Promise((resolve) => { + if (typeof setImmediate === "function") { + setImmediate(resolve); + return; + } + setTimeout(resolve, 0); + }); + detach(); + } finally { + process.off("unhandledRejection", onUnhandled); + } + + expect(unhandled).toHaveLength(0); + expect(stopMock).toHaveBeenCalled(); + expect(requestMock).toHaveBeenCalledWith("watch.unsubscribe", { subscription: 1 }); + }); +}); diff --git a/src/imessage/monitor.skips-group-messages-without-mention-by-default.test.ts b/src/imessage/monitor.skips-group-messages-without-mention-by-default.test.ts deleted file mode 100644 index ae85c17e042..00000000000 --- a/src/imessage/monitor.skips-group-messages-without-mention-by-default.test.ts +++ /dev/null @@ -1,546 +0,0 @@ -import { beforeAll, describe, expect, it } from "vitest"; -import { - flush, - getCloseResolve, - getConfigMock, - getReadAllowFromStoreMock, - getNotificationHandler, - getReplyMock, - getRequestMock, - getSendMock, - getStopMock, - getUpsertPairingRequestMock, - getUpdateLastRouteMock, - installMonitorIMessageProviderTestHooks, - setConfigMock, - waitForSubscribe, -} from "./monitor.test-harness.js"; - -installMonitorIMessageProviderTestHooks(); - -let monitorIMessageProvider: typeof import("./monitor.js").monitorIMessageProvider; - -beforeAll(async () => { - ({ monitorIMessageProvider } = await import("./monitor.js")); -}); - -function startMonitor() { - return monitorIMessageProvider(); -} -const replyMock = getReplyMock(); -const requestMock = getRequestMock(); -const sendMock = getSendMock(); -const readAllowFromStoreMock = getReadAllowFromStoreMock(); -const stopMock = getStopMock(); -const upsertPairingRequestMock = getUpsertPairingRequestMock(); -const updateLastRouteMock = getUpdateLastRouteMock(); - -type TestConfig = { - channels: Record & { imessage: Record }; - messages: Record; - session: Record; - [k: string]: unknown; -}; - -function getConfig(): TestConfig { - return getConfigMock() as unknown as TestConfig; -} - -function notifyMessage(message: unknown) { - getNotificationHandler()?.({ - method: "message", - params: { message }, - }); -} - -async function closeMonitor() { - for (let i = 0; i < 50; i += 1) { - const close = getCloseResolve(); - if (close) { - close(); - return; - } - await Promise.resolve(); - } - for (let i = 0; i < 5; i += 1) { - const close = getCloseResolve(); - if (close) { - close(); - return; - } - await flush(); - } - throw new Error("imessage test harness: closeResolve not set"); -} - -describe("monitorIMessageProvider", () => { - it("handles default config gating, formatting, and reply context", async () => { - const run = startMonitor(); - await waitForSubscribe(); - - notifyMessage({ - id: 1, - sender: { nested: "not-a-string" }, - text: "hello", - }); - await flush(); - expect(replyMock).not.toHaveBeenCalled(); - expect(sendMock).not.toHaveBeenCalled(); - replyMock.mockClear(); - sendMock.mockClear(); - - notifyMessage({ - id: 2, - chat_id: 99, - sender: "+15550001111", - is_from_me: false, - text: "hello group", - is_group: true, - }); - await flush(); - expect(replyMock).not.toHaveBeenCalled(); - expect(sendMock).not.toHaveBeenCalled(); - replyMock.mockClear(); - sendMock.mockClear(); - - replyMock.mockResolvedValueOnce({ text: "yo" }); - notifyMessage({ - id: 3, - chat_id: 42, - sender: "+15550002222", - is_from_me: false, - text: "@openclaw ping", - is_group: true, - chat_name: "Lobster Squad", - participants: ["+1555", "+1556"], - }); - await flush(); - expect(replyMock).toHaveBeenCalledOnce(); - { - const ctx = replyMock.mock.calls[0]?.[0] as { Body?: string; ChatType?: string }; - expect(ctx.ChatType).toBe("group"); - // Sender should appear as prefix in group messages (no redundant [from:] suffix) - expect(String(ctx.Body ?? "")).toContain("+15550002222:"); - expect(String(ctx.Body ?? "")).not.toContain("[from:"); - } - expect(sendMock).toHaveBeenCalledWith( - "chat_id:42", - "yo", - expect.objectContaining({ client: expect.any(Object) }), - ); - replyMock.mockClear(); - sendMock.mockClear(); - - notifyMessage({ - id: 4, - chat_id: 99, - chat_name: "Test Group", - sender: "+15550001111", - is_from_me: false, - text: "@openclaw hi", - is_group: true, - created_at: "2026-01-17T00:00:00Z", - }); - await flush(); - expect(replyMock).toHaveBeenCalled(); - { - const ctx = replyMock.mock.calls[0]?.[0]; - const body = ctx?.Body ?? ""; - expect(body).toContain("Test Group id:99"); - expect(body).toContain("+15550001111: @openclaw hi"); - } - replyMock.mockClear(); - sendMock.mockClear(); - - notifyMessage({ - id: 5, - chat_id: 55, - sender: "+15550001111", - is_from_me: false, - text: "replying now", - is_group: false, - reply_to_id: 9001, - reply_to_text: "original message", - reply_to_sender: "+15559998888", - }); - await flush(); - expect(replyMock).toHaveBeenCalled(); - { - const ctx = replyMock.mock.calls[0]?.[0] as { - Body?: string; - ReplyToId?: string; - ReplyToBody?: string; - ReplyToSender?: string; - }; - expect(ctx.ReplyToId).toBe("9001"); - expect(ctx.ReplyToBody).toBe("original message"); - expect(ctx.ReplyToSender).toBe("+15559998888"); - expect(String(ctx.Body ?? "")).toContain("[Replying to +15559998888 id:9001]"); - expect(String(ctx.Body ?? "")).toContain("original message"); - } - expect(updateLastRouteMock).toHaveBeenCalledWith( - expect.objectContaining({ - deliveryContext: expect.objectContaining({ - channel: "imessage", - to: "+15550001111", - }), - }), - ); - - await closeMonitor(); - await run; - }); - - it("allows group messages when imessage groups default disables mention gating", async () => { - const config = getConfig(); - setConfigMock({ - ...config, - channels: { - ...config.channels, - imessage: { - ...config.channels.imessage, - groupPolicy: "open", - groups: { "*": { requireMention: false } }, - }, - }, - }); - - const run = startMonitor(); - await waitForSubscribe(); - - notifyMessage({ - id: 11, - chat_id: 123, - sender: "+15550001111", - is_from_me: false, - text: "hello group", - is_group: true, - }); - - await flush(); - await closeMonitor(); - await run; - - expect(replyMock).toHaveBeenCalled(); - }); - - it("allows group messages when requireMention is true but no mentionPatterns exist", async () => { - const config = getConfig(); - setConfigMock({ - ...config, - messages: { - ...config.messages, - groupChat: { mentionPatterns: [] }, - }, - channels: { - ...config.channels, - imessage: { - ...config.channels.imessage, - groupPolicy: "open", - groups: { "*": { requireMention: true } }, - }, - }, - }); - - const run = startMonitor(); - await waitForSubscribe(); - - notifyMessage({ - id: 12, - chat_id: 777, - sender: "+15550001111", - is_from_me: false, - text: "hello group", - is_group: true, - }); - - await flush(); - await closeMonitor(); - await run; - - expect(replyMock).toHaveBeenCalled(); - }); - - it("blocks group messages when imessage.groups is set without a wildcard", async () => { - const config = getConfig(); - setConfigMock({ - ...config, - channels: { - ...config.channels, - imessage: { - ...config.channels.imessage, - groups: { "99": { requireMention: false } }, - }, - }, - }); - - const run = startMonitor(); - await waitForSubscribe(); - - notifyMessage({ - id: 13, - chat_id: 123, - sender: "+15550001111", - is_from_me: false, - text: "@openclaw hello", - is_group: true, - }); - - await flush(); - await closeMonitor(); - await run; - - expect(replyMock).not.toHaveBeenCalled(); - expect(sendMock).not.toHaveBeenCalled(); - }); - - it("treats configured chat_id as a group session even when is_group is false", async () => { - const config = getConfig(); - setConfigMock({ - ...config, - channels: { - ...config.channels, - imessage: { - ...config.channels.imessage, - dmPolicy: "open", - allowFrom: ["*"], - groups: { "2": { requireMention: false } }, - }, - }, - }); - - const run = startMonitor(); - await waitForSubscribe(); - - notifyMessage({ - id: 14, - chat_id: 2, - sender: "+15550001111", - is_from_me: false, - text: "hello", - is_group: false, - }); - - await flush(); - await closeMonitor(); - await run; - - expect(replyMock).toHaveBeenCalled(); - const ctx = replyMock.mock.calls[0]?.[0] as { - ChatType?: string; - SessionKey?: string; - }; - expect(ctx.ChatType).toBe("group"); - expect(ctx.SessionKey).toBe("agent:main:imessage:group:2"); - }); - - it("prefixes final replies with responsePrefix", async () => { - const config = getConfig(); - setConfigMock({ - ...config, - messages: { - ...config.messages, - responsePrefix: "PFX", - }, - }); - replyMock.mockResolvedValue({ text: "final reply" }); - - const run = startMonitor(); - await waitForSubscribe(); - - notifyMessage({ - id: 7, - chat_id: 77, - sender: "+15550001111", - is_from_me: false, - text: "hello", - is_group: false, - }); - - await flush(); - await closeMonitor(); - await run; - - expect(sendMock).toHaveBeenCalledTimes(1); - expect(sendMock.mock.calls[0][1]).toBe("PFX final reply"); - }); - - it("defaults to dmPolicy=pairing behavior when allowFrom is empty", async () => { - const config = getConfig(); - setConfigMock({ - ...config, - channels: { - ...config.channels, - imessage: { - ...config.channels.imessage, - dmPolicy: "pairing", - allowFrom: [], - groups: { "*": { requireMention: true } }, - }, - }, - }); - - const run = startMonitor(); - await waitForSubscribe(); - - notifyMessage({ - id: 99, - chat_id: 77, - sender: "+15550001111", - is_from_me: false, - text: "hello", - is_group: false, - }); - - await flush(); - await closeMonitor(); - await run; - - expect(replyMock).not.toHaveBeenCalled(); - expect(upsertPairingRequestMock).toHaveBeenCalled(); - expect(sendMock).toHaveBeenCalledTimes(1); - expect(String(sendMock.mock.calls[0]?.[1] ?? "")).toContain( - "Your iMessage sender id: +15550001111", - ); - expect(String(sendMock.mock.calls[0]?.[1] ?? "")).toContain("Pairing code: PAIRCODE"); - }); - - it("honors group allowlist and ignores pairing-store senders in groups", async () => { - const config = getConfig(); - setConfigMock({ - ...config, - channels: { - ...config.channels, - imessage: { - ...config.channels.imessage, - dmPolicy: "pairing", - allowFrom: [], - groupPolicy: "allowlist", - groupAllowFrom: ["chat_id:101"], - }, - }, - }); - readAllowFromStoreMock.mockResolvedValue(["+15550003333"]); - - const run = startMonitor(); - await waitForSubscribe(); - - replyMock.mockClear(); - sendMock.mockClear(); - notifyMessage({ - id: 3, - chat_id: 202, - sender: "+15550003333", - is_from_me: false, - text: "@openclaw hi", - is_group: true, - }); - - await flush(); - expect(replyMock).not.toHaveBeenCalled(); - expect(sendMock).not.toHaveBeenCalled(); - - replyMock.mockClear(); - sendMock.mockClear(); - notifyMessage({ - id: 31, - chat_id: 202, - sender: "+15550003333", - is_from_me: false, - text: "/status", - is_group: true, - }); - - await flush(); - expect(replyMock).not.toHaveBeenCalled(); - expect(sendMock).not.toHaveBeenCalled(); - - replyMock.mockClear(); - sendMock.mockClear(); - notifyMessage({ - id: 33, - chat_id: 101, - sender: "+15550003333", - is_from_me: false, - text: "@openclaw ok", - is_group: true, - }); - - await flush(); - await closeMonitor(); - await run; - - expect(replyMock).toHaveBeenCalled(); - expect(sendMock).toHaveBeenCalled(); - }); - - it("blocks group messages when groupPolicy is disabled", async () => { - const config = getConfig(); - setConfigMock({ - ...config, - channels: { - ...config.channels, - imessage: { - ...config.channels.imessage, - groupPolicy: "disabled", - }, - }, - }); - - const run = startMonitor(); - await waitForSubscribe(); - - notifyMessage({ - id: 10, - chat_id: 303, - sender: "+15550003333", - is_from_me: false, - text: "@openclaw hi", - is_group: true, - }); - - await flush(); - await closeMonitor(); - await run; - - expect(replyMock).not.toHaveBeenCalled(); - }); - - it("does not trigger unhandledRejection when aborting during shutdown", async () => { - requestMock.mockImplementation((method: string) => { - if (method === "watch.subscribe") { - return Promise.resolve({ subscription: 1 }); - } - if (method === "watch.unsubscribe") { - return Promise.reject(new Error("imsg rpc closed")); - } - return Promise.resolve({}); - }); - - const abortController = new AbortController(); - const unhandled: unknown[] = []; - const onUnhandled = (reason: unknown) => { - unhandled.push(reason); - }; - process.on("unhandledRejection", onUnhandled); - - try { - const run = monitorIMessageProvider({ - abortSignal: abortController.signal, - }); - await waitForSubscribe(); - await flush(); - - abortController.abort(); - await flush(); - - await closeMonitor(); - await run; - } finally { - process.off("unhandledRejection", onUnhandled); - } - - expect(unhandled).toHaveLength(0); - expect(stopMock).toHaveBeenCalled(); - }); -}); diff --git a/src/imessage/monitor.test-harness.ts b/src/imessage/monitor.test-harness.ts deleted file mode 100644 index 2233c21bee1..00000000000 --- a/src/imessage/monitor.test-harness.ts +++ /dev/null @@ -1,170 +0,0 @@ -import { beforeEach, vi } from "vitest"; - -type NotificationHandler = (msg: { method: string; params?: unknown }) => void; - -// Avoid exporting vitest mock types (TS2742 under pnpm + d.ts emit). -// oxlint-disable-next-line typescript/no-explicit-any -type AnyMock = any; - -const state = vi.hoisted(() => ({ - requestMock: vi.fn(), - stopMock: vi.fn(), - sendMock: vi.fn(), - replyMock: vi.fn(), - updateLastRouteMock: vi.fn(), - readAllowFromStoreMock: vi.fn(), - upsertPairingRequestMock: vi.fn(), - config: {} as Record, - notificationHandler: undefined as NotificationHandler | undefined, - closeResolve: undefined as (() => void) | undefined, -})); - -export function getRequestMock(): AnyMock { - return state.requestMock; -} - -export function getStopMock(): AnyMock { - return state.stopMock; -} - -export function getSendMock(): AnyMock { - return state.sendMock; -} - -export function getReplyMock(): AnyMock { - return state.replyMock; -} - -export function getUpdateLastRouteMock(): AnyMock { - return state.updateLastRouteMock; -} - -export function getReadAllowFromStoreMock(): AnyMock { - return state.readAllowFromStoreMock; -} - -export function getUpsertPairingRequestMock(): AnyMock { - return state.upsertPairingRequestMock; -} - -export function getNotificationHandler(): NotificationHandler | undefined { - return state.notificationHandler; -} - -export function getCloseResolve(): (() => void) | undefined { - return state.closeResolve; -} - -export function setConfigMock(next: Record) { - state.config = next; -} - -export function getConfigMock() { - return state.config; -} - -vi.mock("../config/config.js", async (importOriginal) => { - const actual = await importOriginal(); - return { - ...actual, - loadConfig: () => state.config, - }; -}); - -vi.mock("../auto-reply/reply.js", () => ({ - getReplyFromConfig: (...args: unknown[]) => state.replyMock(...args), -})); - -vi.mock("./send.js", () => ({ - sendMessageIMessage: (...args: unknown[]) => state.sendMock(...args), -})); - -vi.mock("../pairing/pairing-store.js", () => ({ - readChannelAllowFromStore: (...args: unknown[]) => state.readAllowFromStoreMock(...args), - upsertChannelPairingRequest: (...args: unknown[]) => state.upsertPairingRequestMock(...args), -})); - -vi.mock("../config/sessions.js", () => ({ - resolveStorePath: vi.fn(() => "/tmp/openclaw-sessions.json"), - updateLastRoute: (...args: unknown[]) => state.updateLastRouteMock(...args), - readSessionUpdatedAt: vi.fn(() => undefined), - recordSessionMetaFromInbound: vi.fn().mockResolvedValue(undefined), -})); - -vi.mock("./client.js", () => ({ - createIMessageRpcClient: vi.fn(async (opts: { onNotification?: NotificationHandler }) => { - state.notificationHandler = opts.onNotification; - return { - request: (...args: unknown[]) => state.requestMock(...args), - waitForClose: () => - new Promise((resolve) => { - state.closeResolve = resolve; - }), - stop: (...args: unknown[]) => state.stopMock(...args), - }; - }), -})); - -vi.mock("./probe.js", () => ({ - probeIMessage: vi.fn(async () => ({ ok: true })), -})); - -export const flush = () => - new Promise((resolve) => { - if (typeof setImmediate === "function") { - setImmediate(resolve); - return; - } - setTimeout(resolve, 0); - }); - -export async function waitForSubscribe() { - for (let i = 0; i < 25; i += 1) { - if (state.requestMock.mock.calls.some((call) => call[0] === "watch.subscribe")) { - return; - } - // Prefer microtask turns over timers for speed. - await Promise.resolve(); - } - for (let i = 0; i < 5; i += 1) { - if (state.requestMock.mock.calls.some((call) => call[0] === "watch.subscribe")) { - return; - } - await flush(); - } - throw new Error("imessage test harness: watch.subscribe not observed"); -} - -export function installMonitorIMessageProviderTestHooks() { - beforeEach(() => { - state.config = { - channels: { - imessage: { - dmPolicy: "open", - allowFrom: ["*"], - groups: { "*": { requireMention: true } }, - }, - }, - session: { mainKey: "main" }, - messages: { - groupChat: { mentionPatterns: ["@openclaw"] }, - }, - }; - state.requestMock.mockReset().mockImplementation((method: string) => { - if (method === "watch.subscribe") { - return Promise.resolve({ subscription: 1 }); - } - return Promise.resolve({}); - }); - state.stopMock.mockReset().mockResolvedValue(undefined); - state.sendMock.mockReset().mockResolvedValue({ messageId: "ok" }); - state.replyMock.mockReset().mockResolvedValue({ text: "ok" }); - state.updateLastRouteMock.mockReset(); - state.readAllowFromStoreMock.mockReset().mockResolvedValue([]); - state.upsertPairingRequestMock - .mockReset() - .mockResolvedValue({ code: "PAIRCODE", created: true }); - state.notificationHandler = undefined; - state.closeResolve = undefined; - }); -} diff --git a/src/imessage/monitor/abort-handler.ts b/src/imessage/monitor/abort-handler.ts new file mode 100644 index 00000000000..bd5388260df --- /dev/null +++ b/src/imessage/monitor/abort-handler.ts @@ -0,0 +1,34 @@ +export type IMessageMonitorClient = { + request: (method: string, params?: Record) => Promise; + stop: () => Promise; +}; + +export function attachIMessageMonitorAbortHandler(params: { + abortSignal?: AbortSignal; + client: IMessageMonitorClient; + getSubscriptionId: () => number | null; +}): () => void { + const abort = params.abortSignal; + if (!abort) { + return () => {}; + } + + const onAbort = () => { + const subscriptionId = params.getSubscriptionId(); + if (subscriptionId) { + void params.client + .request("watch.unsubscribe", { + subscription: subscriptionId, + }) + .catch(() => { + // Ignore disconnect errors during shutdown. + }); + } + void params.client.stop().catch(() => { + // Ignore disconnect errors during shutdown. + }); + }; + + abort.addEventListener("abort", onAbort, { once: true }); + return () => abort.removeEventListener("abort", onAbort); +} diff --git a/src/imessage/monitor/inbound-processing.ts b/src/imessage/monitor/inbound-processing.ts new file mode 100644 index 00000000000..df8549c0734 --- /dev/null +++ b/src/imessage/monitor/inbound-processing.ts @@ -0,0 +1,483 @@ +import type { OpenClawConfig } from "../../config/config.js"; +import type { MonitorIMessageOpts, IMessagePayload } from "./types.js"; +import { hasControlCommand } from "../../auto-reply/command-detection.js"; +import { + formatInboundEnvelope, + formatInboundFromLabel, + resolveEnvelopeFormatOptions, + type EnvelopeFormatOptions, +} from "../../auto-reply/envelope.js"; +import { + buildPendingHistoryContextFromMap, + recordPendingHistoryEntryIfEnabled, + type HistoryEntry, +} from "../../auto-reply/reply/history.js"; +import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js"; +import { buildMentionRegexes, matchesMentionPatterns } from "../../auto-reply/reply/mentions.js"; +import { resolveControlCommandGate } from "../../channels/command-gating.js"; +import { logInboundDrop } from "../../channels/logging.js"; +import { + resolveChannelGroupPolicy, + resolveChannelGroupRequireMention, +} from "../../config/group-policy.js"; +import { resolveAgentRoute } from "../../routing/resolve-route.js"; +import { truncateUtf16Safe } from "../../utils.js"; +import { + formatIMessageChatTarget, + isAllowedIMessageSender, + normalizeIMessageHandle, +} from "../targets.js"; + +type IMessageReplyContext = { + id?: string; + body: string; + sender?: string; +}; + +function normalizeReplyField(value: unknown): string | undefined { + if (typeof value === "string") { + const trimmed = value.trim(); + return trimmed ? trimmed : undefined; + } + if (typeof value === "number") { + return String(value); + } + return undefined; +} + +function describeReplyContext(message: IMessagePayload): IMessageReplyContext | null { + const body = normalizeReplyField(message.reply_to_text); + if (!body) { + return null; + } + const id = normalizeReplyField(message.reply_to_id); + const sender = normalizeReplyField(message.reply_to_sender); + return { body, id, sender }; +} + +export type IMessageInboundDispatchDecision = { + kind: "dispatch"; + isGroup: boolean; + chatId?: number; + chatGuid?: string; + chatIdentifier?: string; + groupId?: string; + historyKey?: string; + sender: string; + senderNormalized: string; + route: ReturnType; + bodyText: string; + createdAt?: number; + replyContext: IMessageReplyContext | null; + effectiveWasMentioned: boolean; + commandAuthorized: boolean; + // Used for allowlist checks for control commands. + effectiveDmAllowFrom: string[]; + effectiveGroupAllowFrom: string[]; +}; + +export type IMessageInboundDecision = + | { kind: "drop"; reason: string } + | { kind: "pairing"; senderId: string } + | IMessageInboundDispatchDecision; + +export function resolveIMessageInboundDecision(params: { + cfg: OpenClawConfig; + accountId: string; + message: IMessagePayload; + opts?: Pick; + messageText: string; + bodyText: string; + allowFrom: string[]; + groupAllowFrom: string[]; + groupPolicy: string; + dmPolicy: string; + storeAllowFrom: string[]; + historyLimit: number; + groupHistories: Map; + echoCache?: { has: (scope: string, text: string) => boolean }; + logVerbose?: (msg: string) => void; +}): IMessageInboundDecision { + const senderRaw = params.message.sender ?? ""; + const sender = senderRaw.trim(); + if (!sender) { + return { kind: "drop", reason: "missing sender" }; + } + const senderNormalized = normalizeIMessageHandle(sender); + if (params.message.is_from_me) { + return { kind: "drop", reason: "from me" }; + } + + const chatId = params.message.chat_id ?? undefined; + const chatGuid = params.message.chat_guid ?? undefined; + const chatIdentifier = params.message.chat_identifier ?? undefined; + + const groupIdCandidate = chatId !== undefined ? String(chatId) : undefined; + const groupListPolicy = groupIdCandidate + ? resolveChannelGroupPolicy({ + cfg: params.cfg, + channel: "imessage", + accountId: params.accountId, + groupId: groupIdCandidate, + }) + : { + allowlistEnabled: false, + allowed: true, + groupConfig: undefined, + defaultConfig: undefined, + }; + + // If the owner explicitly configures a chat_id under imessage.groups, treat that thread as a + // "group" for permission gating + session isolation, even when is_group=false. + const treatAsGroupByConfig = Boolean( + groupIdCandidate && groupListPolicy.allowlistEnabled && groupListPolicy.groupConfig, + ); + const isGroup = Boolean(params.message.is_group) || treatAsGroupByConfig; + if (isGroup && !chatId) { + return { kind: "drop", reason: "group without chat_id" }; + } + + const groupId = isGroup ? groupIdCandidate : undefined; + const effectiveDmAllowFrom = Array.from(new Set([...params.allowFrom, ...params.storeAllowFrom])) + .map((v) => String(v).trim()) + .filter(Boolean); + // Keep DM pairing-store authorization scoped to DMs; group access must come from explicit group allowlist config. + const effectiveGroupAllowFrom = Array.from(new Set(params.groupAllowFrom)) + .map((v) => String(v).trim()) + .filter(Boolean); + + if (isGroup) { + if (params.groupPolicy === "disabled") { + params.logVerbose?.("Blocked iMessage group message (groupPolicy: disabled)"); + return { kind: "drop", reason: "groupPolicy disabled" }; + } + if (params.groupPolicy === "allowlist") { + if (effectiveGroupAllowFrom.length === 0) { + params.logVerbose?.( + "Blocked iMessage group message (groupPolicy: allowlist, no groupAllowFrom)", + ); + return { kind: "drop", reason: "groupPolicy allowlist (empty groupAllowFrom)" }; + } + const allowed = isAllowedIMessageSender({ + allowFrom: effectiveGroupAllowFrom, + sender, + chatId, + chatGuid, + chatIdentifier, + }); + if (!allowed) { + params.logVerbose?.(`Blocked iMessage sender ${sender} (not in groupAllowFrom)`); + return { kind: "drop", reason: "not in groupAllowFrom" }; + } + } + if (groupListPolicy.allowlistEnabled && !groupListPolicy.allowed) { + params.logVerbose?.( + `imessage: skipping group message (${groupId ?? "unknown"}) not in allowlist`, + ); + return { kind: "drop", reason: "group id not in allowlist" }; + } + } + + const dmHasWildcard = effectiveDmAllowFrom.includes("*"); + const dmAuthorized = + params.dmPolicy === "open" + ? true + : dmHasWildcard || + (effectiveDmAllowFrom.length > 0 && + isAllowedIMessageSender({ + allowFrom: effectiveDmAllowFrom, + sender, + chatId, + chatGuid, + chatIdentifier, + })); + + if (!isGroup) { + if (params.dmPolicy === "disabled") { + return { kind: "drop", reason: "dmPolicy disabled" }; + } + if (!dmAuthorized) { + if (params.dmPolicy === "pairing") { + return { kind: "pairing", senderId: senderNormalized }; + } + params.logVerbose?.(`Blocked iMessage sender ${sender} (dmPolicy=${params.dmPolicy})`); + return { kind: "drop", reason: "dmPolicy blocked" }; + } + } + + const route = resolveAgentRoute({ + cfg: params.cfg, + channel: "imessage", + accountId: params.accountId, + peer: { + kind: isGroup ? "group" : "direct", + id: isGroup ? String(chatId ?? "unknown") : senderNormalized, + }, + }); + const mentionRegexes = buildMentionRegexes(params.cfg, route.agentId); + const messageText = params.messageText.trim(); + const bodyText = params.bodyText.trim(); + if (!bodyText) { + return { kind: "drop", reason: "empty body" }; + } + + // Echo detection: check if the received message matches a recently sent message (within 5 seconds). + // Scope by conversation so same text in different chats is not conflated. + if (params.echoCache && messageText) { + const echoScope = buildIMessageEchoScope({ + accountId: params.accountId, + isGroup, + chatId, + sender, + }); + if (params.echoCache.has(echoScope, messageText)) { + params.logVerbose?.(describeIMessageEchoDropLog({ messageText })); + return { kind: "drop", reason: "echo" }; + } + } + + const replyContext = describeReplyContext(params.message); + const createdAt = params.message.created_at ? Date.parse(params.message.created_at) : undefined; + const historyKey = isGroup + ? String(chatId ?? chatGuid ?? chatIdentifier ?? "unknown") + : undefined; + + const mentioned = isGroup ? matchesMentionPatterns(messageText, mentionRegexes) : true; + const requireMention = resolveChannelGroupRequireMention({ + cfg: params.cfg, + channel: "imessage", + accountId: params.accountId, + groupId, + requireMentionOverride: params.opts?.requireMention, + overrideOrder: "before-config", + }); + const canDetectMention = mentionRegexes.length > 0; + + const useAccessGroups = params.cfg.commands?.useAccessGroups !== false; + const ownerAllowedForCommands = + effectiveDmAllowFrom.length > 0 + ? isAllowedIMessageSender({ + allowFrom: effectiveDmAllowFrom, + sender, + chatId, + chatGuid, + chatIdentifier, + }) + : false; + const groupAllowedForCommands = + effectiveGroupAllowFrom.length > 0 + ? isAllowedIMessageSender({ + allowFrom: effectiveGroupAllowFrom, + sender, + chatId, + chatGuid, + chatIdentifier, + }) + : false; + const hasControlCommandInMessage = hasControlCommand(messageText, params.cfg); + const commandGate = resolveControlCommandGate({ + useAccessGroups, + authorizers: [ + { configured: effectiveDmAllowFrom.length > 0, allowed: ownerAllowedForCommands }, + { configured: effectiveGroupAllowFrom.length > 0, allowed: groupAllowedForCommands }, + ], + allowTextCommands: true, + hasControlCommand: hasControlCommandInMessage, + }); + const commandAuthorized = isGroup ? commandGate.commandAuthorized : dmAuthorized; + if (isGroup && commandGate.shouldBlock) { + if (params.logVerbose) { + logInboundDrop({ + log: params.logVerbose, + channel: "imessage", + reason: "control command (unauthorized)", + target: sender, + }); + } + return { kind: "drop", reason: "control command (unauthorized)" }; + } + + const shouldBypassMention = + isGroup && requireMention && !mentioned && commandAuthorized && hasControlCommandInMessage; + const effectiveWasMentioned = mentioned || shouldBypassMention; + if (isGroup && requireMention && canDetectMention && !mentioned && !shouldBypassMention) { + params.logVerbose?.(`imessage: skipping group message (no mention)`); + recordPendingHistoryEntryIfEnabled({ + historyMap: params.groupHistories, + historyKey: historyKey ?? "", + limit: params.historyLimit, + entry: historyKey + ? { + sender: senderNormalized, + body: bodyText, + timestamp: createdAt, + messageId: params.message.id ? String(params.message.id) : undefined, + } + : null, + }); + return { kind: "drop", reason: "no mention" }; + } + + return { + kind: "dispatch", + isGroup, + chatId, + chatGuid, + chatIdentifier, + groupId, + historyKey, + sender, + senderNormalized, + route, + bodyText, + createdAt, + replyContext, + effectiveWasMentioned, + commandAuthorized, + effectiveDmAllowFrom, + effectiveGroupAllowFrom, + }; +} + +export function buildIMessageInboundContext(params: { + cfg: OpenClawConfig; + decision: IMessageInboundDispatchDecision; + message: IMessagePayload; + envelopeOptions?: EnvelopeFormatOptions; + previousTimestamp?: number; + remoteHost?: string; + media?: { + path?: string; + type?: string; + paths?: string[]; + types?: Array; + }; + historyLimit: number; + groupHistories: Map; +}): { + ctxPayload: ReturnType; + fromLabel: string; + chatTarget?: string; + imessageTo: string; + inboundHistory?: Array<{ sender: string; body: string; timestamp?: number }>; +} { + const envelopeOptions = params.envelopeOptions ?? resolveEnvelopeFormatOptions(params.cfg); + const { decision } = params; + const chatId = decision.chatId; + const chatTarget = + decision.isGroup && chatId != null ? formatIMessageChatTarget(chatId) : undefined; + + const replySuffix = decision.replyContext + ? `\n\n[Replying to ${decision.replyContext.sender ?? "unknown sender"}${ + decision.replyContext.id ? ` id:${decision.replyContext.id}` : "" + }]\n${decision.replyContext.body}\n[/Replying]` + : ""; + + const fromLabel = formatInboundFromLabel({ + isGroup: decision.isGroup, + groupLabel: params.message.chat_name ?? undefined, + groupId: chatId !== undefined ? String(chatId) : "unknown", + groupFallback: "Group", + directLabel: decision.senderNormalized, + directId: decision.sender, + }); + + const body = formatInboundEnvelope({ + channel: "iMessage", + from: fromLabel, + timestamp: decision.createdAt, + body: `${decision.bodyText}${replySuffix}`, + chatType: decision.isGroup ? "group" : "direct", + sender: { name: decision.senderNormalized, id: decision.sender }, + previousTimestamp: params.previousTimestamp, + envelope: envelopeOptions, + }); + + let combinedBody = body; + if (decision.isGroup && decision.historyKey) { + combinedBody = buildPendingHistoryContextFromMap({ + historyMap: params.groupHistories, + historyKey: decision.historyKey, + limit: params.historyLimit, + currentMessage: combinedBody, + formatEntry: (entry) => + formatInboundEnvelope({ + channel: "iMessage", + from: fromLabel, + timestamp: entry.timestamp, + body: `${entry.body}${entry.messageId ? ` [id:${entry.messageId}]` : ""}`, + chatType: "group", + senderLabel: entry.sender, + envelope: envelopeOptions, + }), + }); + } + + const imessageTo = (decision.isGroup ? chatTarget : undefined) || `imessage:${decision.sender}`; + const inboundHistory = + decision.isGroup && decision.historyKey && params.historyLimit > 0 + ? (params.groupHistories.get(decision.historyKey) ?? []).map((entry) => ({ + sender: entry.sender, + body: entry.body, + timestamp: entry.timestamp, + })) + : undefined; + + const ctxPayload = finalizeInboundContext({ + Body: combinedBody, + BodyForAgent: decision.bodyText, + InboundHistory: inboundHistory, + RawBody: decision.bodyText, + CommandBody: decision.bodyText, + From: decision.isGroup + ? `imessage:group:${chatId ?? "unknown"}` + : `imessage:${decision.sender}`, + To: imessageTo, + SessionKey: decision.route.sessionKey, + AccountId: decision.route.accountId, + ChatType: decision.isGroup ? "group" : "direct", + ConversationLabel: fromLabel, + GroupSubject: decision.isGroup ? (params.message.chat_name ?? undefined) : undefined, + GroupMembers: decision.isGroup + ? (params.message.participants ?? []).filter(Boolean).join(", ") + : undefined, + SenderName: decision.senderNormalized, + SenderId: decision.sender, + Provider: "imessage", + Surface: "imessage", + MessageSid: params.message.id ? String(params.message.id) : undefined, + ReplyToId: decision.replyContext?.id, + ReplyToBody: decision.replyContext?.body, + ReplyToSender: decision.replyContext?.sender, + Timestamp: decision.createdAt, + MediaPath: params.media?.path, + MediaType: params.media?.type, + MediaUrl: params.media?.path, + MediaPaths: + params.media?.paths && params.media.paths.length > 0 ? params.media.paths : undefined, + MediaTypes: + params.media?.types && params.media.types.length > 0 ? params.media.types : undefined, + MediaUrls: + params.media?.paths && params.media.paths.length > 0 ? params.media.paths : undefined, + MediaRemoteHost: params.remoteHost, + WasMentioned: decision.effectiveWasMentioned, + CommandAuthorized: decision.commandAuthorized, + OriginatingChannel: "imessage" as const, + OriginatingTo: imessageTo, + }); + + return { ctxPayload, fromLabel, chatTarget, imessageTo, inboundHistory }; +} + +export function buildIMessageEchoScope(params: { + accountId: string; + isGroup: boolean; + chatId?: number; + sender: string; +}): string { + return `${params.accountId}:${params.isGroup ? formatIMessageChatTarget(params.chatId) : `imessage:${params.sender}`}`; +} + +export function describeIMessageEchoDropLog(params: { messageText: string }): string { + return `imessage: skipping echo message (matches recently sent text within 5s): "${truncateUtf16Safe(params.messageText, 50)}"`; +} diff --git a/src/imessage/monitor/monitor-provider.ts b/src/imessage/monitor/monitor-provider.ts index f918bf6a170..bf7dc90681f 100644 --- a/src/imessage/monitor/monitor-provider.ts +++ b/src/imessage/monitor/monitor-provider.ts @@ -4,34 +4,19 @@ import { resolveHumanDelayConfig } from "../../agents/identity.js"; import { resolveTextChunkLimit } from "../../auto-reply/chunk.js"; import { hasControlCommand } from "../../auto-reply/command-detection.js"; import { dispatchInboundMessage } from "../../auto-reply/dispatch.js"; -import { - formatInboundEnvelope, - formatInboundFromLabel, - resolveEnvelopeFormatOptions, -} from "../../auto-reply/envelope.js"; import { createInboundDebouncer, resolveInboundDebounceMs, } from "../../auto-reply/inbound-debounce.js"; import { - buildPendingHistoryContextFromMap, clearHistoryEntriesIfEnabled, DEFAULT_GROUP_HISTORY_LIMIT, - recordPendingHistoryEntryIfEnabled, type HistoryEntry, } from "../../auto-reply/reply/history.js"; -import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js"; -import { buildMentionRegexes, matchesMentionPatterns } from "../../auto-reply/reply/mentions.js"; import { createReplyDispatcher } from "../../auto-reply/reply/reply-dispatcher.js"; -import { resolveControlCommandGate } from "../../channels/command-gating.js"; -import { logInboundDrop } from "../../channels/logging.js"; import { createReplyPrefixOptions } from "../../channels/reply-prefix.js"; import { recordInboundSession } from "../../channels/session.js"; import { loadConfig } from "../../config/config.js"; -import { - resolveChannelGroupPolicy, - resolveChannelGroupRequireMention, -} from "../../config/group-policy.js"; import { readSessionUpdatedAt, resolveStorePath } from "../../config/sessions.js"; import { danger, logVerbose, shouldLogVerbose } from "../../globals.js"; import { waitForTransportReady } from "../../infra/transport-ready.js"; @@ -41,19 +26,18 @@ import { readChannelAllowFromStore, upsertChannelPairingRequest, } from "../../pairing/pairing-store.js"; -import { resolveAgentRoute } from "../../routing/resolve-route.js"; import { truncateUtf16Safe } from "../../utils.js"; import { resolveIMessageAccount } from "../accounts.js"; import { createIMessageRpcClient } from "../client.js"; import { DEFAULT_IMESSAGE_PROBE_TIMEOUT_MS } from "../constants.js"; import { probeIMessage } from "../probe.js"; import { sendMessageIMessage } from "../send.js"; -import { - formatIMessageChatTarget, - isAllowedIMessageSender, - normalizeIMessageHandle, -} from "../targets.js"; +import { attachIMessageMonitorAbortHandler } from "./abort-handler.js"; import { deliverReplies } from "./deliver.js"; +import { + buildIMessageInboundContext, + resolveIMessageInboundDecision, +} from "./inbound-processing.js"; import { parseIMessageNotification } from "./parse-notification.js"; import { normalizeAllowList, resolveRuntime } from "./runtime.js"; @@ -85,33 +69,6 @@ async function detectRemoteHostFromCliPath(cliPath: string): Promise []); - const effectiveDmAllowFrom = Array.from(new Set([...allowFrom, ...storeAllowFrom])) - .map((v) => String(v).trim()) - .filter(Boolean); - // Keep DM pairing-store authorization scoped to DMs; group access must come - // from explicit group allowlist config. - const effectiveGroupAllowFrom = Array.from(new Set(groupAllowFrom)) - .map((v) => String(v).trim()) - .filter(Boolean); - - if (isGroup) { - if (groupPolicy === "disabled") { - logVerbose("Blocked iMessage group message (groupPolicy: disabled)"); - return; - } - if (groupPolicy === "allowlist") { - if (effectiveGroupAllowFrom.length === 0) { - logVerbose("Blocked iMessage group message (groupPolicy: allowlist, no groupAllowFrom)"); - return; - } - const allowed = isAllowedIMessageSender({ - allowFrom: effectiveGroupAllowFrom, - sender, - chatId: chatId ?? undefined, - chatGuid, - chatIdentifier, - }); - if (!allowed) { - logVerbose(`Blocked iMessage sender ${sender} (not in groupAllowFrom)`); - return; - } - } - if (groupListPolicy.allowlistEnabled && !groupListPolicy.allowed) { - logVerbose(`imessage: skipping group message (${groupId ?? "unknown"}) not in allowlist`); - return; - } - } - - const dmHasWildcard = effectiveDmAllowFrom.includes("*"); - const dmAuthorized = - dmPolicy === "open" - ? true - : dmHasWildcard || - (effectiveDmAllowFrom.length > 0 && - isAllowedIMessageSender({ - allowFrom: effectiveDmAllowFrom, - sender, - chatId: chatId ?? undefined, - chatGuid, - chatIdentifier, - })); - if (!isGroup) { - if (dmPolicy === "disabled") { - return; - } - if (!dmAuthorized) { - if (dmPolicy === "pairing") { - const senderId = normalizeIMessageHandle(sender); - const { code, created } = await upsertChannelPairingRequest({ - channel: "imessage", - id: senderId, - meta: { - sender: senderId, - chatId: chatId ? String(chatId) : undefined, - }, - }); - if (created) { - logVerbose(`imessage pairing request sender=${senderId}`); - try { - await sendMessageIMessage( - sender, - buildPairingReply({ - channel: "imessage", - idLine: `Your iMessage sender id: ${senderId}`, - code, - }), - { - client, - maxBytes: mediaMaxBytes, - accountId: accountInfo.accountId, - ...(chatId ? { chatId } : {}), - }, - ); - } catch (err) { - logVerbose(`imessage pairing reply failed for ${senderId}: ${String(err)}`); - } - } - } else { - logVerbose(`Blocked iMessage sender ${sender} (dmPolicy=${dmPolicy})`); - } - return; - } - } - - const route = resolveAgentRoute({ - cfg, - channel: "imessage", - accountId: accountInfo.accountId, - peer: { - kind: isGroup ? "group" : "direct", - id: isGroup ? String(chatId ?? "unknown") : normalizeIMessageHandle(sender), - }, - }); - const mentionRegexes = buildMentionRegexes(cfg, route.agentId); const messageText = (message.text ?? "").trim(); - // Echo detection: check if the received message matches a recently sent message (within 5 seconds). - // Scope by conversation so same text in different chats is not conflated. - const echoScope = `${accountInfo.accountId}:${isGroup ? formatIMessageChatTarget(chatId) : `imessage:${sender}`}`; - if (messageText && sentMessageCache.has(echoScope, messageText)) { - logVerbose( - `imessage: skipping echo message (matches recently sent text within 5s): "${truncateUtf16Safe(messageText, 50)}"`, - ); - return; - } - const attachments = includeAttachments ? (message.attachments ?? []) : []; // Filter to valid attachments with paths const validAttachments = attachments.filter((entry) => entry?.original_path && !entry?.missing); @@ -419,196 +219,103 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P const kind = mediaKindFromMime(mediaType ?? undefined); const placeholder = kind ? `` : attachments?.length ? "" : ""; const bodyText = messageText || placeholder; - if (!bodyText) { - return; - } - const replyContext = describeReplyContext(message); - const createdAt = message.created_at ? Date.parse(message.created_at) : undefined; - const historyKey = isGroup - ? String(chatId ?? chatGuid ?? chatIdentifier ?? "unknown") - : undefined; - const mentioned = isGroup ? matchesMentionPatterns(messageText, mentionRegexes) : true; - const requireMention = resolveChannelGroupRequireMention({ + + const storeAllowFrom = await readChannelAllowFromStore("imessage").catch(() => []); + const decision = resolveIMessageInboundDecision({ cfg, - channel: "imessage", accountId: accountInfo.accountId, - groupId, - requireMentionOverride: opts.requireMention, - overrideOrder: "before-config", + message, + opts, + messageText, + bodyText, + allowFrom, + groupAllowFrom, + groupPolicy, + dmPolicy, + storeAllowFrom, + historyLimit, + groupHistories, + echoCache: sentMessageCache, + logVerbose, }); - const canDetectMention = mentionRegexes.length > 0; - const useAccessGroups = cfg.commands?.useAccessGroups !== false; - const ownerAllowedForCommands = - effectiveDmAllowFrom.length > 0 - ? isAllowedIMessageSender({ - allowFrom: effectiveDmAllowFrom, - sender, - chatId: chatId ?? undefined, - chatGuid, - chatIdentifier, - }) - : false; - const groupAllowedForCommands = - effectiveGroupAllowFrom.length > 0 - ? isAllowedIMessageSender({ - allowFrom: effectiveGroupAllowFrom, - sender, - chatId: chatId ?? undefined, - chatGuid, - chatIdentifier, - }) - : false; - const hasControlCommandInMessage = hasControlCommand(messageText, cfg); - const commandGate = resolveControlCommandGate({ - useAccessGroups, - authorizers: [ - { configured: effectiveDmAllowFrom.length > 0, allowed: ownerAllowedForCommands }, - { configured: effectiveGroupAllowFrom.length > 0, allowed: groupAllowedForCommands }, - ], - allowTextCommands: true, - hasControlCommand: hasControlCommandInMessage, - }); - const commandAuthorized = isGroup ? commandGate.commandAuthorized : dmAuthorized; - if (isGroup && commandGate.shouldBlock) { - logInboundDrop({ - log: logVerbose, - channel: "imessage", - reason: "control command (unauthorized)", - target: sender, - }); - return; - } - const shouldBypassMention = - isGroup && requireMention && !mentioned && commandAuthorized && hasControlCommandInMessage; - const effectiveWasMentioned = mentioned || shouldBypassMention; - if (isGroup && requireMention && canDetectMention && !mentioned && !shouldBypassMention) { - logVerbose(`imessage: skipping group message (no mention)`); - recordPendingHistoryEntryIfEnabled({ - historyMap: groupHistories, - historyKey: historyKey ?? "", - limit: historyLimit, - entry: historyKey - ? { - sender: senderNormalized, - body: bodyText, - timestamp: createdAt, - messageId: message.id ? String(message.id) : undefined, - } - : null, - }); + + if (decision.kind === "drop") { + return; + } + + const chatId = message.chat_id ?? undefined; + if (decision.kind === "pairing") { + const sender = (message.sender ?? "").trim(); + if (!sender) { + return; + } + const { code, created } = await upsertChannelPairingRequest({ + channel: "imessage", + id: decision.senderId, + meta: { + sender: decision.senderId, + chatId: chatId ? String(chatId) : undefined, + }, + }); + if (created) { + logVerbose(`imessage pairing request sender=${decision.senderId}`); + try { + await sendMessageIMessage( + sender, + buildPairingReply({ + channel: "imessage", + idLine: `Your iMessage sender id: ${decision.senderId}`, + code, + }), + { + client, + maxBytes: mediaMaxBytes, + accountId: accountInfo.accountId, + ...(chatId ? { chatId } : {}), + }, + ); + } catch (err) { + logVerbose(`imessage pairing reply failed for ${decision.senderId}: ${String(err)}`); + } + } return; } - const chatTarget = formatIMessageChatTarget(chatId); - const fromLabel = formatInboundFromLabel({ - isGroup, - groupLabel: message.chat_name ?? undefined, - groupId: chatId !== undefined ? String(chatId) : "unknown", - groupFallback: "Group", - directLabel: senderNormalized, - directId: sender, - }); const storePath = resolveStorePath(cfg.session?.store, { - agentId: route.agentId, + agentId: decision.route.agentId, }); - const envelopeOptions = resolveEnvelopeFormatOptions(cfg); const previousTimestamp = readSessionUpdatedAt({ storePath, - sessionKey: route.sessionKey, + sessionKey: decision.route.sessionKey, }); - const replySuffix = replyContext - ? `\n\n[Replying to ${replyContext.sender ?? "unknown sender"}${ - replyContext.id ? ` id:${replyContext.id}` : "" - }]\n${replyContext.body}\n[/Replying]` - : ""; - const body = formatInboundEnvelope({ - channel: "iMessage", - from: fromLabel, - timestamp: createdAt, - body: `${bodyText}${replySuffix}`, - chatType: isGroup ? "group" : "direct", - sender: { name: senderNormalized, id: sender }, + const { ctxPayload, chatTarget } = buildIMessageInboundContext({ + cfg, + decision, + message, previousTimestamp, - envelope: envelopeOptions, - }); - let combinedBody = body; - if (isGroup && historyKey) { - combinedBody = buildPendingHistoryContextFromMap({ - historyMap: groupHistories, - historyKey, - limit: historyLimit, - currentMessage: combinedBody, - formatEntry: (entry) => - formatInboundEnvelope({ - channel: "iMessage", - from: fromLabel, - timestamp: entry.timestamp, - body: `${entry.body}${entry.messageId ? ` [id:${entry.messageId}]` : ""}`, - chatType: "group", - senderLabel: entry.sender, - envelope: envelopeOptions, - }), - }); - } - - const imessageTo = (isGroup ? chatTarget : undefined) || `imessage:${sender}`; - const inboundHistory = - isGroup && historyKey && historyLimit > 0 - ? (groupHistories.get(historyKey) ?? []).map((entry) => ({ - sender: entry.sender, - body: entry.body, - timestamp: entry.timestamp, - })) - : undefined; - const ctxPayload = finalizeInboundContext({ - Body: combinedBody, - BodyForAgent: bodyText, - InboundHistory: inboundHistory, - RawBody: bodyText, - CommandBody: bodyText, - From: isGroup ? `imessage:group:${chatId ?? "unknown"}` : `imessage:${sender}`, - To: imessageTo, - SessionKey: route.sessionKey, - AccountId: route.accountId, - ChatType: isGroup ? "group" : "direct", - ConversationLabel: fromLabel, - GroupSubject: isGroup ? (message.chat_name ?? undefined) : undefined, - GroupMembers: isGroup ? (message.participants ?? []).filter(Boolean).join(", ") : undefined, - SenderName: senderNormalized, - SenderId: sender, - Provider: "imessage", - Surface: "imessage", - MessageSid: message.id ? String(message.id) : undefined, - ReplyToId: replyContext?.id, - ReplyToBody: replyContext?.body, - ReplyToSender: replyContext?.sender, - Timestamp: createdAt, - MediaPath: mediaPath, - MediaType: mediaType, - MediaUrl: mediaPath, - MediaPaths: mediaPaths.length > 0 ? mediaPaths : undefined, - MediaTypes: mediaTypes.length > 0 ? mediaTypes : undefined, - MediaUrls: mediaPaths.length > 0 ? mediaPaths : undefined, - MediaRemoteHost: remoteHost, - WasMentioned: effectiveWasMentioned, - CommandAuthorized: commandAuthorized, - // Originating channel for reply routing. - OriginatingChannel: "imessage" as const, - OriginatingTo: imessageTo, + remoteHost, + historyLimit, + groupHistories, + media: { + path: mediaPath, + type: mediaType, + paths: mediaPaths, + types: mediaTypes, + }, }); - const updateTarget = (isGroup ? chatTarget : undefined) || sender; + const updateTarget = chatTarget || decision.sender; await recordInboundSession({ storePath, - sessionKey: ctxPayload.SessionKey ?? route.sessionKey, + sessionKey: ctxPayload.SessionKey ?? decision.route.sessionKey, ctx: ctxPayload, updateLastRoute: - !isGroup && updateTarget + !decision.isGroup && updateTarget ? { - sessionKey: route.mainSessionKey, + sessionKey: decision.route.mainSessionKey, channel: "imessage", to: updateTarget, - accountId: route.accountId, + accountId: decision.route.accountId, } : undefined, onRecordError: (err) => { @@ -617,26 +324,33 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P }); if (shouldLogVerbose()) { - const preview = truncateUtf16Safe(body, 200).replace(/\n/g, "\\n"); + const preview = truncateUtf16Safe(String(ctxPayload.Body ?? ""), 200).replace(/\n/g, "\\n"); logVerbose( - `imessage inbound: chatId=${chatId ?? "unknown"} from=${ctxPayload.From} len=${body.length} preview="${preview}"`, + `imessage inbound: chatId=${chatId ?? "unknown"} from=${ctxPayload.From} len=${ + String(ctxPayload.Body ?? "").length + } preview="${preview}"`, ); } const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({ cfg, - agentId: route.agentId, + agentId: decision.route.agentId, channel: "imessage", - accountId: route.accountId, + accountId: decision.route.accountId, }); const dispatcher = createReplyDispatcher({ ...prefixOptions, - humanDelay: resolveHumanDelayConfig(cfg, route.agentId), + humanDelay: resolveHumanDelayConfig(cfg, decision.route.agentId), deliver: async (payload) => { + const target = ctxPayload.To; + if (!target) { + runtime.error?.(danger("imessage: missing delivery target")); + return; + } await deliverReplies({ replies: [payload], - target: ctxPayload.To, + target, client, accountId: accountInfo.accountId, runtime, @@ -664,17 +378,21 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P }); if (!queuedFinal) { - if (isGroup && historyKey) { + if (decision.isGroup && decision.historyKey) { clearHistoryEntriesIfEnabled({ historyMap: groupHistories, - historyKey, + historyKey: decision.historyKey, limit: historyLimit, }); } return; } - if (isGroup && historyKey) { - clearHistoryEntriesIfEnabled({ historyMap: groupHistories, historyKey, limit: historyLimit }); + if (decision.isGroup && decision.historyKey) { + clearHistoryEntriesIfEnabled({ + historyMap: groupHistories, + historyKey: decision.historyKey, + limit: historyLimit, + }); } } @@ -728,21 +446,11 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P let subscriptionId: number | null = null; const abort = opts.abortSignal; - const onAbort = () => { - if (subscriptionId) { - void client - .request("watch.unsubscribe", { - subscription: subscriptionId, - }) - .catch(() => { - // Ignore disconnect errors during shutdown. - }); - } - void client.stop().catch(() => { - // Ignore disconnect errors during shutdown. - }); - }; - abort?.addEventListener("abort", onAbort, { once: true }); + const detachAbortHandler = attachIMessageMonitorAbortHandler({ + abortSignal: abort, + client, + getSubscriptionId: () => subscriptionId, + }); try { const result = await client.request<{ subscription?: number }>("watch.subscribe", { @@ -757,7 +465,7 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P runtime.error?.(danger(`imessage: monitor failed: ${String(err)}`)); throw err; } finally { - abort?.removeEventListener("abort", onAbort); + detachAbortHandler(); await client.stop(); } }