test(infra): dedupe outbound recovery test scaffolding

This commit is contained in:
Peter Steinberger
2026-02-19 08:12:40 +00:00
parent 4e5cffe4c9
commit ab924eb522

View File

@@ -10,6 +10,7 @@ import { createTestRegistry } from "../../test-utils/channel-plugins.js";
import { import {
ackDelivery, ackDelivery,
computeBackoffMs, computeBackoffMs,
type DeliverFn,
enqueueDelivery, enqueueDelivery,
failDelivery, failDelivery,
loadPendingDeliveries, loadPendingDeliveries,
@@ -177,22 +178,38 @@ describe("delivery-queue", () => {
describe("recoverPendingDeliveries", () => { describe("recoverPendingDeliveries", () => {
const noopDelay = async () => {}; const noopDelay = async () => {};
const baseCfg = {}; const baseCfg = {};
const createLog = () => ({ info: vi.fn(), warn: vi.fn(), error: vi.fn() });
it("recovers entries from a simulated crash", async () => { const enqueueCrashRecoveryEntries = async () => {
// Manually create two queue entries as if gateway crashed before delivery.
await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir); await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir);
await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir); await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir);
};
const deliver = vi.fn().mockResolvedValue([]); const runRecovery = async ({
const log = { info: vi.fn(), warn: vi.fn(), error: vi.fn() }; deliver,
log = createLog(),
delay = noopDelay,
maxRecoveryMs,
}: {
deliver: ReturnType<typeof vi.fn>;
log?: ReturnType<typeof createLog>;
delay?: (ms: number) => Promise<void>;
maxRecoveryMs?: number;
}) => {
const result = await recoverPendingDeliveries({ const result = await recoverPendingDeliveries({
deliver, deliver: deliver as DeliverFn,
log, log,
cfg: baseCfg, cfg: baseCfg,
stateDir: tmpDir, stateDir: tmpDir,
delay: noopDelay, delay,
...(maxRecoveryMs === undefined ? {} : { maxRecoveryMs }),
}); });
return { result, log };
};
it("recovers entries from a simulated crash", async () => {
// Manually create queue entries as if gateway crashed before delivery.
await enqueueCrashRecoveryEntries();
const deliver = vi.fn().mockResolvedValue([]);
const { result } = await runRecovery({ deliver });
expect(deliver).toHaveBeenCalledTimes(2); expect(deliver).toHaveBeenCalledTimes(2);
expect(result.recovered).toBe(2); expect(result.recovered).toBe(2);
@@ -216,15 +233,7 @@ describe("delivery-queue", () => {
fs.writeFileSync(filePath, JSON.stringify(entry), "utf-8"); fs.writeFileSync(filePath, JSON.stringify(entry), "utf-8");
const deliver = vi.fn(); const deliver = vi.fn();
const log = { info: vi.fn(), warn: vi.fn(), error: vi.fn() }; const { result } = await runRecovery({ deliver });
const result = await recoverPendingDeliveries({
deliver,
log,
cfg: baseCfg,
stateDir: tmpDir,
delay: noopDelay,
});
expect(deliver).not.toHaveBeenCalled(); expect(deliver).not.toHaveBeenCalled();
expect(result.skipped).toBe(1); expect(result.skipped).toBe(1);
@@ -238,15 +247,7 @@ describe("delivery-queue", () => {
await enqueueDelivery({ channel: "slack", to: "#ch", payloads: [{ text: "x" }] }, tmpDir); await enqueueDelivery({ channel: "slack", to: "#ch", payloads: [{ text: "x" }] }, tmpDir);
const deliver = vi.fn().mockRejectedValue(new Error("network down")); const deliver = vi.fn().mockRejectedValue(new Error("network down"));
const log = { info: vi.fn(), warn: vi.fn(), error: vi.fn() }; const { result } = await runRecovery({ deliver });
const result = await recoverPendingDeliveries({
deliver,
log,
cfg: baseCfg,
stateDir: tmpDir,
delay: noopDelay,
});
expect(result.failed).toBe(1); expect(result.failed).toBe(1);
expect(result.recovered).toBe(0); expect(result.recovered).toBe(0);
@@ -262,15 +263,7 @@ describe("delivery-queue", () => {
await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir); await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir);
const deliver = vi.fn().mockResolvedValue([]); const deliver = vi.fn().mockResolvedValue([]);
const log = { info: vi.fn(), warn: vi.fn(), error: vi.fn() }; await runRecovery({ deliver });
await recoverPendingDeliveries({
deliver,
log,
cfg: baseCfg,
stateDir: tmpDir,
delay: noopDelay,
});
expect(deliver).toHaveBeenCalledWith(expect.objectContaining({ skipQueue: true })); expect(deliver).toHaveBeenCalledWith(expect.objectContaining({ skipQueue: true }));
}); });
@@ -294,15 +287,7 @@ describe("delivery-queue", () => {
); );
const deliver = vi.fn().mockResolvedValue([]); const deliver = vi.fn().mockResolvedValue([]);
const log = { info: vi.fn(), warn: vi.fn(), error: vi.fn() }; await runRecovery({ deliver });
await recoverPendingDeliveries({
deliver,
log,
cfg: baseCfg,
stateDir: tmpDir,
delay: noopDelay,
});
expect(deliver).toHaveBeenCalledWith( expect(deliver).toHaveBeenCalledWith(
expect.objectContaining({ expect.objectContaining({
@@ -319,19 +304,12 @@ describe("delivery-queue", () => {
}); });
it("respects maxRecoveryMs time budget", async () => { it("respects maxRecoveryMs time budget", async () => {
await enqueueDelivery({ channel: "whatsapp", to: "+1", payloads: [{ text: "a" }] }, tmpDir); await enqueueCrashRecoveryEntries();
await enqueueDelivery({ channel: "telegram", to: "2", payloads: [{ text: "b" }] }, tmpDir);
await enqueueDelivery({ channel: "slack", to: "#c", payloads: [{ text: "c" }] }, tmpDir); await enqueueDelivery({ channel: "slack", to: "#c", payloads: [{ text: "c" }] }, tmpDir);
const deliver = vi.fn().mockResolvedValue([]); const deliver = vi.fn().mockResolvedValue([]);
const log = { info: vi.fn(), warn: vi.fn(), error: vi.fn() }; const { result, log } = await runRecovery({
const result = await recoverPendingDeliveries({
deliver, deliver,
log,
cfg: baseCfg,
stateDir: tmpDir,
delay: noopDelay,
maxRecoveryMs: 0, // Immediate timeout -- no entries should be processed. maxRecoveryMs: 0, // Immediate timeout -- no entries should be processed.
}); });
@@ -360,13 +338,8 @@ describe("delivery-queue", () => {
const deliver = vi.fn().mockResolvedValue([]); const deliver = vi.fn().mockResolvedValue([]);
const delay = vi.fn(async () => {}); const delay = vi.fn(async () => {});
const log = { info: vi.fn(), warn: vi.fn(), error: vi.fn() }; const { result, log } = await runRecovery({
const result = await recoverPendingDeliveries({
deliver, deliver,
log,
cfg: baseCfg,
stateDir: tmpDir,
delay, delay,
maxRecoveryMs: 1000, maxRecoveryMs: 1000,
}); });
@@ -383,15 +356,7 @@ describe("delivery-queue", () => {
it("returns zeros when queue is empty", async () => { it("returns zeros when queue is empty", async () => {
const deliver = vi.fn(); const deliver = vi.fn();
const log = { info: vi.fn(), warn: vi.fn(), error: vi.fn() }; const { result } = await runRecovery({ deliver });
const result = await recoverPendingDeliveries({
deliver,
log,
cfg: baseCfg,
stateDir: tmpDir,
delay: noopDelay,
});
expect(result).toEqual({ recovered: 0, failed: 0, skipped: 0 }); expect(result).toEqual({ recovered: 0, failed: 0, skipped: 0 });
expect(deliver).not.toHaveBeenCalled(); expect(deliver).not.toHaveBeenCalled();