mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-09 10:17:39 +00:00
fix(discord): harden reconnect recovery and preserve message delivery
Landed from contributor PR #29508 by @cgdusek. Co-authored-by: Charles Dusek <cgdusek@gmail.com>
This commit is contained in:
79
src/discord/monitor/listeners.test.ts
Normal file
79
src/discord/monitor/listeners.test.ts
Normal file
@@ -0,0 +1,79 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { DiscordMessageListener } from "./listeners.js";
|
||||
|
||||
function createLogger() {
|
||||
return {
|
||||
error: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
describe("DiscordMessageListener", () => {
|
||||
it("returns immediately without awaiting handler completion", async () => {
|
||||
let resolveHandler: (() => void) | undefined;
|
||||
const handlerDone = new Promise<void>((resolve) => {
|
||||
resolveHandler = resolve;
|
||||
});
|
||||
const handler = vi.fn(async () => {
|
||||
await handlerDone;
|
||||
});
|
||||
const logger = createLogger();
|
||||
const listener = new DiscordMessageListener(handler as never, logger as never);
|
||||
|
||||
await expect(listener.handle({} as never, {} as never)).resolves.toBeUndefined();
|
||||
expect(handler).toHaveBeenCalledTimes(1);
|
||||
expect(logger.error).not.toHaveBeenCalled();
|
||||
|
||||
resolveHandler?.();
|
||||
await handlerDone;
|
||||
});
|
||||
|
||||
it("serializes queued handler runs while handle returns immediately", async () => {
|
||||
let firstResolve: (() => void) | undefined;
|
||||
let secondResolve: (() => void) | undefined;
|
||||
const firstDone = new Promise<void>((resolve) => {
|
||||
firstResolve = resolve;
|
||||
});
|
||||
const secondDone = new Promise<void>((resolve) => {
|
||||
secondResolve = resolve;
|
||||
});
|
||||
let runCount = 0;
|
||||
const handler = vi.fn(async () => {
|
||||
runCount += 1;
|
||||
if (runCount === 1) {
|
||||
await firstDone;
|
||||
return;
|
||||
}
|
||||
await secondDone;
|
||||
});
|
||||
const listener = new DiscordMessageListener(handler as never, createLogger() as never);
|
||||
|
||||
await expect(listener.handle({} as never, {} as never)).resolves.toBeUndefined();
|
||||
await expect(listener.handle({} as never, {} as never)).resolves.toBeUndefined();
|
||||
|
||||
// Second event is queued until the first handler run settles.
|
||||
expect(handler).toHaveBeenCalledTimes(1);
|
||||
firstResolve?.();
|
||||
await vi.waitFor(() => {
|
||||
expect(handler).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
secondResolve?.();
|
||||
await secondDone;
|
||||
});
|
||||
|
||||
it("logs async handler failures", async () => {
|
||||
const handler = vi.fn(async () => {
|
||||
throw new Error("boom");
|
||||
});
|
||||
const logger = createLogger();
|
||||
const listener = new DiscordMessageListener(handler as never, logger as never);
|
||||
|
||||
await expect(listener.handle({} as never, {} as never)).resolves.toBeUndefined();
|
||||
await vi.waitFor(() => {
|
||||
expect(logger.error).toHaveBeenCalledWith(
|
||||
expect.stringContaining("discord handler failed: Error: boom"),
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -118,6 +118,8 @@ export function registerDiscordListener(listeners: Array<object>, listener: obje
|
||||
}
|
||||
|
||||
export class DiscordMessageListener extends MessageCreateListener {
|
||||
private messageQueue: Promise<void> = Promise.resolve();
|
||||
|
||||
constructor(
|
||||
private handler: DiscordMessageHandler,
|
||||
private logger?: Logger,
|
||||
@@ -126,15 +128,25 @@ export class DiscordMessageListener extends MessageCreateListener {
|
||||
}
|
||||
|
||||
async handle(data: DiscordMessageEvent, client: Client) {
|
||||
await runDiscordListenerWithSlowLog({
|
||||
logger: this.logger,
|
||||
listener: this.constructor.name,
|
||||
event: this.type,
|
||||
run: () => this.handler(data, client),
|
||||
onError: (err) => {
|
||||
const logger = this.logger ?? discordEventQueueLog;
|
||||
logger.error(danger(`discord handler failed: ${String(err)}`));
|
||||
},
|
||||
// Release Carbon's dispatch lane immediately, but keep our message handler
|
||||
// serialized to avoid unbounded parallel model/IO work on traffic bursts.
|
||||
this.messageQueue = this.messageQueue
|
||||
.catch(() => {})
|
||||
.then(() =>
|
||||
runDiscordListenerWithSlowLog({
|
||||
logger: this.logger,
|
||||
listener: this.constructor.name,
|
||||
event: this.type,
|
||||
run: () => this.handler(data, client),
|
||||
onError: (err) => {
|
||||
const logger = this.logger ?? discordEventQueueLog;
|
||||
logger.error(danger(`discord handler failed: ${String(err)}`));
|
||||
},
|
||||
}),
|
||||
);
|
||||
void this.messageQueue.catch((err) => {
|
||||
const logger = this.logger ?? discordEventQueueLog;
|
||||
logger.error(danger(`discord handler failed: ${String(err)}`));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { EventEmitter } from "node:events";
|
||||
import type { Client } from "@buape/carbon";
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { RuntimeEnv } from "../../runtime.js";
|
||||
@@ -11,9 +12,10 @@ const {
|
||||
waitForDiscordGatewayStopMock,
|
||||
} = vi.hoisted(() => {
|
||||
const stopGatewayLoggingMock = vi.fn();
|
||||
const getDiscordGatewayEmitterMock = vi.fn<() => EventEmitter | undefined>(() => undefined);
|
||||
return {
|
||||
attachDiscordGatewayLoggingMock: vi.fn(() => stopGatewayLoggingMock),
|
||||
getDiscordGatewayEmitterMock: vi.fn(() => undefined),
|
||||
getDiscordGatewayEmitterMock,
|
||||
waitForDiscordGatewayStopMock: vi.fn(() => Promise.resolve()),
|
||||
registerGatewayMock: vi.fn(),
|
||||
unregisterGatewayMock: vi.fn(),
|
||||
@@ -51,6 +53,19 @@ describe("runDiscordGatewayLifecycle", () => {
|
||||
stop?: () => Promise<void>;
|
||||
isDisallowedIntentsError?: (err: unknown) => boolean;
|
||||
pendingGatewayErrors?: unknown[];
|
||||
gateway?: {
|
||||
isConnected?: boolean;
|
||||
options?: Record<string, unknown>;
|
||||
disconnect?: () => void;
|
||||
connect?: (resume?: boolean) => void;
|
||||
state?: {
|
||||
sessionId?: string | null;
|
||||
resumeGatewayUrl?: string | null;
|
||||
sequence?: number | null;
|
||||
};
|
||||
sequence?: number | null;
|
||||
emitter?: EventEmitter;
|
||||
};
|
||||
}) => {
|
||||
const start = vi.fn(params?.start ?? (async () => undefined));
|
||||
const stop = vi.fn(params?.stop ?? (async () => undefined));
|
||||
@@ -72,7 +87,9 @@ describe("runDiscordGatewayLifecycle", () => {
|
||||
releaseEarlyGatewayErrorGuard,
|
||||
lifecycleParams: {
|
||||
accountId: params?.accountId ?? "default",
|
||||
client: { getPlugin: vi.fn(() => undefined) } as unknown as Client,
|
||||
client: {
|
||||
getPlugin: vi.fn((name: string) => (name === "gateway" ? params?.gateway : undefined)),
|
||||
} as unknown as Client,
|
||||
runtime,
|
||||
isDisallowedIntentsError: params?.isDisallowedIntentsError ?? (() => false),
|
||||
voiceManager: null,
|
||||
@@ -203,4 +220,99 @@ describe("runDiscordGatewayLifecycle", () => {
|
||||
releaseEarlyGatewayErrorGuard,
|
||||
});
|
||||
});
|
||||
|
||||
it("retries stalled HELLO with resume before forcing fresh identify", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const { runDiscordGatewayLifecycle } = await import("./provider.lifecycle.js");
|
||||
const emitter = new EventEmitter();
|
||||
const gateway = {
|
||||
isConnected: false,
|
||||
options: {},
|
||||
disconnect: vi.fn(),
|
||||
connect: vi.fn(),
|
||||
state: {
|
||||
sessionId: "session-1",
|
||||
resumeGatewayUrl: "wss://gateway.discord.gg",
|
||||
sequence: 123,
|
||||
},
|
||||
sequence: 123,
|
||||
emitter,
|
||||
};
|
||||
getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter);
|
||||
waitForDiscordGatewayStopMock.mockImplementationOnce(async () => {
|
||||
emitter.emit("debug", "WebSocket connection opened");
|
||||
await vi.advanceTimersByTimeAsync(30000);
|
||||
emitter.emit("debug", "WebSocket connection opened");
|
||||
await vi.advanceTimersByTimeAsync(30000);
|
||||
emitter.emit("debug", "WebSocket connection opened");
|
||||
await vi.advanceTimersByTimeAsync(30000);
|
||||
});
|
||||
|
||||
const { lifecycleParams } = createLifecycleHarness({ gateway });
|
||||
await expect(runDiscordGatewayLifecycle(lifecycleParams)).resolves.toBeUndefined();
|
||||
|
||||
expect(gateway.disconnect).toHaveBeenCalledTimes(3);
|
||||
expect(gateway.connect).toHaveBeenNthCalledWith(1, true);
|
||||
expect(gateway.connect).toHaveBeenNthCalledWith(2, true);
|
||||
expect(gateway.connect).toHaveBeenNthCalledWith(3, false);
|
||||
expect(gateway.state.sessionId).toBeNull();
|
||||
expect(gateway.state.resumeGatewayUrl).toBeNull();
|
||||
expect(gateway.state.sequence).toBeNull();
|
||||
expect(gateway.sequence).toBeNull();
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("resets HELLO stall counter after a successful reconnect that drops quickly", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const { runDiscordGatewayLifecycle } = await import("./provider.lifecycle.js");
|
||||
const emitter = new EventEmitter();
|
||||
const gateway = {
|
||||
isConnected: false,
|
||||
options: {},
|
||||
disconnect: vi.fn(),
|
||||
connect: vi.fn(),
|
||||
state: {
|
||||
sessionId: "session-2",
|
||||
resumeGatewayUrl: "wss://gateway.discord.gg",
|
||||
sequence: 456,
|
||||
},
|
||||
sequence: 456,
|
||||
emitter,
|
||||
};
|
||||
getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter);
|
||||
waitForDiscordGatewayStopMock.mockImplementationOnce(async () => {
|
||||
emitter.emit("debug", "WebSocket connection opened");
|
||||
await vi.advanceTimersByTimeAsync(30000);
|
||||
|
||||
// Successful reconnect (READY/RESUMED sets isConnected=true), then
|
||||
// quick drop before the HELLO timeout window finishes.
|
||||
gateway.isConnected = true;
|
||||
emitter.emit("debug", "WebSocket connection opened");
|
||||
await vi.advanceTimersByTimeAsync(10);
|
||||
emitter.emit("debug", "WebSocket connection closed with code 1006");
|
||||
gateway.isConnected = false;
|
||||
|
||||
emitter.emit("debug", "WebSocket connection opened");
|
||||
await vi.advanceTimersByTimeAsync(30000);
|
||||
|
||||
emitter.emit("debug", "WebSocket connection opened");
|
||||
await vi.advanceTimersByTimeAsync(30000);
|
||||
});
|
||||
|
||||
const { lifecycleParams } = createLifecycleHarness({ gateway });
|
||||
await expect(runDiscordGatewayLifecycle(lifecycleParams)).resolves.toBeUndefined();
|
||||
|
||||
expect(gateway.connect).toHaveBeenCalledTimes(3);
|
||||
expect(gateway.connect).toHaveBeenNthCalledWith(1, true);
|
||||
expect(gateway.connect).toHaveBeenNthCalledWith(2, true);
|
||||
expect(gateway.connect).toHaveBeenNthCalledWith(3, true);
|
||||
expect(gateway.connect).not.toHaveBeenCalledWith(false);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@@ -51,24 +51,95 @@ export async function runDiscordGatewayLifecycle(params: {
|
||||
}
|
||||
|
||||
const HELLO_TIMEOUT_MS = 30000;
|
||||
const HELLO_CONNECTED_POLL_MS = 250;
|
||||
const MAX_CONSECUTIVE_HELLO_STALLS = 3;
|
||||
let helloTimeoutId: ReturnType<typeof setTimeout> | undefined;
|
||||
let helloConnectedPollId: ReturnType<typeof setInterval> | undefined;
|
||||
let consecutiveHelloStalls = 0;
|
||||
const clearHelloWatch = () => {
|
||||
if (helloTimeoutId) {
|
||||
clearTimeout(helloTimeoutId);
|
||||
helloTimeoutId = undefined;
|
||||
}
|
||||
if (helloConnectedPollId) {
|
||||
clearInterval(helloConnectedPollId);
|
||||
helloConnectedPollId = undefined;
|
||||
}
|
||||
};
|
||||
const resetHelloStallCounter = () => {
|
||||
consecutiveHelloStalls = 0;
|
||||
};
|
||||
const clearResumeState = () => {
|
||||
const mutableGateway = gateway as
|
||||
| (GatewayPlugin & {
|
||||
state?: {
|
||||
sessionId?: string | null;
|
||||
resumeGatewayUrl?: string | null;
|
||||
sequence?: number | null;
|
||||
};
|
||||
sequence?: number | null;
|
||||
})
|
||||
| undefined;
|
||||
if (!mutableGateway?.state) {
|
||||
return;
|
||||
}
|
||||
mutableGateway.state.sessionId = null;
|
||||
mutableGateway.state.resumeGatewayUrl = null;
|
||||
mutableGateway.state.sequence = null;
|
||||
mutableGateway.sequence = null;
|
||||
};
|
||||
const onGatewayDebug = (msg: unknown) => {
|
||||
const message = String(msg);
|
||||
if (message.includes("WebSocket connection closed")) {
|
||||
// Carbon marks `isConnected` true only after READY/RESUMED and flips it
|
||||
// false during reconnect handling after this debug line is emitted.
|
||||
if (gateway?.isConnected) {
|
||||
resetHelloStallCounter();
|
||||
}
|
||||
clearHelloWatch();
|
||||
return;
|
||||
}
|
||||
if (!message.includes("WebSocket connection opened")) {
|
||||
return;
|
||||
}
|
||||
if (helloTimeoutId) {
|
||||
clearTimeout(helloTimeoutId);
|
||||
}
|
||||
helloTimeoutId = setTimeout(() => {
|
||||
clearHelloWatch();
|
||||
|
||||
let sawConnected = gateway?.isConnected === true;
|
||||
helloConnectedPollId = setInterval(() => {
|
||||
if (!gateway?.isConnected) {
|
||||
return;
|
||||
}
|
||||
sawConnected = true;
|
||||
resetHelloStallCounter();
|
||||
if (helloConnectedPollId) {
|
||||
clearInterval(helloConnectedPollId);
|
||||
helloConnectedPollId = undefined;
|
||||
}
|
||||
}, HELLO_CONNECTED_POLL_MS);
|
||||
|
||||
helloTimeoutId = setTimeout(() => {
|
||||
if (helloConnectedPollId) {
|
||||
clearInterval(helloConnectedPollId);
|
||||
helloConnectedPollId = undefined;
|
||||
}
|
||||
if (sawConnected || gateway?.isConnected) {
|
||||
resetHelloStallCounter();
|
||||
} else {
|
||||
consecutiveHelloStalls += 1;
|
||||
const forceFreshIdentify = consecutiveHelloStalls >= MAX_CONSECUTIVE_HELLO_STALLS;
|
||||
params.runtime.log?.(
|
||||
danger(
|
||||
`connection stalled: no HELLO received within ${HELLO_TIMEOUT_MS}ms, forcing reconnect`,
|
||||
forceFreshIdentify
|
||||
? `connection stalled: no HELLO within ${HELLO_TIMEOUT_MS}ms (${consecutiveHelloStalls}/${MAX_CONSECUTIVE_HELLO_STALLS}); forcing fresh identify`
|
||||
: `connection stalled: no HELLO within ${HELLO_TIMEOUT_MS}ms (${consecutiveHelloStalls}/${MAX_CONSECUTIVE_HELLO_STALLS}); retrying resume`,
|
||||
),
|
||||
);
|
||||
if (forceFreshIdentify) {
|
||||
clearResumeState();
|
||||
resetHelloStallCounter();
|
||||
}
|
||||
gateway?.disconnect();
|
||||
gateway?.connect(false);
|
||||
gateway?.connect(!forceFreshIdentify);
|
||||
}
|
||||
helloTimeoutId = undefined;
|
||||
}, HELLO_TIMEOUT_MS);
|
||||
@@ -137,9 +208,7 @@ export async function runDiscordGatewayLifecycle(params: {
|
||||
params.releaseEarlyGatewayErrorGuard?.();
|
||||
unregisterGateway(params.accountId);
|
||||
stopGatewayLogging();
|
||||
if (helloTimeoutId) {
|
||||
clearTimeout(helloTimeoutId);
|
||||
}
|
||||
clearHelloWatch();
|
||||
gatewayEmitter?.removeListener("debug", onGatewayDebug);
|
||||
params.abortSignal?.removeEventListener("abort", onAbort);
|
||||
if (params.voiceManager) {
|
||||
|
||||
Reference in New Issue
Block a user