fix(gateway): avoid duplicate delta flush when buffer unchanged

Track the text length at the time of the last broadcast. The flush in
emitChatFinal now only sends a delta if the buffer has grown since the
last broadcast, preventing duplicate sends when the final delta passed
the 150ms throttle and was already broadcast.
This commit is contained in:
Jonathan Taylor
2026-02-23 18:17:04 -05:00
parent bd4a5bd9d4
commit 328568a2dc
3 changed files with 20 additions and 3 deletions

View File

@@ -1295,7 +1295,11 @@ export async function runEmbeddedPiAgent(
aborted, aborted,
systemPromptReport: attempt.systemPromptReport, systemPromptReport: attempt.systemPromptReport,
// Handle client tool calls (OpenResponses hosted tools) // Handle client tool calls (OpenResponses hosted tools)
stopReason: attempt.clientToolCall ? "tool_calls" : undefined, // Propagate the LLM stop reason so callers (lifecycle events,
// ACP bridge) can distinguish end_turn from max_tokens.
stopReason: attempt.clientToolCall
? "tool_calls"
: (lastAssistant?.stopReason as string | undefined),
pendingToolCalls: attempt.clientToolCall pendingToolCalls: attempt.clientToolCall
? [ ? [
{ {

View File

@@ -873,6 +873,10 @@ async function agentCommandInternal(
fallbackProvider = fallbackResult.provider; fallbackProvider = fallbackResult.provider;
fallbackModel = fallbackResult.model; fallbackModel = fallbackResult.model;
if (!lifecycleEnded) { if (!lifecycleEnded) {
const stopReason = result.meta.stopReason;
if (stopReason && stopReason !== "end_turn") {
console.error(`[agent] run ${runId} ended with stopReason=${stopReason}`);
}
emitAgentEvent({ emitAgentEvent({
runId, runId,
stream: "lifecycle", stream: "lifecycle",
@@ -881,6 +885,7 @@ async function agentCommandInternal(
startedAt, startedAt,
endedAt: Date.now(), endedAt: Date.now(),
aborted: result.meta.aborted ?? false, aborted: result.meta.aborted ?? false,
stopReason,
}, },
}); });
} }

View File

@@ -158,6 +158,8 @@ export type ChatRunState = {
registry: ChatRunRegistry; registry: ChatRunRegistry;
buffers: Map<string, string>; buffers: Map<string, string>;
deltaSentAt: Map<string, number>; deltaSentAt: Map<string, number>;
/** Length of text at the time of the last broadcast, used to avoid duplicate flushes. */
deltaLastBroadcastLen: Map<string, number>;
abortedRuns: Map<string, number>; abortedRuns: Map<string, number>;
clear: () => void; clear: () => void;
}; };
@@ -166,12 +168,14 @@ export function createChatRunState(): ChatRunState {
const registry = createChatRunRegistry(); const registry = createChatRunRegistry();
const buffers = new Map<string, string>(); const buffers = new Map<string, string>();
const deltaSentAt = new Map<string, number>(); const deltaSentAt = new Map<string, number>();
const deltaLastBroadcastLen = new Map<string, number>();
const abortedRuns = new Map<string, number>(); const abortedRuns = new Map<string, number>();
const clear = () => { const clear = () => {
registry.clear(); registry.clear();
buffers.clear(); buffers.clear();
deltaSentAt.clear(); deltaSentAt.clear();
deltaLastBroadcastLen.clear();
abortedRuns.clear(); abortedRuns.clear();
}; };
@@ -179,6 +183,7 @@ export function createChatRunState(): ChatRunState {
registry, registry,
buffers, buffers,
deltaSentAt, deltaSentAt,
deltaLastBroadcastLen,
abortedRuns, abortedRuns,
clear, clear,
}; };
@@ -318,6 +323,7 @@ export function createAgentEventHandler({
return; return;
} }
chatRunState.deltaSentAt.set(clientRunId, now); chatRunState.deltaSentAt.set(clientRunId, now);
chatRunState.deltaLastBroadcastLen.set(clientRunId, cleaned.length);
const payload = { const payload = {
runId: clientRunId, runId: clientRunId,
sessionKey, sessionKey,
@@ -355,9 +361,10 @@ export function createAgentEventHandler({
// Flush any throttled delta so streaming clients receive the complete text // Flush any throttled delta so streaming clients receive the complete text
// before the final event. The 150 ms throttle in emitChatDelta may have // before the final event. The 150 ms throttle in emitChatDelta may have
// suppressed the most recent chunk, leaving the client with stale text. // suppressed the most recent chunk, leaving the client with stale text.
// Only flush if the buffer has grown since the last broadcast to avoid duplicates.
if (text && !shouldSuppressSilent) { if (text && !shouldSuppressSilent) {
const lastSent = chatRunState.deltaSentAt.get(clientRunId) ?? 0; const lastBroadcastLen = chatRunState.deltaLastBroadcastLen.get(clientRunId) ?? 0;
if (lastSent > 0) { if (text.length > lastBroadcastLen) {
const flushPayload = { const flushPayload = {
runId: clientRunId, runId: clientRunId,
sessionKey, sessionKey,
@@ -373,6 +380,7 @@ export function createAgentEventHandler({
nodeSendToSession(sessionKey, "chat", flushPayload); nodeSendToSession(sessionKey, "chat", flushPayload);
} }
} }
chatRunState.deltaLastBroadcastLen.delete(clientRunId);
chatRunState.buffers.delete(clientRunId); chatRunState.buffers.delete(clientRunId);
chatRunState.deltaSentAt.delete(clientRunId); chatRunState.deltaSentAt.delete(clientRunId);
if (jobState === "done") { if (jobState === "done") {