fix: kill stuck ACP child processes on startup and harden sessions in discord threads (#33699)

* Gateway: resolve agent.wait for chat.send runs

* Discord: harden ACP thread binding + listener timeout

* ACPX: handle already-exited child wait

* Gateway/Discord: address PR review findings

* Discord: keep ACP error-state thread bindings on startup

* gateway: make agent.wait dedupe bridge event-driven

* discord: harden ACP probe classification and cap startup fan-out

* discord: add cooperative timeout cancellation

* discord: fix startup probe concurrency helper typing

* plugin-sdk: avoid Windows root-alias shard timeout

* plugin-sdk: keep root alias reflection path non-blocking

* discord+gateway: resolve remaining PR review findings

* gateway+discord: fix codex review regressions

* Discord/Gateway: address Codex review findings

* Gateway: keep agent.wait lifecycle active with shared run IDs

* Discord: clean up status reactions on aborted runs

* fix: add changelog note for ACP/Discord startup hardening (#33699) (thanks @dutifulbob)

---------

Co-authored-by: Onur <2453968+osolmaz@users.noreply.github.com>
This commit is contained in:
Bob
2026-03-04 10:52:28 +01:00
committed by GitHub
parent bd25182d5a
commit 61f7cea48b
30 changed files with 2568 additions and 180 deletions

View File

@@ -197,9 +197,9 @@ describe("DiscordMessageListener", () => {
// Release the background handler and allow slow-log finalizer to run.
deferred.resolve();
await Promise.resolve();
expect(logger.warn).toHaveBeenCalled();
await vi.waitFor(() => {
expect(logger.warn).toHaveBeenCalled();
});
const warnMock = logger.warn as unknown as { mock: { calls: unknown[][] } };
const [, meta] = warnMock.mock.calls[0] ?? [];
const durationMs = (meta as { durationMs?: number } | undefined)?.durationMs;

View File

@@ -121,4 +121,110 @@ describe("DiscordMessageListener", () => {
);
});
});
it("continues same-channel processing after handler timeout", async () => {
vi.useFakeTimers();
try {
const never = new Promise<void>(() => {});
const handler = vi.fn(async () => {
if (handler.mock.calls.length === 1) {
await never;
return;
}
});
const logger = createLogger();
const listener = new DiscordMessageListener(handler as never, logger as never, undefined, {
timeoutMs: 50,
});
await listener.handle(fakeEvent("ch-1"), {} as never);
await listener.handle(fakeEvent("ch-1"), {} as never);
expect(handler).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(60);
await vi.waitFor(() => {
expect(handler).toHaveBeenCalledTimes(2);
});
expect(logger.error).toHaveBeenCalledWith(expect.stringContaining("timed out after"));
} finally {
vi.useRealTimers();
}
});
it("aborts timed-out handlers and prevents late side effects", async () => {
vi.useFakeTimers();
try {
let abortReceived = false;
let lateSideEffect = false;
const handler = vi.fn(
async (
_data: unknown,
_client: unknown,
options?: {
abortSignal?: AbortSignal;
},
) => {
await new Promise<void>((resolve) => {
if (options?.abortSignal?.aborted) {
abortReceived = true;
resolve();
return;
}
options?.abortSignal?.addEventListener(
"abort",
() => {
abortReceived = true;
resolve();
},
{ once: true },
);
});
if (options?.abortSignal?.aborted) {
return;
}
lateSideEffect = true;
},
);
const logger = createLogger();
const listener = new DiscordMessageListener(handler as never, logger as never, undefined, {
timeoutMs: 50,
});
await listener.handle(fakeEvent("ch-1"), {} as never);
await listener.handle(fakeEvent("ch-1"), {} as never);
await vi.advanceTimersByTimeAsync(60);
await vi.waitFor(() => {
expect(handler).toHaveBeenCalledTimes(2);
});
expect(abortReceived).toBe(true);
expect(lateSideEffect).toBe(false);
expect(logger.error).toHaveBeenCalledWith(expect.stringContaining("timed out after"));
} finally {
vi.useRealTimers();
}
});
it("does not emit slow-listener warnings when timeout already fired", async () => {
vi.useFakeTimers();
try {
const never = new Promise<void>(() => {});
const handler = vi.fn(async () => {
await never;
});
const logger = createLogger();
const listener = new DiscordMessageListener(handler as never, logger as never, undefined, {
timeoutMs: 31_000,
});
await listener.handle(fakeEvent("ch-1"), {} as never);
await vi.advanceTimersByTimeAsync(31_100);
await vi.waitFor(() => {
expect(logger.error).toHaveBeenCalledWith(expect.stringContaining("timed out after"));
});
expect(logger.warn).not.toHaveBeenCalled();
} finally {
vi.useRealTimers();
}
});
});

View File

