From f28fb61757bdb0c6a514c7d92c98fa9de9259e44 Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Thu, 19 Feb 2026 15:09:08 +0530 Subject: [PATCH] fix: prevent leaking partial think tag fragments in streams --- ...pi-embedded-subscribe.handlers.messages.ts | 12 ++-- ...ion.subscribeembeddedpisession.e2e.test.ts | 55 +++++++++++++++++++ src/agents/pi-embedded-utils.ts | 8 ++- 3 files changed, 68 insertions(+), 7 deletions(-) diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.ts b/src/agents/pi-embedded-subscribe.handlers.messages.ts index 9aa445a1ab6..c08ac557bde 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.ts @@ -30,6 +30,9 @@ const stripTrailingDirective = (text: string): string => { return text.slice(0, openIndex); }; +const stripTrailingPartialThinkingTag = (text: string): string => + text.replace(/<\s*\/?\s*(?:think(?:ing)?|thought|antthinking)\s*$/i, "").trimEnd(); + function emitReasoningEnd(ctx: EmbeddedPiSubscribeContext) { if (!ctx.state.reasoningStreamOpen) { return; @@ -189,7 +192,7 @@ export function handleMessageUpdate( } const parsedDelta = visibleDelta ? ctx.consumePartialReplyDirectives(visibleDelta) : null; const parsedFull = parseReplyDirectives(stripTrailingDirective(next)); - const cleanedText = parsedFull.text; + const cleanedText = stripTrailingPartialThinkingTag(parsedFull.text); const mediaUrls = parsedDelta?.mediaUrls; const hasMedia = Boolean(mediaUrls && mediaUrls.length > 0); const hasAudio = Boolean(parsedDelta?.audioAsVoice); @@ -279,6 +282,10 @@ export function handleMessageEnd( ? extractAssistantThinking(assistantMessage) || extractThinkingFromTaggedText(rawText) : ""; const formattedReasoning = rawThinking ? formatReasoningMessage(rawThinking) : ""; + if (ctx.state.streamReasoning && rawThinking) { + // Emit final reasoning snapshot before answer finalization paths. + ctx.emitReasoningStream(rawThinking); + } const trimmedText = text.trim(); const parsedText = trimmedText ? parseReplyDirectives(stripTrailingDirective(trimmedText)) : null; let cleanedText = parsedText?.text ?? ""; @@ -395,9 +402,6 @@ export function handleMessageEnd( if (!shouldEmitReasoningBeforeAnswer) { maybeEmitReasoning(); } - if (ctx.state.streamReasoning && rawThinking) { - ctx.emitReasoningStream(rawThinking); - } if (ctx.state.blockReplyBreak === "text_end" && onBlockReply) { const tailResult = ctx.consumeReplyDirectives("", { final: true }); diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.e2e.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.e2e.test.ts index a048ab2d6e0..4e148a0d7d1 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.e2e.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.e2e.test.ts @@ -298,6 +298,61 @@ describe("subscribeEmbeddedPiSession", () => { expect(onReasoningEnd).toHaveBeenCalledTimes(1); }); + it("does not leak partial think tag fragments into reasoning or assistant streams", () => { + let handler: ((evt: unknown) => void) | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const onReasoningStream = vi.fn(); + const onPartialReply = vi.fn(); + + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters[0]["session"], + runId: "run", + reasoningMode: "stream", + onReasoningStream, + onPartialReply, + }); + + handler?.({ type: "message_start", message: { role: "assistant" } }); + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { type: "text_delta", delta: "step one" }, + }); + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { type: "text_delta", delta: " and two\nfinal answer" }, + }); + + const reasoningTexts = onReasoningStream.mock.calls + .map((call) => call[0]?.text) + .filter((value): value is string => typeof value === "string"); + const partialTexts = onPartialReply.mock.calls + .map((call) => call[0]?.text) + .filter((value): value is string => typeof value === "string"); + + expect(reasoningTexts.at(-1)).toContain("Reasoning:\n_step one and two_"); + for (const text of reasoningTexts) { + expect(text).not.toContain(" { const { emit, onAgentEvent } = createAgentEventHarness(); diff --git a/src/agents/pi-embedded-utils.ts b/src/agents/pi-embedded-utils.ts index 82ad3efc03d..2f38cecb697 100644 --- a/src/agents/pi-embedded-utils.ts +++ b/src/agents/pi-embedded-utils.ts @@ -394,9 +394,11 @@ export function extractThinkingFromTaggedStream(text: string): string { if (!text) { return ""; } + const stripTrailingPartialTag = (value: string) => + value.replace(/<\s*\/?\s*(?:think(?:ing)?|thought|antthinking)\s*$/i, "").trimEnd(); const closed = extractThinkingFromTaggedText(text); if (closed) { - return closed; + return stripTrailingPartialTag(closed); } const openRe = /<\s*(?:think(?:ing)?|thought|antthinking)\s*>/gi; @@ -409,10 +411,10 @@ export function extractThinkingFromTaggedStream(text: string): string { const lastOpen = openMatches[openMatches.length - 1]; const lastClose = closeMatches[closeMatches.length - 1]; if (lastClose && (lastClose.index ?? -1) > (lastOpen.index ?? -1)) { - return closed; + return stripTrailingPartialTag(closed); } const start = (lastOpen.index ?? 0) + lastOpen[0].length; - return text.slice(start).trim(); + return stripTrailingPartialTag(text.slice(start).trim()); } export function inferToolMetaFromArgs(toolName: string, args: unknown): string | undefined {