Matrix: harden monitor startup cleanup

This commit is contained in:
Gustavo Madeira Santana
2026-03-13 01:48:24 +00:00
parent 0f15cfe21a
commit 5ede08d168
4 changed files with 165 additions and 68 deletions

View File

@@ -21,6 +21,7 @@ function createHarness(params?: {
authEncryption?: boolean;
cryptoAvailable?: boolean;
selfUserId?: string;
selfUserIdError?: Error;
joinedMembersByRoom?: Record<string, string[]>;
verifications?: Array<{
id: string;
@@ -44,13 +45,19 @@ function createHarness(params?: {
const invalidateRoom = vi.fn();
const logger = { info: vi.fn(), warn: vi.fn(), error: vi.fn() };
const formatNativeDependencyHint = vi.fn(() => "install hint");
const logVerboseMessage = vi.fn();
const client = {
on: vi.fn((eventName: string, listener: (...args: unknown[]) => void) => {
listeners.set(eventName, listener);
return client;
}),
sendMessage,
getUserId: vi.fn(async () => params?.selfUserId ?? "@bot:example.org"),
getUserId: vi.fn(async () => {
if (params?.selfUserIdError) {
throw params.selfUserIdError;
}
return params?.selfUserId ?? "@bot:example.org";
}),
getJoinedRoomMembers: vi.fn(
async (roomId: string) =>
params?.joinedMembersByRoom?.[roomId] ?? ["@bot:example.org", "@alice:example.org"],
@@ -74,7 +81,7 @@ function createHarness(params?: {
directTracker: {
invalidateRoom,
},
logVerboseMessage: vi.fn(),
logVerboseMessage,
warnedEncryptedRooms: new Set<string>(),
warnedCryptoMissingRooms: new Set<string>(),
logger,
@@ -95,6 +102,7 @@ function createHarness(params?: {
listVerifications,
logger,
formatNativeDependencyHint,
logVerboseMessage,
roomMessageListener: listeners.get("room.message") as RoomEventListener | undefined,
failedDecryptListener: listeners.get("room.failed_decryption") as
| FailedDecryptListener
@@ -592,4 +600,41 @@ describe("registerMatrixMonitorEvents verification routing", () => {
}),
);
});
it("does not throw when getUserId fails during decrypt guidance lookup", async () => {
const { logger, logVerboseMessage, failedDecryptListener } = createHarness({
accountId: "ops",
selfUserIdError: new Error("lookup failed"),
});
if (!failedDecryptListener) {
throw new Error("room.failed_decryption listener was not registered");
}
await expect(
failedDecryptListener(
"!room:example.org",
{
event_id: "$enc-lookup-fail",
sender: "@gumadeiras:matrix.example.org",
type: EventType.RoomMessageEncrypted,
origin_server_ts: Date.now(),
content: {},
},
new Error("The sender's device has not sent us the keys for this message."),
),
).resolves.toBeUndefined();
expect(logger.warn).toHaveBeenCalledTimes(1);
expect(logger.warn).toHaveBeenCalledWith(
"Failed to decrypt message",
expect.objectContaining({
roomId: "!room:example.org",
eventId: "$enc-lookup-fail",
senderMatchesOwnUser: false,
}),
);
expect(logVerboseMessage).toHaveBeenCalledWith(
"matrix: failed resolving self user id for decrypt warning: Error: lookup failed",
);
});
});

View File

@@ -15,6 +15,21 @@ function formatMatrixSelfDecryptionHint(accountId: string): string {
);
}
async function resolveMatrixSelfUserId(
client: MatrixClient,
logVerboseMessage: (message: string) => void,
): Promise<string | null> {
if (typeof client.getUserId !== "function") {
return null;
}
try {
return (await client.getUserId()) ?? null;
} catch (err) {
logVerboseMessage(`matrix: failed resolving self user id for decrypt warning: ${String(err)}`);
return null;
}
}
export function registerMatrixMonitorEvents(params: {
cfg: CoreConfig;
client: MatrixClient;
@@ -68,8 +83,7 @@ export function registerMatrixMonitorEvents(params: {
client.on(
"room.failed_decryption",
async (roomId: string, event: MatrixRawEvent, error: Error) => {
const selfUserId =
typeof client.getUserId === "function" ? ((await client.getUserId()) ?? null) : null;
const selfUserId = await resolveMatrixSelfUserId(client, logVerboseMessage);
const sender = typeof event.sender === "string" ? event.sender : null;
const senderMatchesOwnUser = Boolean(selfUserId && sender && selfUserId === sender);
logger.warn("Failed to decrypt message", {

View File

@@ -3,6 +3,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest";
const hoisted = vi.hoisted(() => {
const callOrder: string[] = [];
const client = { id: "matrix-client" };
let startClientError: Error | null = null;
const resolveTextChunkLimit = vi.fn<
(cfg: unknown, channel: unknown, accountId?: unknown) => number
>(() => 4000);
@@ -13,11 +14,16 @@ const hoisted = vi.hoisted(() => {
debug: vi.fn(),
};
const stopThreadBindingManager = vi.fn();
const stopSharedClientForAccount = vi.fn();
const setActiveMatrixClient = vi.fn();
return {
callOrder,
client,
logger,
resolveTextChunkLimit,
setActiveMatrixClient,
startClientError,
stopSharedClientForAccount,
stopThreadBindingManager,
};
});
@@ -87,7 +93,7 @@ vi.mock("../accounts.js", () => ({
}));
vi.mock("../active-client.js", () => ({
setActiveMatrixClient: vi.fn(),
setActiveMatrixClient: hoisted.setActiveMatrixClient,
}));
vi.mock("../client.js", () => ({
@@ -111,10 +117,13 @@ vi.mock("../client.js", () => ({
if (!hoisted.callOrder.includes("create-manager")) {
throw new Error("Matrix client started before thread bindings were registered");
}
if (hoisted.startClientError) {
throw hoisted.startClientError;
}
hoisted.callOrder.push("start-client");
return hoisted.client;
}),
stopSharedClientForAccount: vi.fn(),
stopSharedClientForAccount: hoisted.stopSharedClientForAccount,
}));
vi.mock("../config-update.js", () => ({
@@ -191,7 +200,10 @@ describe("monitorMatrixProvider", () => {
beforeEach(() => {
vi.resetModules();
hoisted.callOrder.length = 0;
hoisted.startClientError = null;
hoisted.resolveTextChunkLimit.mockReset().mockReturnValue(4000);
hoisted.setActiveMatrixClient.mockReset();
hoisted.stopSharedClientForAccount.mockReset();
hoisted.stopThreadBindingManager.mockReset();
Object.values(hoisted.logger).forEach((mock) => mock.mockReset());
});
@@ -225,4 +237,16 @@ describe("monitorMatrixProvider", () => {
"default",
);
});
it("cleans up thread bindings and shared clients when startup fails", async () => {
const { monitorMatrixProvider } = await import("./index.js");
hoisted.startClientError = new Error("start failed");
await expect(monitorMatrixProvider()).rejects.toThrow("start failed");
expect(hoisted.stopThreadBindingManager).toHaveBeenCalledTimes(1);
expect(hoisted.stopSharedClientForAccount).toHaveBeenCalledTimes(1);
expect(hoisted.setActiveMatrixClient).toHaveBeenNthCalledWith(1, hoisted.client, "default");
expect(hoisted.setActiveMatrixClient).toHaveBeenNthCalledWith(2, null, "default");
});
});

View File

@@ -129,6 +129,20 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
accountId: auth.accountId,
});
setActiveMatrixClient(client, auth.accountId);
let cleanedUp = false;
let threadBindingManager: { accountId: string; stop: () => void } | null = null;
const cleanup = () => {
if (cleanedUp) {
return;
}
cleanedUp = true;
try {
threadBindingManager?.stop();
} finally {
stopSharedClientForAccount(auth);
setActiveMatrixClient(null, auth.accountId);
}
};
const mentionRegexes = core.channel.mentions.buildMentionRegexes(cfg);
const defaultGroupPolicy = resolveDefaultGroupPolicy(cfg);
@@ -200,74 +214,74 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
needsRoomAliasesForConfig,
});
const threadBindingManager = await createMatrixThreadBindingManager({
accountId: account.accountId,
auth,
client,
env: process.env,
idleTimeoutMs: threadBindingIdleTimeoutMs,
maxAgeMs: threadBindingMaxAgeMs,
logVerboseMessage,
});
logVerboseMessage(
`matrix: thread bindings ready account=${threadBindingManager.accountId} idleMs=${threadBindingIdleTimeoutMs} maxAgeMs=${threadBindingMaxAgeMs}`,
);
try {
threadBindingManager = await createMatrixThreadBindingManager({
accountId: account.accountId,
auth,
client,
env: process.env,
idleTimeoutMs: threadBindingIdleTimeoutMs,
maxAgeMs: threadBindingMaxAgeMs,
logVerboseMessage,
});
logVerboseMessage(
`matrix: thread bindings ready account=${threadBindingManager.accountId} idleMs=${threadBindingIdleTimeoutMs} maxAgeMs=${threadBindingMaxAgeMs}`,
);
registerMatrixMonitorEvents({
cfg,
client,
auth,
directTracker,
logVerboseMessage,
warnedEncryptedRooms,
warnedCryptoMissingRooms,
logger,
formatNativeDependencyHint: core.system.formatNativeDependencyHint,
onRoomMessage: handleRoomMessage,
});
registerMatrixMonitorEvents({
cfg,
client,
auth,
directTracker,
logVerboseMessage,
warnedEncryptedRooms,
warnedCryptoMissingRooms,
logger,
formatNativeDependencyHint: core.system.formatNativeDependencyHint,
onRoomMessage: handleRoomMessage,
});
// Register Matrix thread bindings before the client starts syncing so threaded
// commands during startup never observe Matrix as "unavailable".
logVerboseMessage("matrix: starting client");
await resolveSharedMatrixClient({
cfg,
auth: authWithLimit,
accountId: auth.accountId,
});
logVerboseMessage("matrix: client started");
// Register Matrix thread bindings before the client starts syncing so threaded
// commands during startup never observe Matrix as "unavailable".
logVerboseMessage("matrix: starting client");
await resolveSharedMatrixClient({
cfg,
auth: authWithLimit,
accountId: auth.accountId,
});
logVerboseMessage("matrix: client started");
// Shared client is already started via resolveSharedMatrixClient.
logger.info(`matrix: logged in as ${auth.userId}`);
// Shared client is already started via resolveSharedMatrixClient.
logger.info(`matrix: logged in as ${auth.userId}`);
await runMatrixStartupMaintenance({
client,
auth,
accountId: account.accountId,
effectiveAccountId,
accountConfig,
logger,
logVerboseMessage,
loadConfig: () => core.config.loadConfig() as CoreConfig,
writeConfigFile: async (nextCfg) => await core.config.writeConfigFile(nextCfg),
loadWebMedia: async (url, maxBytes) => await core.media.loadWebMedia(url, maxBytes),
env: process.env,
});
await runMatrixStartupMaintenance({
client,
auth,
accountId: account.accountId,
effectiveAccountId,
accountConfig,
logger,
logVerboseMessage,
loadConfig: () => core.config.loadConfig() as CoreConfig,
writeConfigFile: async (nextCfg) => await core.config.writeConfigFile(nextCfg),
loadWebMedia: async (url, maxBytes) => await core.media.loadWebMedia(url, maxBytes),
env: process.env,
});
await new Promise<void>((resolve) => {
const onAbort = () => {
try {
threadBindingManager.stop();
await new Promise<void>((resolve) => {
const onAbort = () => {
logVerboseMessage("matrix: stopping client");
stopSharedClientForAccount(auth);
} finally {
setActiveMatrixClient(null, auth.accountId);
cleanup();
resolve();
};
if (opts.abortSignal?.aborted) {
onAbort();
return;
}
};
if (opts.abortSignal?.aborted) {
onAbort();
return;
}
opts.abortSignal?.addEventListener("abort", onAbort, { once: true });
});
opts.abortSignal?.addEventListener("abort", onAbort, { once: true });
});
} catch (err) {
cleanup();
throw err;
}
}