@@ -41,7 +41,11 @@ type Logger = ReturnType<typeof import("../../logging/subsystem.js").createSubsy
export type DiscordMessageEvent = Parameters<MessageCreateListener["handle"]>[0];
export type DiscordMessageHandler = (data: DiscordMessageEvent, client: Client) => Promise<void>;
export type DiscordMessageHandler = (
data: DiscordMessageEvent,
client: Client,
options?: { abortSignal?: AbortSignal },
) => Promise<void>;
type DiscordReactionEvent = Parameters<MessageReactionAddListener["handle"]>[0];
@@ -66,13 +70,50 @@ type DiscordReactionRoutingParams = {
};
const DISCORD_SLOW_LISTENER_THRESHOLD_MS = 30_000;
const DISCORD_DEFAULT_LISTENER_TIMEOUT_MS = 120_000;
const discordEventQueueLog = createSubsystemLogger("discord/event-queue");
function normalizeDiscordListenerTimeoutMs(raw: number | undefined): number {
if (!Number.isFinite(raw) || (raw ?? 0) <= 0) {
return DISCORD_DEFAULT_LISTENER_TIMEOUT_MS;
}
return Math.max(1_000, Math.floor(raw!));
}
function formatListenerContextValue(value: unknown): string | null {
if (value === undefined || value === null) {
return null;
}
if (typeof value === "string") {
const trimmed = value.trim();
return trimmed.length > 0 ? trimmed : null;
}
if (typeof value === "number" || typeof value === "boolean" || typeof value === "bigint") {
return String(value);
}
return null;
}
function formatListenerContextSuffix(context?: Record<string, unknown>): string {
if (!context) {
return "";
}
const entries = Object.entries(context).flatMap(([key, value]) => {
const formatted = formatListenerContextValue(value);
return formatted ? [`${key}=${formatted}`] : [];
});
if (entries.length === 0) {
return "";
}
return ` (${entries.join(" ")})`;
}
function logSlowDiscordListener(params: {
logger: Logger | undefined;
listener: string;
event: string;
durationMs: number;
context?: Record<string, unknown>;
}) {
if (params.durationMs < DISCORD_SLOW_LISTENER_THRESHOLD_MS) {
return;
@@ -88,7 +129,8 @@ function logSlowDiscordListener(params: {
event: params.event,
durationMs: params.durationMs,
duration,
consoleMessage: message,
...params.context,
consoleMessage: `${message}${formatListenerContextSuffix(params.context)}`,
});
}
@@ -96,12 +138,59 @@ async function runDiscordListenerWithSlowLog(params: {
logger: Logger | undefined;
listener: string;
event: string;
run: () => Promise<void>;
run: (abortSignal: AbortSignal) => Promise<void>;
timeoutMs?: number;
context?: Record<string, unknown>;
onError?: (err: unknown) => void;
}) {
const startedAt = Date.now();
const timeoutMs = normalizeDiscordListenerTimeoutMs(params.timeoutMs);
let timedOut = false;
let timeoutHandle: ReturnType<typeof setTimeout> | null = null;
const logger = params.logger ?? discordEventQueueLog;
const abortController = new AbortController();
const runPromise = params.run(abortController.signal).catch((err) => {
if (timedOut) {
const errorName =
err && typeof err === "object" && "name" in err ? String(err.name) : undefined;
if (abortController.signal.aborted && errorName === "AbortError") {
logger.warn(
`discord handler canceled after timeout${formatListenerContextSuffix(params.context)}`,
);
return;
}
logger.error(
danger(
`discord handler failed after timeout: ${String(err)}${formatListenerContextSuffix(params.context)}`,
),
);
return;
}
throw err;
});
try {
await params.run();
const timeoutPromise = new Promise<"timeout">((resolve) => {
timeoutHandle = setTimeout(() => resolve("timeout"), timeoutMs);
timeoutHandle.unref?.();
});
const result = await Promise.race([
runPromise.then(() => "completed" as const),
timeoutPromise,
]);
if (result === "timeout") {
timedOut = true;
abortController.abort();
logger.error(
danger(
`discord handler timed out after ${formatDurationSeconds(timeoutMs, {
decimals: 1,
unit: "seconds",
})}${formatListenerContextSuffix(params.context)}`,
),
);
return;
}
} catch (err) {
if (params.onError) {
params.onError(err);
@@ -109,12 +198,18 @@ async function runDiscordListenerWithSlowLog(params: {
}
throw err;
} finally {
logSlowDiscordListener({
logger: params.logger,
listener: params.listener,
event: params.event,
durationMs: Date.now() - startedAt,
});
if (timeoutHandle) {
clearTimeout(timeoutHandle);
}
if (!timedOut) {
logSlowDiscordListener({
logger: params.logger,
listener: params.listener,
event: params.event,
durationMs: Date.now() - startedAt,
context: params.context,
});
}
}
}
@@ -128,18 +223,26 @@ export function registerDiscordListener(listeners: Array<object>, listener: obje
export class DiscordMessageListener extends MessageCreateListener {
private readonly channelQueue = new KeyedAsyncQueue();
private readonly listenerTimeoutMs: number;
constructor(
private handler: DiscordMessageHandler,
private logger?: Logger,
private onEvent?: () => void,
options?: { timeoutMs?: number },
) {
super();
this.listenerTimeoutMs = normalizeDiscordListenerTimeoutMs(options?.timeoutMs);
}
async handle(data: DiscordMessageEvent, client: Client) {
this.onEvent?.();
const channelId = data.channel_id;
const context = {
channelId,
messageId: (data as { message?: { id?: string } }).message?.id,
guildId: (data as { guild_id?: string }).guild_id,
} satisfies Record<string, unknown>;
// Serialize messages within the same channel to preserve ordering,
// but allow different channels to proceed in parallel so that
// channel-bound agents are not blocked by each other.
@@ -148,7 +251,9 @@ export class DiscordMessageListener extends MessageCreateListener {
logger: this.logger,
listener: this.constructor.name,
event: this.type,
run: () => this.handler(data, client),
timeoutMs: this.listenerTimeoutMs,
context,
run: (abortSignal) => this.handler(data, client, { abortSignal }),
onError: (err) => {
const logger = this.logger ?? discordEventQueueLog;
logger.error(danger(`discord handler failed: ${String(err)}`));
@@ -206,7 +311,7 @@ async function runDiscordReactionHandler(params: {
logger: params.handlerParams.logger,
listener: params.listener,
event: params.event,
run: () =>
run: async () =>
handleDiscordReactionEvent({
data: params.data,
client: params.client,

View File

@@ -68,6 +68,10 @@ export type {
const DISCORD_BOUND_THREAD_SYSTEM_PREFIXES = ["⚙️", "🤖", "🧰"];
function isPreflightAborted(abortSignal?: AbortSignal): boolean {
return Boolean(abortSignal?.aborted);
}
function isBoundThreadBotSystemMessage(params: {
isBoundThreadSession: boolean;
isBotAuthor: boolean;
@@ -124,6 +128,9 @@ export function shouldIgnoreBoundThreadWebhookMessage(params: {
export async function preflightDiscordMessage(
params: DiscordMessagePreflightParams,
): Promise<DiscordMessagePreflightContext | null> {
if (isPreflightAborted(params.abortSignal)) {
return null;
}
const logger = getChildLogger({ module: "discord-auto-reply" });
const message = params.data.message;
const author = params.data.author;
@@ -157,6 +164,9 @@ export async function preflightDiscordMessage(
messageId: message.id,
config: pluralkitConfig,
});
if (isPreflightAborted(params.abortSignal)) {
return null;
}
} catch (err) {
logVerbose(`discord: pluralkit lookup failed for ${message.id}: ${String(err)}`);
}
@@ -176,6 +186,9 @@ export async function preflightDiscordMessage(
const isGuildMessage = Boolean(params.data.guild_id);
const channelInfo = await resolveDiscordChannelInfo(params.client, messageChannelId);
if (isPreflightAborted(params.abortSignal)) {
return null;
}
const isDirectMessage = channelInfo?.type === ChannelType.DM;
const isGroupDm = channelInfo?.type === ChannelType.GroupDM;
logDebug(
@@ -213,6 +226,9 @@ export async function preflightDiscordMessage(
allowNameMatching,
useAccessGroups,
});
if (isPreflightAborted(params.abortSignal)) {
return null;
}
commandAuthorized = dmAccess.commandAuthorized;
if (dmAccess.decision !== "allow") {
const allowMatchMeta = formatAllowlistMatchMeta(
@@ -300,6 +316,9 @@ export async function preflightDiscordMessage(
threadChannel: earlyThreadChannel,
channelInfo,
});
if (isPreflightAborted(params.abortSignal)) {
return null;
}
earlyThreadParentId = parentInfo.id;
earlyThreadParentName = parentInfo.name;
earlyThreadParentType = parentInfo.type;
@@ -548,7 +567,11 @@ export async function preflightDiscordMessage(
shouldRequireMention,
mentionRegexes,
cfg: params.cfg,
abortSignal: params.abortSignal,
});
if (isPreflightAborted(params.abortSignal)) {
return null;
}
const mentionText = hasTypedText ? baseText : "";
const wasMentioned =
@@ -727,6 +750,7 @@ export async function preflightDiscordMessage(
token: params.token,
runtime: params.runtime,
botUserId: params.botUserId,
abortSignal: params.abortSignal,
guildHistories: params.guildHistories,
historyLimit: params.historyLimit,
mediaMaxBytes: params.mediaMaxBytes,

View File

@@ -25,6 +25,7 @@ export type DiscordMessagePreflightContext = {
token: string;
runtime: RuntimeEnv;
botUserId?: string;
abortSignal?: AbortSignal;
guildHistories: Map<string, HistoryEntry[]>;
historyLimit: number;
mediaMaxBytes: number;
@@ -95,6 +96,7 @@ export type DiscordMessagePreflightParams = {
token: string;
runtime: RuntimeEnv;
botUserId?: string;
abortSignal?: AbortSignal;
guildHistories: Map<string, HistoryEntry[]>;
historyLimit: number;
mediaMaxBytes: number;

View File

@@ -345,6 +345,32 @@ describe("processDiscordMessage ack reactions", () => {
expect(emojis).toContain("🟦");
expect(emojis).toContain("🏁");
});
it("clears status reactions when dispatch aborts and removeAckAfterReply is enabled", async () => {
const abortController = new AbortController();
dispatchInboundMessage.mockImplementationOnce(async () => {
abortController.abort();
throw new Error("aborted");
});
const ctx = await createBaseContext({
abortSignal: abortController.signal,
cfg: {
messages: {
ackReaction: "👀",
removeAckAfterReply: true,
},
session: { store: "/tmp/openclaw-discord-process-test-sessions.json" },
},
});
// oxlint-disable-next-line typescript/no-explicit-any
await processDiscordMessage(ctx as any);
await vi.waitFor(() => {
expect(sendMocks.removeReactionDiscord).toHaveBeenCalledWith("c1", "m1", "👀", { rest: {} });
});
});
});
describe("processDiscordMessage session routing", () => {

View File

@@ -60,6 +60,10 @@ function sleep(ms: number): Promise<void> {
const DISCORD_TYPING_MAX_DURATION_MS = 20 * 60_000;
function isProcessAborted(abortSignal?: AbortSignal): boolean {
return Boolean(abortSignal?.aborted);
}
export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) {
const {
cfg,
@@ -105,16 +109,26 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
route,
commandAuthorized,
discordRestFetch,
abortSignal,
} = ctx;
if (isProcessAborted(abortSignal)) {
return;
}
const ssrfPolicy = cfg.browser?.ssrfPolicy;
const mediaList = await resolveMediaList(message, mediaMaxBytes, discordRestFetch, ssrfPolicy);
if (isProcessAborted(abortSignal)) {
return;
}
const forwardedMediaList = await resolveForwardedMediaList(
message,
mediaMaxBytes,
discordRestFetch,
ssrfPolicy,
);
if (isProcessAborted(abortSignal)) {
return;
}
mediaList.push(...forwardedMediaList);
const text = messageText;
if (!text) {
@@ -585,6 +599,9 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
humanDelay: resolveHumanDelayConfig(cfg, route.agentId),
typingCallbacks,
deliver: async (payload: ReplyPayload, info) => {
if (isProcessAborted(abortSignal)) {
return;
}
const isFinal = info.kind === "final";
if (payload.isReasoning) {
// Reasoning/thinking payloads should not be delivered to Discord.
@@ -607,6 +624,9 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
if (canFinalizeViaPreviewEdit) {
await draftStream.stop();
if (isProcessAborted(abortSignal)) {
return;
}
try {
await editMessageDiscord(
deliverChannelId,
@@ -627,6 +647,9 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
// Check if stop() flushed a message we can edit
if (!finalizedViaPreviewMessage) {
await draftStream.stop();
if (isProcessAborted(abortSignal)) {
return;
}
const messageIdAfterStop = draftStream.messageId();
if (
typeof messageIdAfterStop === "string" &&
@@ -657,6 +680,9 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
await draftStream.clear();
}
}
if (isProcessAborted(abortSignal)) {
return;
}
const replyToId = replyReference.use();
await deliverDiscordReply({
@@ -682,6 +708,9 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
runtime.error?.(danger(`discord ${info.kind} reply failed: ${String(err)}`));
},
onReplyStart: async () => {
if (isProcessAborted(abortSignal)) {
return;
}
await typingCallbacks.onReplyStart();
await statusReactions.setThinking();
},
@@ -689,13 +718,19 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
let dispatchResult: Awaited<ReturnType<typeof dispatchInboundMessage>> | null = null;
let dispatchError = false;
let dispatchAborted = false;
try {
if (isProcessAborted(abortSignal)) {
dispatchAborted = true;
return;
}
dispatchResult = await dispatchInboundMessage({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
...replyOptions,
abortSignal,
skillFilter: channelConfig?.skills,
disableBlockStreaming:
disableBlockStreamingForDraft ??
@@ -730,11 +765,22 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
await statusReactions.setThinking();
},
onToolStart: async (payload) => {
if (isProcessAborted(abortSignal)) {
return;
}
await statusReactions.setTool(payload.name);
},
},
});
if (isProcessAborted(abortSignal)) {
dispatchAborted = true;
return;
}
} catch (err) {
if (isProcessAborted(abortSignal)) {
dispatchAborted = true;
return;
}
dispatchError = true;
throw err;
} finally {
@@ -752,21 +798,32 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
markDispatchIdle();
}
if (statusReactionsEnabled) {
if (dispatchError) {
await statusReactions.setError();
if (dispatchAborted) {
if (removeAckAfterReply) {
void statusReactions.clear();
} else {
void statusReactions.restoreInitial();
}
} else {
await statusReactions.setDone();
}
if (removeAckAfterReply) {
void (async () => {
await sleep(dispatchError ? DEFAULT_TIMING.errorHoldMs : DEFAULT_TIMING.doneHoldMs);
await statusReactions.clear();
})();
} else {
void statusReactions.restoreInitial();
if (dispatchError) {
await statusReactions.setError();
} else {
await statusReactions.setDone();
}
if (removeAckAfterReply) {
void (async () => {
await sleep(dispatchError ? DEFAULT_TIMING.errorHoldMs : DEFAULT_TIMING.doneHoldMs);
await statusReactions.clear();
})();
} else {
void statusReactions.restoreInitial();
}
}
}
}
if (dispatchAborted) {
return;
}
if (!dispatchResult?.queuedFinal) {
if (isGuildMessage) {

View File

@@ -26,6 +26,7 @@ function createDeferred<T = void>() {
function createHandlerParams(overrides?: {
setStatus?: (patch: Record<string, unknown>) => void;
abortSignal?: AbortSignal;
listenerTimeoutMs?: number;
}) {
const cfg: OpenClawConfig = {
channels: {
@@ -64,6 +65,7 @@ function createHandlerParams(overrides?: {
threadBindings: createNoopThreadBindingManager("default"),
setStatus: overrides?.setStatus,
abortSignal: overrides?.abortSignal,
listenerTimeoutMs: overrides?.listenerTimeoutMs,
};
}
@@ -167,6 +169,55 @@ describe("createDiscordMessageHandler queue behavior", () => {
});
});
it("applies listener timeout to queued runs so stalled runs do not block the queue", async () => {
vi.useFakeTimers();
try {
preflightDiscordMessageMock.mockReset();
processDiscordMessageMock.mockReset();
processDiscordMessageMock
.mockImplementationOnce(async (ctx: { abortSignal?: AbortSignal }) => {
await new Promise<void>((resolve) => {
if (ctx.abortSignal?.aborted) {
resolve();
return;
}
ctx.abortSignal?.addEventListener("abort", () => resolve(), { once: true });
});
})
.mockImplementationOnce(async () => undefined);
preflightDiscordMessageMock.mockImplementation(
async (params: { data: { channel_id: string } }) =>
createPreflightContext(params.data.channel_id),
);
const params = createHandlerParams({ listenerTimeoutMs: 50 });
const handler = createDiscordMessageHandler(params);
await expect(
handler(createMessageData("m-1") as never, {} as never),
).resolves.toBeUndefined();
await expect(
handler(createMessageData("m-2") as never, {} as never),
).resolves.toBeUndefined();
await vi.advanceTimersByTimeAsync(60);
await vi.waitFor(() => {
expect(processDiscordMessageMock).toHaveBeenCalledTimes(2);
});
const firstCtx = processDiscordMessageMock.mock.calls[0]?.[0] as
| { abortSignal?: AbortSignal }
| undefined;
expect(firstCtx?.abortSignal?.aborted).toBe(true);
expect(params.runtime.error).toHaveBeenCalledWith(
expect.stringContaining("discord queued run timed out after"),
);
} finally {
vi.useRealTimers();
}
});
it("refreshes run activity while active runs are in progress", async () => {
preflightDiscordMessageMock.mockReset();
processDiscordMessageMock.mockReset();

View File

@@ -6,6 +6,7 @@ import {
import { createRunStateMachine } from "../../channels/run-state-machine.js";
import { resolveOpenProviderRuntimeGroupPolicy } from "../../config/runtime-group-policy.js";
import { danger } from "../../globals.js";
import { formatDurationSeconds } from "../../infra/format-time/format-duration.ts";
import { KeyedAsyncQueue } from "../../plugin-sdk/keyed-async-queue.js";
import type { DiscordMessageEvent, DiscordMessageHandler } from "./listeners.js";
import { preflightDiscordMessage } from "./message-handler.preflight.js";
@@ -27,12 +28,142 @@ type DiscordMessageHandlerParams = Omit<
> & {
setStatus?: DiscordMonitorStatusSink;
abortSignal?: AbortSignal;
listenerTimeoutMs?: number;
};
export type DiscordMessageHandlerWithLifecycle = DiscordMessageHandler & {
deactivate: () => void;
};
const DEFAULT_DISCORD_RUN_TIMEOUT_MS = 120_000;
const MAX_DISCORD_TIMEOUT_MS = 2_147_483_647;
function normalizeDiscordRunTimeoutMs(timeoutMs?: number): number {
if (typeof timeoutMs !== "number" || !Number.isFinite(timeoutMs) || timeoutMs <= 0) {
return DEFAULT_DISCORD_RUN_TIMEOUT_MS;
}
return Math.max(1, Math.min(Math.floor(timeoutMs), MAX_DISCORD_TIMEOUT_MS));
}
function isAbortError(error: unknown): boolean {
if (typeof error !== "object" || error === null) {
return false;
}
return "name" in error && String((error as { name?: unknown }).name) === "AbortError";
}
function formatDiscordRunContextSuffix(ctx: DiscordMessagePreflightContext): string {
const eventData = ctx as {
data?: {
channel_id?: string;
message?: {
id?: string;
};
};
};
const channelId = ctx.messageChannelId?.trim() || eventData.data?.channel_id?.trim();
const messageId = eventData.data?.message?.id?.trim();
const details = [
channelId ? `channelId=${channelId}` : null,
messageId ? `messageId=${messageId}` : null,
].filter((entry): entry is string => Boolean(entry));
if (details.length === 0) {
return "";
}
return ` (${details.join(", ")})`;
}
function mergeAbortSignals(signals: Array<AbortSignal | undefined>): AbortSignal | undefined {
const activeSignals = signals.filter((signal): signal is AbortSignal => Boolean(signal));
if (activeSignals.length === 0) {
return undefined;
}
if (activeSignals.length === 1) {
return activeSignals[0];
}
if (typeof AbortSignal.any === "function") {
return AbortSignal.any(activeSignals);
}
const fallbackController = new AbortController();
for (const signal of activeSignals) {
if (signal.aborted) {
fallbackController.abort();
return fallbackController.signal;
}
}
const abortFallback = () => {
fallbackController.abort();
for (const signal of activeSignals) {
signal.removeEventListener("abort", abortFallback);
}
};
for (const signal of activeSignals) {
signal.addEventListener("abort", abortFallback, { once: true });
}
return fallbackController.signal;
}
async function processDiscordRunWithTimeout(params: {
ctx: DiscordMessagePreflightContext;
runtime: DiscordMessagePreflightParams["runtime"];
lifecycleSignal?: AbortSignal;
timeoutMs?: number;
}) {
const timeoutMs = normalizeDiscordRunTimeoutMs(params.timeoutMs);
const timeoutAbortController = new AbortController();
const combinedSignal = mergeAbortSignals([
params.ctx.abortSignal,
params.lifecycleSignal,
timeoutAbortController.signal,
]);
const processCtx =
combinedSignal && combinedSignal !== params.ctx.abortSignal
? { ...params.ctx, abortSignal: combinedSignal }
: params.ctx;
const contextSuffix = formatDiscordRunContextSuffix(params.ctx);
let timedOut = false;
let timeoutHandle: ReturnType<typeof setTimeout> | null = null;
const processPromise = processDiscordMessage(processCtx).catch((error) => {
if (timedOut) {
if (timeoutAbortController.signal.aborted && isAbortError(error)) {
return;
}
params.runtime.error?.(
danger(`discord queued run failed after timeout: ${String(error)}${contextSuffix}`),
);
return;
}
throw error;
});
try {
const timeoutPromise = new Promise<"timeout">((resolve) => {
timeoutHandle = setTimeout(() => resolve("timeout"), timeoutMs);
timeoutHandle.unref?.();
});
const result = await Promise.race([
processPromise.then(() => "completed" as const),
timeoutPromise,
]);
if (result === "timeout") {
timedOut = true;
timeoutAbortController.abort();
params.runtime.error?.(
danger(
`discord queued run timed out after ${formatDurationSeconds(timeoutMs, {
decimals: 1,
unit: "seconds",
})}${contextSuffix}`,
),
);
}
} finally {
if (timeoutHandle) {
clearTimeout(timeoutHandle);
}
}
}
function resolveDiscordRunQueueKey(ctx: DiscordMessagePreflightContext): string {
const sessionKey = ctx.route.sessionKey?.trim();
if (sessionKey) {
@@ -75,7 +206,12 @@ export function createDiscordMessageHandler(
if (!runState.isActive()) {
return;
}
await processDiscordMessage(ctx);
await processDiscordRunWithTimeout({
ctx,
runtime: params.runtime,
lifecycleSignal: params.abortSignal,
timeoutMs: params.listenerTimeoutMs,
});
} finally {
runState.onRunEnd();
}
@@ -88,6 +224,7 @@ export function createDiscordMessageHandler(
const { debouncer } = createChannelInboundDebouncer<{
data: DiscordMessageEvent;
client: Client;
abortSignal?: AbortSignal;
}>({
cfg: params.cfg,
channel: "discord",
@@ -126,11 +263,16 @@ export function createDiscordMessageHandler(
if (!last) {
return;
}
const abortSignal = last.abortSignal;
if (abortSignal?.aborted) {
return;
}
if (entries.length === 1) {
const ctx = await preflightDiscordMessage({
...params,
ackReactionScope,
groupPolicy,
abortSignal,
data: last.data,
client: last.client,
});
@@ -162,6 +304,7 @@ export function createDiscordMessageHandler(
...params,
ackReactionScope,
groupPolicy,
abortSignal,
data: syntheticData,
client: last.client,
});
@@ -188,19 +331,22 @@ export function createDiscordMessageHandler(
},
});
const handler: DiscordMessageHandlerWithLifecycle = async (data, client) => {
// Filter bot-own messages before they enter the debounce queue.
// The same check exists in preflightDiscordMessage(), but by that point
// the message has already consumed debounce capacity and blocked
// legitimate user messages. On active servers this causes cumulative
// slowdown (see #15874).
const msgAuthorId = data.message?.author?.id ?? data.author?.id;
if (params.botUserId && msgAuthorId === params.botUserId) {
return;
}
const handler: DiscordMessageHandlerWithLifecycle = async (data, client, options) => {
try {
await debouncer.enqueue({ data, client });
if (options?.abortSignal?.aborted) {
return;
}
// Filter bot-own messages before they enter the debounce queue.
// The same check exists in preflightDiscordMessage(), but by that point
// the message has already consumed debounce capacity and blocked
// legitimate user messages. On active servers this causes cumulative
// slowdown (see #15874).
const msgAuthorId = data.message?.author?.id ?? data.author?.id;
if (params.botUserId && msgAuthorId === params.botUserId) {
return;
}
await debouncer.enqueue({ data, client, abortSignal: options?.abortSignal });
} catch (err) {
params.runtime.error?.(danger(`handler failed: ${String(err)}`));
}

View File

@@ -24,6 +24,7 @@ export async function resolveDiscordPreflightAudioMentionContext(params: {
shouldRequireMention: boolean;
mentionRegexes: RegExp[];
cfg: OpenClawConfig;
abortSignal?: AbortSignal;
}): Promise<{
hasAudioAttachment: boolean;
hasTypedText: boolean;
@@ -42,8 +43,20 @@ export async function resolveDiscordPreflightAudioMentionContext(params: {
let transcript: string | undefined;
if (needsPreflightTranscription) {
if (params.abortSignal?.aborted) {
return {
hasAudioAttachment,
hasTypedText,
};
}
try {
const { transcribeFirstAudio } = await import("../../media-understanding/audio-preflight.js");
if (params.abortSignal?.aborted) {
return {
hasAudioAttachment,
hasTypedText,
};
}
const audioUrls = audioAttachments
.map((att) => att.url)
.filter((url): url is string => typeof url === "string" && url.length > 0);
@@ -58,6 +71,9 @@ export async function resolveDiscordPreflightAudioMentionContext(params: {
cfg: params.cfg,
agentDir: undefined,
});
if (params.abortSignal?.aborted) {
transcript = undefined;
}
}
} catch (err) {
logVerbose(`discord: audio preflight transcription failed: ${String(err)}`);

View File

@@ -1,5 +1,6 @@
import { EventEmitter } from "node:events";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { AcpRuntimeError } from "../../acp/runtime/errors.js";
import type { OpenClawConfig } from "../../config/config.js";
import type { RuntimeEnv } from "../../runtime.js";
@@ -25,6 +26,7 @@ const {
createThreadBindingManagerMock,
reconcileAcpThreadBindingsOnStartupMock,
createdBindingManagers,
getAcpSessionStatusMock,
getPluginCommandSpecsMock,
listNativeCommandSpecsForConfigMock,
listSkillCommandsForAgentsMock,
@@ -63,6 +65,11 @@ const {
staleSessionKeys: [],
})),
createdBindingManagers,
getAcpSessionStatusMock: vi.fn(
async (_params: { cfg: OpenClawConfig; sessionKey: string; signal?: AbortSignal }) => ({
state: "idle",
}),
),
getPluginCommandSpecsMock: vi.fn<() => PluginCommandSpecMock[]>(() => []),
listNativeCommandSpecsForConfigMock: vi.fn<() => NativeCommandSpecMock[]>(() => [
{ name: "cmd", description: "built-in", acceptsArgs: false },
@@ -127,6 +134,12 @@ vi.mock("../../auto-reply/chunk.js", () => ({
resolveTextChunkLimit: () => 2000,
}));
vi.mock("../../acp/control-plane/manager.js", () => ({
getAcpSessionManager: () => ({
getSessionStatus: getAcpSessionStatusMock,
}),
}));
vi.mock("../../auto-reply/commands-registry.js", () => ({
listNativeCommandSpecsForConfig: listNativeCommandSpecsForConfigMock,
}));
@@ -272,6 +285,21 @@ vi.mock("./thread-bindings.js", () => ({
}));
describe("monitorDiscordProvider", () => {
type ReconcileHealthProbeParams = {
cfg: OpenClawConfig;
accountId: string;
sessionKey: string;
binding: unknown;
session: unknown;
};
type ReconcileStartupParams = {
cfg: OpenClawConfig;
healthProbe?: (
params: ReconcileHealthProbeParams,
) => Promise<{ status: string; reason?: string }>;
};
const baseRuntime = (): RuntimeEnv => {
return {
log: vi.fn(),
@@ -299,6 +327,16 @@ describe("monitorDiscordProvider", () => {
return opts.eventQueue;
};
const getHealthProbe = () => {
expect(reconcileAcpThreadBindingsOnStartupMock).toHaveBeenCalledTimes(1);
const firstCall = reconcileAcpThreadBindingsOnStartupMock.mock.calls.at(0) as
| [ReconcileStartupParams]
| undefined;
const reconcileParams = firstCall?.[0];
expect(typeof reconcileParams?.healthProbe).toBe("function");
return reconcileParams?.healthProbe as NonNullable<ReconcileStartupParams["healthProbe"]>;
};
beforeEach(() => {
clientConstructorOptionsMock.mockClear();
createDiscordAutoPresenceControllerMock.mockClear().mockImplementation(() => ({
@@ -318,6 +356,7 @@ describe("monitorDiscordProvider", () => {
removed: 0,
staleSessionKeys: [],
});
getAcpSessionStatusMock.mockClear().mockResolvedValue({ state: "idle" });
createdBindingManagers.length = 0;
getPluginCommandSpecsMock.mockClear().mockReturnValue([]);
listNativeCommandSpecsForConfigMock
@@ -368,6 +407,167 @@ describe("monitorDiscordProvider", () => {
expect(reconcileAcpThreadBindingsOnStartupMock).toHaveBeenCalledTimes(1);
});
it("treats ACP error status as uncertain during startup thread-binding probes", async () => {
const { monitorDiscordProvider } = await import("./provider.js");
getAcpSessionStatusMock.mockResolvedValue({ state: "error" });
await monitorDiscordProvider({
config: baseConfig(),
runtime: baseRuntime(),
});
const probeResult = await getHealthProbe()({
cfg: baseConfig(),
accountId: "default",
sessionKey: "agent:codex:acp:error",
binding: {} as never,
session: {
acp: {
state: "error",
lastActivityAt: Date.now(),
},
} as never,
});
expect(probeResult).toEqual({
status: "uncertain",
reason: "status-error-state",
});
});
it("classifies typed ACP session init failures as stale", async () => {
const { monitorDiscordProvider } = await import("./provider.js");
getAcpSessionStatusMock.mockRejectedValue(
new AcpRuntimeError("ACP_SESSION_INIT_FAILED", "missing ACP metadata"),
);
await monitorDiscordProvider({
config: baseConfig(),
runtime: baseRuntime(),
});
const probeResult = await getHealthProbe()({
cfg: baseConfig(),
accountId: "default",
sessionKey: "agent:codex:acp:stale",
binding: {} as never,
session: {
acp: {
state: "idle",
lastActivityAt: Date.now(),
},
} as never,
});
expect(probeResult).toEqual({
status: "stale",
reason: "session-init-failed",
});
});
it("classifies typed non-init ACP errors as uncertain when not stale-running", async () => {
const { monitorDiscordProvider } = await import("./provider.js");
getAcpSessionStatusMock.mockRejectedValue(
new AcpRuntimeError("ACP_BACKEND_UNAVAILABLE", "runtime unavailable"),
);
await monitorDiscordProvider({
config: baseConfig(),
runtime: baseRuntime(),
});
const probeResult = await getHealthProbe()({
cfg: baseConfig(),
accountId: "default",
sessionKey: "agent:codex:acp:uncertain",
binding: {} as never,
session: {
acp: {
state: "idle",
lastActivityAt: Date.now(),
},
} as never,
});
expect(probeResult).toEqual({
status: "uncertain",
reason: "status-error",
});
});
it("aborts timed-out ACP status probes during startup thread-binding health checks", async () => {
vi.useFakeTimers();
try {
const { monitorDiscordProvider } = await import("./provider.js");
getAcpSessionStatusMock.mockImplementation(
({ signal }: { signal?: AbortSignal }) =>
new Promise((_resolve, reject) => {
signal?.addEventListener("abort", () => reject(new Error("aborted")), { once: true });
}),
);
await monitorDiscordProvider({
config: baseConfig(),
runtime: baseRuntime(),
});
const probePromise = getHealthProbe()({
cfg: baseConfig(),
accountId: "default",
sessionKey: "agent:codex:acp:timeout",
binding: {} as never,
session: {
acp: {
state: "idle",
lastActivityAt: Date.now(),
},
} as never,
});
await vi.advanceTimersByTimeAsync(8_100);
await expect(probePromise).resolves.toEqual({
status: "uncertain",
reason: "status-timeout",
});
const firstCall = getAcpSessionStatusMock.mock.calls[0]?.[0] as
| { signal?: AbortSignal }
| undefined;
expect(firstCall?.signal).toBeDefined();
expect(firstCall?.signal?.aborted).toBe(true);
} finally {
vi.useRealTimers();
}
});
it("falls back to legacy missing-session message classification", async () => {
const { monitorDiscordProvider } = await import("./provider.js");
getAcpSessionStatusMock.mockRejectedValue(new Error("ACP session metadata missing"));
await monitorDiscordProvider({
config: baseConfig(),
runtime: baseRuntime(),
});
const probeResult = await getHealthProbe()({
cfg: baseConfig(),
accountId: "default",
sessionKey: "agent:codex:acp:legacy",
binding: {} as never,
session: {
acp: {
state: "idle",
lastActivityAt: Date.now(),
},
} as never,
});
expect(probeResult).toEqual({
status: "stale",
reason: "session-missing",
});
});
it("captures gateway errors emitted before lifecycle wait starts", async () => {
const { monitorDiscordProvider } = await import("./provider.js");
const emitter = new EventEmitter();

View File

@@ -10,6 +10,8 @@ import {
import { GatewayCloseCodes, type GatewayPlugin } from "@buape/carbon/gateway";
import { VoicePlugin } from "@buape/carbon/voice";
import { Routes } from "discord-api-types/v10";
import { getAcpSessionManager } from "../../acp/control-plane/manager.js";
import { isAcpRuntimeError } from "../../acp/runtime/errors.js";
import { resolveTextChunkLimit } from "../../auto-reply/chunk.js";
import type { NativeCommandSpec } from "../../auto-reply/commands-registry.js";
import { listNativeCommandSpecsForConfig } from "../../auto-reply/commands-registry.js";
@@ -175,6 +177,92 @@ function appendPluginCommandSpecs(params: {
return merged;
}
const DISCORD_ACP_STATUS_PROBE_TIMEOUT_MS = 8_000;
const DISCORD_ACP_STALE_RUNNING_ACTIVITY_MS = 2 * 60 * 1000;
function isLegacyMissingSessionError(message: string): boolean {
return (
message.includes("Session is not ACP-enabled") ||
message.includes("ACP session metadata missing")
);
}
function classifyAcpStatusProbeError(params: { error: unknown; isStaleRunning: boolean }): {
status: "stale" | "uncertain";
reason: string;
} {
if (isAcpRuntimeError(params.error) && params.error.code === "ACP_SESSION_INIT_FAILED") {
return { status: "stale", reason: "session-init-failed" };
}
const message = params.error instanceof Error ? params.error.message : String(params.error);
if (isLegacyMissingSessionError(message)) {
return { status: "stale", reason: "session-missing" };
}
return params.isStaleRunning
? { status: "stale", reason: "status-error-running-stale" }
: { status: "uncertain", reason: "status-error" };
}
async function probeDiscordAcpBindingHealth(params: {
cfg: OpenClawConfig;
sessionKey: string;
storedState?: "idle" | "running" | "error";
lastActivityAt?: number;
}): Promise<{ status: "healthy" | "stale" | "uncertain"; reason?: string }> {
const manager = getAcpSessionManager();
const statusProbeAbortController = new AbortController();
const statusPromise = manager
.getSessionStatus({
cfg: params.cfg,
sessionKey: params.sessionKey,
signal: statusProbeAbortController.signal,
})
.then((status) => ({ kind: "status" as const, status }))
.catch((error: unknown) => ({ kind: "error" as const, error }));
let timeoutTimer: ReturnType<typeof setTimeout> | null = null;
const timeoutPromise = new Promise<{ kind: "timeout" }>((resolve) => {
timeoutTimer = setTimeout(
() => resolve({ kind: "timeout" }),
DISCORD_ACP_STATUS_PROBE_TIMEOUT_MS,
);
timeoutTimer.unref?.();
});
const result = await Promise.race([statusPromise, timeoutPromise]);
if (timeoutTimer) {
clearTimeout(timeoutTimer);
}
if (result.kind === "timeout") {
statusProbeAbortController.abort();
}
const runningForMs =
params.storedState === "running" && Number.isFinite(params.lastActivityAt)
? Date.now() - Math.max(0, Math.floor(params.lastActivityAt ?? 0))
: 0;
const isStaleRunning =
params.storedState === "running" && runningForMs >= DISCORD_ACP_STALE_RUNNING_ACTIVITY_MS;
if (result.kind === "timeout") {
return isStaleRunning
? { status: "stale", reason: "status-timeout-running-stale" }
: { status: "uncertain", reason: "status-timeout" };
}
if (result.kind === "error") {
return classifyAcpStatusProbeError({
error: result.error,
isStaleRunning,
});
}
if (result.status.state === "error") {
// ACP error state is recoverable (next turn can clear it), so keep the
// binding unless stronger stale signals exist.
return { status: "uncertain", reason: "status-error-state" };
}
return { status: "healthy" };
}
async function deployDiscordCommands(params: {
client: Client;
runtime: RuntimeEnv;
@@ -382,14 +470,32 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
})
: createNoopThreadBindingManager(account.accountId);
if (threadBindingsEnabled) {
const reconciliation = reconcileAcpThreadBindingsOnStartup({
const uncertainProbeKeys = new Set<string>();
const reconciliation = await reconcileAcpThreadBindingsOnStartup({
cfg,
accountId: account.accountId,
sendFarewell: false,
healthProbe: async ({ sessionKey, session }) => {
const probe = await probeDiscordAcpBindingHealth({
cfg,
sessionKey,
storedState: session.acp?.state,
lastActivityAt: session.acp?.lastActivityAt,
});
if (probe.status === "uncertain") {
uncertainProbeKeys.add(`${sessionKey}${probe.reason ? ` (${probe.reason})` : ""}`);
}
return probe;
},
});
if (reconciliation.removed > 0) {
logVerbose(
`discord: removed ${reconciliation.removed}/${reconciliation.checked} stale ACP thread bindings on startup for account ${account.accountId}`,
`discord: removed ${reconciliation.removed}/${reconciliation.checked} stale ACP thread bindings on startup for account ${account.accountId}: ${reconciliation.staleSessionKeys.join(", ")}`,
);
}
if (uncertainProbeKeys.size > 0) {
logVerbose(
`discord: ACP thread-binding health probe uncertain for account ${account.accountId}: ${[...uncertainProbeKeys].join(", ")}`,
);
}
}
@@ -599,6 +705,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
runtime,
setStatus: opts.setStatus,
abortSignal: opts.abortSignal,
listenerTimeoutMs: eventQueueOpts.listenerTimeout,
botUserId,
guildHistories,
historyLimit,
@@ -623,7 +730,9 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
registerDiscordListener(
client.listeners,
new DiscordMessageListener(messageHandler, logger, trackInboundEvent),
new DiscordMessageListener(messageHandler, logger, trackInboundEvent, {
timeoutMs: eventQueueOpts.listenerTimeout,
}),
);
const reactionListenerOptions = {
cfg,

View File

@@ -811,7 +811,7 @@ describe("thread binding lifecycle", () => {
};
});
const result = reconcileAcpThreadBindingsOnStartup({
const result = await reconcileAcpThreadBindingsOnStartup({
cfg: {} as OpenClawConfig,
accountId: "default",
});
@@ -855,7 +855,7 @@ describe("thread binding lifecycle", () => {
acp: undefined,
});
const result = reconcileAcpThreadBindingsOnStartup({
const result = await reconcileAcpThreadBindingsOnStartup({
cfg: {} as OpenClawConfig,
accountId: "default",
});
@@ -866,6 +866,287 @@ describe("thread binding lifecycle", () => {
expect(manager.getByThreadId("thread-acp-uncertain")).toBeDefined();
});
it("removes ACP bindings when health probe marks running session as stale", async () => {
const manager = createThreadBindingManager({
accountId: "default",
persist: false,
enableSweeper: false,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
});
await manager.bindTarget({
threadId: "thread-acp-running",
channelId: "parent-1",
targetKind: "acp",
targetSessionKey: "agent:codex:acp:running",
agentId: "codex",
webhookId: "wh-1",
webhookToken: "tok-1",
});
hoisted.readAcpSessionEntry.mockReturnValue({
sessionKey: "agent:codex:acp:running",
storeSessionKey: "agent:codex:acp:running",
acp: {
backend: "acpx",
agent: "codex",
runtimeSessionName: "runtime:running",
mode: "persistent",
state: "running",
lastActivityAt: Date.now() - 5 * 60 * 1000,
},
});
const result = await reconcileAcpThreadBindingsOnStartup({
cfg: {} as OpenClawConfig,
accountId: "default",
healthProbe: async () => ({ status: "stale", reason: "status-timeout-running-stale" }),
});
expect(result.checked).toBe(1);
expect(result.removed).toBe(1);
expect(result.staleSessionKeys).toContain("agent:codex:acp:running");
expect(manager.getByThreadId("thread-acp-running")).toBeUndefined();
});
it("keeps running ACP bindings when health probe is uncertain", async () => {
const manager = createThreadBindingManager({
accountId: "default",
persist: false,
enableSweeper: false,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
});
await manager.bindTarget({
threadId: "thread-acp-running-uncertain",
channelId: "parent-1",
targetKind: "acp",
targetSessionKey: "agent:codex:acp:running-uncertain",
agentId: "codex",
webhookId: "wh-1",
webhookToken: "tok-1",
});
hoisted.readAcpSessionEntry.mockReturnValue({
sessionKey: "agent:codex:acp:running-uncertain",
storeSessionKey: "agent:codex:acp:running-uncertain",
acp: {
backend: "acpx",
agent: "codex",
runtimeSessionName: "runtime:running-uncertain",
mode: "persistent",
state: "running",
lastActivityAt: Date.now(),
},
});
const result = await reconcileAcpThreadBindingsOnStartup({
cfg: {} as OpenClawConfig,
accountId: "default",
healthProbe: async () => ({ status: "uncertain", reason: "status-timeout" }),
});
expect(result.checked).toBe(1);
expect(result.removed).toBe(0);
expect(result.staleSessionKeys).toEqual([]);
expect(manager.getByThreadId("thread-acp-running-uncertain")).toBeDefined();
});
it("keeps ACP bindings in stored error state when no explicit stale probe verdict exists", async () => {
const manager = createThreadBindingManager({
accountId: "default",
persist: false,
enableSweeper: false,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
});
await manager.bindTarget({
threadId: "thread-acp-error",
channelId: "parent-1",
targetKind: "acp",
targetSessionKey: "agent:codex:acp:error",
agentId: "codex",
webhookId: "wh-1",
webhookToken: "tok-1",
});
hoisted.readAcpSessionEntry.mockReturnValue({
sessionKey: "agent:codex:acp:error",
storeSessionKey: "agent:codex:acp:error",
acp: {
backend: "acpx",
agent: "codex",
runtimeSessionName: "runtime:error",
mode: "persistent",
state: "error",
lastActivityAt: Date.now(),
},
});
const result = await reconcileAcpThreadBindingsOnStartup({
cfg: {} as OpenClawConfig,
accountId: "default",
});
expect(result.checked).toBe(1);
expect(result.removed).toBe(0);
expect(result.staleSessionKeys).toEqual([]);
expect(manager.getByThreadId("thread-acp-error")).toBeDefined();
});
it("starts ACP health probes in parallel during startup reconciliation", async () => {
const manager = createThreadBindingManager({
accountId: "default",
persist: false,
enableSweeper: false,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
});
await manager.bindTarget({
threadId: "thread-acp-probe-1",
channelId: "parent-1",
targetKind: "acp",
targetSessionKey: "agent:codex:acp:probe-1",
agentId: "codex",
webhookId: "wh-1",
webhookToken: "tok-1",
});
await manager.bindTarget({
threadId: "thread-acp-probe-2",
channelId: "parent-1",
targetKind: "acp",
targetSessionKey: "agent:codex:acp:probe-2",
agentId: "codex",
webhookId: "wh-1",
webhookToken: "tok-1",
});
hoisted.readAcpSessionEntry.mockImplementation((paramsUnknown: unknown) => {
const sessionKey = (paramsUnknown as { sessionKey?: string }).sessionKey ?? "";
return {
sessionKey,
storeSessionKey: sessionKey,
acp: {
backend: "acpx",
agent: "codex",
runtimeSessionName: `runtime:${sessionKey}`,
mode: "persistent",
state: "running",
lastActivityAt: Date.now(),
},
};
});
let resolveFirstProbe: ((value: { status: "healthy" }) => void) | undefined;
const firstProbe = new Promise<{ status: "healthy" }>((resolve) => {
resolveFirstProbe = resolve;
});
let probeCallCount = 0;
let secondProbeStartedBeforeFirstResolved = false;
const reconcilePromise = reconcileAcpThreadBindingsOnStartup({
cfg: {} as OpenClawConfig,
accountId: "default",
healthProbe: async () => {
probeCallCount += 1;
if (probeCallCount === 1) {
return await firstProbe;
}
secondProbeStartedBeforeFirstResolved = true;
return { status: "healthy" as const };
},
});
await Promise.resolve();
await Promise.resolve();
const observedParallelStart = secondProbeStartedBeforeFirstResolved;
resolveFirstProbe?.({ status: "healthy" });
const result = await reconcilePromise;
expect(observedParallelStart).toBe(true);
expect(result.checked).toBe(2);
expect(result.removed).toBe(0);
});
it("caps ACP startup health probe concurrency", async () => {
const manager = createThreadBindingManager({
accountId: "default",
persist: false,
enableSweeper: false,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
});
for (let index = 0; index < 12; index += 1) {
const key = `agent:codex:acp:cap-${index}`;
await manager.bindTarget({
threadId: `thread-acp-cap-${index}`,
channelId: "parent-1",
targetKind: "acp",
targetSessionKey: key,
agentId: "codex",
webhookId: "wh-1",
webhookToken: "tok-1",
});
}
hoisted.readAcpSessionEntry.mockImplementation((paramsUnknown: unknown) => {
const sessionKey = (paramsUnknown as { sessionKey?: string }).sessionKey ?? "";
return {
sessionKey,
storeSessionKey: sessionKey,
acp: {
backend: "acpx",
agent: "codex",
runtimeSessionName: `runtime:${sessionKey}`,
mode: "persistent",
state: "running",
lastActivityAt: Date.now(),
},
};
});
const PROBE_LIMIT = 8;
let probeCalls = 0;
let inFlight = 0;
let maxInFlight = 0;
let releaseFirstWave: (() => void) | undefined;
const firstWaveGate = new Promise<void>((resolve) => {
releaseFirstWave = resolve;
});
const reconcilePromise = reconcileAcpThreadBindingsOnStartup({
cfg: {} as OpenClawConfig,
accountId: "default",
healthProbe: async () => {
probeCalls += 1;
inFlight += 1;
maxInFlight = Math.max(maxInFlight, inFlight);
if (probeCalls <= PROBE_LIMIT) {
await firstWaveGate;
}
inFlight -= 1;
return { status: "healthy" as const };
},
});
await vi.waitFor(() => {
expect(probeCalls).toBe(PROBE_LIMIT);
});
expect(maxInFlight).toBe(PROBE_LIMIT);
releaseFirstWave?.();
const result = await reconcilePromise;
expect(result.checked).toBe(12);
expect(result.removed).toBe(0);
expect(maxInFlight).toBeLessThanOrEqual(PROBE_LIMIT);
});
it("migrates legacy expiresAt bindings to idle/max-age semantics", () => {
const previousStateDir = process.env.OPENCLAW_STATE_DIR;
const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-thread-bindings-"));

View File

@@ -1,4 +1,4 @@
import { readAcpSessionEntry } from "../../acp/runtime/session-meta.js";
import { readAcpSessionEntry, type AcpSessionStoreEntry } from "../../acp/runtime/session-meta.js";
import type { OpenClawConfig } from "../../config/config.js";
import { normalizeAccountId } from "../../routing/session-key.js";
import { parseDiscordTarget } from "../targets.js";
@@ -29,6 +29,50 @@ export type AcpThreadBindingReconciliationResult = {
staleSessionKeys: string[];
};
export type AcpThreadBindingHealthStatus = "healthy" | "stale" | "uncertain";
export type AcpThreadBindingHealthProbe = (params: {
cfg: OpenClawConfig;
accountId: string;
sessionKey: string;
binding: ThreadBindingRecord;
session: AcpSessionStoreEntry;
}) => Promise<{
status: AcpThreadBindingHealthStatus;
reason?: string;
}>;
// Cap startup fan-out so large binding sets do not create unbounded ACP probe spikes.
const ACP_STARTUP_HEALTH_PROBE_CONCURRENCY_LIMIT = 8;
async function mapWithConcurrency<TItem, TResult>(params: {
items: TItem[];
limit: number;
worker: (item: TItem, index: number) => Promise<TResult>;
}): Promise<TResult[]> {
if (params.items.length === 0) {
return [];
}
const limit = Math.max(1, Math.floor(params.limit));
const resultsByIndex = new Map<number, TResult>();
let nextIndex = 0;
const runWorker = async () => {
for (;;) {
const index = nextIndex;
nextIndex += 1;
if (index >= params.items.length) {
return;
}
resultsByIndex.set(index, await params.worker(params.items[index], index));
}
};
const workers = Array.from({ length: Math.min(limit, params.items.length) }, () => runWorker());
await Promise.all(workers);
return params.items.map((_item, index) => resultsByIndex.get(index)!);
}
function normalizeNonNegativeMs(raw: number): number {
if (!Number.isFinite(raw)) {
return 0;
@@ -259,11 +303,21 @@ export function setThreadBindingMaxAgeBySessionKey(params: {
return updated;
}
export function reconcileAcpThreadBindingsOnStartup(params: {
function resolveStoredAcpBindingHealth(params: {
session: AcpSessionStoreEntry;
}): AcpThreadBindingHealthStatus {
if (!params.session.acp) {
return "stale";
}
return "healthy";
}
export async function reconcileAcpThreadBindingsOnStartup(params: {
cfg: OpenClawConfig;
accountId?: string;
sendFarewell?: boolean;
}): AcpThreadBindingReconciliationResult {
healthProbe?: AcpThreadBindingHealthProbe;
}): Promise<AcpThreadBindingReconciliationResult> {
const manager = getThreadBindingManager(params.accountId);
if (!manager) {
return {
@@ -274,21 +328,77 @@ export function reconcileAcpThreadBindingsOnStartup(params: {
}
const acpBindings = manager.listBindings().filter((binding) => binding.targetKind === "acp");
const staleBindings = acpBindings.filter((binding) => {
const staleBindings: ThreadBindingRecord[] = [];
const probeTargets: Array<{
binding: ThreadBindingRecord;
sessionKey: string;
session: AcpSessionStoreEntry;
}> = [];
for (const binding of acpBindings) {
const sessionKey = binding.targetSessionKey.trim();
if (!sessionKey) {
return true;
staleBindings.push(binding);
continue;
}
const session = readAcpSessionEntry({
cfg: params.cfg,
sessionKey,
});
// Session store read failures are transient; never auto-unbind on uncertain reads.
if (session?.storeReadFailed) {
return false;
if (!session) {
staleBindings.push(binding);
continue;
}
return !session?.acp;
});
// Session store read failures are transient; never auto-unbind on uncertain reads.
if (session.storeReadFailed) {
continue;
}
if (resolveStoredAcpBindingHealth({ session }) === "stale") {
staleBindings.push(binding);
continue;
}
if (!params.healthProbe) {
continue;
}
probeTargets.push({ binding, sessionKey, session });
}
if (params.healthProbe && probeTargets.length > 0) {
const probeResults = await mapWithConcurrency({
items: probeTargets,
limit: ACP_STARTUP_HEALTH_PROBE_CONCURRENCY_LIMIT,
worker: async ({ binding, sessionKey, session }) => {
try {
const result = await params.healthProbe?.({
cfg: params.cfg,
accountId: manager.accountId,
sessionKey,
binding,
session,
});
return {
binding,
status: result?.status ?? ("uncertain" satisfies AcpThreadBindingHealthStatus),
};
} catch {
// Treat probe failures as uncertain and keep the binding.
return {
binding,
status: "uncertain" satisfies AcpThreadBindingHealthStatus,
};
}
},
});
for (const probeResult of probeResults) {
if (probeResult.status === "stale") {
staleBindings.push(probeResult.binding);
}
}
}
if (staleBindings.length === 0) {
return {
checked: acpBindings.length,