fix: harden thread-binding lifecycle persistence

This commit is contained in:
Onur Solmaz
2026-02-26 22:03:12 +01:00
parent 57dcd7ec08
commit 3e2118caa8
7 changed files with 258 additions and 6 deletions

View File

@@ -0,0 +1,48 @@
import { describe, expect, it } from "vitest";
import { validateConfigObjectRaw } from "./validation.js";
describe("thread binding config keys", () => {
it("rejects legacy session.threadBindings.ttlHours", () => {
const result = validateConfigObjectRaw({
session: {
threadBindings: {
ttlHours: 24,
},
},
});
expect(result.ok).toBe(false);
if (result.ok) {
return;
}
expect(result.issues).toContainEqual(
expect.objectContaining({
path: "session.threadBindings",
message: expect.stringContaining("ttlHours"),
}),
);
});
it("rejects legacy channels.discord.threadBindings.ttlHours", () => {
const result = validateConfigObjectRaw({
channels: {
discord: {
threadBindings: {
ttlHours: 24,
},
},
},
});
expect(result.ok).toBe(false);
if (result.ok) {
return;
}
expect(result.issues).toContainEqual(
expect.objectContaining({
path: "channels.discord.threadBindings",
message: expect.stringContaining("ttlHours"),
}),
);
});
});

View File

