fix(discord): harden voice DAVE receive reliability (#25861)

Reimplements and consolidates related work:
- #24339 stale disconnect/destroyed session guards
- #25312 voice listener cleanup on stop
- #23036 restore @snazzah/davey runtime dependency

Adds Discord voice DAVE config passthrough, repeated decrypt failure
rejoin recovery, regression tests, docs, and changelog updates.

Co-authored-by: Frank Yang <frank.ekn@gmail.com>
Co-authored-by: Do Cao Hieu <admin@docaohieu.com>
This commit is contained in:
Peter Steinberger
2026-02-25 00:19:36 +00:00
parent 1839ba8ccb
commit 9cd50c51b0
11 changed files with 555 additions and 12 deletions

View File

@@ -0,0 +1,268 @@
import { ChannelType } from "@buape/carbon";
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
const {
createConnectionMock,
joinVoiceChannelMock,
entersStateMock,
createAudioPlayerMock,
resolveAgentRouteMock,
} = vi.hoisted(() => {
type EventHandler = (...args: unknown[]) => unknown;
type MockConnection = {
destroy: ReturnType<typeof vi.fn>;
subscribe: ReturnType<typeof vi.fn>;
on: ReturnType<typeof vi.fn>;
off: ReturnType<typeof vi.fn>;
receiver: {
speaking: {
on: ReturnType<typeof vi.fn>;
off: ReturnType<typeof vi.fn>;
};
subscribe: ReturnType<typeof vi.fn>;
};
handlers: Map<string, EventHandler>;
};
const createConnectionMock = (): MockConnection => {
const handlers = new Map<string, EventHandler>();
const connection: MockConnection = {
destroy: vi.fn(),
subscribe: vi.fn(),
on: vi.fn((event: string, handler: EventHandler) => {
handlers.set(event, handler);
}),
off: vi.fn(),
receiver: {
speaking: {
on: vi.fn(),
off: vi.fn(),
},
subscribe: vi.fn(() => ({
on: vi.fn(),
[Symbol.asyncIterator]: async function* () {},
})),
},
handlers,
};
return connection;
};
return {
createConnectionMock,
joinVoiceChannelMock: vi.fn(() => createConnectionMock()),
entersStateMock: vi.fn(async (_target?: unknown, _state?: string, _timeoutMs?: number) => {
return undefined;
}),
createAudioPlayerMock: vi.fn(() => ({
on: vi.fn(),
off: vi.fn(),
stop: vi.fn(),
play: vi.fn(),
state: { status: "idle" },
})),
resolveAgentRouteMock: vi.fn(() => ({ agentId: "agent-1", sessionKey: "discord:g1:c1" })),
};
});
vi.mock("@discordjs/voice", () => ({
AudioPlayerStatus: { Playing: "playing", Idle: "idle" },
EndBehaviorType: { AfterSilence: "AfterSilence" },
VoiceConnectionStatus: {
Ready: "ready",
Disconnected: "disconnected",
Destroyed: "destroyed",
Signalling: "signalling",
Connecting: "connecting",
},
createAudioPlayer: createAudioPlayerMock,
createAudioResource: vi.fn(),
entersState: entersStateMock,
joinVoiceChannel: joinVoiceChannelMock,
}));
vi.mock("../../routing/resolve-route.js", () => ({
resolveAgentRoute: resolveAgentRouteMock,
}));
let managerModule: typeof import("./manager.js");
function createClient() {
return {
fetchChannel: vi.fn(async (channelId: string) => ({
id: channelId,
guildId: "g1",
type: ChannelType.GuildVoice,
})),
getPlugin: vi.fn(() => ({
getGatewayAdapterCreator: vi.fn(() => vi.fn()),
})),
fetchMember: vi.fn(),
fetchUser: vi.fn(),
};
}
function createRuntime() {
return {
log: vi.fn(),
error: vi.fn(),
exit: vi.fn(),
};
}
describe("DiscordVoiceManager", () => {
beforeAll(async () => {
managerModule = await import("./manager.js");
});
beforeEach(() => {
joinVoiceChannelMock.mockReset();
joinVoiceChannelMock.mockImplementation(() => createConnectionMock());
entersStateMock.mockReset();
entersStateMock.mockResolvedValue(undefined);
createAudioPlayerMock.mockClear();
resolveAgentRouteMock.mockClear();
});
it("keeps the new session when an old disconnected handler fires", async () => {
const oldConnection = createConnectionMock();
const newConnection = createConnectionMock();
joinVoiceChannelMock.mockReturnValueOnce(oldConnection).mockReturnValueOnce(newConnection);
entersStateMock.mockImplementation(async (target: unknown, status?: string) => {
if (target === oldConnection && (status === "signalling" || status === "connecting")) {
throw new Error("old disconnected");
}
return undefined;
});
const manager = new managerModule.DiscordVoiceManager({
client: createClient() as never,
cfg: {},
discordConfig: {},
accountId: "default",
runtime: createRuntime(),
});
await manager.join({ guildId: "g1", channelId: "c1" });
await manager.join({ guildId: "g1", channelId: "c2" });
const oldDisconnected = oldConnection.handlers.get("disconnected");
expect(oldDisconnected).toBeTypeOf("function");
await oldDisconnected?.();
expect(manager.status()).toEqual([
{
ok: true,
message: "connected: guild g1 channel c2",
guildId: "g1",
channelId: "c2",
},
]);
});
it("keeps the new session when an old destroyed handler fires", async () => {
const oldConnection = createConnectionMock();
const newConnection = createConnectionMock();
joinVoiceChannelMock.mockReturnValueOnce(oldConnection).mockReturnValueOnce(newConnection);
const manager = new managerModule.DiscordVoiceManager({
client: createClient() as never,
cfg: {},
discordConfig: {},
accountId: "default",
runtime: createRuntime(),
});
await manager.join({ guildId: "g1", channelId: "c1" });
await manager.join({ guildId: "g1", channelId: "c2" });
const oldDestroyed = oldConnection.handlers.get("destroyed");
expect(oldDestroyed).toBeTypeOf("function");
oldDestroyed?.();
expect(manager.status()).toEqual([
{
ok: true,
message: "connected: guild g1 channel c2",
guildId: "g1",
channelId: "c2",
},
]);
});
it("removes voice listeners on leave", async () => {
const connection = createConnectionMock();
joinVoiceChannelMock.mockReturnValueOnce(connection);
const manager = new managerModule.DiscordVoiceManager({
client: createClient() as never,
cfg: {},
discordConfig: {},
accountId: "default",
runtime: createRuntime(),
});
await manager.join({ guildId: "g1", channelId: "c1" });
await manager.leave({ guildId: "g1" });
const player = createAudioPlayerMock.mock.results[0]?.value;
expect(connection.receiver.speaking.off).toHaveBeenCalledWith("start", expect.any(Function));
expect(connection.off).toHaveBeenCalledWith("disconnected", expect.any(Function));
expect(connection.off).toHaveBeenCalledWith("destroyed", expect.any(Function));
expect(player.off).toHaveBeenCalledWith("error", expect.any(Function));
});
it("passes DAVE options to joinVoiceChannel", async () => {
const manager = new managerModule.DiscordVoiceManager({
client: createClient() as never,
cfg: {},
discordConfig: {
voice: {
daveEncryption: false,
decryptionFailureTolerance: 8,
},
},
accountId: "default",
runtime: createRuntime(),
});
await manager.join({ guildId: "g1", channelId: "c1" });
expect(joinVoiceChannelMock).toHaveBeenCalledWith(
expect.objectContaining({
daveEncryption: false,
decryptionFailureTolerance: 8,
}),
);
});
it("attempts rejoin after repeated decrypt failures", async () => {
const manager = new managerModule.DiscordVoiceManager({
client: createClient() as never,
cfg: {},
discordConfig: {},
accountId: "default",
runtime: createRuntime(),
});
await manager.join({ guildId: "g1", channelId: "c1" });
const entry = (manager as { sessions: Map<string, unknown> }).sessions.get("g1");
expect(entry).toBeDefined();
(manager as { handleReceiveError: (e: unknown, err: unknown) => void }).handleReceiveError(
entry,
new Error("Failed to decrypt: DecryptionFailed(UnencryptedWhenPassthroughDisabled)"),
);
(manager as { handleReceiveError: (e: unknown, err: unknown) => void }).handleReceiveError(
entry,
new Error("Failed to decrypt: DecryptionFailed(UnencryptedWhenPassthroughDisabled)"),
);
(manager as { handleReceiveError: (e: unknown, err: unknown) => void }).handleReceiveError(
entry,
new Error("Failed to decrypt: DecryptionFailed(UnencryptedWhenPassthroughDisabled)"),
);
await new Promise((resolve) => setTimeout(resolve, 0));
await new Promise((resolve) => setTimeout(resolve, 0));
expect(joinVoiceChannelMock).toHaveBeenCalledTimes(2);
});
});

View File

@@ -45,6 +45,9 @@ const MIN_SEGMENT_SECONDS = 0.35;
const SILENCE_DURATION_MS = 1_000;
const PLAYBACK_READY_TIMEOUT_MS = 15_000;
const SPEAKING_READY_TIMEOUT_MS = 60_000;
const DECRYPT_FAILURE_WINDOW_MS = 30_000;
const DECRYPT_FAILURE_RECONNECT_THRESHOLD = 3;
const DECRYPT_FAILURE_PATTERN = /DecryptionFailed\(/;
const logger = createSubsystemLogger("discord/voice");
@@ -69,6 +72,9 @@ type VoiceSessionEntry = {
playbackQueue: Promise<void>;
processingQueue: Promise<void>;
activeSpeakers: Set<string>;
decryptFailureCount: number;
lastDecryptFailureAt: number;
decryptRecoveryInFlight: boolean;
stop: () => void;
};
@@ -377,12 +383,21 @@ export class DiscordVoiceManager {
}
const adapterCreator = voicePlugin.getGatewayAdapterCreator(guildId);
const daveEncryption = this.params.discordConfig.voice?.daveEncryption;
const decryptionFailureTolerance = this.params.discordConfig.voice?.decryptionFailureTolerance;
logVoiceVerbose(
`join: DAVE settings encryption=${daveEncryption === false ? "off" : "on"} tolerance=${
decryptionFailureTolerance ?? "default"
}`,
);
const connection = joinVoiceChannel({
channelId,
guildId,
adapterCreator,
selfDeaf: false,
selfMute: false,
daveEncryption,
decryptionFailureTolerance,
});
try {
@@ -412,6 +427,17 @@ export class DiscordVoiceManager {
const player = createAudioPlayer();
connection.subscribe(player);
let speakingHandler: ((userId: string) => void) | undefined;
let disconnectedHandler: (() => Promise<void>) | undefined;
let destroyedHandler: (() => void) | undefined;
let playerErrorHandler: ((err: Error) => void) | undefined;
const clearSessionIfCurrent = () => {
const active = this.sessions.get(guildId);
if (active?.connection === connection) {
this.sessions.delete(guildId);
}
};
const entry: VoiceSessionEntry = {
guildId,
channelId,
@@ -422,37 +448,55 @@ export class DiscordVoiceManager {
playbackQueue: Promise.resolve(),
processingQueue: Promise.resolve(),
activeSpeakers: new Set(),
decryptFailureCount: 0,
lastDecryptFailureAt: 0,
decryptRecoveryInFlight: false,
stop: () => {
if (speakingHandler) {
connection.receiver.speaking.off("start", speakingHandler);
}
if (disconnectedHandler) {
connection.off(VoiceConnectionStatus.Disconnected, disconnectedHandler);
}
if (destroyedHandler) {
connection.off(VoiceConnectionStatus.Destroyed, destroyedHandler);
}
if (playerErrorHandler) {
player.off("error", playerErrorHandler);
}
player.stop();
connection.destroy();
},
};
const speakingHandler = (userId: string) => {
speakingHandler = (userId: string) => {
void this.handleSpeakingStart(entry, userId).catch((err) => {
logger.warn(`discord voice: capture failed: ${formatErrorMessage(err)}`);
});
};
connection.receiver.speaking.on("start", speakingHandler);
connection.on(VoiceConnectionStatus.Disconnected, async () => {
disconnectedHandler = async () => {
try {
await Promise.race([
entersState(connection, VoiceConnectionStatus.Signalling, 5_000),
entersState(connection, VoiceConnectionStatus.Connecting, 5_000),
]);
} catch {
this.sessions.delete(guildId);
clearSessionIfCurrent();
connection.destroy();
}
});
connection.on(VoiceConnectionStatus.Destroyed, () => {
this.sessions.delete(guildId);
});
player.on("error", (err) => {
};
destroyedHandler = () => {
clearSessionIfCurrent();
};
playerErrorHandler = (err: Error) => {
logger.warn(`discord voice: playback error: ${formatErrorMessage(err)}`);
});
};
connection.receiver.speaking.on("start", speakingHandler);
connection.on(VoiceConnectionStatus.Disconnected, disconnectedHandler);
connection.on(VoiceConnectionStatus.Destroyed, destroyedHandler);
player.on("error", playerErrorHandler);
this.sessions.set(guildId, entry);
return {
@@ -526,7 +570,7 @@ export class DiscordVoiceManager {
},
});
stream.on("error", (err) => {
logger.warn(`discord voice: receive error: ${formatErrorMessage(err)}`);
this.handleReceiveError(entry, err);
});
try {
@@ -537,6 +581,7 @@ export class DiscordVoiceManager {
);
return;
}
this.resetDecryptFailureState(entry);
const { path: wavPath, durationSeconds } = await writeWavFile(pcm);
if (durationSeconds < MIN_SEGMENT_SECONDS) {
logVoiceVerbose(
@@ -654,6 +699,64 @@ export class DiscordVoiceManager {
});
}
private handleReceiveError(entry: VoiceSessionEntry, err: unknown) {
const message = formatErrorMessage(err);
logger.warn(`discord voice: receive error: ${message}`);
if (!DECRYPT_FAILURE_PATTERN.test(message)) {
return;
}
const now = Date.now();
if (now - entry.lastDecryptFailureAt > DECRYPT_FAILURE_WINDOW_MS) {
entry.decryptFailureCount = 0;
}
entry.lastDecryptFailureAt = now;
entry.decryptFailureCount += 1;
if (entry.decryptFailureCount === 1) {
logger.warn(
"discord voice: DAVE decrypt failures detected; voice receive may be unstable (upstream: discordjs/discord.js#11419)",
);
}
if (
entry.decryptFailureCount < DECRYPT_FAILURE_RECONNECT_THRESHOLD ||
entry.decryptRecoveryInFlight
) {
return;
}
entry.decryptRecoveryInFlight = true;
this.resetDecryptFailureState(entry);
void this.recoverFromDecryptFailures(entry)
.catch((recoverErr) =>
logger.warn(`discord voice: decrypt recovery failed: ${formatErrorMessage(recoverErr)}`),
)
.finally(() => {
entry.decryptRecoveryInFlight = false;
});
}
private resetDecryptFailureState(entry: VoiceSessionEntry) {
entry.decryptFailureCount = 0;
entry.lastDecryptFailureAt = 0;
}
private async recoverFromDecryptFailures(entry: VoiceSessionEntry) {
const active = this.sessions.get(entry.guildId);
if (!active || active.connection !== entry.connection) {
return;
}
logger.warn(
`discord voice: repeated decrypt failures; attempting rejoin for guild ${entry.guildId} channel ${entry.channelId}`,
);
const leaveResult = await this.leave({ guildId: entry.guildId });
if (!leaveResult.ok) {
logger.warn(`discord voice: decrypt recovery leave failed: ${leaveResult.message}`);
return;
}
const result = await this.join({ guildId: entry.guildId, channelId: entry.channelId });
if (!result.ok) {
logger.warn(`discord voice: rejoin after decrypt failures failed: ${result.message}`);
}
}
private async resolveSpeakerLabel(guildId: string, userId: string): Promise<string | undefined> {
try {
const member = await this.params.client.fetchMember(guildId, userId);