diff --git a/CHANGELOG.md b/CHANGELOG.md index 87ac0076d1f..dbe479a5033 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,7 @@ Docs: https://docs.openclaw.ai - Telegram: keep draft-stream preview replies attached to the user message for `replyToMode: "all"` in groups and DMs, preserving threaded reply context from preview through finalization. (#17880) Thanks @yinghaosang. - Telegram: disable block streaming when `channels.telegram.streamMode` is `off`, preventing newline/content-block replies from splitting into multiple messages. (#17679) Thanks @saivarunk. - Telegram: route non-abort slash commands on the normal chat/topic sequential lane while keeping true abort requests (`/stop`, `stop`) on the control lane, preventing command/reply race conditions from control-lane bypass. (#17899) Thanks @obviyus. +- Telegram: prevent streaming final replies from being overwritten by later final/error payloads, and suppress fallback tool-error warnings when a recovered assistant answer already exists after tool calls. (#17883) Thanks @Marvae and @obviyus. - Discord: preserve channel session continuity when runtime payloads omit `message.channelId` by falling back to event/raw `channel_id` values for routing/session keys, so same-channel messages keep history across turns/restarts. Also align diagnostics so active Discord runs no longer appear as `sessionKey=unknown`. (#17622) Thanks @shakkernerd. - Discord: dedupe native skill commands by skill name in multi-agent setups to prevent duplicated slash commands with `_2` suffixes. (#17365) Thanks @seewhyme. - Discord: ensure role allowlist matching uses raw role IDs for message routing authorization. Thanks @xinhuagu. diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index bb5266419a5..1c986e78ea7 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -471,6 +471,7 @@ export async function runEmbeddedPiAgent( blockReplyBreak: params.blockReplyBreak, blockReplyChunking: params.blockReplyChunking, onReasoningStream: params.onReasoningStream, + onReasoningEnd: params.onReasoningEnd, onToolResult: params.onToolResult, onAgentEvent: params.onAgentEvent, extraSystemPrompt: params.extraSystemPrompt, diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index dc648e44280..7a040c0eb6e 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -737,6 +737,7 @@ export async function runEmbeddedAttempt( shouldEmitToolOutput: params.shouldEmitToolOutput, onToolResult: params.onToolResult, onReasoningStream: params.onReasoningStream, + onReasoningEnd: params.onReasoningEnd, onBlockReply: params.onBlockReply, onBlockReplyFlush: params.onBlockReplyFlush, blockReplyBreak: params.blockReplyBreak, diff --git a/src/agents/pi-embedded-runner/run/params.ts b/src/agents/pi-embedded-runner/run/params.ts index c49f7fb656d..428774ce1d0 100644 --- a/src/agents/pi-embedded-runner/run/params.ts +++ b/src/agents/pi-embedded-runner/run/params.ts @@ -95,6 +95,7 @@ export type RunEmbeddedPiAgentParams = { blockReplyBreak?: "text_end" | "message_end"; blockReplyChunking?: BlockReplyChunking; onReasoningStream?: (payload: { text?: string; mediaUrls?: string[] }) => void | Promise; + onReasoningEnd?: () => void | Promise; onToolResult?: (payload: { text?: string; mediaUrls?: string[] }) => void | Promise; onAgentEvent?: (evt: { stream: string; data: Record }) => void; lane?: string; diff --git a/src/agents/pi-embedded-runner/run/payloads.e2e.test.ts b/src/agents/pi-embedded-runner/run/payloads.e2e.test.ts index 03a982289d0..a1457a03b3f 100644 --- a/src/agents/pi-embedded-runner/run/payloads.e2e.test.ts +++ b/src/agents/pi-embedded-runner/run/payloads.e2e.test.ts @@ -184,6 +184,29 @@ describe("buildEmbeddedRunPayloads", () => { expect(payloads[0]?.text).toContain("code 1"); }); + it("does not add tool error fallback when assistant text exists after tool calls", () => { + const payloads = buildPayloads({ + assistantTexts: ["Checked the page and recovered with final answer."], + lastAssistant: makeAssistant({ + stopReason: "toolUse", + errorMessage: undefined, + content: [ + { + type: "toolCall", + id: "toolu_01", + name: "browser", + arguments: { action: "search", query: "openclaw docs" }, + }, + ], + }), + lastToolError: { toolName: "browser", error: "connection timeout" }, + }); + + expect(payloads).toHaveLength(1); + expect(payloads[0]?.isError).toBeUndefined(); + expect(payloads[0]?.text).toContain("recovered"); + }); + it("suppresses recoverable tool errors containing 'required' for non-mutating tools", () => { const payloads = buildPayloads({ lastToolError: { toolName: "browser", error: "url required" }, diff --git a/src/agents/pi-embedded-runner/run/payloads.ts b/src/agents/pi-embedded-runner/run/payloads.ts index e7a4f74b89f..9ccbf76f972 100644 --- a/src/agents/pi-embedded-runner/run/payloads.ts +++ b/src/agents/pi-embedded-runner/run/payloads.ts @@ -218,6 +218,7 @@ export function buildEmbeddedRunPayloads(params: { : [] ).filter((text) => !shouldSuppressRawErrorText(text)); + let hasUserFacingAssistantReply = false; for (const text of answerTexts) { const { text: cleanedText, @@ -238,22 +239,13 @@ export function buildEmbeddedRunPayloads(params: { replyToTag, replyToCurrent, }); + hasUserFacingAssistantReply = true; } if (params.lastToolError) { - const lastAssistantHasToolCalls = - Array.isArray(params.lastAssistant?.content) && - params.lastAssistant?.content.some((block) => - block && typeof block === "object" - ? (block as { type?: unknown }).type === "toolCall" - : false, - ); - const lastAssistantWasToolUse = params.lastAssistant?.stopReason === "toolUse"; - const hasUserFacingReply = - replyItems.length > 0 && !lastAssistantHasToolCalls && !lastAssistantWasToolUse; const shouldShowToolError = shouldShowToolErrorWarning({ lastToolError: params.lastToolError, - hasUserFacingReply, + hasUserFacingReply: hasUserFacingAssistantReply, suppressToolErrors: Boolean(params.config?.messages?.suppressToolErrors), }); diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.ts b/src/agents/pi-embedded-subscribe.handlers.messages.ts index a304d1db24c..0812e5956bf 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.ts @@ -140,7 +140,12 @@ export function handleMessageUpdate( }) .trim(); if (next) { + const wasThinking = ctx.state.partialBlockState.thinking; const visibleDelta = chunk ? ctx.stripBlockTags(chunk, ctx.state.partialBlockState) : ""; + // Detect when thinking block ends ( tag processed) + if (wasThinking && !ctx.state.partialBlockState.thinking) { + void ctx.params.onReasoningEnd?.(); + } const parsedDelta = visibleDelta ? ctx.consumePartialReplyDirectives(visibleDelta) : null; const parsedFull = parseReplyDirectives(stripTrailingDirective(next)); const cleanedText = parsedFull.text; diff --git a/src/agents/pi-embedded-subscribe.types.ts b/src/agents/pi-embedded-subscribe.types.ts index 8c9fe02de37..135be2627f0 100644 --- a/src/agents/pi-embedded-subscribe.types.ts +++ b/src/agents/pi-embedded-subscribe.types.ts @@ -17,6 +17,8 @@ export type SubscribeEmbeddedPiSessionParams = { shouldEmitToolOutput?: () => boolean; onToolResult?: (payload: { text?: string; mediaUrls?: string[] }) => void | Promise; onReasoningStream?: (payload: { text?: string; mediaUrls?: string[] }) => void | Promise; + /** Called when a thinking/reasoning block ends ( tag processed). */ + onReasoningEnd?: () => void | Promise; onBlockReply?: (payload: { text?: string; mediaUrls?: string[]; diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index 482a2d3efb9..e713fc71092 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -333,6 +333,7 @@ export async function runAgentTurnWithFallback(params: { : undefined, onAssistantMessageStart: async () => { await params.typingSignals.signalMessageStart(); + await params.opts?.onAssistantMessageStart?.(); }, onReasoningStream: params.typingSignals.shouldStartOnReasoning || params.opts?.onReasoningStream @@ -344,6 +345,7 @@ export async function runAgentTurnWithFallback(params: { }); } : undefined, + onReasoningEnd: params.opts?.onReasoningEnd, onAgentEvent: async (evt) => { // Trigger typing when tools start executing. // Must await to ensure typing indicator starts before tool summaries are emitted. diff --git a/src/auto-reply/types.ts b/src/auto-reply/types.ts index 29a51a87582..0b06db40120 100644 --- a/src/auto-reply/types.ts +++ b/src/auto-reply/types.ts @@ -31,6 +31,10 @@ export type GetReplyOptions = { heartbeatModelOverride?: string; onPartialReply?: (payload: ReplyPayload) => Promise | void; onReasoningStream?: (payload: ReplyPayload) => Promise | void; + /** Called when a thinking/reasoning block ends. */ + onReasoningEnd?: () => Promise | void; + /** Called when a new assistant message starts (e.g., after tool call or thinking block). */ + onAssistantMessageStart?: () => Promise | void; onBlockReply?: (payload: ReplyPayload, context?: BlockReplyContext) => Promise | void; onToolResult?: (payload: ReplyPayload) => Promise | void; /** Called when the actual model is selected (including after fallback). diff --git a/src/telegram/bot-message-dispatch.test.ts b/src/telegram/bot-message-dispatch.test.ts index 2b306c23e04..64bfd96967c 100644 --- a/src/telegram/bot-message-dispatch.test.ts +++ b/src/telegram/bot-message-dispatch.test.ts @@ -48,6 +48,7 @@ describe("dispatchTelegramMessage draft streaming", () => { messageId: vi.fn().mockReturnValue(messageId), clear: vi.fn().mockResolvedValue(undefined), stop: vi.fn(), + forceNewMessage: vi.fn(), }; } @@ -114,14 +115,13 @@ describe("dispatchTelegramMessage draft streaming", () => { context: TelegramMessageContext; telegramCfg?: Parameters[0]["telegramCfg"]; streamMode?: Parameters[0]["streamMode"]; - replyToMode?: Parameters[0]["replyToMode"]; }) { await dispatchTelegramMessage({ context: params.context, bot: createBot(), cfg: {}, runtime: createRuntime(), - replyToMode: params.replyToMode ?? "first", + replyToMode: "first", streamMode: params.streamMode ?? "partial", textLimit: 4096, telegramCfg: params.telegramCfg ?? {}, @@ -152,7 +152,6 @@ describe("dispatchTelegramMessage draft streaming", () => { expect.objectContaining({ chatId: 123, thread: { id: 777, scope: "dm" }, - replyToMessageId: 456, }), ); expect(draftStream.update).toHaveBeenCalledWith("Hello"); @@ -217,52 +216,38 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(draftStream.stop).toHaveBeenCalled(); }); - it("uses only the latest final payload when multiple finals are emitted", async () => { + it("does not overwrite finalized preview when additional final payloads are sent", async () => { const draftStream = createDraftStream(999); createTelegramDraftStream.mockReturnValue(draftStream); - dispatchReplyWithBufferedBlockDispatcher.mockImplementation( - async ({ dispatcherOptions, replyOptions }) => { - await replyOptions?.onPartialReply?.({ text: "Okay." }); - await dispatcherOptions.deliver({ text: "Ok" }, { kind: "final" }); - await dispatcherOptions.deliver({ text: "Okay." }, { kind: "final" }); - return { queuedFinal: true }; - }, - ); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { + await dispatcherOptions.deliver({ text: "Primary result" }, { kind: "final" }); + await dispatcherOptions.deliver( + { text: "⚠️ Recovered tool error details" }, + { kind: "final" }, + ); + return { queuedFinal: true }; + }); deliverReplies.mockResolvedValue({ delivered: true }); editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); await dispatchWithContext({ context: createContext() }); expect(editMessageTelegram).toHaveBeenCalledTimes(1); - expect(editMessageTelegram).toHaveBeenCalledWith(123, 999, "Okay.", expect.any(Object)); - expect(deliverReplies).not.toHaveBeenCalled(); + expect(editMessageTelegram).toHaveBeenCalledWith( + 123, + 999, + "Primary result", + expect.any(Object), + ); + expect(deliverReplies).toHaveBeenCalledWith( + expect.objectContaining({ + replies: [expect.objectContaining({ text: "⚠️ Recovered tool error details" })], + }), + ); expect(draftStream.clear).not.toHaveBeenCalled(); expect(draftStream.stop).toHaveBeenCalled(); }); - it("ignores transient shorter partial prefixes to avoid preview punctuation flicker", async () => { - const draftStream = createDraftStream(999); - createTelegramDraftStream.mockReturnValue(draftStream); - dispatchReplyWithBufferedBlockDispatcher.mockImplementation( - async ({ dispatcherOptions, replyOptions }) => { - await replyOptions?.onPartialReply?.({ text: "Sure." }); - await replyOptions?.onPartialReply?.({ text: "Sure" }); - await replyOptions?.onPartialReply?.({ text: "Sure." }); - await dispatcherOptions.deliver({ text: "Sure." }, { kind: "final" }); - return { queuedFinal: true }; - }, - ); - deliverReplies.mockResolvedValue({ delivered: true }); - editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); - - await dispatchWithContext({ context: createContext() }); - - expect(draftStream.update).toHaveBeenCalledTimes(1); - expect(draftStream.update).toHaveBeenCalledWith("Sure."); - expect(editMessageTelegram).toHaveBeenCalledTimes(1); - expect(editMessageTelegram).toHaveBeenCalledWith(123, 999, "Sure.", expect.any(Object)); - }); - it("falls back to normal delivery when preview final is too long to edit", async () => { const draftStream = createDraftStream(999); createTelegramDraftStream.mockReturnValue(draftStream); @@ -308,24 +293,124 @@ describe("dispatchTelegramMessage draft streaming", () => { ); }); - it("omits replyToMessageId from draft stream when replyToMode is off", async () => { - const draftStream = createDraftStream(); + it("forces new message when new assistant message starts after previous output", async () => { + const draftStream = createDraftStream(999); createTelegramDraftStream.mockReturnValue(draftStream); - dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { - await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" }); - return { queuedFinal: true }; - }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + // First assistant message: partial text + await replyOptions?.onPartialReply?.({ text: "First response" }); + // New assistant message starts (e.g., after tool call) + await replyOptions?.onAssistantMessageStart?.(); + // Second assistant message: new text + await replyOptions?.onPartialReply?.({ text: "After tool call" }); + await dispatcherOptions.deliver({ text: "After tool call" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); deliverReplies.mockResolvedValue({ delivered: true }); - await dispatchWithContext({ - context: createContext(), - replyToMode: "off", - }); + await dispatchWithContext({ context: createContext(), streamMode: "block" }); - expect(createTelegramDraftStream).toHaveBeenCalledWith( + // Should force new message when assistant message starts after previous output + expect(draftStream.forceNewMessage).toHaveBeenCalled(); + }); + + it("does not force new message on first assistant message start", async () => { + const draftStream = createDraftStream(999); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + // First assistant message starts (no previous output) + await replyOptions?.onAssistantMessageStart?.(); + // Partial updates + await replyOptions?.onPartialReply?.({ text: "Hello" }); + await replyOptions?.onPartialReply?.({ text: "Hello world" }); + await dispatcherOptions.deliver({ text: "Hello world" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + + await dispatchWithContext({ context: createContext(), streamMode: "block" }); + + // First message start shouldn't trigger forceNewMessage (no previous output) + expect(draftStream.forceNewMessage).not.toHaveBeenCalled(); + }); + + it("forces new message when reasoning ends after previous output", async () => { + const draftStream = createDraftStream(999); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + // First partial: text before thinking + await replyOptions?.onPartialReply?.({ text: "Let me check" }); + // Reasoning stream (thinking block) + await replyOptions?.onReasoningStream?.({ text: "Analyzing..." }); + // Reasoning ends + await replyOptions?.onReasoningEnd?.(); + // Second partial: text after thinking + await replyOptions?.onPartialReply?.({ text: "Here's the answer" }); + await dispatcherOptions.deliver({ text: "Here's the answer" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + + await dispatchWithContext({ context: createContext(), streamMode: "block" }); + + // Should force new message when reasoning ends + expect(draftStream.forceNewMessage).toHaveBeenCalled(); + }); + + it("does not force new message on reasoning end without previous output", async () => { + const draftStream = createDraftStream(999); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + // Reasoning starts immediately (no previous text output) + await replyOptions?.onReasoningStream?.({ text: "Thinking..." }); + // Reasoning ends + await replyOptions?.onReasoningEnd?.(); + // First actual text output + await replyOptions?.onPartialReply?.({ text: "Here's my answer" }); + await dispatcherOptions.deliver({ text: "Here's my answer" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + + await dispatchWithContext({ context: createContext(), streamMode: "block" }); + + // No previous text output, so no forceNewMessage needed + expect(draftStream.forceNewMessage).not.toHaveBeenCalled(); + }); + + it("does not edit preview message when final payload is an error", async () => { + const draftStream = createDraftStream(999); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + // Partial text output + await replyOptions?.onPartialReply?.({ text: "Let me check that file" }); + // Error payload should not edit the preview message + await dispatcherOptions.deliver( + { text: "⚠️ 🛠️ Exec: cat /nonexistent failed: No such file", isError: true }, + { kind: "final" }, + ); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + + await dispatchWithContext({ context: createContext(), streamMode: "block" }); + + // Should NOT edit preview message (which would overwrite the partial text) + expect(editMessageTelegram).not.toHaveBeenCalled(); + // Should deliver via normal path as a new message + expect(deliverReplies).toHaveBeenCalledWith( expect.objectContaining({ - chatId: 123, - replyToMessageId: undefined, + replies: [expect.objectContaining({ text: expect.stringContaining("⚠️") })], }), ); }); diff --git a/src/telegram/bot-message-dispatch.ts b/src/telegram/bot-message-dispatch.ts index bfa2d0143ea..2741ec393ca 100644 --- a/src/telegram/bot-message-dispatch.ts +++ b/src/telegram/bot-message-dispatch.ts @@ -112,6 +112,7 @@ export const dispatchTelegramMessage = async ({ const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId); let lastPartialText = ""; let draftText = ""; + let hasStreamedMessage = false; const updateDraftFromPartial = (text?: string) => { if (!draftStream || !text) { return; @@ -119,6 +120,8 @@ export const dispatchTelegramMessage = async ({ if (text === lastPartialText) { return; } + // Mark that we've received streaming content (for forceNewMessage decision). + hasStreamedMessage = true; if (streamMode === "partial") { // Some providers briefly emit a shorter prefix snapshot (for example // "Sure." -> "Sure" -> "Sure."). Keep the longer preview to avoid @@ -295,15 +298,25 @@ export const dispatchTelegramMessage = async ({ const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; const previewMessageId = draftStream?.messageId(); const finalText = payload.text; + const currentPreviewText = streamMode === "block" ? draftText : lastPartialText; + const previewButtons = ( + payload.channelData?.telegram as + | { buttons?: Array> } + | undefined + )?.buttons; + let draftStoppedForPreviewEdit = false; + // Skip preview edit for error payloads to avoid overwriting previous content const canFinalizeViaPreviewEdit = + !finalizedViaPreviewMessage && !hasMedia && typeof finalText === "string" && finalText.length > 0 && typeof previewMessageId === "number" && - finalText.length <= draftMaxChars; + finalText.length <= draftMaxChars && + !payload.isError; if (canFinalizeViaPreviewEdit) { draftStream?.stop(); - const currentPreviewText = streamMode === "block" ? draftText : lastPartialText; + draftStoppedForPreviewEdit = true; if ( currentPreviewText && currentPreviewText.startsWith(finalText) && @@ -313,11 +326,6 @@ export const dispatchTelegramMessage = async ({ // can appear transiently in some provider streams. return; } - const previewButtons = ( - payload.channelData?.telegram as - | { buttons?: Array> } - | undefined - )?.buttons; try { await editMessageTelegram(chatId, previewMessageId, finalText, { api: bot.api, @@ -335,12 +343,19 @@ export const dispatchTelegramMessage = async ({ ); } } - if (payload.text && payload.text.length > draftMaxChars) { + if ( + !hasMedia && + !payload.isError && + typeof finalText === "string" && + finalText.length > draftMaxChars + ) { logVerbose( - `telegram: preview final too long for edit (${payload.text.length} > ${draftMaxChars}); falling back to standard send`, + `telegram: preview final too long for edit (${finalText.length} > ${draftMaxChars}); falling back to standard send`, ); } - draftStream?.stop(); + if (!draftStoppedForPreviewEdit) { + draftStream?.stop(); + } } const result = await deliverReplies({ ...deliveryBaseOptions, @@ -375,6 +390,34 @@ export const dispatchTelegramMessage = async ({ skillFilter, disableBlockStreaming, onPartialReply: draftStream ? (payload) => updateDraftFromPartial(payload.text) : undefined, + onAssistantMessageStart: draftStream + ? () => { + // When a new assistant message starts (e.g., after tool call), + // force a new Telegram message if we have previous content. + // Only force once per response to avoid excessive splitting. + logVerbose( + `telegram: onAssistantMessageStart called, hasStreamedMessage=${hasStreamedMessage}`, + ); + if (hasStreamedMessage) { + logVerbose(`telegram: calling forceNewMessage()`); + draftStream.forceNewMessage(); + } + lastPartialText = ""; + draftText = ""; + draftChunker?.reset(); + } + : undefined, + onReasoningEnd: draftStream + ? () => { + // When a thinking block ends, force a new Telegram message for the next text output. + if (hasStreamedMessage) { + draftStream.forceNewMessage(); + lastPartialText = ""; + draftText = ""; + draftChunker?.reset(); + } + } + : undefined, onModelSelected, }, })); diff --git a/src/telegram/draft-stream.test.ts b/src/telegram/draft-stream.test.ts index 3cc63539805..ab59412926c 100644 --- a/src/telegram/draft-stream.test.ts +++ b/src/telegram/draft-stream.test.ts @@ -114,51 +114,12 @@ describe("createTelegramDraftStream", () => { expect(api.deleteMessage).toHaveBeenCalledWith(123, 17); }); - it("includes reply_to_message_id in initial sendMessage when replyToMessageId is set", async () => { + it("creates new message after forceNewMessage is called", async () => { const api = { - sendMessage: vi.fn().mockResolvedValue({ message_id: 42 }), - editMessageText: vi.fn().mockResolvedValue(true), - deleteMessage: vi.fn().mockResolvedValue(true), - }; - const stream = createTelegramDraftStream({ - // oxlint-disable-next-line typescript/no-explicit-any - api: api as any, - chatId: 123, - replyToMessageId: 999, - }); - - stream.update("Hello"); - await vi.waitFor(() => - expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", { reply_to_message_id: 999 }), - ); - }); - - it("includes both reply_to_message_id and message_thread_id when both are set", async () => { - const api = { - sendMessage: vi.fn().mockResolvedValue({ message_id: 42 }), - editMessageText: vi.fn().mockResolvedValue(true), - deleteMessage: vi.fn().mockResolvedValue(true), - }; - const stream = createTelegramDraftStream({ - // oxlint-disable-next-line typescript/no-explicit-any - api: api as any, - chatId: 123, - thread: { id: 99, scope: "forum" }, - replyToMessageId: 555, - }); - - stream.update("Hello"); - await vi.waitFor(() => - expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", { - message_thread_id: 99, - reply_to_message_id: 555, - }), - ); - }); - - it("passes undefined params when neither thread nor replyToMessageId is set", async () => { - const api = { - sendMessage: vi.fn().mockResolvedValue({ message_id: 42 }), + sendMessage: vi + .fn() + .mockResolvedValueOnce({ message_id: 17 }) + .mockResolvedValueOnce({ message_id: 42 }), editMessageText: vi.fn().mockResolvedValue(true), deleteMessage: vi.fn().mockResolvedValue(true), }; @@ -168,27 +129,23 @@ describe("createTelegramDraftStream", () => { chatId: 123, }); + // First message stream.update("Hello"); - await vi.waitFor(() => expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", undefined)); - }); + await stream.flush(); + expect(api.sendMessage).toHaveBeenCalledTimes(1); - it("includes reply_to_message_id even when thread resolves to general topic", async () => { - const api = { - sendMessage: vi.fn().mockResolvedValue({ message_id: 42 }), - editMessageText: vi.fn().mockResolvedValue(true), - deleteMessage: vi.fn().mockResolvedValue(true), - }; - const stream = createTelegramDraftStream({ - // oxlint-disable-next-line typescript/no-explicit-any - api: api as any, - chatId: 123, - thread: { id: 1, scope: "forum" }, - replyToMessageId: 888, - }); + // Normal edit (same message) + stream.update("Hello edited"); + await stream.flush(); + expect(api.editMessageText).toHaveBeenCalledWith(123, 17, "Hello edited"); - stream.update("Hello"); - await vi.waitFor(() => - expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", { reply_to_message_id: 888 }), - ); + // Force new message (e.g. after thinking block ends) + stream.forceNewMessage(); + stream.update("After thinking"); + await stream.flush(); + + // Should have sent a second new message, not edited the first + expect(api.sendMessage).toHaveBeenCalledTimes(2); + expect(api.sendMessage).toHaveBeenLastCalledWith(123, "After thinking", undefined); }); }); diff --git a/src/telegram/draft-stream.ts b/src/telegram/draft-stream.ts index 82ac80476c1..17b3cbec654 100644 --- a/src/telegram/draft-stream.ts +++ b/src/telegram/draft-stream.ts @@ -10,6 +10,8 @@ export type TelegramDraftStream = { messageId: () => number | undefined; clear: () => Promise; stop: () => void; + /** Reset internal state so the next update creates a new message instead of editing. */ + forceNewMessage: () => void; }; export function createTelegramDraftStream(params: { @@ -174,6 +176,12 @@ export function createTelegramDraftStream(params: { } }; + const forceNewMessage = () => { + streamMessageId = undefined; + lastSentText = ""; + pendingText = ""; + }; + params.log?.(`telegram stream preview ready (maxChars=${maxChars}, throttleMs=${throttleMs})`); return { @@ -182,5 +190,6 @@ export function createTelegramDraftStream(params: { messageId: () => streamMessageId, clear, stop, + forceNewMessage, }; }