diff --git a/src/agents/pi-embedded-subscribe.reply-tags.test.ts b/src/agents/pi-embedded-subscribe.reply-tags.test.ts index a76db602291..64449f0470a 100644 --- a/src/agents/pi-embedded-subscribe.reply-tags.test.ts +++ b/src/agents/pi-embedded-subscribe.reply-tags.test.ts @@ -27,7 +27,7 @@ describe("subscribeEmbeddedPiSession reply tags", () => { return { emit, onBlockReply }; } - it("carries reply_to_current across tag-only block chunks", () => { + it("carries reply_to_current across tag-only block chunks", async () => { const { emit, onBlockReply } = createBlockReplyHarness(); emit({ type: "message_start", message: { role: "assistant" } }); @@ -39,6 +39,7 @@ describe("subscribeEmbeddedPiSession reply tags", () => { content: [{ type: "text", text: "[[reply_to_current]]\nHello" }], } as AssistantMessage; emit({ type: "message_end", message: assistantMessage }); + await Promise.resolve(); expect(onBlockReply).toHaveBeenCalledTimes(1); const payload = onBlockReply.mock.calls[0]?.[0]; @@ -47,7 +48,7 @@ describe("subscribeEmbeddedPiSession reply tags", () => { expect(payload?.replyToTag).toBe(true); }); - it("flushes trailing directive tails on stream end", () => { + it("flushes trailing directive tails on stream end", async () => { const { emit, onBlockReply } = createBlockReplyHarness(); emit({ type: "message_start", message: { role: "assistant" } }); @@ -59,6 +60,7 @@ describe("subscribeEmbeddedPiSession reply tags", () => { content: [{ type: "text", text: "Hello [[" }], } as AssistantMessage; emit({ type: "message_end", message: assistantMessage }); + await Promise.resolve(); expect(onBlockReply).toHaveBeenCalledTimes(2); expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Hello"); @@ -88,4 +90,43 @@ describe("subscribeEmbeddedPiSession reply tags", () => { expect(call[0]?.text?.includes("[[reply_to")).toBe(false); } }); + + it("rebuilds reply_to_current after a compaction retry replays a directive chunk", async () => { + const { session, emit } = createStubSessionHarness(); + const onBlockReply = vi.fn(); + + subscribeEmbeddedPiSession({ + session, + runId: "run", + onBlockReply, + blockReplyBreak: "text_end", + blockReplyChunking: { + minChars: 1, + maxChars: 50, + breakPreference: "newline", + }, + }); + + emit({ type: "message_start", message: { role: "assistant" } }); + emitAssistantTextDelta({ emit, delta: "[[reply_to_current]]" }); + emit({ type: "auto_compaction_end", willRetry: true }); + + emit({ type: "message_start", message: { role: "assistant" } }); + emitAssistantTextDelta({ emit, delta: "[[reply_to_current]]\nHello again" }); + emitAssistantTextEnd({ emit }); + emit({ + type: "message_end", + message: { + role: "assistant", + content: [{ type: "text", text: "[[reply_to_current]]\nHello again" }], + } as AssistantMessage, + }); + await Promise.resolve(); + + expect(onBlockReply).toHaveBeenCalledTimes(1); + const payload = onBlockReply.mock.calls[0]?.[0]; + expect(payload?.text).toBe("Hello again"); + expect(payload?.replyToCurrent).toBe(true); + expect(payload?.replyToTag).toBe(true); + }); }); diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.splits-long-single-line-fenced-blocks-reopen.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.splits-long-single-line-fenced-blocks-reopen.test.ts index bff7046cc80..907cc06e4bf 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.splits-long-single-line-fenced-blocks-reopen.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.splits-long-single-line-fenced-blocks-reopen.test.ts @@ -11,7 +11,7 @@ import { makeZeroUsageSnapshot } from "./usage.js"; type SessionEventHandler = (evt: unknown) => void; describe("subscribeEmbeddedPiSession", () => { - it("splits long single-line fenced blocks with reopen/close", () => { + it("splits long single-line fenced blocks with reopen/close", async () => { const onBlockReply = vi.fn(); const { emit } = createParagraphChunkedBlockReplyHarness({ onBlockReply, @@ -23,6 +23,7 @@ describe("subscribeEmbeddedPiSession", () => { const text = `\`\`\`json\n${"x".repeat(120)}\n\`\`\``; emitAssistantTextDeltaAndEnd({ emit, text }); + await Promise.resolve(); expectFencedChunks(onBlockReply.mock.calls, "```json"); }); it("waits for auto-compaction retry and clears buffered text", async () => { @@ -152,4 +153,32 @@ describe("subscribeEmbeddedPiSession", () => { const usage = (session.messages?.[0] as { usage?: unknown } | undefined)?.usage; expect(usage).toEqual(makeZeroUsageSnapshot()); }); + + it("does not cache non-block assistant text across a willRetry compaction reset", () => { + const listeners: SessionEventHandler[] = []; + const session = { + subscribe: (listener: SessionEventHandler) => { + listeners.push(listener); + return () => {}; + }, + } as unknown as Parameters[0]["session"]; + + const subscription = subscribeEmbeddedPiSession({ + session, + runId: "run-4", + }); + + const assistantMessage = { + role: "assistant", + content: [{ type: "text", text: "Repeated completion text" }], + } as AssistantMessage; + + for (const listener of listeners) { + listener({ type: "message_end", message: assistantMessage }); + listener({ type: "auto_compaction_end", willRetry: true }); + listener({ type: "message_end", message: assistantMessage }); + } + + expect(subscription.assistantTexts).toEqual(["Repeated completion text"]); + }); });