fix(discord): make message listener non-blocking (#39154, thanks @yaseenkadlemakki)

Co-authored-by: Yaseen Kadlemakki <yaseen82@gmail.com>
This commit is contained in:
Peter Steinberger
2026-03-07 21:13:47 +00:00
parent 7649712356
commit f51cac277c
4 changed files with 81 additions and 188 deletions

View File

@@ -280,6 +280,7 @@ Docs: https://docs.openclaw.ai
- Podman/.env gateway bind precedence: evaluate `OPENCLAW_GATEWAY_BIND` after sourcing `.env` in `run-openclaw-podman.sh` so env-file overrides are honored. (#38785) Thanks @majinyu666.
- Models/default alias refresh: bump `gpt` to `openai/gpt-5.4` and Gemini defaults to `gemini-3.1` preview aliases (including normalization/default wiring) to track current model IDs. (#38638) Thanks @ademczuk.
- Config/env substitution degraded mode: convert missing `${VAR}` resolution in config reads from hard-fail to warning-backed degraded behavior, while preventing unresolved placeholders from being accepted as gateway credentials. (#39050) Thanks @akz142857.
- Discord inbound listener non-blocking dispatch: make `MESSAGE_CREATE` listener handoff asynchronous (no per-listener queue blocking), so long runs no longer stall unrelated incoming events. (#39154) Thanks @yaseenkadlemakki.
## 2026.3.2

View File

@@ -115,7 +115,7 @@ describe("DiscordMessageListener", () => {
expect(handlerResolved).toBe(true);
});
it("queues subsequent events until prior message handling completes", async () => {
it("dispatches subsequent events concurrently without blocking on prior handler", async () => {
const first = createDeferred();
const second = createDeferred();
let runCount = 0;
@@ -142,12 +142,12 @@ describe("DiscordMessageListener", () => {
),
).resolves.toBeUndefined();
expect(handler).toHaveBeenCalledTimes(1);
first.resolve();
// Both handlers are dispatched concurrently (fire-and-forget).
await vi.waitFor(() => {
expect(handler).toHaveBeenCalledTimes(2);
});
first.resolve();
second.resolve();
await Promise.resolve();
});
@@ -171,42 +171,28 @@ describe("DiscordMessageListener", () => {
});
});
it("logs slow handlers after the threshold", async () => {
vi.useFakeTimers();
vi.setSystemTime(0);
it("does not apply its own slow-listener logging (owned by inbound worker)", async () => {
const deferred = createDeferred();
const handler = vi.fn(() => deferred.promise);
const logger = {
warn: vi.fn(),
error: vi.fn(),
} as unknown as ReturnType<typeof import("../logging/subsystem.js").createSubsystemLogger>;
const listener = new DiscordMessageListener(handler, logger);
try {
const deferred = createDeferred();
const handler = vi.fn(() => deferred.promise);
const logger = {
warn: vi.fn(),
error: vi.fn(),
} as unknown as ReturnType<typeof import("../logging/subsystem.js").createSubsystemLogger>;
const listener = new DiscordMessageListener(handler, logger);
const handlePromise = listener.handle(
{} as unknown as import("./monitor/listeners.js").DiscordMessageEvent,
{} as unknown as import("@buape/carbon").Client,
);
await expect(handlePromise).resolves.toBeUndefined();
// handle() should release immediately.
const handlePromise = listener.handle(
{} as unknown as import("./monitor/listeners.js").DiscordMessageEvent,
{} as unknown as import("@buape/carbon").Client,
);
await expect(handlePromise).resolves.toBeUndefined();
expect(logger.warn).not.toHaveBeenCalled();
// Advance wall clock past the slow listener threshold.
vi.setSystemTime(31_000);
// Release the background handler and allow slow-log finalizer to run.
deferred.resolve();
await vi.waitFor(() => {
expect(logger.warn).toHaveBeenCalled();
});
const warnMock = logger.warn as unknown as { mock: { calls: unknown[][] } };
const [, meta] = warnMock.mock.calls[0] ?? [];
const durationMs = (meta as { durationMs?: number } | undefined)?.durationMs;
expect(durationMs).toBeGreaterThanOrEqual(30_000);
} finally {
vi.useRealTimers();
}
deferred.resolve();
await vi.waitFor(() => {
expect(handler).toHaveBeenCalledOnce();
});
// The listener no longer wraps handlers with slow-listener logging;
// that responsibility moved to the inbound worker.
expect(logger.warn).not.toHaveBeenCalled();
});
});

View File

@@ -25,44 +25,63 @@ describe("DiscordMessageListener", () => {
const listener = new DiscordMessageListener(handler as never, logger as never);
await expect(listener.handle(fakeEvent("ch-1"), {} as never)).resolves.toBeUndefined();
expect(handler).toHaveBeenCalledTimes(1);
// Handler was dispatched but may not have been called yet (fire-and-forget).
// Wait for the microtask to flush so the handler starts.
await vi.waitFor(() => {
expect(handler).toHaveBeenCalledTimes(1);
});
expect(logger.error).not.toHaveBeenCalled();
resolveHandler?.();
await handlerDone;
});
it("serializes queued handler runs for the same channel", async () => {
let firstResolve: (() => void) | undefined;
let secondResolve: (() => void) | undefined;
const firstDone = new Promise<void>((resolve) => {
firstResolve = resolve;
it("runs handlers for the same channel concurrently (no per-channel serialization)", async () => {
const order: string[] = [];
let resolveA: (() => void) | undefined;
let resolveB: (() => void) | undefined;
const doneA = new Promise<void>((r) => {
resolveA = r;
});
const secondDone = new Promise<void>((resolve) => {
secondResolve = resolve;
const doneB = new Promise<void>((r) => {
resolveB = r;
});
let runCount = 0;
let callCount = 0;
const handler = vi.fn(async () => {
runCount += 1;
if (runCount === 1) {
await firstDone;
return;
callCount += 1;
const id = callCount;
order.push(`start:${id}`);
if (id === 1) {
await doneA;
} else {
await doneB;
}
await secondDone;
order.push(`end:${id}`);
});
const listener = new DiscordMessageListener(handler as never, createLogger() as never);
await expect(listener.handle(fakeEvent("ch-1"), {} as never)).resolves.toBeUndefined();
await expect(listener.handle(fakeEvent("ch-1"), {} as never)).resolves.toBeUndefined();
// Both messages target the same channel — previously serialized, now concurrent.
await listener.handle(fakeEvent("ch-1"), {} as never);
await listener.handle(fakeEvent("ch-1"), {} as never);
expect(handler).toHaveBeenCalledTimes(1);
firstResolve?.();
await vi.waitFor(() => {
expect(handler).toHaveBeenCalledTimes(2);
});
// Both handlers started without waiting for the first to finish.
expect(order).toContain("start:1");
expect(order).toContain("start:2");
secondResolve?.();
await secondDone;
resolveB?.();
await vi.waitFor(() => {
expect(order).toContain("end:2");
});
// First handler is still running — no serialization.
expect(order).not.toContain("end:1");
resolveA?.();
await vi.waitFor(() => {
expect(order).toContain("end:1");
});
});
it("runs handlers for different channels in parallel", async () => {
@@ -122,109 +141,14 @@ describe("DiscordMessageListener", () => {
});
});
it("continues same-channel processing after handler timeout", async () => {
vi.useFakeTimers();
try {
const never = new Promise<void>(() => {});
const handler = vi.fn(async () => {
if (handler.mock.calls.length === 1) {
await never;
return;
}
});
const logger = createLogger();
const listener = new DiscordMessageListener(handler as never, logger as never, undefined, {
timeoutMs: 50,
});
it("calls onEvent callback for each message", async () => {
const handler = vi.fn(async () => {});
const onEvent = vi.fn();
const listener = new DiscordMessageListener(handler as never, undefined, onEvent);
await listener.handle(fakeEvent("ch-1"), {} as never);
await listener.handle(fakeEvent("ch-1"), {} as never);
expect(handler).toHaveBeenCalledTimes(1);
await listener.handle(fakeEvent("ch-1"), {} as never);
await listener.handle(fakeEvent("ch-2"), {} as never);
await vi.advanceTimersByTimeAsync(60);
await vi.waitFor(() => {
expect(handler).toHaveBeenCalledTimes(2);
});
expect(logger.error).toHaveBeenCalledWith(expect.stringContaining("timed out after"));
} finally {
vi.useRealTimers();
}
});
it("aborts timed-out handlers and prevents late side effects", async () => {
vi.useFakeTimers();
try {
let abortReceived = false;
let lateSideEffect = false;
const handler = vi.fn(
async (
_data: unknown,
_client: unknown,
options?: {
abortSignal?: AbortSignal;
},
) => {
await new Promise<void>((resolve) => {
if (options?.abortSignal?.aborted) {
abortReceived = true;
resolve();
return;
}
options?.abortSignal?.addEventListener(
"abort",
() => {
abortReceived = true;
resolve();
},
{ once: true },
);
});
if (options?.abortSignal?.aborted) {
return;
}
lateSideEffect = true;
},
);
const logger = createLogger();
const listener = new DiscordMessageListener(handler as never, logger as never, undefined, {
timeoutMs: 50,
});
await listener.handle(fakeEvent("ch-1"), {} as never);
await listener.handle(fakeEvent("ch-1"), {} as never);
await vi.advanceTimersByTimeAsync(60);
await vi.waitFor(() => {
expect(handler).toHaveBeenCalledTimes(2);
});
expect(abortReceived).toBe(true);
expect(lateSideEffect).toBe(false);
expect(logger.error).toHaveBeenCalledWith(expect.stringContaining("timed out after"));
} finally {
vi.useRealTimers();
}
});
it("does not emit slow-listener warnings when timeout already fired", async () => {
vi.useFakeTimers();
try {
const never = new Promise<void>(() => {});
const handler = vi.fn(async () => {
await never;
});
const logger = createLogger();
const listener = new DiscordMessageListener(handler as never, logger as never, undefined, {
timeoutMs: 31_000,
});
await listener.handle(fakeEvent("ch-1"), {} as never);
await vi.advanceTimersByTimeAsync(31_100);
await vi.waitFor(() => {
expect(logger.error).toHaveBeenCalledWith(expect.stringContaining("timed out after"));
});
expect(logger.warn).not.toHaveBeenCalled();
} finally {
vi.useRealTimers();
}
expect(onEvent).toHaveBeenCalledTimes(2);
});
});

View File

@@ -13,7 +13,6 @@ import { danger, logVerbose } from "../../globals.js";
import { formatDurationSeconds } from "../../infra/format-time/format-duration.ts";
import { enqueueSystemEvent } from "../../infra/system-events.js";
import { createSubsystemLogger } from "../../logging/subsystem.js";
import { KeyedAsyncQueue } from "../../plugin-sdk/keyed-async-queue.js";
import { resolveAgentRoute } from "../../routing/resolve-route.js";
import {
readStoreAllowFromForDmPolicy,
@@ -199,44 +198,27 @@ export function registerDiscordListener(listeners: Array<object>, listener: obje
}
export class DiscordMessageListener extends MessageCreateListener {
private readonly channelQueue = new KeyedAsyncQueue();
private readonly listenerTimeoutMs: number;
constructor(
private handler: DiscordMessageHandler,
private logger?: Logger,
private onEvent?: () => void,
options?: { timeoutMs?: number },
_options?: { timeoutMs?: number },
) {
super();
this.listenerTimeoutMs = normalizeDiscordListenerTimeoutMs(options?.timeoutMs);
}
async handle(data: DiscordMessageEvent, client: Client) {
this.onEvent?.();
const channelId = data.channel_id;
const context = {
channelId,
messageId: (data as { message?: { id?: string } }).message?.id,
guildId: (data as { guild_id?: string }).guild_id,
} satisfies Record<string, unknown>;
// Serialize messages within the same channel to preserve ordering,
// but allow different channels to proceed in parallel so that
// channel-bound agents are not blocked by each other.
void this.channelQueue.enqueue(channelId, () =>
runDiscordListenerWithSlowLog({
logger: this.logger,
listener: this.constructor.name,
event: this.type,
timeoutMs: this.listenerTimeoutMs,
context,
run: (abortSignal) => this.handler(data, client, { abortSignal }),
onError: (err) => {
const logger = this.logger ?? discordEventQueueLog;
logger.error(danger(`discord handler failed: ${String(err)}`));
},
}),
);
// Fire-and-forget: hand off to the handler without blocking the
// Carbon listener. Per-session ordering and run timeouts are owned
// by the inbound worker queue, so the listener no longer serializes
// or applies its own timeout.
void Promise.resolve()
.then(() => this.handler(data, client))
.catch((err) => {
const logger = this.logger ?? discordEventQueueLog;
logger.error(danger(`discord handler failed: ${String(err)}`));
});
}
}