diff --git a/src/auto-reply/reply.block-streaming.test.ts b/src/auto-reply/reply.block-streaming.test.ts index cbd41c5e025..436fe61f210 100644 --- a/src/auto-reply/reply.block-streaming.test.ts +++ b/src/auto-reply/reply.block-streaming.test.ts @@ -175,61 +175,6 @@ describe("block streaming", () => { expect(res).toBeUndefined(); expect(seen).toEqual(["first\n\nsecond"]); - let sawAbort = false; - const onBlockReplyTimeout = vi.fn((_, context) => { - return new Promise((resolve) => { - context?.abortSignal?.addEventListener( - "abort", - () => { - sawAbort = true; - resolve(); - }, - { once: true }, - ); - }); - }); - - const timeoutImpl = async (params: RunEmbeddedPiAgentParams) => { - void params.onBlockReply?.({ text: "streamed" }); - return { - payloads: [{ text: "final" }], - meta: { - durationMs: 5, - agentMeta: { sessionId: "s", provider: "p", model: "m" }, - }, - }; - }; - piEmbeddedMock.runEmbeddedPiAgent.mockImplementation(timeoutImpl); - - const timeoutReplyPromise = getReplyFromConfig( - { - Body: "ping", - From: "+1004", - To: "+2000", - MessageSid: "msg-126", - Provider: "telegram", - }, - { - onBlockReply: onBlockReplyTimeout, - blockReplyTimeoutMs: 1, - disableBlockStreaming: false, - }, - { - agents: { - defaults: { - model: "anthropic/claude-opus-4-5", - workspace: path.join(home, "openclaw"), - }, - }, - channels: { telegram: { allowFrom: ["*"] } }, - session: { store: path.join(home, "sessions.json") }, - }, - ); - - const timeoutRes = await timeoutReplyPromise; - expect(timeoutRes).toMatchObject({ text: "final" }); - expect(sawAbort).toBe(true); - const onBlockReplyStreamMode = vi.fn().mockResolvedValue(undefined); piEmbeddedMock.runEmbeddedPiAgent.mockImplementation(async () => ({ payloads: [{ text: "final" }], diff --git a/src/auto-reply/reply/agent-runner.block-streaming.test.ts b/src/auto-reply/reply/agent-runner.block-streaming.test.ts index 8e6f036a13b..fed7dc14a91 100644 --- a/src/auto-reply/reply/agent-runner.block-streaming.test.ts +++ b/src/auto-reply/reply/agent-runner.block-streaming.test.ts @@ -125,4 +125,109 @@ describe("runReplyAgent block streaming", () => { expect(onBlockReply.mock.calls[0][0].text).toBe("Hello"); expect(result).toBeUndefined(); }); + + it("returns the final payload when onBlockReply times out", async () => { + vi.useFakeTimers(); + let sawAbort = false; + + const onBlockReply = vi.fn((_payload, context) => { + return new Promise((resolve) => { + context?.abortSignal?.addEventListener( + "abort", + () => { + sawAbort = true; + resolve(); + }, + { once: true }, + ); + }); + }); + + runEmbeddedPiAgentMock.mockImplementationOnce(async (params) => { + const block = params.onBlockReply as ((payload: { text?: string }) => void) | undefined; + block?.({ text: "Chunk" }); + return { + payloads: [{ text: "Final message" }], + meta: {}, + }; + }); + + const typing = createMockTypingController(); + const sessionCtx = { + Provider: "discord", + OriginatingTo: "channel:C1", + AccountId: "primary", + MessageSid: "msg", + } as unknown as TemplateContext; + const resolvedQueue = { mode: "interrupt" } as unknown as QueueSettings; + const followupRun = { + prompt: "hello", + summaryLine: "hello", + enqueuedAt: Date.now(), + run: { + sessionId: "session", + sessionKey: "main", + messageProvider: "discord", + sessionFile: "/tmp/session.jsonl", + workspaceDir: "/tmp", + config: { + agents: { + defaults: { + blockStreamingCoalesce: { + minChars: 1, + maxChars: 200, + idleMs: 0, + }, + }, + }, + }, + skillsSnapshot: {}, + provider: "anthropic", + model: "claude", + thinkLevel: "low", + verboseLevel: "off", + elevatedLevel: "off", + bashElevated: { + enabled: false, + allowed: false, + defaultLevel: "off", + }, + timeoutMs: 1_000, + blockReplyBreak: "text_end", + }, + } as unknown as FollowupRun; + + const resultPromise = runReplyAgent({ + commandBody: "hello", + followupRun, + queueKey: "main", + resolvedQueue, + shouldSteer: false, + shouldFollowup: false, + isActive: false, + isStreaming: false, + opts: { onBlockReply, blockReplyTimeoutMs: 1 }, + typing, + sessionCtx, + defaultModel: "anthropic/claude-opus-4-5", + resolvedVerboseLevel: "off", + isNewSession: false, + blockStreamingEnabled: true, + blockReplyChunking: { + minChars: 1, + maxChars: 200, + breakPreference: "paragraph", + }, + resolvedBlockStreamingBreak: "text_end", + shouldInjectGroupIntro: false, + typingMode: "instant", + }); + + await vi.advanceTimersByTimeAsync(5); + const result = await resultPromise; + vi.useRealTimers(); + + expect(sawAbort).toBe(true); + expect(result).toMatchObject({ text: "Final message" }); + }); });