fix (auto-reply/queue): preserve queued items on drain retries

This commit is contained in:
Vignesh Natarajan
2026-02-14 20:23:23 -08:00
parent f3a474af30
commit d6f1e7ae95
2 changed files with 94 additions and 6 deletions

View File

@@ -365,4 +365,58 @@ describe("followup queue collect routing", () => {
expect(calls[0]?.originatingThreadId).toBe("1706000000.000001");
expect(calls[1]?.originatingThreadId).toBe("1706000000.000002");
});
it("retries collect-mode batches without losing queued items", async () => {
const key = `test-collect-retry-${Date.now()}`;
const calls: FollowupRun[] = [];
let attempt = 0;
const runFollowup = async (run: FollowupRun) => {
attempt += 1;
if (attempt === 1) {
throw new Error("transient failure");
}
calls.push(run);
};
const settings: QueueSettings = {
mode: "collect",
debounceMs: 0,
cap: 50,
dropPolicy: "summarize",
};
enqueueFollowupRun(key, createRun({ prompt: "one" }), settings);
enqueueFollowupRun(key, createRun({ prompt: "two" }), settings);
scheduleFollowupDrain(key, runFollowup);
await expect.poll(() => calls.length).toBe(1);
expect(calls[0]?.prompt).toContain("Queued #1\none");
expect(calls[0]?.prompt).toContain("Queued #2\ntwo");
});
it("retries overflow summary delivery without losing dropped previews", async () => {
const key = `test-overflow-summary-retry-${Date.now()}`;
const calls: FollowupRun[] = [];
let attempt = 0;
const runFollowup = async (run: FollowupRun) => {
attempt += 1;
if (attempt === 1) {
throw new Error("transient failure");
}
calls.push(run);
};
const settings: QueueSettings = {
mode: "followup",
debounceMs: 0,
cap: 1,
dropPolicy: "summarize",
};
enqueueFollowupRun(key, createRun({ prompt: "first" }), settings);
enqueueFollowupRun(key, createRun({ prompt: "second" }), settings);
scheduleFollowupDrain(key, runFollowup);
await expect.poll(() => calls.length).toBe(1);
expect(calls[0]?.prompt).toContain("[Queue overflow] Dropped 1 message due to cap.");
expect(calls[0]?.prompt).toContain("- first");
});
});

View File

@@ -9,6 +9,26 @@ import {
import { isRoutableChannel } from "../route-reply.js";
import { FOLLOWUP_QUEUES } from "./state.js";
function previewQueueSummaryPrompt(queue: {
dropPolicy: "summarize" | "old" | "new";
droppedCount: number;
summaryLines: string[];
}): string | undefined {
return buildQueueSummaryPrompt({
state: {
dropPolicy: queue.dropPolicy,
droppedCount: queue.droppedCount,
summaryLines: [...queue.summaryLines],
},
noun: "message",
});
}
function clearQueueSummaryState(queue: { droppedCount: number; summaryLines: string[] }): void {
queue.droppedCount = 0;
queue.summaryLines = [];
}
export function scheduleFollowupDrain(
key: string,
runFollowup: (run: FollowupRun) => Promise<void>,
@@ -29,11 +49,12 @@ export function scheduleFollowupDrain(
//
// Debug: `pnpm test src/auto-reply/reply/queue.collect-routing.test.ts`
if (forceIndividualCollect) {
const next = queue.items.shift();
const next = queue.items[0];
if (!next) {
break;
}
await runFollowup(next);
queue.items.shift();
continue;
}
@@ -58,16 +79,17 @@ export function scheduleFollowupDrain(
if (isCrossChannel) {
forceIndividualCollect = true;
const next = queue.items.shift();
const next = queue.items[0];
if (!next) {
break;
}
await runFollowup(next);
queue.items.shift();
continue;
}
const items = queue.items.splice(0, queue.items.length);
const summary = buildQueueSummaryPrompt({ state: queue, noun: "message" });
const items = queue.items.slice();
const summary = previewQueueSummaryPrompt(queue);
const run = items.at(-1)?.run ?? queue.lastRun;
if (!run) {
break;
@@ -98,30 +120,42 @@ export function scheduleFollowupDrain(
originatingAccountId,
originatingThreadId,
});
queue.items.splice(0, items.length);
if (summary) {
clearQueueSummaryState(queue);
}
continue;
}
const summaryPrompt = buildQueueSummaryPrompt({ state: queue, noun: "message" });
const summaryPrompt = previewQueueSummaryPrompt(queue);
if (summaryPrompt) {
const run = queue.lastRun;
if (!run) {
break;
}
const next = queue.items[0];
if (!next) {
break;
}
await runFollowup({
prompt: summaryPrompt,
run,
enqueuedAt: Date.now(),
});
queue.items.shift();
clearQueueSummaryState(queue);
continue;
}
const next = queue.items.shift();
const next = queue.items[0];
if (!next) {
break;
}
await runFollowup(next);
queue.items.shift();
}
} catch (err) {
queue.lastEnqueuedAt = Date.now();
defaultRuntime.error?.(`followup queue drain failed for ${key}: ${String(err)}`);
} finally {
queue.draining = false;