Subagents: retain announce queue items on send failure

This commit is contained in:
Vignesh Natarajan
2026-02-14 18:14:11 -08:00
parent 28ff755623
commit 2a83609287

View File

@@ -44,6 +44,22 @@ type AnnounceQueueState = {
const ANNOUNCE_QUEUES = new Map<string, AnnounceQueueState>(); const ANNOUNCE_QUEUES = new Map<string, AnnounceQueueState>();
function previewQueueSummaryPrompt(queue: AnnounceQueueState): string | undefined {
return buildQueueSummaryPrompt({
state: {
dropPolicy: queue.dropPolicy,
droppedCount: queue.droppedCount,
summaryLines: [...queue.summaryLines],
},
noun: "announce",
});
}
function clearQueueSummaryState(queue: AnnounceQueueState) {
queue.droppedCount = 0;
queue.summaryLines = [];
}
export function resetAnnounceQueuesForTests() { export function resetAnnounceQueuesForTests() {
// Test isolation: other suites may leave a draining queue behind in the worker. // Test isolation: other suites may leave a draining queue behind in the worker.
// Clearing the map alone isn't enough because drain loops capture `queue` by reference. // Clearing the map alone isn't enough because drain loops capture `queue` by reference.
@@ -105,11 +121,12 @@ function scheduleAnnounceDrain(key: string) {
await waitForQueueDebounce(queue); await waitForQueueDebounce(queue);
if (queue.mode === "collect") { if (queue.mode === "collect") {
if (forceIndividualCollect) { if (forceIndividualCollect) {
const next = queue.items.shift(); const next = queue.items[0];
if (!next) { if (!next) {
break; break;
} }
await queue.send(next); await queue.send(next);
queue.items.shift();
continue; continue;
} }
const isCrossChannel = hasCrossChannelItems(queue.items, (item) => { const isCrossChannel = hasCrossChannelItems(queue.items, (item) => {
@@ -123,15 +140,16 @@ function scheduleAnnounceDrain(key: string) {
}); });
if (isCrossChannel) { if (isCrossChannel) {
forceIndividualCollect = true; forceIndividualCollect = true;
const next = queue.items.shift(); const next = queue.items[0];
if (!next) { if (!next) {
break; break;
} }
await queue.send(next); await queue.send(next);
queue.items.shift();
continue; continue;
} }
const items = queue.items.splice(0, queue.items.length); const items = queue.items.slice();
const summary = buildQueueSummaryPrompt({ state: queue, noun: "announce" }); const summary = previewQueueSummaryPrompt(queue);
const prompt = buildCollectPrompt({ const prompt = buildCollectPrompt({
title: "[Queued announce messages while agent was busy]", title: "[Queued announce messages while agent was busy]",
items, items,
@@ -143,26 +161,35 @@ function scheduleAnnounceDrain(key: string) {
break; break;
} }
await queue.send({ ...last, prompt }); await queue.send({ ...last, prompt });
queue.items.splice(0, items.length);
if (summary) {
clearQueueSummaryState(queue);
}
continue; continue;
} }
const summaryPrompt = buildQueueSummaryPrompt({ state: queue, noun: "announce" }); const summaryPrompt = previewQueueSummaryPrompt(queue);
if (summaryPrompt) { if (summaryPrompt) {
const next = queue.items.shift(); const next = queue.items[0];
if (!next) { if (!next) {
break; break;
} }
await queue.send({ ...next, prompt: summaryPrompt }); await queue.send({ ...next, prompt: summaryPrompt });
queue.items.shift();
clearQueueSummaryState(queue);
continue; continue;
} }
const next = queue.items.shift(); const next = queue.items[0];
if (!next) { if (!next) {
break; break;
} }
await queue.send(next); await queue.send(next);
queue.items.shift();
} }
} catch (err) { } catch (err) {
// Keep items in queue and retry after debounce; avoid hot-loop retries.
queue.lastEnqueuedAt = Date.now();
defaultRuntime.error?.(`announce queue drain failed for ${key}: ${String(err)}`); defaultRuntime.error?.(`announce queue drain failed for ${key}: ${String(err)}`);
} finally { } finally {
queue.draining = false; queue.draining = false;