Matrix: harden thread binding persistence

This commit is contained in:
Gustavo Madeira Santana
2026-03-09 04:50:02 -04:00
parent de4d36dc22
commit 61260f0b73
2 changed files with 188 additions and 50 deletions

View File

@@ -8,9 +8,12 @@ import {
__testing, __testing,
} from "../../../../src/infra/outbound/session-binding-service.js"; } from "../../../../src/infra/outbound/session-binding-service.js";
import { setMatrixRuntime } from "../runtime.js"; import { setMatrixRuntime } from "../runtime.js";
import { resolveMatrixStoragePaths } from "./client/storage.js";
import { import {
createMatrixThreadBindingManager, createMatrixThreadBindingManager,
resetMatrixThreadBindingsForTests, resetMatrixThreadBindingsForTests,
setMatrixThreadBindingIdleTimeoutBySessionKey,
setMatrixThreadBindingMaxAgeBySessionKey,
} from "./thread-bindings.js"; } from "./thread-bindings.js";
const sendMessageMatrixMock = vi.hoisted(() => const sendMessageMatrixMock = vi.hoisted(() =>
@@ -30,6 +33,12 @@ vi.mock("./send.js", async () => {
describe("matrix thread bindings", () => { describe("matrix thread bindings", () => {
let stateDir: string; let stateDir: string;
const auth = {
accountId: "ops",
homeserver: "https://matrix.example.org",
userId: "@bot:example.org",
accessToken: "token",
} as const;
beforeEach(async () => { beforeEach(async () => {
stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "matrix-thread-bindings-")); stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "matrix-thread-bindings-"));
@@ -46,12 +55,7 @@ describe("matrix thread bindings", () => {
it("creates child Matrix thread bindings from a top-level room context", async () => { it("creates child Matrix thread bindings from a top-level room context", async () => {
await createMatrixThreadBindingManager({ await createMatrixThreadBindingManager({
accountId: "ops", accountId: "ops",
auth: { auth,
accountId: "ops",
homeserver: "https://matrix.example.org",
userId: "@bot:example.org",
accessToken: "token",
},
client: {} as never, client: {} as never,
idleTimeoutMs: 24 * 60 * 60 * 1000, idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0, maxAgeMs: 0,
@@ -87,12 +91,7 @@ describe("matrix thread bindings", () => {
it("posts intro messages inside existing Matrix threads for current placement", async () => { it("posts intro messages inside existing Matrix threads for current placement", async () => {
await createMatrixThreadBindingManager({ await createMatrixThreadBindingManager({
accountId: "ops", accountId: "ops",
auth: { auth,
accountId: "ops",
homeserver: "https://matrix.example.org",
userId: "@bot:example.org",
accessToken: "token",
},
client: {} as never, client: {} as never,
idleTimeoutMs: 24 * 60 * 60 * 1000, idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0, maxAgeMs: 0,
@@ -138,12 +137,7 @@ describe("matrix thread bindings", () => {
try { try {
await createMatrixThreadBindingManager({ await createMatrixThreadBindingManager({
accountId: "ops", accountId: "ops",
auth: { auth,
accountId: "ops",
homeserver: "https://matrix.example.org",
userId: "@bot:example.org",
accessToken: "token",
},
client: {} as never, client: {} as never,
idleTimeoutMs: 1_000, idleTimeoutMs: 1_000,
maxAgeMs: 0, maxAgeMs: 0,
@@ -184,12 +178,7 @@ describe("matrix thread bindings", () => {
it("sends threaded farewell messages when bindings are unbound", async () => { it("sends threaded farewell messages when bindings are unbound", async () => {
await createMatrixThreadBindingManager({ await createMatrixThreadBindingManager({
accountId: "ops", accountId: "ops",
auth: { auth,
accountId: "ops",
homeserver: "https://matrix.example.org",
userId: "@bot:example.org",
accessToken: "token",
},
client: {} as never, client: {} as never,
idleTimeoutMs: 1_000, idleTimeoutMs: 1_000,
maxAgeMs: 0, maxAgeMs: 0,
@@ -226,4 +215,110 @@ describe("matrix thread bindings", () => {
}), }),
); );
}); });
it("updates lifecycle windows by session key and refreshes activity", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-03-06T10:00:00.000Z"));
try {
const manager = await createMatrixThreadBindingManager({
accountId: "ops",
auth,
client: {} as never,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
enableSweeper: false,
});
await getSessionBindingService().bind({
targetSessionKey: "agent:ops:subagent:child",
targetKind: "subagent",
conversation: {
channel: "matrix",
accountId: "ops",
conversationId: "$thread",
parentConversationId: "!room:example",
},
placement: "current",
});
const original = manager.listBySessionKey("agent:ops:subagent:child")[0];
expect(original).toBeDefined();
const idleUpdated = setMatrixThreadBindingIdleTimeoutBySessionKey({
accountId: "ops",
targetSessionKey: "agent:ops:subagent:child",
idleTimeoutMs: 2 * 60 * 60 * 1000,
});
vi.setSystemTime(new Date("2026-03-06T12:00:00.000Z"));
const maxAgeUpdated = setMatrixThreadBindingMaxAgeBySessionKey({
accountId: "ops",
targetSessionKey: "agent:ops:subagent:child",
maxAgeMs: 6 * 60 * 60 * 1000,
});
expect(idleUpdated).toHaveLength(1);
expect(idleUpdated[0]?.metadata?.idleTimeoutMs).toBe(2 * 60 * 60 * 1000);
expect(maxAgeUpdated).toHaveLength(1);
expect(maxAgeUpdated[0]?.metadata?.maxAgeMs).toBe(6 * 60 * 60 * 1000);
expect(maxAgeUpdated[0]?.boundAt).toBe(original?.boundAt);
expect(maxAgeUpdated[0]?.metadata?.lastActivityAt).toBe(
Date.parse("2026-03-06T12:00:00.000Z"),
);
expect(manager.listBySessionKey("agent:ops:subagent:child")[0]?.maxAgeMs).toBe(
6 * 60 * 60 * 1000,
);
expect(manager.listBySessionKey("agent:ops:subagent:child")[0]?.lastActivityAt).toBe(
Date.parse("2026-03-06T12:00:00.000Z"),
);
} finally {
vi.useRealTimers();
}
});
it("flushes pending touch persistence on stop", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-03-06T10:00:00.000Z"));
try {
const manager = await createMatrixThreadBindingManager({
accountId: "ops",
auth,
client: {} as never,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
enableSweeper: false,
});
const binding = await getSessionBindingService().bind({
targetSessionKey: "agent:ops:subagent:child",
targetKind: "subagent",
conversation: {
channel: "matrix",
accountId: "ops",
conversationId: "$thread",
parentConversationId: "!room:example",
},
placement: "current",
});
const touchedAt = Date.parse("2026-03-06T12:00:00.000Z");
getSessionBindingService().touch(binding.bindingId, touchedAt);
manager.stop();
vi.useRealTimers();
const bindingsPath = path.join(
resolveMatrixStoragePaths({
...auth,
env: process.env,
}).rootDir,
"thread-bindings.json",
);
await vi.waitFor(async () => {
const raw = await fs.readFile(bindingsPath, "utf-8");
const parsed = JSON.parse(raw) as {
bindings?: Array<{ lastActivityAt?: number }>;
};
expect(parsed.bindings?.[0]?.lastActivityAt).toBe(touchedAt);
});
} finally {
vi.useRealTimers();
}
});
}); });

View File

@@ -232,14 +232,27 @@ async function loadBindingsFromDisk(filePath: string, accountId: string) {
return loaded; return loaded;
} }
async function persistBindings(filePath: string, accountId: string): Promise<void> { function toStoredBindingsState(
const bindings = [...BINDINGS_BY_ACCOUNT_CONVERSATION.values()] bindings: MatrixThreadBindingRecord[],
.filter((entry) => entry.accountId === accountId) ): StoredMatrixThreadBindingState {
.sort((a, b) => a.boundAt - b.boundAt); return {
await writeJsonFileAtomically(filePath, {
version: STORE_VERSION, version: STORE_VERSION,
bindings, bindings: [...bindings].sort((a, b) => a.boundAt - b.boundAt),
} satisfies StoredMatrixThreadBindingState); };
}
async function persistBindingsSnapshot(
filePath: string,
bindings: MatrixThreadBindingRecord[],
): Promise<void> {
await writeJsonFileAtomically(filePath, toStoredBindingsState(bindings));
}
async function persistBindings(filePath: string, accountId: string): Promise<void> {
await persistBindingsSnapshot(
filePath,
[...BINDINGS_BY_ACCOUNT_CONVERSATION.values()].filter((entry) => entry.accountId === accountId),
);
} }
function setBindingRecord(record: MatrixThreadBindingRecord): void { function setBindingRecord(record: MatrixThreadBindingRecord): void {
@@ -360,6 +373,16 @@ export async function createMatrixThreadBindingManager(params: {
} }
const persist = async () => await persistBindings(filePath, params.accountId); const persist = async () => await persistBindings(filePath, params.accountId);
const persistSafely = (reason: string, bindings?: MatrixThreadBindingRecord[]) => {
void persistBindingsSnapshot(
filePath,
bindings ?? listBindingsForAccount(params.accountId),
).catch((err) => {
params.logVerboseMessage?.(
`matrix: failed persisting thread bindings account=${params.accountId} action=${reason}: ${String(err)}`,
);
});
};
const defaults = { const defaults = {
idleTimeoutMs: params.idleTimeoutMs, idleTimeoutMs: params.idleTimeoutMs,
maxAgeMs: params.maxAgeMs, maxAgeMs: params.maxAgeMs,
@@ -371,10 +394,32 @@ export async function createMatrixThreadBindingManager(params: {
} }
persistTimer = setTimeout(() => { persistTimer = setTimeout(() => {
persistTimer = null; persistTimer = null;
void persist(); persistSafely("delayed-touch");
}, delayMs); }, delayMs);
persistTimer.unref?.(); persistTimer.unref?.();
}; };
const updateBindingsBySessionKey = (input: {
targetSessionKey: string;
update: (entry: MatrixThreadBindingRecord, now: number) => MatrixThreadBindingRecord;
persistReason: string;
}): MatrixThreadBindingRecord[] => {
const targetSessionKey = input.targetSessionKey.trim();
if (!targetSessionKey) {
return [];
}
const now = Date.now();
const nextBindings = listBindingsForAccount(params.accountId)
.filter((entry) => entry.targetSessionKey === targetSessionKey)
.map((entry) => input.update(entry, now));
if (nextBindings.length === 0) {
return [];
}
for (const entry of nextBindings) {
setBindingRecord(entry);
}
persistSafely(input.persistReason);
return nextBindings;
};
const manager: MatrixThreadBindingManager = { const manager: MatrixThreadBindingManager = {
accountId: params.accountId, accountId: params.accountId,
@@ -414,30 +459,26 @@ export async function createMatrixThreadBindingManager(params: {
return nextRecord; return nextRecord;
}, },
setIdleTimeoutBySessionKey: ({ targetSessionKey, idleTimeoutMs }) => { setIdleTimeoutBySessionKey: ({ targetSessionKey, idleTimeoutMs }) => {
const nextBindings = listBindingsForAccount(params.accountId) return updateBindingsBySessionKey({
.filter((entry) => entry.targetSessionKey === targetSessionKey.trim()) targetSessionKey,
.map((entry) => ({ persistReason: "idle-timeout-update",
update: (entry, now) => ({
...entry, ...entry,
idleTimeoutMs: Math.max(0, Math.floor(idleTimeoutMs)), idleTimeoutMs: Math.max(0, Math.floor(idleTimeoutMs)),
})); lastActivityAt: now,
for (const entry of nextBindings) { }),
setBindingRecord(entry); });
}
void persist();
return nextBindings;
}, },
setMaxAgeBySessionKey: ({ targetSessionKey, maxAgeMs }) => { setMaxAgeBySessionKey: ({ targetSessionKey, maxAgeMs }) => {
const nextBindings = listBindingsForAccount(params.accountId) return updateBindingsBySessionKey({
.filter((entry) => entry.targetSessionKey === targetSessionKey.trim()) targetSessionKey,
.map((entry) => ({ persistReason: "max-age-update",
update: (entry, now) => ({
...entry, ...entry,
maxAgeMs: Math.max(0, Math.floor(maxAgeMs)), maxAgeMs: Math.max(0, Math.floor(maxAgeMs)),
})); lastActivityAt: now,
for (const entry of nextBindings) { }),
setBindingRecord(entry); });
}
void persist();
return nextBindings;
}, },
stop: () => { stop: () => {
if (sweepTimer) { if (sweepTimer) {
@@ -446,6 +487,7 @@ export async function createMatrixThreadBindingManager(params: {
if (persistTimer) { if (persistTimer) {
clearTimeout(persistTimer); clearTimeout(persistTimer);
persistTimer = null; persistTimer = null;
persistSafely("shutdown-flush");
} }
unregisterSessionBindingAdapter({ unregisterSessionBindingAdapter({
channel: "matrix", channel: "matrix",
@@ -631,6 +673,7 @@ export async function createMatrixThreadBindingManager(params: {
}), }),
); );
}, THREAD_BINDINGS_SWEEP_INTERVAL_MS); }, THREAD_BINDINGS_SWEEP_INTERVAL_MS);
sweepTimer.unref?.();
} }
MANAGERS_BY_ACCOUNT_ID.set(params.accountId, manager); MANAGERS_BY_ACCOUNT_ID.set(params.accountId, manager);