From 155853c0576edcd62fb4944de2d9e18a1a3b7560 Mon Sep 17 00:00:00 2001 From: SidQin-cyber Date: Thu, 5 Mar 2026 13:07:35 +0800 Subject: [PATCH] feat(agents): flush reply pipeline before compaction wait MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Auto-compaction blocks the run pipeline after prompt() returns. Any buffered block replies (the assistant's response) sit in the delivery pipeline until compaction finishes — which can take 7+ minutes on large contexts. The user sees no reply for the entire duration even though the response is already fully generated. Two changes ensure the response reaches the channel immediately: 1. attempt.ts: call onBlockReplyFlush() before waitForCompactionRetry() so the pipeline drains while compaction is still running. 2. handleAgentEnd: call onBlockReplyFlush() after flushBlockReplyBuffer() (mirroring the pattern already used by handleToolExecutionStart) so coalesced blocks are dispatched as soon as the turn ends. Closes #35074 --- src/agents/pi-embedded-runner/run/attempt.ts | 8 ++++++++ src/agents/pi-embedded-subscribe.handlers.lifecycle.ts | 5 +++++ 2 files changed, 13 insertions(+) diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 54ac8b13489..e19dd3cedb2 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -1688,6 +1688,14 @@ export async function runEmbeddedAttempt( const preCompactionSessionId = activeSession.sessionId; try { + // Flush buffered block replies before waiting for compaction so the + // user receives the assistant response immediately. Without this, + // coalesced/buffered blocks stay in the pipeline until compaction + // finishes — which can take minutes on large contexts (#35074). + if (params.onBlockReplyFlush) { + await params.onBlockReplyFlush(); + } + await abortable(waitForCompactionRetry()); } catch (err) { if (isRunnerAbortError(err)) { diff --git a/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts b/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts index 326b51c7266..4c6803e814c 100644 --- a/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts +++ b/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts @@ -73,6 +73,11 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) { } ctx.flushBlockReplyBuffer(); + // Flush the reply pipeline so the response reaches the channel before + // compaction wait blocks the run. This mirrors the pattern used by + // handleToolExecutionStart and ensures delivery is not held hostage to + // long-running compaction (#35074). + void ctx.params.onBlockReplyFlush?.(); ctx.state.blockState.thinking = false; ctx.state.blockState.final = false;