fix: prevent leaking partial think tag fragments in streams

This commit is contained in:
Ayaan Zaidi
2026-02-19 15:09:08 +05:30
parent 69420c5e60
commit f28fb61757
3 changed files with 68 additions and 7 deletions

View File

@@ -30,6 +30,9 @@ const stripTrailingDirective = (text: string): string => {
return text.slice(0, openIndex);
};
const stripTrailingPartialThinkingTag = (text: string): string =>
text.replace(/<\s*\/?\s*(?:think(?:ing)?|thought|antthinking)\s*$/i, "").trimEnd();
function emitReasoningEnd(ctx: EmbeddedPiSubscribeContext) {
if (!ctx.state.reasoningStreamOpen) {
return;
@@ -189,7 +192,7 @@ export function handleMessageUpdate(
}
const parsedDelta = visibleDelta ? ctx.consumePartialReplyDirectives(visibleDelta) : null;
const parsedFull = parseReplyDirectives(stripTrailingDirective(next));
const cleanedText = parsedFull.text;
const cleanedText = stripTrailingPartialThinkingTag(parsedFull.text);
const mediaUrls = parsedDelta?.mediaUrls;
const hasMedia = Boolean(mediaUrls && mediaUrls.length > 0);
const hasAudio = Boolean(parsedDelta?.audioAsVoice);
@@ -279,6 +282,10 @@ export function handleMessageEnd(
? extractAssistantThinking(assistantMessage) || extractThinkingFromTaggedText(rawText)
: "";
const formattedReasoning = rawThinking ? formatReasoningMessage(rawThinking) : "";
if (ctx.state.streamReasoning && rawThinking) {
// Emit final reasoning snapshot before answer finalization paths.
ctx.emitReasoningStream(rawThinking);
}
const trimmedText = text.trim();
const parsedText = trimmedText ? parseReplyDirectives(stripTrailingDirective(trimmedText)) : null;
let cleanedText = parsedText?.text ?? "";
@@ -395,9 +402,6 @@ export function handleMessageEnd(
if (!shouldEmitReasoningBeforeAnswer) {
maybeEmitReasoning();
}
if (ctx.state.streamReasoning && rawThinking) {
ctx.emitReasoningStream(rawThinking);
}
if (ctx.state.blockReplyBreak === "text_end" && onBlockReply) {
const tailResult = ctx.consumeReplyDirectives("", { final: true });

View File

@@ -298,6 +298,61 @@ describe("subscribeEmbeddedPiSession", () => {
expect(onReasoningEnd).toHaveBeenCalledTimes(1);
});
it("does not leak partial think tag fragments into reasoning or assistant streams", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const onReasoningStream = vi.fn();
const onPartialReply = vi.fn();
subscribeEmbeddedPiSession({
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
runId: "run",
reasoningMode: "stream",
onReasoningStream,
onPartialReply,
});
handler?.({ type: "message_start", message: { role: "assistant" } });
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_delta", delta: "<think>step one" },
});
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_delta", delta: " and two</think" },
});
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_delta", delta: ">\nfinal answer" },
});
const reasoningTexts = onReasoningStream.mock.calls
.map((call) => call[0]?.text)
.filter((value): value is string => typeof value === "string");
const partialTexts = onPartialReply.mock.calls
.map((call) => call[0]?.text)
.filter((value): value is string => typeof value === "string");
expect(reasoningTexts.at(-1)).toContain("Reasoning:\n_step one and two_");
for (const text of reasoningTexts) {
expect(text).not.toContain("</think");
expect(text).not.toContain("<think");
}
for (const text of partialTexts) {
expect(text).not.toContain("</think");
expect(text).not.toContain("<think");
}
});
it("emits delta chunks in agent events for streaming assistant text", () => {
const { emit, onAgentEvent } = createAgentEventHarness();

View File

@@ -394,9 +394,11 @@ export function extractThinkingFromTaggedStream(text: string): string {
if (!text) {
return "";
}
const stripTrailingPartialTag = (value: string) =>
value.replace(/<\s*\/?\s*(?:think(?:ing)?|thought|antthinking)\s*$/i, "").trimEnd();
const closed = extractThinkingFromTaggedText(text);
if (closed) {
return closed;
return stripTrailingPartialTag(closed);
}
const openRe = /<\s*(?:think(?:ing)?|thought|antthinking)\s*>/gi;
@@ -409,10 +411,10 @@ export function extractThinkingFromTaggedStream(text: string): string {
const lastOpen = openMatches[openMatches.length - 1];
const lastClose = closeMatches[closeMatches.length - 1];
if (lastClose && (lastClose.index ?? -1) > (lastOpen.index ?? -1)) {
return closed;
return stripTrailingPartialTag(closed);
}
const start = (lastOpen.index ?? 0) + lastOpen[0].length;
return text.slice(start).trim();
return stripTrailingPartialTag(text.slice(start).trim());
}
export function inferToolMetaFromArgs(toolName: string, args: unknown): string | undefined {