diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 3c5d5a67f6f..bfda498f5e3 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -1295,7 +1295,11 @@ export async function runEmbeddedPiAgent( aborted, systemPromptReport: attempt.systemPromptReport, // Handle client tool calls (OpenResponses hosted tools) - stopReason: attempt.clientToolCall ? "tool_calls" : undefined, + // Propagate the LLM stop reason so callers (lifecycle events, + // ACP bridge) can distinguish end_turn from max_tokens. + stopReason: attempt.clientToolCall + ? "tool_calls" + : (lastAssistant?.stopReason as string | undefined), pendingToolCalls: attempt.clientToolCall ? [ { diff --git a/src/commands/agent.ts b/src/commands/agent.ts index 1f58c5e39f4..32fbd3b2adc 100644 --- a/src/commands/agent.ts +++ b/src/commands/agent.ts @@ -873,6 +873,10 @@ async function agentCommandInternal( fallbackProvider = fallbackResult.provider; fallbackModel = fallbackResult.model; if (!lifecycleEnded) { + const stopReason = result.meta.stopReason; + if (stopReason && stopReason !== "end_turn") { + console.error(`[agent] run ${runId} ended with stopReason=${stopReason}`); + } emitAgentEvent({ runId, stream: "lifecycle", @@ -881,6 +885,7 @@ async function agentCommandInternal( startedAt, endedAt: Date.now(), aborted: result.meta.aborted ?? false, + stopReason, }, }); } diff --git a/src/gateway/server-chat.ts b/src/gateway/server-chat.ts index 64675c8b5b7..36ce78903b8 100644 --- a/src/gateway/server-chat.ts +++ b/src/gateway/server-chat.ts @@ -158,6 +158,8 @@ export type ChatRunState = { registry: ChatRunRegistry; buffers: Map; deltaSentAt: Map; + /** Length of text at the time of the last broadcast, used to avoid duplicate flushes. */ + deltaLastBroadcastLen: Map; abortedRuns: Map; clear: () => void; }; @@ -166,12 +168,14 @@ export function createChatRunState(): ChatRunState { const registry = createChatRunRegistry(); const buffers = new Map(); const deltaSentAt = new Map(); + const deltaLastBroadcastLen = new Map(); const abortedRuns = new Map(); const clear = () => { registry.clear(); buffers.clear(); deltaSentAt.clear(); + deltaLastBroadcastLen.clear(); abortedRuns.clear(); }; @@ -179,6 +183,7 @@ export function createChatRunState(): ChatRunState { registry, buffers, deltaSentAt, + deltaLastBroadcastLen, abortedRuns, clear, }; @@ -318,6 +323,7 @@ export function createAgentEventHandler({ return; } chatRunState.deltaSentAt.set(clientRunId, now); + chatRunState.deltaLastBroadcastLen.set(clientRunId, cleaned.length); const payload = { runId: clientRunId, sessionKey, @@ -355,9 +361,10 @@ export function createAgentEventHandler({ // Flush any throttled delta so streaming clients receive the complete text // before the final event. The 150 ms throttle in emitChatDelta may have // suppressed the most recent chunk, leaving the client with stale text. + // Only flush if the buffer has grown since the last broadcast to avoid duplicates. if (text && !shouldSuppressSilent) { - const lastSent = chatRunState.deltaSentAt.get(clientRunId) ?? 0; - if (lastSent > 0) { + const lastBroadcastLen = chatRunState.deltaLastBroadcastLen.get(clientRunId) ?? 0; + if (text.length > lastBroadcastLen) { const flushPayload = { runId: clientRunId, sessionKey, @@ -373,6 +380,7 @@ export function createAgentEventHandler({ nodeSendToSession(sessionKey, "chat", flushPayload); } } + chatRunState.deltaLastBroadcastLen.delete(clientRunId); chatRunState.buffers.delete(clientRunId); chatRunState.deltaSentAt.delete(clientRunId); if (jobState === "done") {