diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.ts b/src/agents/pi-embedded-subscribe.handlers.messages.ts index 9aa445a1ab6..b6b780c9b52 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.ts @@ -165,9 +165,15 @@ export function handleMessageUpdate( } } + const streamedReasoning = extractThinkingFromTaggedStream(ctx.state.deltaBuffer); + if (streamedReasoning) { + // Keep the latest observed reasoning so reasoning=on can still emit a separate + // reasoning block when providers omit thinking blocks in final message content. + ctx.state.lastObservedReasoning = streamedReasoning; + } if (ctx.state.streamReasoning) { // Handle partial tags: stream whatever reasoning is visible so far. - ctx.emitReasoningStream(extractThinkingFromTaggedStream(ctx.state.deltaBuffer)); + ctx.emitReasoningStream(streamedReasoning); } const next = ctx @@ -274,9 +280,13 @@ export function handleMessageEnd( text: ctx.stripBlockTags(rawText, { thinking: false, final: false }), messagingToolSentTexts: ctx.state.messagingToolSentTexts, }); + const streamedReasoningFallback = + ctx.state.lastObservedReasoning || extractThinkingFromTaggedStream(ctx.state.deltaBuffer); const rawThinking = ctx.state.includeReasoning || ctx.state.streamReasoning - ? extractAssistantThinking(assistantMessage) || extractThinkingFromTaggedText(rawText) + ? extractAssistantThinking(assistantMessage) || + streamedReasoningFallback || + extractThinkingFromTaggedText(rawText) : ""; const formattedReasoning = rawThinking ? formatReasoningMessage(rawThinking) : ""; const trimmedText = text.trim(); diff --git a/src/agents/pi-embedded-subscribe.handlers.types.ts b/src/agents/pi-embedded-subscribe.handlers.types.ts index d5c725528c8..948c4348a09 100644 --- a/src/agents/pi-embedded-subscribe.handlers.types.ts +++ b/src/agents/pi-embedded-subscribe.handlers.types.ts @@ -51,6 +51,7 @@ export type EmbeddedPiSubscribeState = { lastStreamedAssistantCleaned?: string; emittedAssistantUpdate: boolean; lastStreamedReasoning?: string; + lastObservedReasoning?: string; lastBlockReplyText?: string; reasoningStreamOpen: boolean; assistantMessageIndex: number; diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.emits-reasoning-as-separate-message-enabled.e2e.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.emits-reasoning-as-separate-message-enabled.e2e.test.ts index 98b4ce09237..767c344aab8 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.emits-reasoning-as-separate-message-enabled.e2e.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.emits-reasoning-as-separate-message-enabled.e2e.test.ts @@ -68,4 +68,36 @@ describe("subscribeEmbeddedPiSession", () => { ]); }, ); + + it.each(THINKING_TAG_CASES)( + "falls back to streamed <%s> reasoning when message_end has no thinking block", + ({ open, close }) => { + const { emit, onBlockReply } = createReasoningBlockReplyHarness(); + + emit({ type: "message_start", message: { role: "assistant" } }); + emit({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { type: "text_delta", delta: `${open}Because` }, + }); + emit({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_delta", + delta: ` it helps${close}\n\nFinal answer`, + }, + }); + + emit({ + type: "message_end", + message: { + role: "assistant", + content: [{ type: "text", text: "Final answer" }], + } as AssistantMessage, + }); + + expectReasoningAndAnswerCalls(onBlockReply); + }, + ); }); diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 8e7a7fec295..1d9916b2be6 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -54,6 +54,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar lastStreamedAssistantCleaned: undefined, emittedAssistantUpdate: false, lastStreamedReasoning: undefined, + lastObservedReasoning: undefined, lastBlockReplyText: undefined, reasoningStreamOpen: false, assistantMessageIndex: 0, @@ -117,6 +118,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar state.emittedAssistantUpdate = false; state.lastBlockReplyText = undefined; state.lastStreamedReasoning = undefined; + state.lastObservedReasoning = undefined; state.lastReasoningSent = undefined; state.reasoningStreamOpen = false; state.suppressBlockChunks = false; diff --git a/src/telegram/bot-message-dispatch.test.ts b/src/telegram/bot-message-dispatch.test.ts index d0369e3635f..c9f04cbd153 100644 --- a/src/telegram/bot-message-dispatch.test.ts +++ b/src/telegram/bot-message-dispatch.test.ts @@ -7,6 +7,8 @@ const createTelegramDraftStream = vi.hoisted(() => vi.fn()); const dispatchReplyWithBufferedBlockDispatcher = vi.hoisted(() => vi.fn()); const deliverReplies = vi.hoisted(() => vi.fn()); const editMessageTelegram = vi.hoisted(() => vi.fn()); +const loadSessionStore = vi.hoisted(() => vi.fn()); +const resolveStorePath = vi.hoisted(() => vi.fn(() => "/tmp/sessions.json")); vi.mock("./draft-stream.js", () => ({ createTelegramDraftStream, @@ -24,6 +26,11 @@ vi.mock("./send.js", () => ({ editMessageTelegram, })); +vi.mock("../config/sessions.js", async () => ({ + loadSessionStore, + resolveStorePath, +})); + vi.mock("./sticker-cache.js", () => ({ cacheSticker: vi.fn(), describeStickerImage: vi.fn(), @@ -39,6 +46,10 @@ describe("dispatchTelegramMessage draft streaming", () => { dispatchReplyWithBufferedBlockDispatcher.mockReset(); deliverReplies.mockReset(); editMessageTelegram.mockReset(); + loadSessionStore.mockReset(); + resolveStorePath.mockReset(); + resolveStorePath.mockReturnValue("/tmp/sessions.json"); + loadSessionStore.mockReturnValue({}); }); function createDraftStream(messageId?: number) { @@ -226,6 +237,66 @@ describe("dispatchTelegramMessage draft streaming", () => { ); }); + it("keeps block streaming enabled when session reasoning level is on", async () => { + loadSessionStore.mockReturnValue({ + s1: { reasoningLevel: "on" }, + }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { + await dispatcherOptions.deliver({ text: "Reasoning:\n_step_" }, { kind: "block" }); + await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" }); + return { queuedFinal: true }; + }); + deliverReplies.mockResolvedValue({ delivered: true }); + + await dispatchWithContext({ + context: createContext({ + ctxPayload: { SessionKey: "s1" } as unknown as TelegramMessageContext["ctxPayload"], + }), + }); + + expect(createTelegramDraftStream).not.toHaveBeenCalled(); + expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledWith( + expect.objectContaining({ + replyOptions: expect.objectContaining({ + disableBlockStreaming: false, + }), + }), + ); + expect(loadSessionStore).toHaveBeenCalledWith("/tmp/sessions.json", { skipCache: true }); + expect(deliverReplies).toHaveBeenCalledWith( + expect.objectContaining({ + replies: [expect.objectContaining({ text: "Reasoning:\n_step_" })], + }), + ); + }); + + it("streams reasoning draft updates even when answer stream mode is off", async () => { + loadSessionStore.mockReturnValue({ + s1: { reasoningLevel: "stream" }, + }); + const reasoningDraftStream = createDraftStream(111); + createTelegramDraftStream.mockImplementationOnce(() => reasoningDraftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_step_" }); + await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + + await dispatchWithContext({ + context: createContext({ + ctxPayload: { SessionKey: "s1" } as unknown as TelegramMessageContext["ctxPayload"], + }), + streamMode: "off", + }); + + expect(createTelegramDraftStream).toHaveBeenCalledTimes(1); + expect(reasoningDraftStream.update).toHaveBeenCalledWith("Reasoning:\n_step_"); + expect(loadSessionStore).toHaveBeenCalledWith("/tmp/sessions.json", { skipCache: true }); + }); + it("finalizes text-only replies by editing the preview message in place", async () => { const draftStream = createDraftStream(999); createTelegramDraftStream.mockReturnValue(draftStream); @@ -730,6 +801,141 @@ describe("dispatchTelegramMessage draft streaming", () => { ); }); + it("routes think-tag partials to reasoning lane and keeps answer lane clean", async () => { + const { answerDraftStream, reasoningDraftStream } = setupDraftStreams({ + answerMessageId: 999, + reasoningMessageId: 111, + }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ + text: "Counting letters in strawberry3", + }); + await dispatcherOptions.deliver({ text: "3" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + expect(reasoningDraftStream.update).toHaveBeenCalledWith( + "Reasoning:\n_Counting letters in strawberry_", + ); + expect(answerDraftStream.update).toHaveBeenCalledWith("3"); + expect( + answerDraftStream.update.mock.calls.some((call) => String(call[0] ?? "").includes("")), + ).toBe(false); + expect(editMessageTelegram).toHaveBeenCalledWith(123, 999, "3", expect.any(Object)); + }); + + it("routes unmatched think partials to reasoning lane without leaking answer lane", async () => { + const { answerDraftStream, reasoningDraftStream } = setupDraftStreams({ + answerMessageId: 999, + reasoningMessageId: 111, + }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ + text: "Counting letters in strawberry", + }); + await dispatcherOptions.deliver( + { text: "There are 3 r's in strawberry." }, + { kind: "final" }, + ); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + expect(reasoningDraftStream.update).toHaveBeenCalledWith( + "Reasoning:\n_Counting letters in strawberry_", + ); + expect( + answerDraftStream.update.mock.calls.some((call) => String(call[0] ?? "").includes("<")), + ).toBe(false); + expect(editMessageTelegram).toHaveBeenCalledWith( + 123, + 999, + "There are 3 r's in strawberry.", + expect.any(Object), + ); + }); + + it("keeps reasoning preview message when reasoning is streamed but final is answer-only", async () => { + const { reasoningDraftStream } = setupDraftStreams({ + answerMessageId: 999, + reasoningMessageId: 111, + }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ + text: "Word: strawberry. r appears at 3, 8, 9.", + }); + await dispatcherOptions.deliver( + { text: "There are 3 r's in strawberry." }, + { kind: "final" }, + ); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + expect(reasoningDraftStream.update).toHaveBeenCalledWith( + "Reasoning:\n_Word: strawberry. r appears at 3, 8, 9._", + ); + expect(reasoningDraftStream.clear).not.toHaveBeenCalled(); + expect(editMessageTelegram).toHaveBeenCalledWith( + 123, + 999, + "There are 3 r's in strawberry.", + expect.any(Object), + ); + }); + + it("splits think-tag final payload into reasoning and answer lanes", async () => { + setupDraftStreams({ + answerMessageId: 999, + reasoningMessageId: 111, + }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { + await dispatcherOptions.deliver( + { + text: "Word: strawberry. r appears at 3, 8, 9.There are 3 r's in strawberry.", + }, + { kind: "final" }, + ); + return { queuedFinal: true }; + }); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + expect(editMessageTelegram).toHaveBeenNthCalledWith( + 1, + 123, + 111, + "Reasoning:\n_Word: strawberry. r appears at 3, 8, 9._", + expect.any(Object), + ); + expect(editMessageTelegram).toHaveBeenNthCalledWith( + 2, + 123, + 999, + "There are 3 r's in strawberry.", + expect.any(Object), + ); + expect(deliverReplies).not.toHaveBeenCalled(); + }); + it("splits reasoning preview only when next reasoning block starts in partial mode", async () => { const { reasoningDraftStream } = setupDraftStreams({ answerMessageId: 999, diff --git a/src/telegram/bot-message-dispatch.ts b/src/telegram/bot-message-dispatch.ts index cecfd9795e4..79e9109e050 100644 --- a/src/telegram/bot-message-dispatch.ts +++ b/src/telegram/bot-message-dispatch.ts @@ -10,11 +10,13 @@ import { EmbeddedBlockChunker } from "../agents/pi-embedded-block-chunker.js"; import { resolveChunkMode } from "../auto-reply/chunk.js"; import { clearHistoryEntriesIfEnabled } from "../auto-reply/reply/history.js"; import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.js"; +import type { ReplyPayload } from "../auto-reply/types.js"; import { removeAckReactionAfterReply } from "../channels/ack-reactions.js"; import { logAckFailure, logTypingFailure } from "../channels/logging.js"; import { createReplyPrefixOptions } from "../channels/reply-prefix.js"; import { createTypingCallbacks } from "../channels/typing.js"; import { resolveMarkdownTableMode } from "../config/markdown-tables.js"; +import { loadSessionStore, resolveStorePath } from "../config/sessions.js"; import type { OpenClawConfig, ReplyToMode, TelegramAccountConfig } from "../config/types.js"; import { danger, logVerbose } from "../globals.js"; import { getAgentScopedMediaLocalRoots } from "../media/local-roots.js"; @@ -27,6 +29,10 @@ import type { TelegramInlineButtons } from "./button-types.js"; import { resolveTelegramDraftStreamingChunking } from "./draft-chunking.js"; import { createTelegramDraftStream } from "./draft-stream.js"; import { renderTelegramHtmlText } from "./format.js"; +import { + createTelegramReasoningStepState, + splitTelegramReasoningText, +} from "./reasoning-lane-coordinator.js"; import { editMessageTelegram } from "./send.js"; import { cacheSticker, describeStickerImage } from "./sticker-cache.js"; @@ -34,17 +40,6 @@ const EMPTY_RESPONSE_FALLBACK = "No response generated. Please try again."; /** Minimum chars before sending first streaming message (improves push notification UX) */ const DRAFT_MIN_INITIAL_CHARS = 30; -const REASONING_MESSAGE_PREFIX = "Reasoning:\n"; - -function isReasoningMessage(text?: string): boolean { - if (typeof text !== "string") { - return false; - } - const trimmed = text.trim(); - return ( - trimmed.startsWith(REASONING_MESSAGE_PREFIX) && trimmed.length > REASONING_MESSAGE_PREFIX.length - ); -} async function resolveStickerVisionSupport(cfg: OpenClawConfig, agentId: string) { try { @@ -72,6 +67,31 @@ type DispatchTelegramMessageParams = { opts: Pick; }; +type TelegramReasoningLevel = "off" | "on" | "stream"; + +function resolveTelegramReasoningLevel(params: { + cfg: OpenClawConfig; + sessionKey?: string; + agentId: string; +}): TelegramReasoningLevel { + const { cfg, sessionKey, agentId } = params; + if (!sessionKey) { + return "off"; + } + try { + const storePath = resolveStorePath(cfg.session?.store, { agentId }); + const store = loadSessionStore(storePath, { skipCache: true }); + const entry = store[sessionKey.toLowerCase()] ?? store[sessionKey]; + const level = entry?.reasoningLevel; + if (level === "on" || level === "stream") { + return level; + } + } catch { + // Fall through to default. + } + return "off"; +} + export const dispatchTelegramMessage = async ({ context, bot, @@ -115,11 +135,21 @@ export const dispatchTelegramMessage = async ({ typeof telegramCfg.blockStreaming === "boolean" ? telegramCfg.blockStreaming : cfg.agents?.defaults?.blockStreamingDefault === "on"; - const canStreamDraft = streamMode !== "off" && !accountBlockStreamingEnabled; + const resolvedReasoningLevel = resolveTelegramReasoningLevel({ + cfg, + sessionKey: ctxPayload.SessionKey, + agentId: route.agentId, + }); + const forceBlockStreamingForReasoning = resolvedReasoningLevel === "on"; + const streamReasoningDraft = resolvedReasoningLevel === "stream"; + const canStreamAnswerDraft = + streamMode !== "off" && !accountBlockStreamingEnabled && !forceBlockStreamingForReasoning; + const canStreamReasoningDraft = canStreamAnswerDraft || streamReasoningDraft; const draftReplyToMessageId = replyToMode !== "off" && typeof msg.message_id === "number" ? msg.message_id : undefined; - const draftMinInitialChars = streamMode === "partial" ? 1 : DRAFT_MIN_INITIAL_CHARS; - const answerDraftStream = canStreamDraft + const draftMinInitialChars = + streamMode === "partial" || streamReasoningDraft ? 1 : DRAFT_MIN_INITIAL_CHARS; + const answerDraftStream = canStreamAnswerDraft ? createTelegramDraftStream({ api: bot.api, chatId, @@ -132,7 +162,7 @@ export const dispatchTelegramMessage = async ({ warn: logVerbose, }) : undefined; - const reasoningDraftStream = canStreamDraft + const reasoningDraftStream = canStreamReasoningDraft ? createTelegramDraftStream({ api: bot.api, chatId, @@ -182,6 +212,7 @@ export const dispatchTelegramMessage = async ({ chunker: reasoningDraftChunker, }; let splitReasoningOnNextStream = false; + const reasoningStepState = createTelegramReasoningStepState(); const resetDraftLaneState = (lane: DraftLaneState) => { lane.lastPartialText = ""; lane.draftText = ""; @@ -239,6 +270,20 @@ export const dispatchTelegramMessage = async ({ }, }); }; + const updateDraftLanesFromPartial = (text: string | undefined) => { + if (!text) { + return; + } + const split = splitTelegramReasoningText(text); + if (split.reasoningText) { + reasoningStepState.noteReasoningHint(); + reasoningStepState.noteReasoningDelivered(); + updateDraftFromPartial(reasoningLane, split.reasoningText); + } + if (split.answerText) { + updateDraftFromPartial(answerLane, split.answerText); + } + }; const flushDraftLane = async (lane: DraftLaneState) => { if (!lane.stream) { return; @@ -258,10 +303,11 @@ export const dispatchTelegramMessage = async ({ await lane.stream.flush(); }; - const disableBlockStreaming = - typeof telegramCfg.blockStreaming === "boolean" + const disableBlockStreaming = forceBlockStreamingForReasoning + ? false + : typeof telegramCfg.blockStreaming === "boolean" ? !telegramCfg.blockStreaming - : answerDraftStream || streamMode === "off" + : canStreamAnswerDraft || streamMode === "off" ? true : undefined; @@ -336,6 +382,7 @@ export const dispatchTelegramMessage = async ({ skippedNonSilent: 0, }; let finalizedViaPreviewMessage = false; + let finalizedReasoningViaPreviewMessage = false; const clearGroupHistory = () => { if (isGroup && historyKey) { clearHistoryEntriesIfEnabled({ historyMap: groupHistories, historyKey, limit: historyLimit }); @@ -442,6 +489,51 @@ export const dispatchTelegramMessage = async ({ return false; } }; + const applyTextToPayload = (payload: ReplyPayload, text: string): ReplyPayload => { + if (payload.text === text) { + return payload; + } + return { ...payload, text }; + }; + const sendPayload = async (payload: ReplyPayload) => { + const result = await deliverReplies({ + ...deliveryBaseOptions, + replies: [payload], + onVoiceRecording: sendRecordVoice, + }); + if (result.delivered) { + deliveryState.delivered = true; + } + return result.delivered; + }; + const tryFinalizeLaneText = async (params: { + lane: DraftLaneState; + laneName: "answer" | "reasoning"; + text: string; + previewButtons?: TelegramInlineButtons; + alreadyFinalized?: boolean; + payload: ReplyPayload; + }): Promise => { + const { lane, laneName, text, previewButtons, alreadyFinalized, payload } = params; + const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; + const canFinalizeViaPreviewEdit = + !hasMedia && text.length > 0 && text.length <= draftMaxChars && !payload.isError; + if (!canFinalizeViaPreviewEdit || alreadyFinalized) { + if (!hasMedia && !payload.isError && text.length > draftMaxChars) { + logVerbose( + `telegram: preview final too long for edit (${text.length} > ${draftMaxChars}); falling back to standard send`, + ); + } + return false; + } + await flushDraftLane(lane); + return tryFinalizePreviewForLane({ + lane, + laneName, + finalText: text, + previewButtons, + }); + }; let queuedFinal = false; try { @@ -451,75 +543,147 @@ export const dispatchTelegramMessage = async ({ dispatcherOptions: { ...prefixOptions, deliver: async (payload, info) => { - const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; - const finalText = payload.text; - const reasoningMessage = isReasoningMessage(finalText); const previewButtons = ( payload.channelData?.telegram as { buttons?: TelegramInlineButtons } | undefined )?.buttons; - const canFinalizeViaPreviewEdit = - !hasMedia && - typeof finalText === "string" && - finalText.length > 0 && - finalText.length <= draftMaxChars && - !payload.isError; - if (info.kind === "final") { - await flushDraftLane(answerLane); - await flushDraftLane(reasoningLane); - if (canFinalizeViaPreviewEdit && reasoningMessage) { - const finalizedReasoning = await tryFinalizePreviewForLane({ - lane: reasoningLane, - laneName: "reasoning", - finalText, - previewButtons, - }); - if (finalizedReasoning) { - return; - } - } - if (canFinalizeViaPreviewEdit && !reasoningMessage && !finalizedViaPreviewMessage) { - const finalizedAnswer = await tryFinalizePreviewForLane({ - lane: answerLane, - laneName: "answer", - finalText, - previewButtons, - }); - if (finalizedAnswer) { - finalizedViaPreviewMessage = true; - return; - } - } - if ( - !hasMedia && - !payload.isError && - typeof finalText === "string" && - finalText.length > draftMaxChars - ) { - logVerbose( - `telegram: preview final too long for edit (${finalText.length} > ${draftMaxChars}); falling back to standard send`, - ); - } - await answerLane.stream?.stop(); - await reasoningLane.stream?.stop(); - } - if (info.kind !== "final" && canFinalizeViaPreviewEdit && reasoningMessage) { - const updatedReasoning = await tryEditExistingPreviewForLane({ - lane: reasoningLane, - laneName: "reasoning", - finalText, - previewButtons, - }); - if (updatedReasoning) { + const split = splitTelegramReasoningText(payload.text); + const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; + + const flushBufferedFinalAnswer = async () => { + const buffered = reasoningStepState.takeBufferedFinalAnswer(); + if (!buffered) { return; } + const bufferedButtons = ( + buffered.payload.channelData?.telegram as + | { buttons?: TelegramInlineButtons } + | undefined + )?.buttons; + const finalizedBufferedAnswer = await tryFinalizeLaneText({ + lane: answerLane, + laneName: "answer", + text: buffered.text, + previewButtons: bufferedButtons, + alreadyFinalized: finalizedViaPreviewMessage, + payload: buffered.payload, + }); + if (finalizedBufferedAnswer) { + finalizedViaPreviewMessage = true; + reasoningStepState.resetForNextStep(); + return; + } + await answerLane.stream?.stop(); + await sendPayload(applyTextToPayload(buffered.payload, buffered.text)); + reasoningStepState.resetForNextStep(); + }; + + const deliverReasoningText = async (text: string) => { + reasoningStepState.noteReasoningHint(); + if (info.kind === "final") { + const finalizedReasoning = await tryFinalizeLaneText({ + lane: reasoningLane, + laneName: "reasoning", + text, + previewButtons, + payload, + }); + if (finalizedReasoning) { + finalizedReasoningViaPreviewMessage = true; + reasoningStepState.noteReasoningDelivered(); + await flushBufferedFinalAnswer(); + return; + } + await reasoningLane.stream?.stop(); + const delivered = await sendPayload(applyTextToPayload(payload, text)); + if (delivered) { + reasoningStepState.noteReasoningDelivered(); + await flushBufferedFinalAnswer(); + } + return; + } + + const canEditReasoningPreview = + !hasMedia && text.length > 0 && text.length <= draftMaxChars && !payload.isError; + if (canEditReasoningPreview) { + const updatedReasoning = await tryEditExistingPreviewForLane({ + lane: reasoningLane, + laneName: "reasoning", + finalText: text, + previewButtons, + }); + if (updatedReasoning) { + reasoningStepState.noteReasoningDelivered(); + await flushBufferedFinalAnswer(); + return; + } + } + const delivered = await sendPayload(applyTextToPayload(payload, text)); + if (delivered) { + reasoningStepState.noteReasoningDelivered(); + await flushBufferedFinalAnswer(); + } + }; + + const deliverAnswerText = async (text: string) => { + if (info.kind === "final" && reasoningStepState.shouldBufferFinalAnswer()) { + reasoningStepState.bufferFinalAnswer({ payload, text }); + return; + } + if (info.kind === "final") { + const finalizedAnswer = await tryFinalizeLaneText({ + lane: answerLane, + laneName: "answer", + text, + previewButtons, + alreadyFinalized: finalizedViaPreviewMessage, + payload, + }); + if (finalizedAnswer && !finalizedViaPreviewMessage) { + finalizedViaPreviewMessage = true; + if (reasoningLane.hasStreamedMessage) { + finalizedReasoningViaPreviewMessage = true; + } + reasoningStepState.resetForNextStep(); + return; + } + await answerLane.stream?.stop(); + } + await sendPayload(applyTextToPayload(payload, text)); + if (info.kind === "final") { + if (reasoningLane.hasStreamedMessage) { + finalizedReasoningViaPreviewMessage = true; + } + reasoningStepState.resetForNextStep(); + } + }; + + if (split.reasoningText) { + await deliverReasoningText(split.reasoningText); } - const result = await deliverReplies({ - ...deliveryBaseOptions, - replies: [payload], - onVoiceRecording: sendRecordVoice, - }); - if (result.delivered) { - deliveryState.delivered = true; + if (split.answerText) { + await deliverAnswerText(split.answerText); + return; + } + if (split.reasoningText) { + return; + } + + if (info.kind === "final") { + await answerLane.stream?.stop(); + await reasoningLane.stream?.stop(); + reasoningStepState.resetForNextStep(); + } + const canSendAsIs = + hasMedia || typeof payload.text !== "string" || payload.text.length > 0; + if (!canSendAsIs) { + if (info.kind === "final") { + await flushBufferedFinalAnswer(); + } + return; + } + await sendPayload(payload); + if (info.kind === "final") { + await flushBufferedFinalAnswer(); } }, onSkip: (_payload, info) => { @@ -546,7 +710,7 @@ export const dispatchTelegramMessage = async ({ skillFilter, disableBlockStreaming, onPartialReply: answerLane.stream - ? (payload) => updateDraftFromPartial(answerLane, payload.text) + ? (payload) => updateDraftLanesFromPartial(payload.text) : undefined, onReasoningStream: reasoningLane.stream ? (payload) => { @@ -558,11 +722,20 @@ export const dispatchTelegramMessage = async ({ resetDraftLaneState(reasoningLane); splitReasoningOnNextStream = false; } - updateDraftFromPartial(reasoningLane, payload.text); + const split = splitTelegramReasoningText(payload.text); + if (split.reasoningText) { + reasoningStepState.noteReasoningHint(); + reasoningStepState.noteReasoningDelivered(); + updateDraftFromPartial(reasoningLane, split.reasoningText); + } + if (split.answerText) { + updateDraftFromPartial(answerLane, split.answerText); + } } : undefined, onAssistantMessageStart: answerLane.stream ? () => { + reasoningStepState.resetForNextStep(); // Keep answer blocks separated in block mode; partial mode keeps one answer lane. if (streamMode === "block" && answerLane.hasStreamedMessage) { answerLane.stream?.forceNewMessage(); @@ -581,11 +754,20 @@ export const dispatchTelegramMessage = async ({ })); } finally { // Must stop() first to flush debounced content before clear() wipes state + const streamsShareHandle = + Boolean(answerLane.stream) && + Boolean(reasoningLane.stream) && + answerLane.stream === reasoningLane.stream; await answerLane.stream?.stop(); if (!finalizedViaPreviewMessage) { await answerLane.stream?.clear(); } - await reasoningLane.stream?.stop(); + if (!streamsShareHandle) { + await reasoningLane.stream?.stop(); + if (!finalizedReasoningViaPreviewMessage) { + await reasoningLane.stream?.clear(); + } + } } let sentFallback = false; if (!deliveryState.delivered && deliveryState.skippedNonSilent > 0) { diff --git a/src/telegram/reasoning-lane-coordinator.ts b/src/telegram/reasoning-lane-coordinator.ts new file mode 100644 index 00000000000..c510e4ae908 --- /dev/null +++ b/src/telegram/reasoning-lane-coordinator.ts @@ -0,0 +1,111 @@ +import { + extractThinkingFromTaggedStream, + formatReasoningMessage, +} from "../agents/pi-embedded-utils.js"; +import type { ReplyPayload } from "../auto-reply/types.js"; +import { stripReasoningTagsFromText } from "../shared/text/reasoning-tags.js"; + +const REASONING_MESSAGE_PREFIX = "Reasoning:\n"; +const REASONING_TAG_PREFIXES = [ + "")) { + return false; + } + return REASONING_TAG_PREFIXES.some((prefix) => prefix.startsWith(trimmed)); +} + +export type TelegramReasoningSplit = { + reasoningText?: string; + answerText?: string; +}; + +export function splitTelegramReasoningText(text?: string): TelegramReasoningSplit { + if (typeof text !== "string") { + return {}; + } + + const trimmed = text.trim(); + if (isPartialReasoningTagPrefix(trimmed)) { + return {}; + } + if ( + trimmed.startsWith(REASONING_MESSAGE_PREFIX) && + trimmed.length > REASONING_MESSAGE_PREFIX.length + ) { + return { reasoningText: trimmed }; + } + + const taggedReasoning = extractThinkingFromTaggedStream(text); + const strippedAnswer = stripReasoningTagsFromText(text, { mode: "strict", trim: "both" }); + + if (!taggedReasoning && strippedAnswer === text) { + return { answerText: text }; + } + + const reasoningText = taggedReasoning ? formatReasoningMessage(taggedReasoning) : undefined; + const answerText = strippedAnswer || undefined; + return { reasoningText, answerText }; +} + +export type BufferedFinalAnswer = { + payload: ReplyPayload; + text: string; +}; + +export function createTelegramReasoningStepState() { + let reasoningHint = false; + let reasoningDelivered = false; + let bufferedFinalAnswer: BufferedFinalAnswer | undefined; + + const noteReasoningHint = () => { + reasoningHint = true; + }; + + const noteReasoningDelivered = () => { + reasoningHint = true; + reasoningDelivered = true; + }; + + const shouldBufferFinalAnswer = () => { + return reasoningHint && !reasoningDelivered && !bufferedFinalAnswer; + }; + + const bufferFinalAnswer = (value: BufferedFinalAnswer) => { + bufferedFinalAnswer = value; + }; + + const takeBufferedFinalAnswer = (): BufferedFinalAnswer | undefined => { + const value = bufferedFinalAnswer; + bufferedFinalAnswer = undefined; + return value; + }; + + const resetForNextStep = () => { + reasoningHint = false; + reasoningDelivered = false; + bufferedFinalAnswer = undefined; + }; + + return { + noteReasoningHint, + noteReasoningDelivered, + shouldBufferFinalAnswer, + bufferFinalAnswer, + takeBufferedFinalAnswer, + resetForNextStep, + }; +}