Matrix: reuse shared clients for encrypted actions

This commit is contained in:
Gustavo Madeira Santana
2026-03-12 20:01:54 +00:00
parent f35fcb89b4
commit 9d17de6bdb
7 changed files with 189 additions and 148 deletions

View File

@@ -11,9 +11,8 @@ const {
loadConfigMock,
getMatrixRuntimeMock,
getActiveMatrixClientMock,
createMatrixClientMock,
resolveSharedMatrixClientMock,
isBunRuntimeMock,
resolveMatrixAuthMock,
resolveMatrixAuthContextMock,
} = matrixClientResolverMocks;
@@ -26,9 +25,8 @@ vi.mock("../active-client.js", () => ({
}));
vi.mock("../client.js", () => ({
createMatrixClient: createMatrixClientMock,
resolveSharedMatrixClient: resolveSharedMatrixClientMock,
isBunRuntime: () => isBunRuntimeMock(),
resolveMatrixAuth: resolveMatrixAuthMock,
resolveMatrixAuthContext: resolveMatrixAuthContextMock,
}));
@@ -56,42 +54,41 @@ describe("action client helpers", () => {
vi.unstubAllEnvs();
});
it("creates a one-off client even when OPENCLAW_GATEWAY_PORT is set", async () => {
it("reuses the shared client pool when no active monitor client is registered", async () => {
vi.stubEnv("OPENCLAW_GATEWAY_PORT", "18799");
const result = await withResolvedActionClient({ accountId: "default" }, async () => "ok");
expect(getActiveMatrixClientMock).toHaveBeenCalledWith("default");
expect(resolveMatrixAuthMock).toHaveBeenCalledTimes(1);
expect(createMatrixClientMock).toHaveBeenCalledTimes(1);
expect(createMatrixClientMock).toHaveBeenCalledWith(
expect.objectContaining({
autoBootstrapCrypto: false,
}),
);
const oneOffClient = await createMatrixClientMock.mock.results[0]?.value;
expect(oneOffClient.prepareForOneOff).toHaveBeenCalledTimes(1);
expect(oneOffClient.stop).toHaveBeenCalledTimes(1);
expect(resolveSharedMatrixClientMock).toHaveBeenCalledTimes(1);
expect(resolveSharedMatrixClientMock).toHaveBeenCalledWith({
cfg: {},
timeoutMs: undefined,
accountId: "default",
});
const sharedClient = await resolveSharedMatrixClientMock.mock.results[0]?.value;
expect(sharedClient.prepareForOneOff).toHaveBeenCalledTimes(1);
expect(sharedClient.stop).not.toHaveBeenCalled();
expect(result).toBe("ok");
});
it("skips one-off room preparation when readiness is disabled", async () => {
await withResolvedActionClient({ accountId: "default", readiness: "none" }, async () => {});
const oneOffClient = await createMatrixClientMock.mock.results[0]?.value;
expect(oneOffClient.prepareForOneOff).not.toHaveBeenCalled();
expect(oneOffClient.start).not.toHaveBeenCalled();
expect(oneOffClient.stop).toHaveBeenCalledTimes(1);
const sharedClient = await resolveSharedMatrixClientMock.mock.results[0]?.value;
expect(sharedClient.prepareForOneOff).not.toHaveBeenCalled();
expect(sharedClient.start).not.toHaveBeenCalled();
expect(sharedClient.stop).not.toHaveBeenCalled();
});
it("starts one-off clients when started readiness is required", async () => {
await withStartedActionClient({ accountId: "default" }, async () => {});
const oneOffClient = await createMatrixClientMock.mock.results[0]?.value;
expect(oneOffClient.start).toHaveBeenCalledTimes(1);
expect(oneOffClient.prepareForOneOff).not.toHaveBeenCalled();
expect(oneOffClient.stop).not.toHaveBeenCalled();
expect(oneOffClient.stopAndPersist).toHaveBeenCalledTimes(1);
const sharedClient = await resolveSharedMatrixClientMock.mock.results[0]?.value;
expect(sharedClient.start).toHaveBeenCalledTimes(1);
expect(sharedClient.prepareForOneOff).not.toHaveBeenCalled();
expect(sharedClient.stop).not.toHaveBeenCalled();
expect(sharedClient.stopAndPersist).not.toHaveBeenCalled();
});
it("reuses active monitor client when available", async () => {
@@ -104,8 +101,7 @@ describe("action client helpers", () => {
});
expect(result).toBe("ok");
expect(resolveMatrixAuthMock).not.toHaveBeenCalled();
expect(createMatrixClientMock).not.toHaveBeenCalled();
expect(resolveSharedMatrixClientMock).not.toHaveBeenCalled();
expect(activeClient.stop).not.toHaveBeenCalled();
});
@@ -149,30 +145,14 @@ describe("action client helpers", () => {
encryption: true,
},
});
resolveMatrixAuthMock.mockResolvedValue({
accountId: "ops",
homeserver: "https://ops.example.org",
userId: "@ops:example.org",
accessToken: "ops-token",
password: undefined,
deviceId: "OPSDEVICE",
encryption: true,
});
await withResolvedActionClient({}, async () => {});
expect(getActiveMatrixClientMock).toHaveBeenCalledWith("ops");
expect(resolveMatrixAuthMock).toHaveBeenCalledWith(
expect.objectContaining({
accountId: "ops",
}),
);
expect(createMatrixClientMock).toHaveBeenCalledWith(
expect.objectContaining({
accountId: "ops",
homeserver: "https://ops.example.org",
}),
);
expect(resolveSharedMatrixClientMock).toHaveBeenCalledWith({
cfg: loadConfigMock(),
timeoutMs: undefined,
accountId: "ops",
});
});
it("uses explicit cfg instead of loading runtime config", async () => {
@@ -191,29 +171,30 @@ describe("action client helpers", () => {
cfg: explicitCfg,
accountId: "ops",
});
expect(resolveMatrixAuthMock).toHaveBeenCalledWith({
expect(resolveSharedMatrixClientMock).toHaveBeenCalledWith({
cfg: explicitCfg,
timeoutMs: undefined,
accountId: "ops",
});
});
it("stops one-off action clients after wrapped calls succeed", async () => {
const oneOffClient = createMockMatrixClient();
createMatrixClientMock.mockResolvedValue(oneOffClient);
it("does not stop shared action clients after wrapped calls succeed", async () => {
const sharedClient = createMockMatrixClient();
resolveSharedMatrixClientMock.mockResolvedValue(sharedClient);
const result = await withResolvedActionClient({ accountId: "default" }, async (client) => {
expect(client).toBe(oneOffClient);
expect(client).toBe(sharedClient);
return "ok";
});
expect(result).toBe("ok");
expect(oneOffClient.stop).toHaveBeenCalledTimes(1);
expect(oneOffClient.stopAndPersist).not.toHaveBeenCalled();
expect(sharedClient.stop).not.toHaveBeenCalled();
expect(sharedClient.stopAndPersist).not.toHaveBeenCalled();
});
it("still stops one-off action clients when the wrapped call throws", async () => {
const oneOffClient = createMockMatrixClient();
createMatrixClientMock.mockResolvedValue(oneOffClient);
it("keeps shared action clients alive when the wrapped call throws", async () => {
const sharedClient = createMockMatrixClient();
resolveSharedMatrixClientMock.mockResolvedValue(sharedClient);
await expect(
withResolvedActionClient({ accountId: "default" }, async () => {
@@ -221,26 +202,26 @@ describe("action client helpers", () => {
}),
).rejects.toThrow("boom");
expect(oneOffClient.stop).toHaveBeenCalledTimes(1);
expect(oneOffClient.stopAndPersist).not.toHaveBeenCalled();
expect(sharedClient.stop).not.toHaveBeenCalled();
expect(sharedClient.stopAndPersist).not.toHaveBeenCalled();
});
it("resolves room ids before running wrapped room actions", async () => {
const oneOffClient = createMockMatrixClient();
createMatrixClientMock.mockResolvedValue(oneOffClient);
const sharedClient = createMockMatrixClient();
resolveSharedMatrixClientMock.mockResolvedValue(sharedClient);
resolveMatrixRoomIdMock.mockResolvedValue("!room:example.org");
const result = await withResolvedRoomAction(
"room:#ops:example.org",
{ accountId: "default" },
async (client, resolvedRoom) => {
expect(client).toBe(oneOffClient);
expect(client).toBe(sharedClient);
return resolvedRoom;
},
);
expect(resolveMatrixRoomIdMock).toHaveBeenCalledWith(oneOffClient, "room:#ops:example.org");
expect(resolveMatrixRoomIdMock).toHaveBeenCalledWith(sharedClient, "room:#ops:example.org");
expect(result).toBe("!room:example.org");
expect(oneOffClient.stop).toHaveBeenCalledTimes(1);
expect(sharedClient.stop).not.toHaveBeenCalled();
});
});

View File

@@ -1,12 +1,7 @@
import { getMatrixRuntime } from "../runtime.js";
import type { CoreConfig } from "../types.js";
import { getActiveMatrixClient } from "./active-client.js";
import {
createMatrixClient,
isBunRuntime,
resolveMatrixAuth,
resolveMatrixAuthContext,
} from "./client.js";
import { isBunRuntime, resolveMatrixAuthContext, resolveSharedMatrixClient } from "./client.js";
import type { MatrixClient } from "./sdk.js";
type ResolvedRuntimeMatrixClient = {
@@ -19,19 +14,19 @@ type ResolvedRuntimeMatrixClientStopMode = "stop" | "persist";
type MatrixResolvedClientHook = (
client: MatrixClient,
context: { createdForOneOff: boolean },
context: { preparedByDefault: boolean },
) => Promise<void> | void;
async function ensureResolvedClientReadiness(params: {
client: MatrixClient;
readiness?: MatrixRuntimeClientReadiness;
createdForOneOff: boolean;
preparedByDefault: boolean;
}): Promise<void> {
if (params.readiness === "started") {
await params.client.start();
return;
}
if (params.readiness === "prepared" || (!params.readiness && params.createdForOneOff)) {
if (params.readiness === "prepared" || (!params.readiness && params.preparedByDefault)) {
await params.client.prepareForOneOff();
}
}
@@ -51,7 +46,7 @@ async function resolveRuntimeMatrixClient(opts: {
}): Promise<ResolvedRuntimeMatrixClient> {
ensureMatrixNodeRuntime();
if (opts.client) {
await opts.onResolved?.(opts.client, { createdForOneOff: false });
await opts.onResolved?.(opts.client, { preparedByDefault: false });
return { client: opts.client, stopOnDone: false };
}
@@ -62,27 +57,17 @@ async function resolveRuntimeMatrixClient(opts: {
});
const active = getActiveMatrixClient(authContext.accountId);
if (active) {
await opts.onResolved?.(active, { createdForOneOff: false });
await opts.onResolved?.(active, { preparedByDefault: false });
return { client: active, stopOnDone: false };
}
const auth = await resolveMatrixAuth({
const client = await resolveSharedMatrixClient({
cfg,
timeoutMs: opts.timeoutMs,
accountId: authContext.accountId,
});
const client = await createMatrixClient({
homeserver: auth.homeserver,
userId: auth.userId,
accessToken: auth.accessToken,
password: auth.password,
deviceId: auth.deviceId,
encryption: auth.encryption,
localTimeoutMs: opts.timeoutMs,
accountId: auth.accountId,
autoBootstrapCrypto: false,
});
await opts.onResolved?.(client, { createdForOneOff: true });
return { client, stopOnDone: true };
await opts.onResolved?.(client, { preparedByDefault: true });
return { client, stopOnDone: false };
}
export async function resolveRuntimeMatrixClientWithReadiness(opts: {
@@ -101,7 +86,7 @@ export async function resolveRuntimeMatrixClientWithReadiness(opts: {
await ensureResolvedClientReadiness({
client,
readiness: opts.readiness,
createdForOneOff: context.createdForOneOff,
preparedByDefault: context.preparedByDefault,
});
},
});

View File

@@ -5,9 +5,8 @@ type MatrixClientResolverMocks = {
loadConfigMock: Mock<() => unknown>;
getMatrixRuntimeMock: Mock<() => unknown>;
getActiveMatrixClientMock: Mock<(...args: unknown[]) => MatrixClient | null>;
createMatrixClientMock: Mock<(...args: unknown[]) => Promise<MatrixClient>>;
resolveSharedMatrixClientMock: Mock<(...args: unknown[]) => Promise<MatrixClient>>;
isBunRuntimeMock: Mock<() => boolean>;
resolveMatrixAuthMock: Mock<(...args: unknown[]) => Promise<unknown>>;
resolveMatrixAuthContextMock: Mock<
(params: { cfg: unknown; accountId?: string | null }) => unknown
>;
@@ -17,9 +16,8 @@ export const matrixClientResolverMocks: MatrixClientResolverMocks = {
loadConfigMock: vi.fn(() => ({})),
getMatrixRuntimeMock: vi.fn(),
getActiveMatrixClientMock: vi.fn(),
createMatrixClientMock: vi.fn(),
resolveSharedMatrixClientMock: vi.fn(),
isBunRuntimeMock: vi.fn(() => false),
resolveMatrixAuthMock: vi.fn(),
resolveMatrixAuthContextMock: vi.fn(),
};
@@ -43,9 +41,8 @@ export function primeMatrixClientResolverMocks(params?: {
loadConfigMock,
getMatrixRuntimeMock,
getActiveMatrixClientMock,
createMatrixClientMock,
resolveSharedMatrixClientMock,
isBunRuntimeMock,
resolveMatrixAuthMock,
resolveMatrixAuthContextMock,
} = matrixClientResolverMocks;
@@ -59,15 +56,6 @@ export function primeMatrixClientResolverMocks(params?: {
deviceId: "DEVICE123",
encryption: false,
};
const defaultAuth = {
accountId,
homeserver: "https://matrix.example.org",
userId: "@bot:example.org",
accessToken: "token",
password: undefined,
deviceId: "DEVICE123",
encryption: false,
};
const client = params?.client ?? createMockMatrixClient();
vi.clearAllMocks();
@@ -96,11 +84,7 @@ export function primeMatrixClientResolverMocks(params?: {
},
}),
);
resolveMatrixAuthMock.mockResolvedValue({
...defaultAuth,
...params?.auth,
});
createMatrixClientMock.mockResolvedValue(client);
resolveSharedMatrixClientMock.mockResolvedValue(client);
return client;
}

View File

@@ -7,6 +7,7 @@ import type { MatrixRawEvent } from "./types.js";
import { EventType } from "./types.js";
type RoomEventListener = (roomId: string, event: MatrixRawEvent) => void;
type FailedDecryptListener = (roomId: string, event: MatrixRawEvent, error: Error) => Promise<void>;
function getSentNoticeBody(sendMessage: ReturnType<typeof vi.fn>, index = 0): string {
const calls = sendMessage.mock.calls as unknown[][];
@@ -92,6 +93,9 @@ function createHarness(params?: {
logger,
formatNativeDependencyHint,
roomMessageListener: listeners.get("room.message") as RoomEventListener | undefined,
failedDecryptListener: listeners.get("room.failed_decryption") as
| FailedDecryptListener
| undefined,
};
}
@@ -399,4 +403,79 @@ describe("registerMatrixMonitorEvents verification routing", () => {
{ roomId: "!room:example.org" },
);
});
it("adds self-device guidance when decrypt failures come from the same Matrix user", async () => {
const { logger, failedDecryptListener } = createHarness({
accountId: "ops",
selfUserId: "@gumadeiras:matrix.example.org",
});
if (!failedDecryptListener) {
throw new Error("room.failed_decryption listener was not registered");
}
await failedDecryptListener(
"!room:example.org",
{
event_id: "$enc-self",
sender: "@gumadeiras:matrix.example.org",
type: EventType.RoomMessageEncrypted,
origin_server_ts: Date.now(),
content: {},
},
new Error("The sender's device has not sent us the keys for this message."),
);
expect(logger.warn).toHaveBeenNthCalledWith(
1,
"Failed to decrypt message",
expect.objectContaining({
roomId: "!room:example.org",
eventId: "$enc-self",
sender: "@gumadeiras:matrix.example.org",
senderMatchesOwnUser: true,
}),
);
expect(logger.warn).toHaveBeenNthCalledWith(
2,
"matrix: failed to decrypt a message from this same Matrix user. This usually means another Matrix device did not share the room key, or another OpenClaw runtime is using the same account. Check 'openclaw matrix verify status --verbose --account ops' and 'openclaw matrix devices list --account ops'.",
{
roomId: "!room:example.org",
eventId: "$enc-self",
sender: "@gumadeiras:matrix.example.org",
},
);
});
it("does not add self-device guidance for decrypt failures from another sender", async () => {
const { logger, failedDecryptListener } = createHarness({
accountId: "ops",
selfUserId: "@gumadeiras:matrix.example.org",
});
if (!failedDecryptListener) {
throw new Error("room.failed_decryption listener was not registered");
}
await failedDecryptListener(
"!room:example.org",
{
event_id: "$enc-other",
sender: "@alice:matrix.example.org",
type: EventType.RoomMessageEncrypted,
origin_server_ts: Date.now(),
content: {},
},
new Error("The sender's device has not sent us the keys for this message."),
);
expect(logger.warn).toHaveBeenCalledTimes(1);
expect(logger.warn).toHaveBeenCalledWith(
"Failed to decrypt message",
expect.objectContaining({
roomId: "!room:example.org",
eventId: "$enc-other",
sender: "@alice:matrix.example.org",
senderMatchesOwnUser: false,
}),
);
});
});

View File

@@ -7,6 +7,14 @@ import type { MatrixRawEvent } from "./types.js";
import { EventType } from "./types.js";
import { createMatrixVerificationEventRouter } from "./verification-events.js";
function formatMatrixSelfDecryptionHint(accountId: string): string {
return (
"matrix: failed to decrypt a message from this same Matrix user. " +
"This usually means another Matrix device did not share the room key, or another OpenClaw runtime is using the same account. " +
`Check 'openclaw matrix verify status --verbose --account ${accountId}' and 'openclaw matrix devices list --account ${accountId}'.`
);
}
export function registerMatrixMonitorEvents(params: {
cfg: CoreConfig;
client: MatrixClient;
@@ -60,11 +68,24 @@ export function registerMatrixMonitorEvents(params: {
client.on(
"room.failed_decryption",
async (roomId: string, event: MatrixRawEvent, error: Error) => {
const selfUserId =
typeof client.getUserId === "function" ? ((await client.getUserId()) ?? null) : null;
const sender = typeof event.sender === "string" ? event.sender : null;
const senderMatchesOwnUser = Boolean(selfUserId && sender && selfUserId === sender);
logger.warn("Failed to decrypt message", {
roomId,
eventId: event.event_id,
sender,
senderMatchesOwnUser,
error: error.message,
});
if (senderMatchesOwnUser) {
logger.warn(formatMatrixSelfDecryptionHint(auth.accountId), {
roomId,
eventId: event.event_id,
sender,
});
}
logVerboseMessage(
`matrix: failed decrypt room=${roomId} id=${event.event_id ?? "unknown"} error=${error.message}`,
);

View File

@@ -54,6 +54,10 @@ const makeClient = () => {
getEvent,
uploadContent,
getUserId: vi.fn().mockResolvedValue("@bot:example.org"),
prepareForOneOff: vi.fn(async () => undefined),
start: vi.fn(async () => undefined),
stop: vi.fn(() => undefined),
stopAndPersist: vi.fn(async () => undefined),
} as unknown as import("./sdk.js").MatrixClient;
return { client, sendMessage, sendEvent, getEvent, uploadContent };
};
@@ -464,6 +468,10 @@ describe("sendTypingMatrix", () => {
const setTyping = vi.fn().mockResolvedValue(undefined);
const client = {
setTyping,
prepareForOneOff: vi.fn(async () => undefined),
start: vi.fn(async () => undefined),
stop: vi.fn(() => undefined),
stopAndPersist: vi.fn(async () => undefined),
} as unknown as import("./sdk.js").MatrixClient;
await sendTypingMatrix("room:!room:example", true, undefined, client);

View File

@@ -8,9 +8,8 @@ import {
const {
getMatrixRuntimeMock,
getActiveMatrixClientMock,
createMatrixClientMock,
resolveSharedMatrixClientMock,
isBunRuntimeMock,
resolveMatrixAuthMock,
resolveMatrixAuthContextMock,
} = matrixClientResolverMocks;
@@ -19,9 +18,8 @@ vi.mock("../active-client.js", () => ({
}));
vi.mock("../client.js", () => ({
createMatrixClient: (...args: unknown[]) => createMatrixClientMock(...args),
resolveSharedMatrixClient: (...args: unknown[]) => resolveSharedMatrixClientMock(...args),
isBunRuntime: () => isBunRuntimeMock(),
resolveMatrixAuth: (...args: unknown[]) => resolveMatrixAuthMock(...args),
resolveMatrixAuthContext: resolveMatrixAuthContextMock,
}));
@@ -45,22 +43,21 @@ describe("withResolvedMatrixClient", () => {
vi.unstubAllEnvs();
});
it("creates a one-off client even when OPENCLAW_GATEWAY_PORT is set", async () => {
it("reuses the shared client pool when no active monitor client is registered", async () => {
vi.stubEnv("OPENCLAW_GATEWAY_PORT", "18799");
const result = await withResolvedMatrixClient({ accountId: "default" }, async () => "ok");
expect(getActiveMatrixClientMock).toHaveBeenCalledWith("default");
expect(resolveMatrixAuthMock).toHaveBeenCalledTimes(1);
expect(createMatrixClientMock).toHaveBeenCalledTimes(1);
expect(createMatrixClientMock).toHaveBeenCalledWith(
expect.objectContaining({
autoBootstrapCrypto: false,
}),
);
const oneOffClient = await createMatrixClientMock.mock.results[0]?.value;
expect(oneOffClient.prepareForOneOff).toHaveBeenCalledTimes(1);
expect(oneOffClient.stop).toHaveBeenCalledTimes(1);
expect(resolveSharedMatrixClientMock).toHaveBeenCalledTimes(1);
expect(resolveSharedMatrixClientMock).toHaveBeenCalledWith({
cfg: {},
timeoutMs: undefined,
accountId: "default",
});
const sharedClient = await resolveSharedMatrixClientMock.mock.results[0]?.value;
expect(sharedClient.prepareForOneOff).toHaveBeenCalledTimes(1);
expect(sharedClient.stop).not.toHaveBeenCalled();
expect(result).toBe("ok");
});
@@ -74,8 +71,7 @@ describe("withResolvedMatrixClient", () => {
});
expect(result).toBe("ok");
expect(resolveMatrixAuthMock).not.toHaveBeenCalled();
expect(createMatrixClientMock).not.toHaveBeenCalled();
expect(resolveSharedMatrixClientMock).not.toHaveBeenCalled();
expect(activeClient.stop).not.toHaveBeenCalled();
});
@@ -86,28 +82,14 @@ describe("withResolvedMatrixClient", () => {
accountId: "ops",
resolved: {},
});
resolveMatrixAuthMock.mockResolvedValue({
accountId: "ops",
homeserver: "https://matrix.example.org",
userId: "@bot:example.org",
accessToken: "token",
password: undefined,
deviceId: "DEVICE123",
encryption: false,
});
await withResolvedMatrixClient({}, async () => {});
expect(getActiveMatrixClientMock).toHaveBeenCalledWith("ops");
expect(resolveMatrixAuthMock).toHaveBeenCalledWith({
expect(resolveSharedMatrixClientMock).toHaveBeenCalledWith({
cfg: {},
timeoutMs: undefined,
accountId: "ops",
});
expect(createMatrixClientMock).toHaveBeenCalledWith(
expect.objectContaining({
accountId: "ops",
}),
);
});
it("uses explicit cfg instead of loading runtime config", async () => {
@@ -126,15 +108,16 @@ describe("withResolvedMatrixClient", () => {
cfg: explicitCfg,
accountId: "ops",
});
expect(resolveMatrixAuthMock).toHaveBeenCalledWith({
expect(resolveSharedMatrixClientMock).toHaveBeenCalledWith({
cfg: explicitCfg,
timeoutMs: undefined,
accountId: "ops",
});
});
it("still stops one-off matrix clients when wrapped sends fail", async () => {
const oneOffClient = createMockMatrixClient();
createMatrixClientMock.mockResolvedValue(oneOffClient);
it("keeps shared matrix clients alive when wrapped sends fail", async () => {
const sharedClient = createMockMatrixClient();
resolveSharedMatrixClientMock.mockResolvedValue(sharedClient);
await expect(
withResolvedMatrixClient({ accountId: "default" }, async () => {
@@ -142,6 +125,6 @@ describe("withResolvedMatrixClient", () => {
}),
).rejects.toThrow("boom");
expect(oneOffClient.stop).toHaveBeenCalledTimes(1);
expect(sharedClient.stop).not.toHaveBeenCalled();
});
});