From 5ede08d16876d724b3ebf21f1d88e11a82db4740 Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Fri, 13 Mar 2026 01:48:24 +0000 Subject: [PATCH] Matrix: harden monitor startup cleanup --- .../matrix/src/matrix/monitor/events.test.ts | 49 ++++++- .../matrix/src/matrix/monitor/events.ts | 18 ++- .../matrix/src/matrix/monitor/index.test.ts | 28 +++- extensions/matrix/src/matrix/monitor/index.ts | 138 ++++++++++-------- 4 files changed, 165 insertions(+), 68 deletions(-) diff --git a/extensions/matrix/src/matrix/monitor/events.test.ts b/extensions/matrix/src/matrix/monitor/events.test.ts index 18eaebc2154..c6d78064ef4 100644 --- a/extensions/matrix/src/matrix/monitor/events.test.ts +++ b/extensions/matrix/src/matrix/monitor/events.test.ts @@ -21,6 +21,7 @@ function createHarness(params?: { authEncryption?: boolean; cryptoAvailable?: boolean; selfUserId?: string; + selfUserIdError?: Error; joinedMembersByRoom?: Record; verifications?: Array<{ id: string; @@ -44,13 +45,19 @@ function createHarness(params?: { const invalidateRoom = vi.fn(); const logger = { info: vi.fn(), warn: vi.fn(), error: vi.fn() }; const formatNativeDependencyHint = vi.fn(() => "install hint"); + const logVerboseMessage = vi.fn(); const client = { on: vi.fn((eventName: string, listener: (...args: unknown[]) => void) => { listeners.set(eventName, listener); return client; }), sendMessage, - getUserId: vi.fn(async () => params?.selfUserId ?? "@bot:example.org"), + getUserId: vi.fn(async () => { + if (params?.selfUserIdError) { + throw params.selfUserIdError; + } + return params?.selfUserId ?? "@bot:example.org"; + }), getJoinedRoomMembers: vi.fn( async (roomId: string) => params?.joinedMembersByRoom?.[roomId] ?? ["@bot:example.org", "@alice:example.org"], @@ -74,7 +81,7 @@ function createHarness(params?: { directTracker: { invalidateRoom, }, - logVerboseMessage: vi.fn(), + logVerboseMessage, warnedEncryptedRooms: new Set(), warnedCryptoMissingRooms: new Set(), logger, @@ -95,6 +102,7 @@ function createHarness(params?: { listVerifications, logger, formatNativeDependencyHint, + logVerboseMessage, roomMessageListener: listeners.get("room.message") as RoomEventListener | undefined, failedDecryptListener: listeners.get("room.failed_decryption") as | FailedDecryptListener @@ -592,4 +600,41 @@ describe("registerMatrixMonitorEvents verification routing", () => { }), ); }); + + it("does not throw when getUserId fails during decrypt guidance lookup", async () => { + const { logger, logVerboseMessage, failedDecryptListener } = createHarness({ + accountId: "ops", + selfUserIdError: new Error("lookup failed"), + }); + if (!failedDecryptListener) { + throw new Error("room.failed_decryption listener was not registered"); + } + + await expect( + failedDecryptListener( + "!room:example.org", + { + event_id: "$enc-lookup-fail", + sender: "@gumadeiras:matrix.example.org", + type: EventType.RoomMessageEncrypted, + origin_server_ts: Date.now(), + content: {}, + }, + new Error("The sender's device has not sent us the keys for this message."), + ), + ).resolves.toBeUndefined(); + + expect(logger.warn).toHaveBeenCalledTimes(1); + expect(logger.warn).toHaveBeenCalledWith( + "Failed to decrypt message", + expect.objectContaining({ + roomId: "!room:example.org", + eventId: "$enc-lookup-fail", + senderMatchesOwnUser: false, + }), + ); + expect(logVerboseMessage).toHaveBeenCalledWith( + "matrix: failed resolving self user id for decrypt warning: Error: lookup failed", + ); + }); }); diff --git a/extensions/matrix/src/matrix/monitor/events.ts b/extensions/matrix/src/matrix/monitor/events.ts index 09f88220a82..c767034036f 100644 --- a/extensions/matrix/src/matrix/monitor/events.ts +++ b/extensions/matrix/src/matrix/monitor/events.ts @@ -15,6 +15,21 @@ function formatMatrixSelfDecryptionHint(accountId: string): string { ); } +async function resolveMatrixSelfUserId( + client: MatrixClient, + logVerboseMessage: (message: string) => void, +): Promise { + if (typeof client.getUserId !== "function") { + return null; + } + try { + return (await client.getUserId()) ?? null; + } catch (err) { + logVerboseMessage(`matrix: failed resolving self user id for decrypt warning: ${String(err)}`); + return null; + } +} + export function registerMatrixMonitorEvents(params: { cfg: CoreConfig; client: MatrixClient; @@ -68,8 +83,7 @@ export function registerMatrixMonitorEvents(params: { client.on( "room.failed_decryption", async (roomId: string, event: MatrixRawEvent, error: Error) => { - const selfUserId = - typeof client.getUserId === "function" ? ((await client.getUserId()) ?? null) : null; + const selfUserId = await resolveMatrixSelfUserId(client, logVerboseMessage); const sender = typeof event.sender === "string" ? event.sender : null; const senderMatchesOwnUser = Boolean(selfUserId && sender && selfUserId === sender); logger.warn("Failed to decrypt message", { diff --git a/extensions/matrix/src/matrix/monitor/index.test.ts b/extensions/matrix/src/matrix/monitor/index.test.ts index e7c77a3fab3..4824c8ebd12 100644 --- a/extensions/matrix/src/matrix/monitor/index.test.ts +++ b/extensions/matrix/src/matrix/monitor/index.test.ts @@ -3,6 +3,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; const hoisted = vi.hoisted(() => { const callOrder: string[] = []; const client = { id: "matrix-client" }; + let startClientError: Error | null = null; const resolveTextChunkLimit = vi.fn< (cfg: unknown, channel: unknown, accountId?: unknown) => number >(() => 4000); @@ -13,11 +14,16 @@ const hoisted = vi.hoisted(() => { debug: vi.fn(), }; const stopThreadBindingManager = vi.fn(); + const stopSharedClientForAccount = vi.fn(); + const setActiveMatrixClient = vi.fn(); return { callOrder, client, logger, resolveTextChunkLimit, + setActiveMatrixClient, + startClientError, + stopSharedClientForAccount, stopThreadBindingManager, }; }); @@ -87,7 +93,7 @@ vi.mock("../accounts.js", () => ({ })); vi.mock("../active-client.js", () => ({ - setActiveMatrixClient: vi.fn(), + setActiveMatrixClient: hoisted.setActiveMatrixClient, })); vi.mock("../client.js", () => ({ @@ -111,10 +117,13 @@ vi.mock("../client.js", () => ({ if (!hoisted.callOrder.includes("create-manager")) { throw new Error("Matrix client started before thread bindings were registered"); } + if (hoisted.startClientError) { + throw hoisted.startClientError; + } hoisted.callOrder.push("start-client"); return hoisted.client; }), - stopSharedClientForAccount: vi.fn(), + stopSharedClientForAccount: hoisted.stopSharedClientForAccount, })); vi.mock("../config-update.js", () => ({ @@ -191,7 +200,10 @@ describe("monitorMatrixProvider", () => { beforeEach(() => { vi.resetModules(); hoisted.callOrder.length = 0; + hoisted.startClientError = null; hoisted.resolveTextChunkLimit.mockReset().mockReturnValue(4000); + hoisted.setActiveMatrixClient.mockReset(); + hoisted.stopSharedClientForAccount.mockReset(); hoisted.stopThreadBindingManager.mockReset(); Object.values(hoisted.logger).forEach((mock) => mock.mockReset()); }); @@ -225,4 +237,16 @@ describe("monitorMatrixProvider", () => { "default", ); }); + + it("cleans up thread bindings and shared clients when startup fails", async () => { + const { monitorMatrixProvider } = await import("./index.js"); + hoisted.startClientError = new Error("start failed"); + + await expect(monitorMatrixProvider()).rejects.toThrow("start failed"); + + expect(hoisted.stopThreadBindingManager).toHaveBeenCalledTimes(1); + expect(hoisted.stopSharedClientForAccount).toHaveBeenCalledTimes(1); + expect(hoisted.setActiveMatrixClient).toHaveBeenNthCalledWith(1, hoisted.client, "default"); + expect(hoisted.setActiveMatrixClient).toHaveBeenNthCalledWith(2, null, "default"); + }); }); diff --git a/extensions/matrix/src/matrix/monitor/index.ts b/extensions/matrix/src/matrix/monitor/index.ts index 1a24c92afb5..c05270a7ed2 100644 --- a/extensions/matrix/src/matrix/monitor/index.ts +++ b/extensions/matrix/src/matrix/monitor/index.ts @@ -129,6 +129,20 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi accountId: auth.accountId, }); setActiveMatrixClient(client, auth.accountId); + let cleanedUp = false; + let threadBindingManager: { accountId: string; stop: () => void } | null = null; + const cleanup = () => { + if (cleanedUp) { + return; + } + cleanedUp = true; + try { + threadBindingManager?.stop(); + } finally { + stopSharedClientForAccount(auth); + setActiveMatrixClient(null, auth.accountId); + } + }; const mentionRegexes = core.channel.mentions.buildMentionRegexes(cfg); const defaultGroupPolicy = resolveDefaultGroupPolicy(cfg); @@ -200,74 +214,74 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi needsRoomAliasesForConfig, }); - const threadBindingManager = await createMatrixThreadBindingManager({ - accountId: account.accountId, - auth, - client, - env: process.env, - idleTimeoutMs: threadBindingIdleTimeoutMs, - maxAgeMs: threadBindingMaxAgeMs, - logVerboseMessage, - }); - logVerboseMessage( - `matrix: thread bindings ready account=${threadBindingManager.accountId} idleMs=${threadBindingIdleTimeoutMs} maxAgeMs=${threadBindingMaxAgeMs}`, - ); + try { + threadBindingManager = await createMatrixThreadBindingManager({ + accountId: account.accountId, + auth, + client, + env: process.env, + idleTimeoutMs: threadBindingIdleTimeoutMs, + maxAgeMs: threadBindingMaxAgeMs, + logVerboseMessage, + }); + logVerboseMessage( + `matrix: thread bindings ready account=${threadBindingManager.accountId} idleMs=${threadBindingIdleTimeoutMs} maxAgeMs=${threadBindingMaxAgeMs}`, + ); - registerMatrixMonitorEvents({ - cfg, - client, - auth, - directTracker, - logVerboseMessage, - warnedEncryptedRooms, - warnedCryptoMissingRooms, - logger, - formatNativeDependencyHint: core.system.formatNativeDependencyHint, - onRoomMessage: handleRoomMessage, - }); + registerMatrixMonitorEvents({ + cfg, + client, + auth, + directTracker, + logVerboseMessage, + warnedEncryptedRooms, + warnedCryptoMissingRooms, + logger, + formatNativeDependencyHint: core.system.formatNativeDependencyHint, + onRoomMessage: handleRoomMessage, + }); - // Register Matrix thread bindings before the client starts syncing so threaded - // commands during startup never observe Matrix as "unavailable". - logVerboseMessage("matrix: starting client"); - await resolveSharedMatrixClient({ - cfg, - auth: authWithLimit, - accountId: auth.accountId, - }); - logVerboseMessage("matrix: client started"); + // Register Matrix thread bindings before the client starts syncing so threaded + // commands during startup never observe Matrix as "unavailable". + logVerboseMessage("matrix: starting client"); + await resolveSharedMatrixClient({ + cfg, + auth: authWithLimit, + accountId: auth.accountId, + }); + logVerboseMessage("matrix: client started"); - // Shared client is already started via resolveSharedMatrixClient. - logger.info(`matrix: logged in as ${auth.userId}`); + // Shared client is already started via resolveSharedMatrixClient. + logger.info(`matrix: logged in as ${auth.userId}`); - await runMatrixStartupMaintenance({ - client, - auth, - accountId: account.accountId, - effectiveAccountId, - accountConfig, - logger, - logVerboseMessage, - loadConfig: () => core.config.loadConfig() as CoreConfig, - writeConfigFile: async (nextCfg) => await core.config.writeConfigFile(nextCfg), - loadWebMedia: async (url, maxBytes) => await core.media.loadWebMedia(url, maxBytes), - env: process.env, - }); + await runMatrixStartupMaintenance({ + client, + auth, + accountId: account.accountId, + effectiveAccountId, + accountConfig, + logger, + logVerboseMessage, + loadConfig: () => core.config.loadConfig() as CoreConfig, + writeConfigFile: async (nextCfg) => await core.config.writeConfigFile(nextCfg), + loadWebMedia: async (url, maxBytes) => await core.media.loadWebMedia(url, maxBytes), + env: process.env, + }); - await new Promise((resolve) => { - const onAbort = () => { - try { - threadBindingManager.stop(); + await new Promise((resolve) => { + const onAbort = () => { logVerboseMessage("matrix: stopping client"); - stopSharedClientForAccount(auth); - } finally { - setActiveMatrixClient(null, auth.accountId); + cleanup(); resolve(); + }; + if (opts.abortSignal?.aborted) { + onAbort(); + return; } - }; - if (opts.abortSignal?.aborted) { - onAbort(); - return; - } - opts.abortSignal?.addEventListener("abort", onAbort, { once: true }); - }); + opts.abortSignal?.addEventListener("abort", onAbort, { once: true }); + }); + } catch (err) { + cleanup(); + throw err; + } }