From d117e59ed953344e8c0a0e60de65147e29be114a Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Thu, 19 Feb 2026 15:47:59 +0530 Subject: [PATCH] fix(telegram): scope stream fix to telegram lane handling --- ...pi-embedded-subscribe.handlers.messages.ts | 10 ++-- ...ion.subscribeembeddedpisession.e2e.test.ts | 55 ------------------- src/agents/pi-embedded-utils.e2e.test.ts | 15 ----- src/agents/pi-embedded-utils.ts | 24 +------- .../reply/agent-runner-execution.ts | 2 +- 5 files changed, 8 insertions(+), 98 deletions(-) diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.ts b/src/agents/pi-embedded-subscribe.handlers.messages.ts index d9cd44e7a7f..9aa445a1ab6 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.ts @@ -16,7 +16,6 @@ import { extractThinkingFromTaggedText, formatReasoningMessage, promoteThinkingTagsToBlocks, - stripTrailingPartialThinkingTagFragment, } from "./pi-embedded-utils.js"; const stripTrailingDirective = (text: string): string => { @@ -190,7 +189,7 @@ export function handleMessageUpdate( } const parsedDelta = visibleDelta ? ctx.consumePartialReplyDirectives(visibleDelta) : null; const parsedFull = parseReplyDirectives(stripTrailingDirective(next)); - const cleanedText = stripTrailingPartialThinkingTagFragment(parsedFull.text); + const cleanedText = parsedFull.text; const mediaUrls = parsedDelta?.mediaUrls; const hasMedia = Boolean(mediaUrls && mediaUrls.length > 0); const hasAudio = Boolean(parsedDelta?.audioAsVoice); @@ -280,10 +279,6 @@ export function handleMessageEnd( ? extractAssistantThinking(assistantMessage) || extractThinkingFromTaggedText(rawText) : ""; const formattedReasoning = rawThinking ? formatReasoningMessage(rawThinking) : ""; - if (ctx.state.streamReasoning && rawThinking) { - // Emit final reasoning snapshot before answer finalization paths. - ctx.emitReasoningStream(rawThinking); - } const trimmedText = text.trim(); const parsedText = trimmedText ? parseReplyDirectives(stripTrailingDirective(trimmedText)) : null; let cleanedText = parsedText?.text ?? ""; @@ -400,6 +395,9 @@ export function handleMessageEnd( if (!shouldEmitReasoningBeforeAnswer) { maybeEmitReasoning(); } + if (ctx.state.streamReasoning && rawThinking) { + ctx.emitReasoningStream(rawThinking); + } if (ctx.state.blockReplyBreak === "text_end" && onBlockReply) { const tailResult = ctx.consumeReplyDirectives("", { final: true }); diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.e2e.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.e2e.test.ts index 4e148a0d7d1..a048ab2d6e0 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.e2e.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.e2e.test.ts @@ -298,61 +298,6 @@ describe("subscribeEmbeddedPiSession", () => { expect(onReasoningEnd).toHaveBeenCalledTimes(1); }); - it("does not leak partial think tag fragments into reasoning or assistant streams", () => { - let handler: ((evt: unknown) => void) | undefined; - const session: StubSession = { - subscribe: (fn) => { - handler = fn; - return () => {}; - }, - }; - - const onReasoningStream = vi.fn(); - const onPartialReply = vi.fn(); - - subscribeEmbeddedPiSession({ - session: session as unknown as Parameters[0]["session"], - runId: "run", - reasoningMode: "stream", - onReasoningStream, - onPartialReply, - }); - - handler?.({ type: "message_start", message: { role: "assistant" } }); - handler?.({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { type: "text_delta", delta: "step one" }, - }); - handler?.({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { type: "text_delta", delta: " and two\nfinal answer" }, - }); - - const reasoningTexts = onReasoningStream.mock.calls - .map((call) => call[0]?.text) - .filter((value): value is string => typeof value === "string"); - const partialTexts = onPartialReply.mock.calls - .map((call) => call[0]?.text) - .filter((value): value is string => typeof value === "string"); - - expect(reasoningTexts.at(-1)).toContain("Reasoning:\n_step one and two_"); - for (const text of reasoningTexts) { - expect(text).not.toContain(" { const { emit, onAgentEvent } = createAgentEventHarness(); diff --git a/src/agents/pi-embedded-utils.e2e.test.ts b/src/agents/pi-embedded-utils.e2e.test.ts index ee13e527c2a..ecb8dace5a1 100644 --- a/src/agents/pi-embedded-utils.e2e.test.ts +++ b/src/agents/pi-embedded-utils.e2e.test.ts @@ -2,9 +2,7 @@ import type { AssistantMessage } from "@mariozechner/pi-ai"; import { describe, expect, it } from "vitest"; import { extractAssistantText, - extractThinkingFromTaggedStream, formatReasoningMessage, - stripTrailingPartialThinkingTagFragment, stripDowngradedToolCallText, } from "./pi-embedded-utils.js"; @@ -605,19 +603,6 @@ describe("formatReasoningMessage", () => { }); }); -describe("thinking tag fragment handling", () => { - it("strips dangling closing tag fragments from tagged stream extraction", () => { - expect(extractThinkingFromTaggedStream("step one and twostep one and two { - expect(stripTrailingPartialThinkingTagFragment("Reasoning line { it("strips [Historical context: ...] blocks", () => { const text = `[Historical context: a different model called tool "exec" with arguments {"command":"git status"}]`; diff --git a/src/agents/pi-embedded-utils.ts b/src/agents/pi-embedded-utils.ts index 2bae7aa5ac7..82ad3efc03d 100644 --- a/src/agents/pi-embedded-utils.ts +++ b/src/agents/pi-embedded-utils.ts @@ -390,31 +390,13 @@ export function extractThinkingFromTaggedText(text: string): string { return result.trim(); } -export function stripTrailingPartialThinkingTagFragment(text: string): string { - if (!text) { - return text; - } - const match = text.match(/<\s*\/?\s*([a-z]*)\s*$/i); - if (!match || typeof match.index !== "number") { - return text; - } - const prefix = (match[1] ?? "").toLowerCase(); - const targets = ["think", "thinking", "thought", "antthinking"]; - const isThinkingPrefix = - prefix.length === 0 || targets.some((target) => target.startsWith(prefix)); - if (!isThinkingPrefix) { - return text; - } - return text.slice(0, match.index).trimEnd(); -} - export function extractThinkingFromTaggedStream(text: string): string { if (!text) { return ""; } const closed = extractThinkingFromTaggedText(text); if (closed) { - return stripTrailingPartialThinkingTagFragment(closed); + return closed; } const openRe = /<\s*(?:think(?:ing)?|thought|antthinking)\s*>/gi; @@ -427,10 +409,10 @@ export function extractThinkingFromTaggedStream(text: string): string { const lastOpen = openMatches[openMatches.length - 1]; const lastClose = closeMatches[closeMatches.length - 1]; if (lastClose && (lastClose.index ?? -1) > (lastOpen.index ?? -1)) { - return stripTrailingPartialThinkingTagFragment(closed); + return closed; } const start = (lastOpen.index ?? 0) + lastOpen[0].length; - return stripTrailingPartialThinkingTagFragment(text.slice(start).trim()); + return text.slice(start).trim(); } export function inferToolMetaFromArgs(toolName: string, args: unknown): string | undefined { diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index 3fdafd821bd..4becf72c780 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -321,11 +321,11 @@ export async function runAgentTurnWithFallback(params: { onReasoningStream: params.typingSignals.shouldStartOnReasoning || params.opts?.onReasoningStream ? async (payload) => { + await params.typingSignals.signalReasoningDelta(); await params.opts?.onReasoningStream?.({ text: payload.text, mediaUrls: payload.mediaUrls, }); - await params.typingSignals.signalReasoningDelta(); } : undefined, onReasoningEnd: params.opts?.onReasoningEnd,