@@ -118,7 +118,7 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
}
if (ctx.threadBinding?.threadId) {
threadBindings.touchThread({ threadId: ctx.threadBinding.threadId, persist: false });
threadBindings.touchThread({ threadId: ctx.threadBinding.threadId });
}
const ackReaction = resolveAckReaction(cfg, route.agentId, {
channel: "discord",

View File

@@ -272,6 +272,6 @@ export async function deliverDiscordReply(params: {
}
if (binding && deliveredAny) {
params.threadBindings?.touchThread({ threadId: binding.threadId, persist: false });
params.threadBindings?.touchThread({ threadId: binding.threadId });
}
}

View File

@@ -392,6 +392,66 @@ describe("thread binding lifecycle", () => {
}
});
it("persists touched activity timestamps across restart when persistence is enabled", async () => {
vi.useFakeTimers();
const previousStateDir = process.env.OPENCLAW_STATE_DIR;
const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-thread-bindings-"));
process.env.OPENCLAW_STATE_DIR = stateDir;
try {
__testing.resetThreadBindingsForTests();
vi.setSystemTime(new Date("2026-02-20T00:00:00.000Z"));
const manager = createThreadBindingManager({
accountId: "default",
persist: true,
enableSweeper: false,
idleTimeoutMs: 60_000,
maxAgeMs: 0,
});
await manager.bindTarget({
threadId: "thread-1",
channelId: "parent-1",
targetKind: "subagent",
targetSessionKey: "agent:main:subagent:child",
agentId: "main",
webhookId: "wh-1",
webhookToken: "tok-1",
});
const touchedAt = new Date("2026-02-20T00:00:30.000Z").getTime();
vi.setSystemTime(touchedAt);
manager.touchThread({ threadId: "thread-1" });
__testing.resetThreadBindingsForTests();
const reloaded = createThreadBindingManager({
accountId: "default",
persist: true,
enableSweeper: false,
idleTimeoutMs: 60_000,
maxAgeMs: 0,
});
const record = reloaded.getByThreadId("thread-1");
expect(record).toBeDefined();
expect(record?.lastActivityAt).toBe(touchedAt);
expect(
resolveThreadBindingInactivityExpiresAt({
record: record!,
defaultIdleTimeoutMs: reloaded.getIdleTimeoutMs(),
}),
).toBe(new Date("2026-02-20T00:01:30.000Z").getTime());
} finally {
__testing.resetThreadBindingsForTests();
if (previousStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = previousStateDir;
}
fs.rmSync(stateDir, { recursive: true, force: true });
vi.useRealTimers();
}
});
it("reuses webhook credentials after unbind when rebinding in the same channel", async () => {
const manager = createThreadBindingManager({
accountId: "default",
@@ -746,6 +806,104 @@ describe("thread binding lifecycle", () => {
expect(manager.getByThreadId("thread-acp-uncertain")).toBeDefined();
});
it("migrates legacy expiresAt bindings to idle/max-age semantics", () => {
const previousStateDir = process.env.OPENCLAW_STATE_DIR;
const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-thread-bindings-"));
process.env.OPENCLAW_STATE_DIR = stateDir;
try {
__testing.resetThreadBindingsForTests();
const bindingsPath = __testing.resolveThreadBindingsPath();
fs.mkdirSync(path.dirname(bindingsPath), { recursive: true });
const boundAt = Date.now() - 10_000;
const expiresAt = boundAt + 60_000;
fs.writeFileSync(
bindingsPath,
JSON.stringify(
{
version: 1,
bindings: {
"thread-legacy-active": {
accountId: "default",
channelId: "parent-1",
threadId: "thread-legacy-active",
targetKind: "subagent",
targetSessionKey: "agent:main:subagent:legacy-active",
agentId: "main",
boundBy: "system",
boundAt,
expiresAt,
},
"thread-legacy-disabled": {
accountId: "default",
channelId: "parent-1",
threadId: "thread-legacy-disabled",
targetKind: "subagent",
targetSessionKey: "agent:main:subagent:legacy-disabled",
agentId: "main",
boundBy: "system",
boundAt,
expiresAt: 0,
},
},
},
null,
2,
),
"utf-8",
);
const manager = createThreadBindingManager({
accountId: "default",
persist: false,
enableSweeper: false,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
});
const active = manager.getByThreadId("thread-legacy-active");
expect(active).toBeDefined();
expect(active?.idleTimeoutMs).toBe(0);
expect(active?.maxAgeMs).toBe(expiresAt - boundAt);
expect(
resolveThreadBindingMaxAgeExpiresAt({
record: active!,
defaultMaxAgeMs: manager.getMaxAgeMs(),
}),
).toBe(expiresAt);
expect(
resolveThreadBindingInactivityExpiresAt({
record: active!,
defaultIdleTimeoutMs: manager.getIdleTimeoutMs(),
}),
).toBeUndefined();
const disabled = manager.getByThreadId("thread-legacy-disabled");
expect(disabled).toBeDefined();
expect(disabled?.idleTimeoutMs).toBe(0);
expect(disabled?.maxAgeMs).toBe(0);
expect(
resolveThreadBindingMaxAgeExpiresAt({
record: disabled!,
defaultMaxAgeMs: manager.getMaxAgeMs(),
}),
).toBeUndefined();
expect(
resolveThreadBindingInactivityExpiresAt({
record: disabled!,
defaultIdleTimeoutMs: manager.getIdleTimeoutMs(),
}),
).toBeUndefined();
} finally {
__testing.resetThreadBindingsForTests();
if (previousStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = previousStateDir;
}
fs.rmSync(stateDir, { recursive: true, force: true });
}
});
it("persists unbinds even when no manager is active", () => {
const previousStateDir = process.env.OPENCLAW_STATE_DIR;
const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-thread-bindings-"));

View File

@@ -44,6 +44,7 @@ import {
resolveThreadBindingsPath,
saveBindingsToDisk,
setBindingRecord,
THREAD_BINDING_TOUCH_PERSIST_MIN_INTERVAL_MS,
shouldDefaultPersist,
resetThreadBindingsForTests,
} from "./thread-bindings.state.js";
@@ -262,7 +263,9 @@ export function createThreadBindingManager(
};
setBindingRecord(nextRecord);
if (touchParams.persist ?? persist) {
saveBindingsToDisk();
saveBindingsToDisk({
minIntervalMs: THREAD_BINDING_TOUCH_PERSIST_MIN_INTERVAL_MS,
});
}
return nextRecord;
},

View File

@@ -24,6 +24,7 @@ type ThreadBindingsGlobalState = {
reusableWebhooksByAccountChannel: Map<string, { webhookId: string; webhookToken: string }>;
persistByAccountId: Map<string, boolean>;
loadedBindings: boolean;
lastPersistedAtMs: number;
};
// Plugin hooks can load this module via Jiti while core imports it via ESM.
@@ -46,6 +47,7 @@ function createThreadBindingsGlobalState(): ThreadBindingsGlobalState {
>(),
persistByAccountId: new Map<string, boolean>(),
loadedBindings: false,
lastPersistedAtMs: 0,
};
}
@@ -70,6 +72,7 @@ export const RECENT_UNBOUND_WEBHOOK_ECHOES_BY_BINDING_KEY =
export const REUSABLE_WEBHOOKS_BY_ACCOUNT_CHANNEL =
THREAD_BINDINGS_STATE.reusableWebhooksByAccountChannel;
export const PERSIST_BY_ACCOUNT_ID = THREAD_BINDINGS_STATE.persistByAccountId;
export const THREAD_BINDING_TOUCH_PERSIST_MIN_INTERVAL_MS = 15_000;
export function rememberThreadBindingToken(params: { accountId?: string; token?: string }) {
const normalizedAccountId = normalizeAccountId(params.accountId);
@@ -177,6 +180,29 @@ function normalizePersistedBinding(threadIdKey: string, raw: unknown): ThreadBin
typeof value.maxAgeMs === "number" && Number.isFinite(value.maxAgeMs)
? Math.max(0, Math.floor(value.maxAgeMs))
: undefined;
const legacyExpiresAt =
typeof (value as { expiresAt?: unknown }).expiresAt === "number" &&
Number.isFinite((value as { expiresAt?: unknown }).expiresAt)
? Math.max(0, Math.floor((value as { expiresAt?: number }).expiresAt ?? 0))
: undefined;
let migratedIdleTimeoutMs = idleTimeoutMs;
let migratedMaxAgeMs = maxAgeMs;
if (
migratedIdleTimeoutMs === undefined &&
migratedMaxAgeMs === undefined &&
legacyExpiresAt != null
) {
if (legacyExpiresAt <= 0) {
migratedIdleTimeoutMs = 0;
migratedMaxAgeMs = 0;
} else {
const baseBoundAt = boundAt > 0 ? boundAt : lastActivityAt;
// Legacy expiresAt represented an absolute timestamp; map it to max-age and disable idle timeout.
migratedIdleTimeoutMs = 0;
migratedMaxAgeMs = Math.max(1, legacyExpiresAt - Math.max(0, baseBoundAt));
}
}
return {
accountId,
@@ -191,8 +217,8 @@ function normalizePersistedBinding(threadIdKey: string, raw: unknown): ThreadBin
boundBy,
boundAt,
lastActivityAt,
idleTimeoutMs,
maxAgeMs,
idleTimeoutMs: migratedIdleTimeoutMs,
maxAgeMs: migratedMaxAgeMs,
};
}
@@ -405,10 +431,23 @@ export function shouldPersistBindingMutations(): boolean {
return fs.existsSync(resolveThreadBindingsPath());
}
export function saveBindingsToDisk(params: { force?: boolean } = {}) {
export function saveBindingsToDisk(params: { force?: boolean; minIntervalMs?: number } = {}) {
if (!params.force && !shouldPersistAnyBindingState()) {
return;
}
const minIntervalMs =
typeof params.minIntervalMs === "number" && Number.isFinite(params.minIntervalMs)
? Math.max(0, Math.floor(params.minIntervalMs))
: 0;
const now = Date.now();
if (
!params.force &&
minIntervalMs > 0 &&
THREAD_BINDINGS_STATE.lastPersistedAtMs > 0 &&
now - THREAD_BINDINGS_STATE.lastPersistedAtMs < minIntervalMs
) {
return;
}
const bindings: Record<string, PersistedThreadBindingRecord> = {};
for (const [bindingKey, record] of BINDINGS_BY_THREAD_ID.entries()) {
bindings[bindingKey] = { ...record };
@@ -418,6 +457,7 @@ export function saveBindingsToDisk(params: { force?: boolean } = {}) {
bindings,
};
saveJsonFile(resolveThreadBindingsPath(), payload);
THREAD_BINDINGS_STATE.lastPersistedAtMs = now;
}
export function ensureBindingsLoaded() {
@@ -496,4 +536,5 @@ export function resetThreadBindingsForTests() {
TOKENS_BY_ACCOUNT_ID.clear();
PERSIST_BY_ACCOUNT_ID.clear();
THREAD_BINDINGS_STATE.loadedBindings = false;
THREAD_BINDINGS_STATE.lastPersistedAtMs = 0;
}

View File

@@ -21,6 +21,8 @@ export type ThreadBindingRecord = {
export type PersistedThreadBindingRecord = ThreadBindingRecord & {
sessionKey?: string;
/** @deprecated Legacy absolute expiry timestamp; migrated on load. */
expiresAt?: number;
};
export type PersistedThreadBindingsPayload = {