Discord: thread bindings idle + max-age lifecycle (#27845) (thanks @osolmaz)

* refactor discord thread bindings to idle and max-age lifecycle

* fix: migrate legacy thread binding expiry and reduce hot-path disk writes

* refactor: remove remaining thread-binding ttl legacy paths

* fix: harden thread-binding lifecycle persistence

* Discord: fix thread binding types in message/reply paths

* Infra: handle win32 unknown inode in file identity checks

* Infra: relax win32 guarded-open identity checks

* Config: migrate threadBindings ttlHours to idleHours

* Revert "Infra: relax win32 guarded-open identity checks"

This reverts commit de94126771.

* Revert "Infra: handle win32 unknown inode in file identity checks"

This reverts commit 96fc5ddfb3.

* Discord: re-read live binding state before sweep unbind

* fix: add changelog note for thread binding lifecycle update (#27845) (thanks @osolmaz)

---------

Co-authored-by: Onur Solmaz <onur@textcortex.com>
This commit is contained in:
Onur Solmaz
2026-02-27 10:02:39 +01:00
committed by GitHub
parent 0fb7add7d6
commit a7929abad8
45 changed files with 1656 additions and 402 deletions

View File

@@ -113,9 +113,14 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
mediaList.push(...forwardedMediaList);
const text = messageText;
if (!text) {
logVerbose(`discord: drop message ${message.id} (empty content)`);
logVerbose("discord: drop message " + message.id + " (empty content)");
return;
}
const boundThreadId = ctx.threadBinding?.conversation?.conversationId?.trim();
if (boundThreadId && typeof threadBindings.touchThread === "function") {
threadBindings.touchThread({ threadId: boundThreadId });
}
const ackReaction = resolveAckReaction(cfg, route.agentId, {
channel: "discord",
accountId,

View File

@@ -179,7 +179,8 @@ function createBoundThreadBindingManager(params: {
}): ThreadBindingManager {
return {
accountId: params.accountId,
getSessionTtlMs: () => 24 * 60 * 60 * 1000,
getIdleTimeoutMs: () => 24 * 60 * 60 * 1000,
getMaxAgeMs: () => 0,
getByThreadId: (threadId: string) =>
threadId === params.threadId
? {
@@ -191,11 +192,15 @@ function createBoundThreadBindingManager(params: {
agentId: params.agentId,
boundBy: "system",
boundAt: Date.now(),
lastActivityAt: Date.now(),
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
}
: undefined,
getBySessionKey: () => undefined,
listBySessionKey: () => [],
listBindings: () => [],
touchThread: () => null,
bindTarget: async () => null,
unbindThread: () => null,
unbindBySessionKey: () => [],

View File

@@ -18,8 +18,6 @@ const {
resolveDiscordAllowlistConfigMock,
resolveNativeCommandsEnabledMock,
resolveNativeSkillsEnabledMock,
resolveThreadBindingSessionTtlMsMock,
resolveThreadBindingsEnabledMock,
} = vi.hoisted(() => {
const createdBindingManagers: Array<{ stop: ReturnType<typeof vi.fn> }> = [];
return {
@@ -63,8 +61,6 @@ const {
})),
resolveNativeCommandsEnabledMock: vi.fn(() => true),
resolveNativeSkillsEnabledMock: vi.fn(() => false),
resolveThreadBindingSessionTtlMsMock: vi.fn(() => undefined),
resolveThreadBindingsEnabledMock: vi.fn(() => true),
};
});
@@ -235,8 +231,6 @@ vi.mock("./thread-bindings.js", () => ({
createNoopThreadBindingManager: createNoopThreadBindingManagerMock,
createThreadBindingManager: createThreadBindingManagerMock,
reconcileAcpThreadBindingsOnStartup: reconcileAcpThreadBindingsOnStartupMock,
resolveThreadBindingSessionTtlMs: resolveThreadBindingSessionTtlMsMock,
resolveThreadBindingsEnabled: resolveThreadBindingsEnabledMock,
}));
describe("monitorDiscordProvider", () => {
@@ -283,8 +277,6 @@ describe("monitorDiscordProvider", () => {
});
resolveNativeCommandsEnabledMock.mockClear().mockReturnValue(true);
resolveNativeSkillsEnabledMock.mockClear().mockReturnValue(false);
resolveThreadBindingSessionTtlMsMock.mockClear().mockReturnValue(undefined);
resolveThreadBindingsEnabledMock.mockClear().mockReturnValue(true);
});
it("stops thread bindings when startup fails before lifecycle begins", async () => {

View File

@@ -74,11 +74,9 @@ import { resolveDiscordRestFetch } from "./rest-fetch.js";
import {
createNoopThreadBindingManager,
createThreadBindingManager,
resolveThreadBindingSessionTtlMs,
resolveThreadBindingsEnabled,
reconcileAcpThreadBindingsOnStartup,
} from "./thread-bindings.js";
import { formatThreadBindingTtlLabel } from "./thread-bindings.messages.js";
import { formatThreadBindingDurationLabel } from "./thread-bindings.messages.js";
export type MonitorDiscordOpts = {
token?: string;
@@ -110,8 +108,61 @@ function summarizeGuilds(entries?: Record<string, unknown>) {
return `${sample.join(", ")}${suffix}`;
}
function formatThreadBindingSessionTtlLabel(ttlMs: number): string {
const label = formatThreadBindingTtlLabel(ttlMs);
const DEFAULT_THREAD_BINDING_IDLE_HOURS = 24;
const DEFAULT_THREAD_BINDING_MAX_AGE_HOURS = 0;
function normalizeThreadBindingHours(raw: unknown): number | undefined {
if (typeof raw !== "number" || !Number.isFinite(raw)) {
return undefined;
}
if (raw < 0) {
return undefined;
}
return raw;
}
function resolveThreadBindingIdleTimeoutMs(params: {
channelIdleHoursRaw: unknown;
sessionIdleHoursRaw: unknown;
}): number {
const idleHours =
normalizeThreadBindingHours(params.channelIdleHoursRaw) ??
normalizeThreadBindingHours(params.sessionIdleHoursRaw) ??
DEFAULT_THREAD_BINDING_IDLE_HOURS;
return Math.floor(idleHours * 60 * 60 * 1000);
}
function resolveThreadBindingMaxAgeMs(params: {
channelMaxAgeHoursRaw: unknown;
sessionMaxAgeHoursRaw: unknown;
}): number {
const maxAgeHours =
normalizeThreadBindingHours(params.channelMaxAgeHoursRaw) ??
normalizeThreadBindingHours(params.sessionMaxAgeHoursRaw) ??
DEFAULT_THREAD_BINDING_MAX_AGE_HOURS;
return Math.floor(maxAgeHours * 60 * 60 * 1000);
}
function normalizeThreadBindingsEnabled(raw: unknown): boolean | undefined {
if (typeof raw !== "boolean") {
return undefined;
}
return raw;
}
function resolveThreadBindingsEnabled(params: {
channelEnabledRaw: unknown;
sessionEnabledRaw: unknown;
}): boolean {
return (
normalizeThreadBindingsEnabled(params.channelEnabledRaw) ??
normalizeThreadBindingsEnabled(params.sessionEnabledRaw) ??
true
);
}
function formatThreadBindingDurationForConfigLabel(durationMs: number): string {
const label = formatThreadBindingDurationLabel(durationMs);
return label === "disabled" ? "off" : label;
}
@@ -245,10 +296,15 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
const replyToMode = opts.replyToMode ?? discordCfg.replyToMode ?? "off";
const dmEnabled = dmConfig?.enabled ?? true;
const dmPolicy = discordCfg.dmPolicy ?? dmConfig?.policy ?? "pairing";
const threadBindingSessionTtlMs = resolveThreadBindingSessionTtlMs({
channelTtlHoursRaw:
discordAccountThreadBindings?.ttlHours ?? discordRootThreadBindings?.ttlHours,
sessionTtlHoursRaw: cfg.session?.threadBindings?.ttlHours,
const threadBindingIdleTimeoutMs = resolveThreadBindingIdleTimeoutMs({
channelIdleHoursRaw:
discordAccountThreadBindings?.idleHours ?? discordRootThreadBindings?.idleHours,
sessionIdleHoursRaw: cfg.session?.threadBindings?.idleHours,
});
const threadBindingMaxAgeMs = resolveThreadBindingMaxAgeMs({
channelMaxAgeHoursRaw:
discordAccountThreadBindings?.maxAgeHours ?? discordRootThreadBindings?.maxAgeHours,
sessionMaxAgeHoursRaw: cfg.session?.threadBindings?.maxAgeHours,
});
const threadBindingsEnabled = resolveThreadBindingsEnabled({
channelEnabledRaw: discordAccountThreadBindings?.enabled ?? discordRootThreadBindings?.enabled,
@@ -288,7 +344,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
if (shouldLogVerbose()) {
logVerbose(
`discord: config dm=${dmEnabled ? "on" : "off"} dmPolicy=${dmPolicy} allowFrom=${summarizeAllowList(allowFrom)} groupDm=${groupDmEnabled ? "on" : "off"} groupDmChannels=${summarizeAllowList(groupDmChannels)} groupPolicy=${groupPolicy} guilds=${summarizeGuilds(guildEntries)} historyLimit=${historyLimit} mediaMaxMb=${Math.round(mediaMaxBytes / (1024 * 1024))} native=${nativeEnabled ? "on" : "off"} nativeSkills=${nativeSkillsEnabled ? "on" : "off"} accessGroups=${useAccessGroups ? "on" : "off"} threadBindings=${threadBindingsEnabled ? "on" : "off"} threadSessionTtl=${formatThreadBindingSessionTtlLabel(threadBindingSessionTtlMs)}`,
`discord: config dm=${dmEnabled ? "on" : "off"} dmPolicy=${dmPolicy} allowFrom=${summarizeAllowList(allowFrom)} groupDm=${groupDmEnabled ? "on" : "off"} groupDmChannels=${summarizeAllowList(groupDmChannels)} groupPolicy=${groupPolicy} guilds=${summarizeGuilds(guildEntries)} historyLimit=${historyLimit} mediaMaxMb=${Math.round(mediaMaxBytes / (1024 * 1024))} native=${nativeEnabled ? "on" : "off"} nativeSkills=${nativeSkillsEnabled ? "on" : "off"} accessGroups=${useAccessGroups ? "on" : "off"} threadBindings=${threadBindingsEnabled ? "on" : "off"} threadIdleTimeout=${formatThreadBindingDurationForConfigLabel(threadBindingIdleTimeoutMs)} threadMaxAge=${formatThreadBindingDurationForConfigLabel(threadBindingMaxAgeMs)}`,
);
}
@@ -327,7 +383,8 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
? createThreadBindingManager({
accountId: account.accountId,
token,
sessionTtlMs: threadBindingSessionTtlMs,
idleTimeoutMs: threadBindingIdleTimeoutMs,
maxAgeMs: threadBindingMaxAgeMs,
})
: createNoopThreadBindingManager(account.accountId);
if (threadBindingsEnabled) {

View File

@@ -210,6 +210,31 @@ describe("deliverDiscordReply", () => {
expect(sendMessageDiscordMock).not.toHaveBeenCalled();
});
it("touches bound-thread activity after outbound delivery", async () => {
vi.useFakeTimers();
try {
vi.setSystemTime(new Date("2026-02-20T00:00:00.000Z"));
const threadBindings = await createBoundThreadBindings();
vi.setSystemTime(new Date("2026-02-20T00:02:00.000Z"));
await deliverDiscordReply({
replies: [{ text: "Activity ping" }],
target: "channel:thread-1",
token: "token",
runtime,
textLimit: 2000,
sessionKey: "agent:main:subagent:child",
threadBindings,
});
expect(threadBindings.getByThreadId("thread-1")?.lastActivityAt).toBe(
new Date("2026-02-20T00:02:00.000Z").getTime(),
);
} finally {
vi.useRealTimers();
}
});
it("falls back to bot send when webhook delivery fails", async () => {
const threadBindings = await createBoundThreadBindings();
sendWebhookMessageDiscordMock.mockRejectedValueOnce(new Error("rate limited"));

View File

@@ -20,6 +20,7 @@ export type DiscordThreadBindingLookupRecord = {
export type DiscordThreadBindingLookup = {
listBySessionKey: (targetSessionKey: string) => DiscordThreadBindingLookupRecord[];
touchThread?: (params: { threadId: string; at?: number; persist?: boolean }) => unknown;
};
function resolveTargetChannelId(target: string): string | undefined {
@@ -173,6 +174,7 @@ export async function deliverDiscordReply(params: {
target: params.target,
});
const persona = resolveBindingPersona(binding);
let deliveredAny = false;
for (const payload of params.replies) {
const mediaList = payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []);
const rawText = payload.text ?? "";
@@ -207,6 +209,7 @@ export async function deliverDiscordReply(params: {
username: persona.username,
avatarUrl: persona.avatarUrl,
});
deliveredAny = true;
}
continue;
}
@@ -225,6 +228,7 @@ export async function deliverDiscordReply(params: {
accountId: params.accountId,
replyTo,
});
deliveredAny = true;
// Voice messages cannot include text; send remaining text separately if present.
await sendDiscordChunkWithFallback({
target: params.target,
@@ -257,6 +261,7 @@ export async function deliverDiscordReply(params: {
accountId: params.accountId,
replyTo,
});
deliveredAny = true;
await sendAdditionalDiscordMedia({
target: params.target,
token: params.token,
@@ -266,4 +271,8 @@ export async function deliverDiscordReply(params: {
resolveReplyTo,
});
}
if (binding && deliveredAny) {
params.threadBindings?.touchThread?.({ threadId: binding.threadId });
}
}

View File

@@ -1,21 +1,39 @@
import {
resolveThreadBindingSessionTtlMs,
resolveThreadBindingIdleTimeoutMs,
resolveThreadBindingMaxAgeMs,
resolveThreadBindingsEnabled,
} from "../../channels/thread-bindings-policy.js";
import type { OpenClawConfig } from "../../config/config.js";
import { normalizeAccountId } from "../../routing/session-key.js";
export { resolveThreadBindingSessionTtlMs, resolveThreadBindingsEnabled };
export {
resolveThreadBindingIdleTimeoutMs,
resolveThreadBindingMaxAgeMs,
resolveThreadBindingsEnabled,
};
export function resolveDiscordThreadBindingSessionTtlMs(params: {
export function resolveDiscordThreadBindingIdleTimeoutMs(params: {
cfg: OpenClawConfig;
accountId?: string;
}): number {
const accountId = normalizeAccountId(params.accountId);
const root = params.cfg.channels?.discord?.threadBindings;
const account = params.cfg.channels?.discord?.accounts?.[accountId]?.threadBindings;
return resolveThreadBindingSessionTtlMs({
channelTtlHoursRaw: account?.ttlHours ?? root?.ttlHours,
sessionTtlHoursRaw: params.cfg.session?.threadBindings?.ttlHours,
return resolveThreadBindingIdleTimeoutMs({
channelIdleHoursRaw: account?.idleHours ?? root?.idleHours,
sessionIdleHoursRaw: params.cfg.session?.threadBindings?.idleHours,
});
}
export function resolveDiscordThreadBindingMaxAgeMs(params: {
cfg: OpenClawConfig;
accountId?: string;
}): number {
const accountId = normalizeAccountId(params.accountId);
const root = params.cfg.channels?.discord?.threadBindings;
const account = params.cfg.channels?.discord?.accounts?.[accountId]?.threadBindings;
return resolveThreadBindingMaxAgeMs({
channelMaxAgeHoursRaw: account?.maxAgeHours ?? root?.maxAgeHours,
sessionMaxAgeHoursRaw: params.cfg.session?.threadBindings?.maxAgeHours,
});
}

View File

@@ -57,12 +57,15 @@ const {
autoBindSpawnedDiscordSubagent,
createThreadBindingManager,
reconcileAcpThreadBindingsOnStartup,
resolveThreadBindingInactivityExpiresAt,
resolveThreadBindingIntroText,
setThreadBindingTtlBySessionKey,
resolveThreadBindingMaxAgeExpiresAt,
setThreadBindingIdleTimeoutBySessionKey,
setThreadBindingMaxAgeBySessionKey,
unbindThreadBindingsBySessionKey,
} = await import("./thread-bindings.js");
describe("thread binding ttl", () => {
describe("thread binding lifecycle", () => {
beforeEach(() => {
__testing.resetThreadBindingsForTests();
hoisted.sendMessageDiscord.mockClear();
@@ -80,7 +83,8 @@ describe("thread binding ttl", () => {
accountId: "default",
persist: false,
enableSweeper: true,
sessionTtlMs: 24 * 60 * 60 * 1000,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
});
const bindDefaultThreadTarget = async (
@@ -97,33 +101,36 @@ describe("thread binding ttl", () => {
});
};
it("includes ttl in intro text", () => {
it("includes idle and max-age details in intro text", () => {
const intro = resolveThreadBindingIntroText({
agentId: "main",
label: "worker",
sessionTtlMs: 24 * 60 * 60 * 1000,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 48 * 60 * 60 * 1000,
});
expect(intro).toContain("auto-unfocus in 24h");
expect(intro).toContain("idle auto-unfocus after 24h inactivity");
expect(intro).toContain("max age 48h");
});
it("includes cwd near the top of intro text", () => {
const intro = resolveThreadBindingIntroText({
agentId: "codex",
sessionTtlMs: 24 * 60 * 60 * 1000,
idleTimeoutMs: 24 * 60 * 60 * 1000,
sessionCwd: "/home/bob/clawd",
sessionDetails: ["session ids: pending (available after the first reply)"],
});
expect(intro).toContain("\ncwd: /home/bob/clawd\nsession ids: pending");
});
it("auto-unfocuses expired bindings and sends a ttl-expired message", async () => {
it("auto-unfocuses idle-expired bindings and sends inactivity message", async () => {
vi.useFakeTimers();
try {
const manager = createThreadBindingManager({
accountId: "default",
persist: false,
enableSweeper: true,
sessionTtlMs: 60_000,
idleTimeoutMs: 60_000,
maxAgeMs: 0,
});
const binding = await manager.bindTarget({
@@ -147,7 +154,41 @@ describe("thread binding ttl", () => {
expect(hoisted.sendWebhookMessageDiscord).not.toHaveBeenCalled();
expect(hoisted.sendMessageDiscord).toHaveBeenCalledTimes(1);
const farewell = hoisted.sendMessageDiscord.mock.calls[0]?.[1] as string | undefined;
expect(farewell).toContain("Session ended automatically after 1m");
expect(farewell).toContain("after 1m of inactivity");
} finally {
vi.useRealTimers();
}
});
it("auto-unfocuses max-age-expired bindings and sends max-age message", async () => {
vi.useFakeTimers();
try {
const manager = createThreadBindingManager({
accountId: "default",
persist: false,
enableSweeper: true,
idleTimeoutMs: 0,
maxAgeMs: 60_000,
});
const binding = await manager.bindTarget({
threadId: "thread-1",
channelId: "parent-1",
targetKind: "subagent",
targetSessionKey: "agent:main:subagent:child",
agentId: "main",
webhookId: "wh-1",
webhookToken: "tok-1",
});
expect(binding).not.toBeNull();
hoisted.sendMessageDiscord.mockClear();
await vi.advanceTimersByTimeAsync(120_000);
expect(manager.getByThreadId("thread-1")).toBeUndefined();
expect(hoisted.sendMessageDiscord).toHaveBeenCalledTimes(1);
const farewell = hoisted.sendMessageDiscord.mock.calls[0]?.[1] as string | undefined;
expect(farewell).toContain("max age of 1m");
} finally {
vi.useRealTimers();
}
@@ -190,7 +231,7 @@ describe("thread binding ttl", () => {
}
});
it("updates ttl by target session key", async () => {
it("updates idle timeout by target session key", async () => {
vi.useFakeTimers();
try {
vi.setSystemTime(new Date("2026-02-20T23:00:00.000Z"));
@@ -198,7 +239,8 @@ describe("thread binding ttl", () => {
accountId: "default",
persist: false,
enableSweeper: false,
sessionTtlMs: 24 * 60 * 60 * 1000,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
});
await manager.bindTarget({
@@ -210,33 +252,80 @@ describe("thread binding ttl", () => {
webhookId: "wh-1",
webhookToken: "tok-1",
});
const boundAt = manager.getByThreadId("thread-1")?.boundAt;
vi.setSystemTime(new Date("2026-02-20T23:15:00.000Z"));
const updated = setThreadBindingTtlBySessionKey({
const updated = setThreadBindingIdleTimeoutBySessionKey({
accountId: "default",
targetSessionKey: "agent:main:subagent:child",
ttlMs: 2 * 60 * 60 * 1000,
idleTimeoutMs: 2 * 60 * 60 * 1000,
});
expect(updated).toHaveLength(1);
expect(updated[0]?.boundAt).toBe(new Date("2026-02-20T23:15:00.000Z").getTime());
expect(updated[0]?.expiresAt).toBe(new Date("2026-02-21T01:15:00.000Z").getTime());
expect(manager.getByThreadId("thread-1")?.expiresAt).toBe(
new Date("2026-02-21T01:15:00.000Z").getTime(),
);
expect(updated[0]?.lastActivityAt).toBe(new Date("2026-02-20T23:15:00.000Z").getTime());
expect(updated[0]?.boundAt).toBe(boundAt);
expect(
resolveThreadBindingInactivityExpiresAt({
record: updated[0],
defaultIdleTimeoutMs: manager.getIdleTimeoutMs(),
}),
).toBe(new Date("2026-02-21T01:15:00.000Z").getTime());
} finally {
vi.useRealTimers();
}
});
it("keeps binding when ttl is disabled per session key", async () => {
it("updates max age by target session key", async () => {
vi.useFakeTimers();
try {
vi.setSystemTime(new Date("2026-02-20T10:00:00.000Z"));
const manager = createThreadBindingManager({
accountId: "default",
persist: false,
enableSweeper: false,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
});
await manager.bindTarget({
threadId: "thread-1",
channelId: "parent-1",
targetKind: "subagent",
targetSessionKey: "agent:main:subagent:child",
agentId: "main",
});
vi.setSystemTime(new Date("2026-02-20T10:30:00.000Z"));
const updated = setThreadBindingMaxAgeBySessionKey({
accountId: "default",
targetSessionKey: "agent:main:subagent:child",
maxAgeMs: 3 * 60 * 60 * 1000,
});
expect(updated).toHaveLength(1);
expect(updated[0]?.boundAt).toBe(new Date("2026-02-20T10:30:00.000Z").getTime());
expect(updated[0]?.lastActivityAt).toBe(new Date("2026-02-20T10:30:00.000Z").getTime());
expect(
resolveThreadBindingMaxAgeExpiresAt({
record: updated[0],
defaultMaxAgeMs: manager.getMaxAgeMs(),
}),
).toBe(new Date("2026-02-20T13:30:00.000Z").getTime());
} finally {
vi.useRealTimers();
}
});
it("keeps binding when idle timeout is disabled per session key", async () => {
vi.useFakeTimers();
try {
const manager = createThreadBindingManager({
accountId: "default",
persist: false,
enableSweeper: true,
sessionTtlMs: 60_000,
idleTimeoutMs: 60_000,
maxAgeMs: 0,
});
await manager.bindTarget({
@@ -249,30 +338,187 @@ describe("thread binding ttl", () => {
webhookToken: "tok-1",
});
const updated = setThreadBindingTtlBySessionKey({
const updated = setThreadBindingIdleTimeoutBySessionKey({
accountId: "default",
targetSessionKey: "agent:main:subagent:child",
ttlMs: 0,
idleTimeoutMs: 0,
});
expect(updated).toHaveLength(1);
expect(updated[0]?.expiresAt).toBe(0);
hoisted.sendWebhookMessageDiscord.mockClear();
expect(updated[0]?.idleTimeoutMs).toBe(0);
await vi.advanceTimersByTimeAsync(240_000);
expect(manager.getByThreadId("thread-1")).toBeDefined();
expect(hoisted.sendWebhookMessageDiscord).not.toHaveBeenCalled();
} finally {
vi.useRealTimers();
}
});
it("keeps a binding when activity is touched during the same sweep pass", async () => {
vi.useFakeTimers();
try {
const manager = createThreadBindingManager({
accountId: "default",
persist: false,
enableSweeper: true,
idleTimeoutMs: 60_000,
maxAgeMs: 0,
});
await manager.bindTarget({
threadId: "thread-1",
channelId: "parent-1",
targetKind: "subagent",
targetSessionKey: "agent:main:subagent:first",
agentId: "main",
webhookId: "wh-1",
webhookToken: "tok-1",
});
await manager.bindTarget({
threadId: "thread-2",
channelId: "parent-1",
targetKind: "subagent",
targetSessionKey: "agent:main:subagent:second",
agentId: "main",
webhookId: "wh-2",
webhookToken: "tok-2",
});
// Keep the first binding off the idle-expire path so the sweep performs
// an awaited probe and gives a window for in-pass touches.
setThreadBindingIdleTimeoutBySessionKey({
accountId: "default",
targetSessionKey: "agent:main:subagent:first",
idleTimeoutMs: 0,
});
hoisted.restGet.mockImplementation(async (...args: unknown[]) => {
const route = typeof args[0] === "string" ? args[0] : "";
if (route.includes("thread-1")) {
manager.touchThread({ threadId: "thread-2", persist: false });
}
return {
id: route.split("/").at(-1) ?? "thread-1",
type: 11,
parent_id: "parent-1",
};
});
hoisted.sendMessageDiscord.mockClear();
await vi.advanceTimersByTimeAsync(120_000);
expect(manager.getByThreadId("thread-2")).toBeDefined();
expect(hoisted.sendMessageDiscord).not.toHaveBeenCalled();
} finally {
vi.useRealTimers();
}
});
it("refreshes inactivity window when thread activity is touched", async () => {
vi.useFakeTimers();
try {
vi.setSystemTime(new Date("2026-02-20T00:00:00.000Z"));
const manager = createThreadBindingManager({
accountId: "default",
persist: false,
enableSweeper: false,
idleTimeoutMs: 60_000,
maxAgeMs: 0,
});
await manager.bindTarget({
threadId: "thread-1",
channelId: "parent-1",
targetKind: "subagent",
targetSessionKey: "agent:main:subagent:child",
agentId: "main",
});
vi.setSystemTime(new Date("2026-02-20T00:00:30.000Z"));
const touched = manager.touchThread({ threadId: "thread-1", persist: false });
expect(touched).not.toBeNull();
const record = manager.getByThreadId("thread-1");
expect(record).toBeDefined();
expect(record?.lastActivityAt).toBe(new Date("2026-02-20T00:00:30.000Z").getTime());
expect(
resolveThreadBindingInactivityExpiresAt({
record: record!,
defaultIdleTimeoutMs: manager.getIdleTimeoutMs(),
}),
).toBe(new Date("2026-02-20T00:01:30.000Z").getTime());
} finally {
vi.useRealTimers();
}
});
it("persists touched activity timestamps across restart when persistence is enabled", async () => {
vi.useFakeTimers();
const previousStateDir = process.env.OPENCLAW_STATE_DIR;
const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-thread-bindings-"));
process.env.OPENCLAW_STATE_DIR = stateDir;
try {
__testing.resetThreadBindingsForTests();
vi.setSystemTime(new Date("2026-02-20T00:00:00.000Z"));
const manager = createThreadBindingManager({
accountId: "default",
persist: true,
enableSweeper: false,
idleTimeoutMs: 60_000,
maxAgeMs: 0,
});
await manager.bindTarget({
threadId: "thread-1",
channelId: "parent-1",
targetKind: "subagent",
targetSessionKey: "agent:main:subagent:child",
agentId: "main",
webhookId: "wh-1",
webhookToken: "tok-1",
});
const touchedAt = new Date("2026-02-20T00:00:30.000Z").getTime();
vi.setSystemTime(touchedAt);
manager.touchThread({ threadId: "thread-1" });
__testing.resetThreadBindingsForTests();
const reloaded = createThreadBindingManager({
accountId: "default",
persist: true,
enableSweeper: false,
idleTimeoutMs: 60_000,
maxAgeMs: 0,
});
const record = reloaded.getByThreadId("thread-1");
expect(record).toBeDefined();
expect(record?.lastActivityAt).toBe(touchedAt);
expect(
resolveThreadBindingInactivityExpiresAt({
record: record!,
defaultIdleTimeoutMs: reloaded.getIdleTimeoutMs(),
}),
).toBe(new Date("2026-02-20T00:01:30.000Z").getTime());
} finally {
__testing.resetThreadBindingsForTests();
if (previousStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = previousStateDir;
}
fs.rmSync(stateDir, { recursive: true, force: true });
vi.useRealTimers();
}
});
it("reuses webhook credentials after unbind when rebinding in the same channel", async () => {
const manager = createThreadBindingManager({
accountId: "default",
persist: false,
enableSweeper: false,
sessionTtlMs: 24 * 60 * 60 * 1000,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
});
const first = await manager.bindTarget({
@@ -308,7 +554,8 @@ describe("thread binding ttl", () => {
accountId: "default",
persist: false,
enableSweeper: false,
sessionTtlMs: 24 * 60 * 60 * 1000,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
});
await manager.bindTarget({
@@ -348,7 +595,8 @@ describe("thread binding ttl", () => {
accountId: "default",
persist: false,
enableSweeper: false,
sessionTtlMs: 24 * 60 * 60 * 1000,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
});
hoisted.restGet.mockClear();
@@ -384,7 +632,8 @@ describe("thread binding ttl", () => {
token: "runtime-token",
persist: false,
enableSweeper: false,
sessionTtlMs: 24 * 60 * 60 * 1000,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
});
hoisted.createDiscordRestClient.mockClear();
@@ -421,14 +670,16 @@ describe("thread binding ttl", () => {
token: "token-old",
persist: false,
enableSweeper: false,
sessionTtlMs: 24 * 60 * 60 * 1000,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
});
const manager = createThreadBindingManager({
accountId: "runtime",
token: "token-new",
persist: false,
enableSweeper: false,
sessionTtlMs: 24 * 60 * 60 * 1000,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
});
hoisted.createThreadDiscord.mockClear();
@@ -460,13 +711,15 @@ describe("thread binding ttl", () => {
accountId: "a",
persist: false,
enableSweeper: false,
sessionTtlMs: 24 * 60 * 60 * 1000,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
});
const b = createThreadBindingManager({
accountId: "b",
persist: false,
enableSweeper: false,
sessionTtlMs: 24 * 60 * 60 * 1000,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
});
const aBinding = await a.bindTarget({
@@ -503,7 +756,8 @@ describe("thread binding ttl", () => {
accountId: "default",
persist: false,
enableSweeper: false,
sessionTtlMs: 24 * 60 * 60 * 1000,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
});
await manager.bindTarget({
@@ -577,7 +831,8 @@ describe("thread binding ttl", () => {
accountId: "default",
persist: false,
enableSweeper: false,
sessionTtlMs: 24 * 60 * 60 * 1000,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
});
await manager.bindTarget({
@@ -611,6 +866,104 @@ describe("thread binding ttl", () => {
expect(manager.getByThreadId("thread-acp-uncertain")).toBeDefined();
});
it("migrates legacy expiresAt bindings to idle/max-age semantics", () => {
const previousStateDir = process.env.OPENCLAW_STATE_DIR;
const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-thread-bindings-"));
process.env.OPENCLAW_STATE_DIR = stateDir;
try {
__testing.resetThreadBindingsForTests();
const bindingsPath = __testing.resolveThreadBindingsPath();
fs.mkdirSync(path.dirname(bindingsPath), { recursive: true });
const boundAt = Date.now() - 10_000;
const expiresAt = boundAt + 60_000;
fs.writeFileSync(
bindingsPath,
JSON.stringify(
{
version: 1,
bindings: {
"thread-legacy-active": {
accountId: "default",
channelId: "parent-1",
threadId: "thread-legacy-active",
targetKind: "subagent",
targetSessionKey: "agent:main:subagent:legacy-active",
agentId: "main",
boundBy: "system",
boundAt,
expiresAt,
},
"thread-legacy-disabled": {
accountId: "default",
channelId: "parent-1",
threadId: "thread-legacy-disabled",
targetKind: "subagent",
targetSessionKey: "agent:main:subagent:legacy-disabled",
agentId: "main",
boundBy: "system",
boundAt,
expiresAt: 0,
},
},
},
null,
2,
),
"utf-8",
);
const manager = createThreadBindingManager({
accountId: "default",
persist: false,
enableSweeper: false,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
});
const active = manager.getByThreadId("thread-legacy-active");
expect(active).toBeDefined();
expect(active?.idleTimeoutMs).toBe(0);
expect(active?.maxAgeMs).toBe(expiresAt - boundAt);
expect(
resolveThreadBindingMaxAgeExpiresAt({
record: active!,
defaultMaxAgeMs: manager.getMaxAgeMs(),
}),
).toBe(expiresAt);
expect(
resolveThreadBindingInactivityExpiresAt({
record: active!,
defaultIdleTimeoutMs: manager.getIdleTimeoutMs(),
}),
).toBeUndefined();
const disabled = manager.getByThreadId("thread-legacy-disabled");
expect(disabled).toBeDefined();
expect(disabled?.idleTimeoutMs).toBe(0);
expect(disabled?.maxAgeMs).toBe(0);
expect(
resolveThreadBindingMaxAgeExpiresAt({
record: disabled!,
defaultMaxAgeMs: manager.getMaxAgeMs(),
}),
).toBeUndefined();
expect(
resolveThreadBindingInactivityExpiresAt({
record: disabled!,
defaultIdleTimeoutMs: manager.getIdleTimeoutMs(),
}),
).toBeUndefined();
} finally {
__testing.resetThreadBindingsForTests();
if (previousStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = previousStateDir;
}
fs.rmSync(stateDir, { recursive: true, force: true });
}
});
it("persists unbinds even when no manager is active", () => {
const previousStateDir = process.env.OPENCLAW_STATE_DIR;
const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-thread-bindings-"));
@@ -635,7 +988,9 @@ describe("thread binding ttl", () => {
agentId: "main",
boundBy: "system",
boundAt: now,
expiresAt: now + 60_000,
lastActivityAt: now,
idleTimeoutMs: 60_000,
maxAgeMs: 0,
},
},
},

View File

@@ -13,7 +13,6 @@ import {
MANAGERS_BY_ACCOUNT_ID,
ensureBindingsLoaded,
getThreadBindingToken,
normalizeThreadBindingTtlMs,
normalizeThreadId,
rememberRecentUnboundWebhookEcho,
removeBindingRecord,
@@ -30,6 +29,13 @@ export type AcpThreadBindingReconciliationResult = {
staleSessionKeys: string[];
};
function normalizeNonNegativeMs(raw: number): number {
if (!Number.isFinite(raw)) {
return 0;
}
return Math.max(0, Math.floor(raw));
}
function resolveBindingIdsForTargetSession(params: {
targetSessionKey: string;
accountId?: string;
@@ -139,7 +145,8 @@ export async function autoBindSpawnedDiscordSubagent(params: {
introText: resolveThreadBindingIntroText({
agentId: params.agentId,
label: params.label,
sessionTtlMs: manager.getSessionTtlMs(),
idleTimeoutMs: manager.getIdleTimeoutMs(),
maxAgeMs: manager.getMaxAgeMs(),
}),
});
}
@@ -189,18 +196,17 @@ export function unbindThreadBindingsBySessionKey(params: {
return removed;
}
export function setThreadBindingTtlBySessionKey(params: {
export function setThreadBindingIdleTimeoutBySessionKey(params: {
targetSessionKey: string;
accountId?: string;
ttlMs: number;
idleTimeoutMs: number;
}): ThreadBindingRecord[] {
const ids = resolveBindingIdsForTargetSession(params);
if (ids.length === 0) {
return [];
}
const ttlMs = normalizeThreadBindingTtlMs(params.ttlMs);
const idleTimeoutMs = normalizeNonNegativeMs(params.idleTimeoutMs);
const now = Date.now();
const expiresAt = ttlMs > 0 ? now + ttlMs : 0;
const updated: ThreadBindingRecord[] = [];
for (const bindingKey of ids) {
const existing = BINDINGS_BY_THREAD_ID.get(bindingKey);
@@ -209,8 +215,40 @@ export function setThreadBindingTtlBySessionKey(params: {
}
const nextRecord: ThreadBindingRecord = {
...existing,
idleTimeoutMs,
lastActivityAt: now,
};
setBindingRecord(nextRecord);
updated.push(nextRecord);
}
if (updated.length > 0 && shouldPersistBindingMutations()) {
saveBindingsToDisk({ force: true });
}
return updated;
}
export function setThreadBindingMaxAgeBySessionKey(params: {
targetSessionKey: string;
accountId?: string;
maxAgeMs: number;
}): ThreadBindingRecord[] {
const ids = resolveBindingIdsForTargetSession(params);
if (ids.length === 0) {
return [];
}
const maxAgeMs = normalizeNonNegativeMs(params.maxAgeMs);
const now = Date.now();
const updated: ThreadBindingRecord[] = [];
for (const bindingKey of ids) {
const existing = BINDINGS_BY_THREAD_ID.get(bindingKey);
if (!existing) {
continue;
}
const nextRecord: ThreadBindingRecord = {
...existing,
maxAgeMs,
boundAt: now,
expiresAt,
lastActivityAt: now,
};
setBindingRecord(nextRecord);
updated.push(nextRecord);

View File

@@ -31,21 +31,26 @@ import {
ensureBindingsLoaded,
rememberThreadBindingToken,
normalizeTargetKind,
normalizeThreadBindingTtlMs,
normalizeThreadBindingDurationMs,
normalizeThreadId,
rememberRecentUnboundWebhookEcho,
removeBindingRecord,
resolveBindingIdsForSession,
resolveBindingRecordKey,
resolveThreadBindingExpiresAt,
resolveThreadBindingIdleTimeoutMs,
resolveThreadBindingInactivityExpiresAt,
resolveThreadBindingMaxAgeExpiresAt,
resolveThreadBindingMaxAgeMs,
resolveThreadBindingsPath,
saveBindingsToDisk,
setBindingRecord,
THREAD_BINDING_TOUCH_PERSIST_MIN_INTERVAL_MS,
shouldDefaultPersist,
resetThreadBindingsForTests,
} from "./thread-bindings.state.js";
import {
DEFAULT_THREAD_BINDING_TTL_MS,
DEFAULT_THREAD_BINDING_IDLE_TIMEOUT_MS,
DEFAULT_THREAD_BINDING_MAX_AGE_MS,
THREAD_BINDINGS_SWEEP_INTERVAL_MS,
type ThreadBindingManager,
type ThreadBindingRecord,
@@ -62,15 +67,36 @@ function unregisterManager(accountId: string, manager: ThreadBindingManager) {
}
}
function resolveEffectiveBindingExpiresAt(params: {
record: ThreadBindingRecord;
defaultIdleTimeoutMs: number;
defaultMaxAgeMs: number;
}): number | undefined {
const inactivityExpiresAt = resolveThreadBindingInactivityExpiresAt({
record: params.record,
defaultIdleTimeoutMs: params.defaultIdleTimeoutMs,
});
const maxAgeExpiresAt = resolveThreadBindingMaxAgeExpiresAt({
record: params.record,
defaultMaxAgeMs: params.defaultMaxAgeMs,
});
if (inactivityExpiresAt != null && maxAgeExpiresAt != null) {
return Math.min(inactivityExpiresAt, maxAgeExpiresAt);
}
return inactivityExpiresAt ?? maxAgeExpiresAt;
}
function createNoopManager(accountIdRaw?: string): ThreadBindingManager {
const accountId = normalizeAccountId(accountIdRaw);
return {
accountId,
getSessionTtlMs: () => DEFAULT_THREAD_BINDING_TTL_MS,
getIdleTimeoutMs: () => DEFAULT_THREAD_BINDING_IDLE_TIMEOUT_MS,
getMaxAgeMs: () => DEFAULT_THREAD_BINDING_MAX_AGE_MS,
getByThreadId: () => undefined,
getBySessionKey: () => undefined,
listBySessionKey: () => [],
listBindings: () => [],
touchThread: () => null,
bindTarget: async () => null,
unbindThread: () => null,
unbindBySessionKey: () => [],
@@ -86,7 +112,10 @@ function toThreadBindingTargetKind(raw: BindingTargetKind): "subagent" | "acp" {
return raw === "subagent" ? "subagent" : "acp";
}
function toSessionBindingRecord(record: ThreadBindingRecord): SessionBindingRecord {
function toSessionBindingRecord(
record: ThreadBindingRecord,
defaults: { idleTimeoutMs: number; maxAgeMs: number },
): SessionBindingRecord {
const bindingId =
resolveBindingRecordKey({
accountId: record.accountId,
@@ -104,13 +133,26 @@ function toSessionBindingRecord(record: ThreadBindingRecord): SessionBindingReco
},
status: "active",
boundAt: record.boundAt,
expiresAt: record.expiresAt,
expiresAt: resolveEffectiveBindingExpiresAt({
record,
defaultIdleTimeoutMs: defaults.idleTimeoutMs,
defaultMaxAgeMs: defaults.maxAgeMs,
}),
metadata: {
agentId: record.agentId,
label: record.label,
webhookId: record.webhookId,
webhookToken: record.webhookToken,
boundBy: record.boundBy,
lastActivityAt: record.lastActivityAt,
idleTimeoutMs: resolveThreadBindingIdleTimeoutMs({
record,
defaultIdleTimeoutMs: defaults.idleTimeoutMs,
}),
maxAgeMs: resolveThreadBindingMaxAgeMs({
record,
defaultMaxAgeMs: defaults.maxAgeMs,
}),
},
};
}
@@ -137,7 +179,8 @@ export function createThreadBindingManager(
token?: string;
persist?: boolean;
enableSweeper?: boolean;
sessionTtlMs?: number;
idleTimeoutMs?: number;
maxAgeMs?: number;
} = {},
): ThreadBindingManager {
ensureBindingsLoaded();
@@ -152,14 +195,22 @@ export function createThreadBindingManager(
const persist = params.persist ?? shouldDefaultPersist();
PERSIST_BY_ACCOUNT_ID.set(accountId, persist);
const sessionTtlMs = normalizeThreadBindingTtlMs(params.sessionTtlMs);
const idleTimeoutMs = normalizeThreadBindingDurationMs(
params.idleTimeoutMs,
DEFAULT_THREAD_BINDING_IDLE_TIMEOUT_MS,
);
const maxAgeMs = normalizeThreadBindingDurationMs(
params.maxAgeMs,
DEFAULT_THREAD_BINDING_MAX_AGE_MS,
);
const resolveCurrentToken = () => getThreadBindingToken(accountId) ?? params.token;
let sweepTimer: NodeJS.Timeout | null = null;
const manager: ThreadBindingManager = {
accountId,
getSessionTtlMs: () => sessionTtlMs,
getIdleTimeoutMs: () => idleTimeoutMs,
getMaxAgeMs: () => maxAgeMs,
getByThreadId: (threadId) => {
const key = resolveBindingRecordKey({
accountId,
@@ -189,6 +240,35 @@ export function createThreadBindingManager(
},
listBindings: () =>
[...BINDINGS_BY_THREAD_ID.values()].filter((entry) => entry.accountId === accountId),
touchThread: (touchParams) => {
const key = resolveBindingRecordKey({
accountId,
threadId: touchParams.threadId,
});
if (!key) {
return null;
}
const existing = BINDINGS_BY_THREAD_ID.get(key);
if (!existing || existing.accountId !== accountId) {
return null;
}
const now = Date.now();
const at =
typeof touchParams.at === "number" && Number.isFinite(touchParams.at)
? Math.max(0, Math.floor(touchParams.at))
: now;
const nextRecord: ThreadBindingRecord = {
...existing,
lastActivityAt: Math.max(existing.lastActivityAt || 0, at),
};
setBindingRecord(nextRecord);
if (touchParams.persist ?? persist) {
saveBindingsToDisk({
minIntervalMs: THREAD_BINDING_TOUCH_PERSIST_MIN_INTERVAL_MS,
});
}
return nextRecord;
},
bindTarget: async (bindParams) => {
let threadId = normalizeThreadId(bindParams.threadId);
let channelId = bindParams.channelId?.trim() || "";
@@ -250,7 +330,7 @@ export function createThreadBindingManager(
webhookToken = createdWebhook.webhookToken ?? "";
}
const boundAt = Date.now();
const now = Date.now();
const record: ThreadBindingRecord = {
accountId,
channelId,
@@ -262,8 +342,10 @@ export function createThreadBindingManager(
webhookId: webhookId || undefined,
webhookToken: webhookToken || undefined,
boundBy: bindParams.boundBy?.trim() || "system",
boundAt,
expiresAt: sessionTtlMs > 0 ? boundAt + sessionTtlMs : undefined,
boundAt: now,
lastActivityAt: now,
idleTimeoutMs,
maxAgeMs,
};
setBindingRecord(record);
@@ -301,7 +383,14 @@ export function createThreadBindingManager(
const farewell = resolveThreadBindingFarewellText({
reason: unbindParams.reason,
farewellText: unbindParams.farewellText,
sessionTtlMs,
idleTimeoutMs: resolveThreadBindingIdleTimeoutMs({
record: removed,
defaultIdleTimeoutMs: idleTimeoutMs,
}),
maxAgeMs: resolveThreadBindingMaxAgeMs({
record: removed,
defaultMaxAgeMs: maxAgeMs,
}),
});
// Use bot send path for farewell messages so unbound threads don't process
// webhook echoes as fresh inbound turns when allowBots is enabled.
@@ -366,20 +455,50 @@ export function createThreadBindingManager(
} catch {
return;
}
for (const binding of bindings) {
const expiresAt = resolveThreadBindingExpiresAt({
for (const snapshotBinding of bindings) {
// Re-read live state after any awaited work from earlier iterations.
// This avoids unbinding based on stale snapshot data when activity touches
// happen while the sweeper loop is in-flight.
const binding = manager.getByThreadId(snapshotBinding.threadId);
if (!binding) {
continue;
}
const now = Date.now();
const inactivityExpiresAt = resolveThreadBindingInactivityExpiresAt({
record: binding,
sessionTtlMs,
defaultIdleTimeoutMs: idleTimeoutMs,
});
if (expiresAt != null && Date.now() >= expiresAt) {
const ttlFromBinding = Math.max(0, expiresAt - binding.boundAt);
const maxAgeExpiresAt = resolveThreadBindingMaxAgeExpiresAt({
record: binding,
defaultMaxAgeMs: maxAgeMs,
});
const expirationCandidates: Array<{
reason: "idle-expired" | "max-age-expired";
at: number;
}> = [];
if (inactivityExpiresAt != null && now >= inactivityExpiresAt) {
expirationCandidates.push({ reason: "idle-expired", at: inactivityExpiresAt });
}
if (maxAgeExpiresAt != null && now >= maxAgeExpiresAt) {
expirationCandidates.push({ reason: "max-age-expired", at: maxAgeExpiresAt });
}
if (expirationCandidates.length > 0) {
expirationCandidates.sort((a, b) => a.at - b.at);
const reason = expirationCandidates[0]?.reason ?? "idle-expired";
manager.unbindThread({
threadId: binding.threadId,
reason: "ttl-expired",
reason,
sendFarewell: true,
farewellText: resolveThreadBindingFarewellText({
reason: "ttl-expired",
sessionTtlMs: ttlFromBinding,
reason,
idleTimeoutMs: resolveThreadBindingIdleTimeoutMs({
record: binding,
defaultIdleTimeoutMs: idleTimeoutMs,
}),
maxAgeMs: resolveThreadBindingMaxAgeMs({
record: binding,
defaultMaxAgeMs: maxAgeMs,
}),
}),
});
continue;
@@ -479,19 +598,30 @@ export function createThreadBindingManager(
boundBy,
introText,
});
return bound ? toSessionBindingRecord(bound) : null;
return bound
? toSessionBindingRecord(bound, {
idleTimeoutMs,
maxAgeMs,
})
: null;
},
listBySession: (targetSessionKey) =>
manager.listBySessionKey(targetSessionKey).map(toSessionBindingRecord),
manager
.listBySessionKey(targetSessionKey)
.map((entry) => toSessionBindingRecord(entry, { idleTimeoutMs, maxAgeMs })),
resolveByConversation: (ref) => {
if (ref.channel !== "discord") {
return null;
}
const binding = manager.getByThreadId(ref.conversationId);
return binding ? toSessionBindingRecord(binding) : null;
return binding ? toSessionBindingRecord(binding, { idleTimeoutMs, maxAgeMs }) : null;
},
touch: () => {
// Thread bindings are activity-touched by inbound/outbound message flows.
touch: (bindingId, at) => {
const threadId = resolveThreadIdFromBindingId({ accountId, bindingId });
if (!threadId) {
return;
}
manager.touchThread({ threadId, at, persist: true });
},
unbind: async (input) => {
if (input.targetSessionKey?.trim()) {
@@ -499,7 +629,7 @@ export function createThreadBindingManager(
targetSessionKey: input.targetSessionKey,
reason: input.reason,
});
return removed.map(toSessionBindingRecord);
return removed.map((entry) => toSessionBindingRecord(entry, { idleTimeoutMs, maxAgeMs }));
}
const threadId = resolveThreadIdFromBindingId({
accountId,
@@ -512,7 +642,7 @@ export function createThreadBindingManager(
threadId,
reason: input.reason,
});
return removed ? [toSessionBindingRecord(removed)] : [];
return removed ? [toSessionBindingRecord(removed, { idleTimeoutMs, maxAgeMs })] : [];
},
});

View File

@@ -1,5 +1,5 @@
export {
formatThreadBindingTtlLabel,
formatThreadBindingDurationLabel,
resolveThreadBindingFarewellText,
resolveThreadBindingIntroText,
resolveThreadBindingThreadName,

View File

@@ -26,6 +26,7 @@ describe("thread binding persona", () => {
agentId: "codex",
boundBy: "system",
boundAt: Date.now(),
lastActivityAt: Date.now(),
label: "codex-thread",
} satisfies ThreadBindingRecord;
expect(resolveThreadBindingPersonaFromRecord(record)).toBe("⚙️ codex-thread");

View File

@@ -4,8 +4,9 @@ import { resolveStateDir } from "../../config/paths.js";
import { loadJsonFile, saveJsonFile } from "../../infra/json-file.js";
import { normalizeAccountId, resolveAgentIdFromSessionKey } from "../../routing/session-key.js";
import {
DEFAULT_THREAD_BINDING_TTL_MS,
RECENT_UNBOUND_WEBHOOK_ECHO_TTL_MS,
DEFAULT_THREAD_BINDING_IDLE_TIMEOUT_MS,
DEFAULT_THREAD_BINDING_MAX_AGE_MS,
RECENT_UNBOUND_WEBHOOK_ECHO_WINDOW_MS,
THREAD_BINDINGS_VERSION,
type PersistedThreadBindingRecord,
type PersistedThreadBindingsPayload,
@@ -23,6 +24,7 @@ type ThreadBindingsGlobalState = {
reusableWebhooksByAccountChannel: Map<string, { webhookId: string; webhookToken: string }>;
persistByAccountId: Map<string, boolean>;
loadedBindings: boolean;
lastPersistedAtMs: number;
};
// Plugin hooks can load this module via Jiti while core imports it via ESM.
@@ -45,6 +47,7 @@ function createThreadBindingsGlobalState(): ThreadBindingsGlobalState {
>(),
persistByAccountId: new Map<string, boolean>(),
loadedBindings: false,
lastPersistedAtMs: 0,
};
}
@@ -69,6 +72,7 @@ export const RECENT_UNBOUND_WEBHOOK_ECHOES_BY_BINDING_KEY =
export const REUSABLE_WEBHOOKS_BY_ACCOUNT_CHANNEL =
THREAD_BINDINGS_STATE.reusableWebhooksByAccountChannel;
export const PERSIST_BY_ACCOUNT_ID = THREAD_BINDINGS_STATE.persistByAccountId;
export const THREAD_BINDING_TOUCH_PERSIST_MIN_INTERVAL_MS = 15_000;
export function rememberThreadBindingToken(params: { accountId?: string; token?: string }) {
const normalizedAccountId = normalizeAccountId(params.accountId);
@@ -164,10 +168,42 @@ function normalizePersistedBinding(threadIdKey: string, raw: unknown): ThreadBin
typeof value.boundAt === "number" && Number.isFinite(value.boundAt)
? Math.floor(value.boundAt)
: Date.now();
const expiresAt =
typeof value.expiresAt === "number" && Number.isFinite(value.expiresAt)
? Math.max(0, Math.floor(value.expiresAt))
const lastActivityAt =
typeof value.lastActivityAt === "number" && Number.isFinite(value.lastActivityAt)
? Math.max(0, Math.floor(value.lastActivityAt))
: boundAt;
const idleTimeoutMs =
typeof value.idleTimeoutMs === "number" && Number.isFinite(value.idleTimeoutMs)
? Math.max(0, Math.floor(value.idleTimeoutMs))
: undefined;
const maxAgeMs =
typeof value.maxAgeMs === "number" && Number.isFinite(value.maxAgeMs)
? Math.max(0, Math.floor(value.maxAgeMs))
: undefined;
const legacyExpiresAt =
typeof (value as { expiresAt?: unknown }).expiresAt === "number" &&
Number.isFinite((value as { expiresAt?: unknown }).expiresAt)
? Math.max(0, Math.floor((value as { expiresAt?: number }).expiresAt ?? 0))
: undefined;
let migratedIdleTimeoutMs = idleTimeoutMs;
let migratedMaxAgeMs = maxAgeMs;
if (
migratedIdleTimeoutMs === undefined &&
migratedMaxAgeMs === undefined &&
legacyExpiresAt != null
) {
if (legacyExpiresAt <= 0) {
migratedIdleTimeoutMs = 0;
migratedMaxAgeMs = 0;
} else {
const baseBoundAt = boundAt > 0 ? boundAt : lastActivityAt;
// Legacy expiresAt represented an absolute timestamp; map it to max-age and disable idle timeout.
migratedIdleTimeoutMs = 0;
migratedMaxAgeMs = Math.max(1, legacyExpiresAt - Math.max(0, baseBoundAt));
}
}
return {
accountId,
channelId,
@@ -180,41 +216,79 @@ function normalizePersistedBinding(threadIdKey: string, raw: unknown): ThreadBin
webhookToken,
boundBy,
boundAt,
expiresAt,
lastActivityAt,
idleTimeoutMs: migratedIdleTimeoutMs,
maxAgeMs: migratedMaxAgeMs,
};
}
export function normalizeThreadBindingTtlMs(raw: unknown): number {
export function normalizeThreadBindingDurationMs(raw: unknown, defaultsTo: number): number {
if (typeof raw !== "number" || !Number.isFinite(raw)) {
return DEFAULT_THREAD_BINDING_TTL_MS;
return defaultsTo;
}
const ttlMs = Math.floor(raw);
if (ttlMs < 0) {
return DEFAULT_THREAD_BINDING_TTL_MS;
const durationMs = Math.floor(raw);
if (durationMs < 0) {
return defaultsTo;
}
return ttlMs;
return durationMs;
}
export function resolveThreadBindingExpiresAt(params: {
record: Pick<ThreadBindingRecord, "boundAt" | "expiresAt">;
sessionTtlMs: number;
}): number | undefined {
if (typeof params.record.expiresAt === "number" && Number.isFinite(params.record.expiresAt)) {
const explicitExpiresAt = Math.floor(params.record.expiresAt);
if (explicitExpiresAt <= 0) {
// 0 is an explicit per-binding TTL disable sentinel.
return undefined;
}
return explicitExpiresAt;
export function resolveThreadBindingIdleTimeoutMs(params: {
record: Pick<ThreadBindingRecord, "idleTimeoutMs">;
defaultIdleTimeoutMs: number;
}): number {
const explicit = params.record.idleTimeoutMs;
if (typeof explicit === "number" && Number.isFinite(explicit)) {
return Math.max(0, Math.floor(explicit));
}
if (params.sessionTtlMs <= 0) {
return Math.max(0, Math.floor(params.defaultIdleTimeoutMs));
}
export function resolveThreadBindingMaxAgeMs(params: {
record: Pick<ThreadBindingRecord, "maxAgeMs">;
defaultMaxAgeMs: number;
}): number {
const explicit = params.record.maxAgeMs;
if (typeof explicit === "number" && Number.isFinite(explicit)) {
return Math.max(0, Math.floor(explicit));
}
return Math.max(0, Math.floor(params.defaultMaxAgeMs));
}
export function resolveThreadBindingInactivityExpiresAt(params: {
record: Pick<ThreadBindingRecord, "lastActivityAt" | "idleTimeoutMs">;
defaultIdleTimeoutMs: number;
}): number | undefined {
const idleTimeoutMs = resolveThreadBindingIdleTimeoutMs({
record: params.record,
defaultIdleTimeoutMs: params.defaultIdleTimeoutMs,
});
if (idleTimeoutMs <= 0) {
return undefined;
}
const lastActivityAt = Math.floor(params.record.lastActivityAt);
if (!Number.isFinite(lastActivityAt) || lastActivityAt <= 0) {
return undefined;
}
return lastActivityAt + idleTimeoutMs;
}
export function resolveThreadBindingMaxAgeExpiresAt(params: {
record: Pick<ThreadBindingRecord, "boundAt" | "maxAgeMs">;
defaultMaxAgeMs: number;
}): number | undefined {
const maxAgeMs = resolveThreadBindingMaxAgeMs({
record: params.record,
defaultMaxAgeMs: params.defaultMaxAgeMs,
});
if (maxAgeMs <= 0) {
return undefined;
}
const boundAt = Math.floor(params.record.boundAt);
if (!Number.isFinite(boundAt) || boundAt <= 0) {
return undefined;
}
return boundAt + params.sessionTtlMs;
return boundAt + maxAgeMs;
}
function linkSessionBinding(targetSessionKey: string, bindingKey: string) {
@@ -273,7 +347,7 @@ export function rememberRecentUnboundWebhookEcho(record: ThreadBindingRecord) {
}
RECENT_UNBOUND_WEBHOOK_ECHOES_BY_BINDING_KEY.set(bindingKey, {
webhookId,
expiresAt: Date.now() + RECENT_UNBOUND_WEBHOOK_ECHO_TTL_MS,
expiresAt: Date.now() + RECENT_UNBOUND_WEBHOOK_ECHO_WINDOW_MS,
});
}
@@ -357,10 +431,23 @@ export function shouldPersistBindingMutations(): boolean {
return fs.existsSync(resolveThreadBindingsPath());
}
export function saveBindingsToDisk(params: { force?: boolean } = {}) {
export function saveBindingsToDisk(params: { force?: boolean; minIntervalMs?: number } = {}) {
if (!params.force && !shouldPersistAnyBindingState()) {
return;
}
const minIntervalMs =
typeof params.minIntervalMs === "number" && Number.isFinite(params.minIntervalMs)
? Math.max(0, Math.floor(params.minIntervalMs))
: 0;
const now = Date.now();
if (
!params.force &&
minIntervalMs > 0 &&
THREAD_BINDINGS_STATE.lastPersistedAtMs > 0 &&
now - THREAD_BINDINGS_STATE.lastPersistedAtMs < minIntervalMs
) {
return;
}
const bindings: Record<string, PersistedThreadBindingRecord> = {};
for (const [bindingKey, record] of BINDINGS_BY_THREAD_ID.entries()) {
bindings[bindingKey] = { ...record };
@@ -370,6 +457,7 @@ export function saveBindingsToDisk(params: { force?: boolean } = {}) {
bindings,
};
saveJsonFile(resolveThreadBindingsPath(), payload);
THREAD_BINDINGS_STATE.lastPersistedAtMs = now;
}
export function ensureBindingsLoaded() {
@@ -429,6 +517,13 @@ export function resolveBindingIdsForSession(params: {
return out;
}
export function resolveDefaultThreadBindingDurations() {
return {
defaultIdleTimeoutMs: DEFAULT_THREAD_BINDING_IDLE_TIMEOUT_MS,
defaultMaxAgeMs: DEFAULT_THREAD_BINDING_MAX_AGE_MS,
};
}
export function resetThreadBindingsForTests() {
for (const manager of MANAGERS_BY_ACCOUNT_ID.values()) {
manager.stop();
@@ -441,4 +536,5 @@ export function resetThreadBindingsForTests() {
TOKENS_BY_ACCOUNT_ID.clear();
PERSIST_BY_ACCOUNT_ID.clear();
THREAD_BINDINGS_STATE.loadedBindings = false;
THREAD_BINDINGS_STATE.lastPersistedAtMs = 0;
}

View File

@@ -5,7 +5,7 @@ export type {
} from "./thread-bindings.types.js";
export {
formatThreadBindingTtlLabel,
formatThreadBindingDurationLabel,
resolveThreadBindingIntroText,
resolveThreadBindingThreadName,
} from "./thread-bindings.messages.js";
@@ -15,19 +15,26 @@ export {
} from "./thread-bindings.persona.js";
export {
resolveDiscordThreadBindingSessionTtlMs,
resolveThreadBindingSessionTtlMs,
resolveDiscordThreadBindingIdleTimeoutMs,
resolveDiscordThreadBindingMaxAgeMs,
resolveThreadBindingsEnabled,
} from "./thread-bindings.config.js";
export { isRecentlyUnboundThreadWebhookMessage } from "./thread-bindings.state.js";
export {
isRecentlyUnboundThreadWebhookMessage,
resolveThreadBindingIdleTimeoutMs,
resolveThreadBindingInactivityExpiresAt,
resolveThreadBindingMaxAgeExpiresAt,
resolveThreadBindingMaxAgeMs,
} from "./thread-bindings.state.js";
export {
autoBindSpawnedDiscordSubagent,
listThreadBindingsBySessionKey,
listThreadBindingsForAccount,
reconcileAcpThreadBindingsOnStartup,
setThreadBindingTtlBySessionKey,
setThreadBindingIdleTimeoutBySessionKey,
setThreadBindingMaxAgeBySessionKey,
unbindThreadBindingsBySessionKey,
} from "./thread-bindings.lifecycle.js";

View File

@@ -12,11 +12,17 @@ export type ThreadBindingRecord = {
webhookToken?: string;
boundBy: string;
boundAt: number;
expiresAt?: number;
lastActivityAt: number;
/** Inactivity timeout window in milliseconds (0 disables inactivity auto-unfocus). */
idleTimeoutMs?: number;
/** Hard max-age window in milliseconds from bind time (0 disables hard cap). */
maxAgeMs?: number;
};
export type PersistedThreadBindingRecord = ThreadBindingRecord & {
sessionKey?: string;
/** @deprecated Legacy absolute expiry timestamp; migrated on load. */
expiresAt?: number;
};
export type PersistedThreadBindingsPayload = {
@@ -26,11 +32,17 @@ export type PersistedThreadBindingsPayload = {
export type ThreadBindingManager = {
accountId: string;
getSessionTtlMs: () => number;
getIdleTimeoutMs: () => number;
getMaxAgeMs: () => number;
getByThreadId: (threadId: string) => ThreadBindingRecord | undefined;
getBySessionKey: (targetSessionKey: string) => ThreadBindingRecord | undefined;
listBySessionKey: (targetSessionKey: string) => ThreadBindingRecord[];
listBindings: () => ThreadBindingRecord[];
touchThread: (params: {
threadId: string;
at?: number;
persist?: boolean;
}) => ThreadBindingRecord | null;
bindTarget: (params: {
threadId?: string | number;
channelId?: string;
@@ -63,7 +75,8 @@ export type ThreadBindingManager = {
export const THREAD_BINDINGS_VERSION = 1 as const;
export const THREAD_BINDINGS_SWEEP_INTERVAL_MS = 120_000;
export const DEFAULT_THREAD_BINDING_TTL_MS = 24 * 60 * 60 * 1000; // 24h
export const DEFAULT_FAREWELL_TEXT = "Session ended. Messages here will no longer be routed.";
export const DEFAULT_THREAD_BINDING_IDLE_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 24h
export const DEFAULT_THREAD_BINDING_MAX_AGE_MS = 0; // disabled
export const DEFAULT_FAREWELL_TEXT = "Thread unfocused. Messages here will no longer be routed.";
export const DISCORD_UNKNOWN_CHANNEL_ERROR_CODE = 10_003;
export const RECENT_UNBOUND_WEBHOOK_ECHO_TTL_MS = 30_000;
export const RECENT_UNBOUND_WEBHOOK_ECHO_WINDOW_MS = 30_000;