From d6f1e7ae9571c78e6a29022334f89547b5cf1af3 Mon Sep 17 00:00:00 2001 From: Vignesh Natarajan Date: Sat, 14 Feb 2026 20:23:23 -0800 Subject: [PATCH] fix (auto-reply/queue): preserve queued items on drain retries --- .../reply/queue.collect-routing.test.ts | 54 +++++++++++++++++++ src/auto-reply/reply/queue/drain.ts | 46 +++++++++++++--- 2 files changed, 94 insertions(+), 6 deletions(-) diff --git a/src/auto-reply/reply/queue.collect-routing.test.ts b/src/auto-reply/reply/queue.collect-routing.test.ts index cc2b214bf0d..e9545352aa4 100644 --- a/src/auto-reply/reply/queue.collect-routing.test.ts +++ b/src/auto-reply/reply/queue.collect-routing.test.ts @@ -365,4 +365,58 @@ describe("followup queue collect routing", () => { expect(calls[0]?.originatingThreadId).toBe("1706000000.000001"); expect(calls[1]?.originatingThreadId).toBe("1706000000.000002"); }); + + it("retries collect-mode batches without losing queued items", async () => { + const key = `test-collect-retry-${Date.now()}`; + const calls: FollowupRun[] = []; + let attempt = 0; + const runFollowup = async (run: FollowupRun) => { + attempt += 1; + if (attempt === 1) { + throw new Error("transient failure"); + } + calls.push(run); + }; + const settings: QueueSettings = { + mode: "collect", + debounceMs: 0, + cap: 50, + dropPolicy: "summarize", + }; + + enqueueFollowupRun(key, createRun({ prompt: "one" }), settings); + enqueueFollowupRun(key, createRun({ prompt: "two" }), settings); + + scheduleFollowupDrain(key, runFollowup); + await expect.poll(() => calls.length).toBe(1); + expect(calls[0]?.prompt).toContain("Queued #1\none"); + expect(calls[0]?.prompt).toContain("Queued #2\ntwo"); + }); + + it("retries overflow summary delivery without losing dropped previews", async () => { + const key = `test-overflow-summary-retry-${Date.now()}`; + const calls: FollowupRun[] = []; + let attempt = 0; + const runFollowup = async (run: FollowupRun) => { + attempt += 1; + if (attempt === 1) { + throw new Error("transient failure"); + } + calls.push(run); + }; + const settings: QueueSettings = { + mode: "followup", + debounceMs: 0, + cap: 1, + dropPolicy: "summarize", + }; + + enqueueFollowupRun(key, createRun({ prompt: "first" }), settings); + enqueueFollowupRun(key, createRun({ prompt: "second" }), settings); + + scheduleFollowupDrain(key, runFollowup); + await expect.poll(() => calls.length).toBe(1); + expect(calls[0]?.prompt).toContain("[Queue overflow] Dropped 1 message due to cap."); + expect(calls[0]?.prompt).toContain("- first"); + }); }); diff --git a/src/auto-reply/reply/queue/drain.ts b/src/auto-reply/reply/queue/drain.ts index 626e40af327..2d8c8737758 100644 --- a/src/auto-reply/reply/queue/drain.ts +++ b/src/auto-reply/reply/queue/drain.ts @@ -9,6 +9,26 @@ import { import { isRoutableChannel } from "../route-reply.js"; import { FOLLOWUP_QUEUES } from "./state.js"; +function previewQueueSummaryPrompt(queue: { + dropPolicy: "summarize" | "old" | "new"; + droppedCount: number; + summaryLines: string[]; +}): string | undefined { + return buildQueueSummaryPrompt({ + state: { + dropPolicy: queue.dropPolicy, + droppedCount: queue.droppedCount, + summaryLines: [...queue.summaryLines], + }, + noun: "message", + }); +} + +function clearQueueSummaryState(queue: { droppedCount: number; summaryLines: string[] }): void { + queue.droppedCount = 0; + queue.summaryLines = []; +} + export function scheduleFollowupDrain( key: string, runFollowup: (run: FollowupRun) => Promise, @@ -29,11 +49,12 @@ export function scheduleFollowupDrain( // // Debug: `pnpm test src/auto-reply/reply/queue.collect-routing.test.ts` if (forceIndividualCollect) { - const next = queue.items.shift(); + const next = queue.items[0]; if (!next) { break; } await runFollowup(next); + queue.items.shift(); continue; } @@ -58,16 +79,17 @@ export function scheduleFollowupDrain( if (isCrossChannel) { forceIndividualCollect = true; - const next = queue.items.shift(); + const next = queue.items[0]; if (!next) { break; } await runFollowup(next); + queue.items.shift(); continue; } - const items = queue.items.splice(0, queue.items.length); - const summary = buildQueueSummaryPrompt({ state: queue, noun: "message" }); + const items = queue.items.slice(); + const summary = previewQueueSummaryPrompt(queue); const run = items.at(-1)?.run ?? queue.lastRun; if (!run) { break; @@ -98,30 +120,42 @@ export function scheduleFollowupDrain( originatingAccountId, originatingThreadId, }); + queue.items.splice(0, items.length); + if (summary) { + clearQueueSummaryState(queue); + } continue; } - const summaryPrompt = buildQueueSummaryPrompt({ state: queue, noun: "message" }); + const summaryPrompt = previewQueueSummaryPrompt(queue); if (summaryPrompt) { const run = queue.lastRun; if (!run) { break; } + const next = queue.items[0]; + if (!next) { + break; + } await runFollowup({ prompt: summaryPrompt, run, enqueuedAt: Date.now(), }); + queue.items.shift(); + clearQueueSummaryState(queue); continue; } - const next = queue.items.shift(); + const next = queue.items[0]; if (!next) { break; } await runFollowup(next); + queue.items.shift(); } } catch (err) { + queue.lastEnqueuedAt = Date.now(); defaultRuntime.error?.(`followup queue drain failed for ${key}: ${String(err)}`); } finally { queue.draining = false;