mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-30 11:46:26 +00:00
fix(telegram): fix streaming with extended thinking models overwriting previous messages/ also happens to Execution error (#17973)
Merged via /review-pr -> /prepare-pr -> /merge-pr.
Prepared head SHA: 34b52eead8
Co-authored-by: Marvae <11957602+Marvae@users.noreply.github.com>
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Reviewed-by: @obviyus
This commit is contained in:
@@ -55,6 +55,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Telegram: keep draft-stream preview replies attached to the user message for `replyToMode: "all"` in groups and DMs, preserving threaded reply context from preview through finalization. (#17880) Thanks @yinghaosang.
|
||||
- Telegram: disable block streaming when `channels.telegram.streamMode` is `off`, preventing newline/content-block replies from splitting into multiple messages. (#17679) Thanks @saivarunk.
|
||||
- Telegram: route non-abort slash commands on the normal chat/topic sequential lane while keeping true abort requests (`/stop`, `stop`) on the control lane, preventing command/reply race conditions from control-lane bypass. (#17899) Thanks @obviyus.
|
||||
- Telegram: prevent streaming final replies from being overwritten by later final/error payloads, and suppress fallback tool-error warnings when a recovered assistant answer already exists after tool calls. (#17883) Thanks @Marvae and @obviyus.
|
||||
- Discord: preserve channel session continuity when runtime payloads omit `message.channelId` by falling back to event/raw `channel_id` values for routing/session keys, so same-channel messages keep history across turns/restarts. Also align diagnostics so active Discord runs no longer appear as `sessionKey=unknown`. (#17622) Thanks @shakkernerd.
|
||||
- Discord: dedupe native skill commands by skill name in multi-agent setups to prevent duplicated slash commands with `_2` suffixes. (#17365) Thanks @seewhyme.
|
||||
- Discord: ensure role allowlist matching uses raw role IDs for message routing authorization. Thanks @xinhuagu.
|
||||
|
||||
@@ -471,6 +471,7 @@ export async function runEmbeddedPiAgent(
|
||||
blockReplyBreak: params.blockReplyBreak,
|
||||
blockReplyChunking: params.blockReplyChunking,
|
||||
onReasoningStream: params.onReasoningStream,
|
||||
onReasoningEnd: params.onReasoningEnd,
|
||||
onToolResult: params.onToolResult,
|
||||
onAgentEvent: params.onAgentEvent,
|
||||
extraSystemPrompt: params.extraSystemPrompt,
|
||||
|
||||
@@ -737,6 +737,7 @@ export async function runEmbeddedAttempt(
|
||||
shouldEmitToolOutput: params.shouldEmitToolOutput,
|
||||
onToolResult: params.onToolResult,
|
||||
onReasoningStream: params.onReasoningStream,
|
||||
onReasoningEnd: params.onReasoningEnd,
|
||||
onBlockReply: params.onBlockReply,
|
||||
onBlockReplyFlush: params.onBlockReplyFlush,
|
||||
blockReplyBreak: params.blockReplyBreak,
|
||||
|
||||
@@ -95,6 +95,7 @@ export type RunEmbeddedPiAgentParams = {
|
||||
blockReplyBreak?: "text_end" | "message_end";
|
||||
blockReplyChunking?: BlockReplyChunking;
|
||||
onReasoningStream?: (payload: { text?: string; mediaUrls?: string[] }) => void | Promise<void>;
|
||||
onReasoningEnd?: () => void | Promise<void>;
|
||||
onToolResult?: (payload: { text?: string; mediaUrls?: string[] }) => void | Promise<void>;
|
||||
onAgentEvent?: (evt: { stream: string; data: Record<string, unknown> }) => void;
|
||||
lane?: string;
|
||||
|
||||
@@ -184,6 +184,29 @@ describe("buildEmbeddedRunPayloads", () => {
|
||||
expect(payloads[0]?.text).toContain("code 1");
|
||||
});
|
||||
|
||||
it("does not add tool error fallback when assistant text exists after tool calls", () => {
|
||||
const payloads = buildPayloads({
|
||||
assistantTexts: ["Checked the page and recovered with final answer."],
|
||||
lastAssistant: makeAssistant({
|
||||
stopReason: "toolUse",
|
||||
errorMessage: undefined,
|
||||
content: [
|
||||
{
|
||||
type: "toolCall",
|
||||
id: "toolu_01",
|
||||
name: "browser",
|
||||
arguments: { action: "search", query: "openclaw docs" },
|
||||
},
|
||||
],
|
||||
}),
|
||||
lastToolError: { toolName: "browser", error: "connection timeout" },
|
||||
});
|
||||
|
||||
expect(payloads).toHaveLength(1);
|
||||
expect(payloads[0]?.isError).toBeUndefined();
|
||||
expect(payloads[0]?.text).toContain("recovered");
|
||||
});
|
||||
|
||||
it("suppresses recoverable tool errors containing 'required' for non-mutating tools", () => {
|
||||
const payloads = buildPayloads({
|
||||
lastToolError: { toolName: "browser", error: "url required" },
|
||||
|
||||
@@ -218,6 +218,7 @@ export function buildEmbeddedRunPayloads(params: {
|
||||
: []
|
||||
).filter((text) => !shouldSuppressRawErrorText(text));
|
||||
|
||||
let hasUserFacingAssistantReply = false;
|
||||
for (const text of answerTexts) {
|
||||
const {
|
||||
text: cleanedText,
|
||||
@@ -238,22 +239,13 @@ export function buildEmbeddedRunPayloads(params: {
|
||||
replyToTag,
|
||||
replyToCurrent,
|
||||
});
|
||||
hasUserFacingAssistantReply = true;
|
||||
}
|
||||
|
||||
if (params.lastToolError) {
|
||||
const lastAssistantHasToolCalls =
|
||||
Array.isArray(params.lastAssistant?.content) &&
|
||||
params.lastAssistant?.content.some((block) =>
|
||||
block && typeof block === "object"
|
||||
? (block as { type?: unknown }).type === "toolCall"
|
||||
: false,
|
||||
);
|
||||
const lastAssistantWasToolUse = params.lastAssistant?.stopReason === "toolUse";
|
||||
const hasUserFacingReply =
|
||||
replyItems.length > 0 && !lastAssistantHasToolCalls && !lastAssistantWasToolUse;
|
||||
const shouldShowToolError = shouldShowToolErrorWarning({
|
||||
lastToolError: params.lastToolError,
|
||||
hasUserFacingReply,
|
||||
hasUserFacingReply: hasUserFacingAssistantReply,
|
||||
suppressToolErrors: Boolean(params.config?.messages?.suppressToolErrors),
|
||||
});
|
||||
|
||||
|
||||
@@ -140,7 +140,12 @@ export function handleMessageUpdate(
|
||||
})
|
||||
.trim();
|
||||
if (next) {
|
||||
const wasThinking = ctx.state.partialBlockState.thinking;
|
||||
const visibleDelta = chunk ? ctx.stripBlockTags(chunk, ctx.state.partialBlockState) : "";
|
||||
// Detect when thinking block ends (</think> tag processed)
|
||||
if (wasThinking && !ctx.state.partialBlockState.thinking) {
|
||||
void ctx.params.onReasoningEnd?.();
|
||||
}
|
||||
const parsedDelta = visibleDelta ? ctx.consumePartialReplyDirectives(visibleDelta) : null;
|
||||
const parsedFull = parseReplyDirectives(stripTrailingDirective(next));
|
||||
const cleanedText = parsedFull.text;
|
||||
|
||||
@@ -17,6 +17,8 @@ export type SubscribeEmbeddedPiSessionParams = {
|
||||
shouldEmitToolOutput?: () => boolean;
|
||||
onToolResult?: (payload: { text?: string; mediaUrls?: string[] }) => void | Promise<void>;
|
||||
onReasoningStream?: (payload: { text?: string; mediaUrls?: string[] }) => void | Promise<void>;
|
||||
/** Called when a thinking/reasoning block ends (</think> tag processed). */
|
||||
onReasoningEnd?: () => void | Promise<void>;
|
||||
onBlockReply?: (payload: {
|
||||
text?: string;
|
||||
mediaUrls?: string[];
|
||||
|
||||
@@ -333,6 +333,7 @@ export async function runAgentTurnWithFallback(params: {
|
||||
: undefined,
|
||||
onAssistantMessageStart: async () => {
|
||||
await params.typingSignals.signalMessageStart();
|
||||
await params.opts?.onAssistantMessageStart?.();
|
||||
},
|
||||
onReasoningStream:
|
||||
params.typingSignals.shouldStartOnReasoning || params.opts?.onReasoningStream
|
||||
@@ -344,6 +345,7 @@ export async function runAgentTurnWithFallback(params: {
|
||||
});
|
||||
}
|
||||
: undefined,
|
||||
onReasoningEnd: params.opts?.onReasoningEnd,
|
||||
onAgentEvent: async (evt) => {
|
||||
// Trigger typing when tools start executing.
|
||||
// Must await to ensure typing indicator starts before tool summaries are emitted.
|
||||
|
||||
@@ -31,6 +31,10 @@ export type GetReplyOptions = {
|
||||
heartbeatModelOverride?: string;
|
||||
onPartialReply?: (payload: ReplyPayload) => Promise<void> | void;
|
||||
onReasoningStream?: (payload: ReplyPayload) => Promise<void> | void;
|
||||
/** Called when a thinking/reasoning block ends. */
|
||||
onReasoningEnd?: () => Promise<void> | void;
|
||||
/** Called when a new assistant message starts (e.g., after tool call or thinking block). */
|
||||
onAssistantMessageStart?: () => Promise<void> | void;
|
||||
onBlockReply?: (payload: ReplyPayload, context?: BlockReplyContext) => Promise<void> | void;
|
||||
onToolResult?: (payload: ReplyPayload) => Promise<void> | void;
|
||||
/** Called when the actual model is selected (including after fallback).
|
||||
|
||||
@@ -48,6 +48,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
|
||||
messageId: vi.fn().mockReturnValue(messageId),
|
||||
clear: vi.fn().mockResolvedValue(undefined),
|
||||
stop: vi.fn(),
|
||||
forceNewMessage: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -114,14 +115,13 @@ describe("dispatchTelegramMessage draft streaming", () => {
|
||||
context: TelegramMessageContext;
|
||||
telegramCfg?: Parameters<typeof dispatchTelegramMessage>[0]["telegramCfg"];
|
||||
streamMode?: Parameters<typeof dispatchTelegramMessage>[0]["streamMode"];
|
||||
replyToMode?: Parameters<typeof dispatchTelegramMessage>[0]["replyToMode"];
|
||||
}) {
|
||||
await dispatchTelegramMessage({
|
||||
context: params.context,
|
||||
bot: createBot(),
|
||||
cfg: {},
|
||||
runtime: createRuntime(),
|
||||
replyToMode: params.replyToMode ?? "first",
|
||||
replyToMode: "first",
|
||||
streamMode: params.streamMode ?? "partial",
|
||||
textLimit: 4096,
|
||||
telegramCfg: params.telegramCfg ?? {},
|
||||
@@ -152,7 +152,6 @@ describe("dispatchTelegramMessage draft streaming", () => {
|
||||
expect.objectContaining({
|
||||
chatId: 123,
|
||||
thread: { id: 777, scope: "dm" },
|
||||
replyToMessageId: 456,
|
||||
}),
|
||||
);
|
||||
expect(draftStream.update).toHaveBeenCalledWith("Hello");
|
||||
@@ -217,52 +216,38 @@ describe("dispatchTelegramMessage draft streaming", () => {
|
||||
expect(draftStream.stop).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("uses only the latest final payload when multiple finals are emitted", async () => {
|
||||
it("does not overwrite finalized preview when additional final payloads are sent", async () => {
|
||||
const draftStream = createDraftStream(999);
|
||||
createTelegramDraftStream.mockReturnValue(draftStream);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
|
||||
async ({ dispatcherOptions, replyOptions }) => {
|
||||
await replyOptions?.onPartialReply?.({ text: "Okay." });
|
||||
await dispatcherOptions.deliver({ text: "Ok" }, { kind: "final" });
|
||||
await dispatcherOptions.deliver({ text: "Okay." }, { kind: "final" });
|
||||
return { queuedFinal: true };
|
||||
},
|
||||
);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
|
||||
await dispatcherOptions.deliver({ text: "Primary result" }, { kind: "final" });
|
||||
await dispatcherOptions.deliver(
|
||||
{ text: "⚠️ Recovered tool error details" },
|
||||
{ kind: "final" },
|
||||
);
|
||||
return { queuedFinal: true };
|
||||
});
|
||||
deliverReplies.mockResolvedValue({ delivered: true });
|
||||
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" });
|
||||
|
||||
await dispatchWithContext({ context: createContext() });
|
||||
|
||||
expect(editMessageTelegram).toHaveBeenCalledTimes(1);
|
||||
expect(editMessageTelegram).toHaveBeenCalledWith(123, 999, "Okay.", expect.any(Object));
|
||||
expect(deliverReplies).not.toHaveBeenCalled();
|
||||
expect(editMessageTelegram).toHaveBeenCalledWith(
|
||||
123,
|
||||
999,
|
||||
"Primary result",
|
||||
expect.any(Object),
|
||||
);
|
||||
expect(deliverReplies).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
replies: [expect.objectContaining({ text: "⚠️ Recovered tool error details" })],
|
||||
}),
|
||||
);
|
||||
expect(draftStream.clear).not.toHaveBeenCalled();
|
||||
expect(draftStream.stop).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("ignores transient shorter partial prefixes to avoid preview punctuation flicker", async () => {
|
||||
const draftStream = createDraftStream(999);
|
||||
createTelegramDraftStream.mockReturnValue(draftStream);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
|
||||
async ({ dispatcherOptions, replyOptions }) => {
|
||||
await replyOptions?.onPartialReply?.({ text: "Sure." });
|
||||
await replyOptions?.onPartialReply?.({ text: "Sure" });
|
||||
await replyOptions?.onPartialReply?.({ text: "Sure." });
|
||||
await dispatcherOptions.deliver({ text: "Sure." }, { kind: "final" });
|
||||
return { queuedFinal: true };
|
||||
},
|
||||
);
|
||||
deliverReplies.mockResolvedValue({ delivered: true });
|
||||
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" });
|
||||
|
||||
await dispatchWithContext({ context: createContext() });
|
||||
|
||||
expect(draftStream.update).toHaveBeenCalledTimes(1);
|
||||
expect(draftStream.update).toHaveBeenCalledWith("Sure.");
|
||||
expect(editMessageTelegram).toHaveBeenCalledTimes(1);
|
||||
expect(editMessageTelegram).toHaveBeenCalledWith(123, 999, "Sure.", expect.any(Object));
|
||||
});
|
||||
|
||||
it("falls back to normal delivery when preview final is too long to edit", async () => {
|
||||
const draftStream = createDraftStream(999);
|
||||
createTelegramDraftStream.mockReturnValue(draftStream);
|
||||
@@ -308,24 +293,124 @@ describe("dispatchTelegramMessage draft streaming", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("omits replyToMessageId from draft stream when replyToMode is off", async () => {
|
||||
const draftStream = createDraftStream();
|
||||
it("forces new message when new assistant message starts after previous output", async () => {
|
||||
const draftStream = createDraftStream(999);
|
||||
createTelegramDraftStream.mockReturnValue(draftStream);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
|
||||
await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" });
|
||||
return { queuedFinal: true };
|
||||
});
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
|
||||
async ({ dispatcherOptions, replyOptions }) => {
|
||||
// First assistant message: partial text
|
||||
await replyOptions?.onPartialReply?.({ text: "First response" });
|
||||
// New assistant message starts (e.g., after tool call)
|
||||
await replyOptions?.onAssistantMessageStart?.();
|
||||
// Second assistant message: new text
|
||||
await replyOptions?.onPartialReply?.({ text: "After tool call" });
|
||||
await dispatcherOptions.deliver({ text: "After tool call" }, { kind: "final" });
|
||||
return { queuedFinal: true };
|
||||
},
|
||||
);
|
||||
deliverReplies.mockResolvedValue({ delivered: true });
|
||||
|
||||
await dispatchWithContext({
|
||||
context: createContext(),
|
||||
replyToMode: "off",
|
||||
});
|
||||
await dispatchWithContext({ context: createContext(), streamMode: "block" });
|
||||
|
||||
expect(createTelegramDraftStream).toHaveBeenCalledWith(
|
||||
// Should force new message when assistant message starts after previous output
|
||||
expect(draftStream.forceNewMessage).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not force new message on first assistant message start", async () => {
|
||||
const draftStream = createDraftStream(999);
|
||||
createTelegramDraftStream.mockReturnValue(draftStream);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
|
||||
async ({ dispatcherOptions, replyOptions }) => {
|
||||
// First assistant message starts (no previous output)
|
||||
await replyOptions?.onAssistantMessageStart?.();
|
||||
// Partial updates
|
||||
await replyOptions?.onPartialReply?.({ text: "Hello" });
|
||||
await replyOptions?.onPartialReply?.({ text: "Hello world" });
|
||||
await dispatcherOptions.deliver({ text: "Hello world" }, { kind: "final" });
|
||||
return { queuedFinal: true };
|
||||
},
|
||||
);
|
||||
deliverReplies.mockResolvedValue({ delivered: true });
|
||||
|
||||
await dispatchWithContext({ context: createContext(), streamMode: "block" });
|
||||
|
||||
// First message start shouldn't trigger forceNewMessage (no previous output)
|
||||
expect(draftStream.forceNewMessage).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("forces new message when reasoning ends after previous output", async () => {
|
||||
const draftStream = createDraftStream(999);
|
||||
createTelegramDraftStream.mockReturnValue(draftStream);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
|
||||
async ({ dispatcherOptions, replyOptions }) => {
|
||||
// First partial: text before thinking
|
||||
await replyOptions?.onPartialReply?.({ text: "Let me check" });
|
||||
// Reasoning stream (thinking block)
|
||||
await replyOptions?.onReasoningStream?.({ text: "Analyzing..." });
|
||||
// Reasoning ends
|
||||
await replyOptions?.onReasoningEnd?.();
|
||||
// Second partial: text after thinking
|
||||
await replyOptions?.onPartialReply?.({ text: "Here's the answer" });
|
||||
await dispatcherOptions.deliver({ text: "Here's the answer" }, { kind: "final" });
|
||||
return { queuedFinal: true };
|
||||
},
|
||||
);
|
||||
deliverReplies.mockResolvedValue({ delivered: true });
|
||||
|
||||
await dispatchWithContext({ context: createContext(), streamMode: "block" });
|
||||
|
||||
// Should force new message when reasoning ends
|
||||
expect(draftStream.forceNewMessage).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not force new message on reasoning end without previous output", async () => {
|
||||
const draftStream = createDraftStream(999);
|
||||
createTelegramDraftStream.mockReturnValue(draftStream);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
|
||||
async ({ dispatcherOptions, replyOptions }) => {
|
||||
// Reasoning starts immediately (no previous text output)
|
||||
await replyOptions?.onReasoningStream?.({ text: "Thinking..." });
|
||||
// Reasoning ends
|
||||
await replyOptions?.onReasoningEnd?.();
|
||||
// First actual text output
|
||||
await replyOptions?.onPartialReply?.({ text: "Here's my answer" });
|
||||
await dispatcherOptions.deliver({ text: "Here's my answer" }, { kind: "final" });
|
||||
return { queuedFinal: true };
|
||||
},
|
||||
);
|
||||
deliverReplies.mockResolvedValue({ delivered: true });
|
||||
|
||||
await dispatchWithContext({ context: createContext(), streamMode: "block" });
|
||||
|
||||
// No previous text output, so no forceNewMessage needed
|
||||
expect(draftStream.forceNewMessage).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not edit preview message when final payload is an error", async () => {
|
||||
const draftStream = createDraftStream(999);
|
||||
createTelegramDraftStream.mockReturnValue(draftStream);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
|
||||
async ({ dispatcherOptions, replyOptions }) => {
|
||||
// Partial text output
|
||||
await replyOptions?.onPartialReply?.({ text: "Let me check that file" });
|
||||
// Error payload should not edit the preview message
|
||||
await dispatcherOptions.deliver(
|
||||
{ text: "⚠️ 🛠️ Exec: cat /nonexistent failed: No such file", isError: true },
|
||||
{ kind: "final" },
|
||||
);
|
||||
return { queuedFinal: true };
|
||||
},
|
||||
);
|
||||
deliverReplies.mockResolvedValue({ delivered: true });
|
||||
|
||||
await dispatchWithContext({ context: createContext(), streamMode: "block" });
|
||||
|
||||
// Should NOT edit preview message (which would overwrite the partial text)
|
||||
expect(editMessageTelegram).not.toHaveBeenCalled();
|
||||
// Should deliver via normal path as a new message
|
||||
expect(deliverReplies).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
chatId: 123,
|
||||
replyToMessageId: undefined,
|
||||
replies: [expect.objectContaining({ text: expect.stringContaining("⚠️") })],
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
@@ -112,6 +112,7 @@ export const dispatchTelegramMessage = async ({
|
||||
const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId);
|
||||
let lastPartialText = "";
|
||||
let draftText = "";
|
||||
let hasStreamedMessage = false;
|
||||
const updateDraftFromPartial = (text?: string) => {
|
||||
if (!draftStream || !text) {
|
||||
return;
|
||||
@@ -119,6 +120,8 @@ export const dispatchTelegramMessage = async ({
|
||||
if (text === lastPartialText) {
|
||||
return;
|
||||
}
|
||||
// Mark that we've received streaming content (for forceNewMessage decision).
|
||||
hasStreamedMessage = true;
|
||||
if (streamMode === "partial") {
|
||||
// Some providers briefly emit a shorter prefix snapshot (for example
|
||||
// "Sure." -> "Sure" -> "Sure."). Keep the longer preview to avoid
|
||||
@@ -295,15 +298,25 @@ export const dispatchTelegramMessage = async ({
|
||||
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
|
||||
const previewMessageId = draftStream?.messageId();
|
||||
const finalText = payload.text;
|
||||
const currentPreviewText = streamMode === "block" ? draftText : lastPartialText;
|
||||
const previewButtons = (
|
||||
payload.channelData?.telegram as
|
||||
| { buttons?: Array<Array<{ text: string; callback_data: string }>> }
|
||||
| undefined
|
||||
)?.buttons;
|
||||
let draftStoppedForPreviewEdit = false;
|
||||
// Skip preview edit for error payloads to avoid overwriting previous content
|
||||
const canFinalizeViaPreviewEdit =
|
||||
!finalizedViaPreviewMessage &&
|
||||
!hasMedia &&
|
||||
typeof finalText === "string" &&
|
||||
finalText.length > 0 &&
|
||||
typeof previewMessageId === "number" &&
|
||||
finalText.length <= draftMaxChars;
|
||||
finalText.length <= draftMaxChars &&
|
||||
!payload.isError;
|
||||
if (canFinalizeViaPreviewEdit) {
|
||||
draftStream?.stop();
|
||||
const currentPreviewText = streamMode === "block" ? draftText : lastPartialText;
|
||||
draftStoppedForPreviewEdit = true;
|
||||
if (
|
||||
currentPreviewText &&
|
||||
currentPreviewText.startsWith(finalText) &&
|
||||
@@ -313,11 +326,6 @@ export const dispatchTelegramMessage = async ({
|
||||
// can appear transiently in some provider streams.
|
||||
return;
|
||||
}
|
||||
const previewButtons = (
|
||||
payload.channelData?.telegram as
|
||||
| { buttons?: Array<Array<{ text: string; callback_data: string }>> }
|
||||
| undefined
|
||||
)?.buttons;
|
||||
try {
|
||||
await editMessageTelegram(chatId, previewMessageId, finalText, {
|
||||
api: bot.api,
|
||||
@@ -335,12 +343,19 @@ export const dispatchTelegramMessage = async ({
|
||||
);
|
||||
}
|
||||
}
|
||||
if (payload.text && payload.text.length > draftMaxChars) {
|
||||
if (
|
||||
!hasMedia &&
|
||||
!payload.isError &&
|
||||
typeof finalText === "string" &&
|
||||
finalText.length > draftMaxChars
|
||||
) {
|
||||
logVerbose(
|
||||
`telegram: preview final too long for edit (${payload.text.length} > ${draftMaxChars}); falling back to standard send`,
|
||||
`telegram: preview final too long for edit (${finalText.length} > ${draftMaxChars}); falling back to standard send`,
|
||||
);
|
||||
}
|
||||
draftStream?.stop();
|
||||
if (!draftStoppedForPreviewEdit) {
|
||||
draftStream?.stop();
|
||||
}
|
||||
}
|
||||
const result = await deliverReplies({
|
||||
...deliveryBaseOptions,
|
||||
@@ -375,6 +390,34 @@ export const dispatchTelegramMessage = async ({
|
||||
skillFilter,
|
||||
disableBlockStreaming,
|
||||
onPartialReply: draftStream ? (payload) => updateDraftFromPartial(payload.text) : undefined,
|
||||
onAssistantMessageStart: draftStream
|
||||
? () => {
|
||||
// When a new assistant message starts (e.g., after tool call),
|
||||
// force a new Telegram message if we have previous content.
|
||||
// Only force once per response to avoid excessive splitting.
|
||||
logVerbose(
|
||||
`telegram: onAssistantMessageStart called, hasStreamedMessage=${hasStreamedMessage}`,
|
||||
);
|
||||
if (hasStreamedMessage) {
|
||||
logVerbose(`telegram: calling forceNewMessage()`);
|
||||
draftStream.forceNewMessage();
|
||||
}
|
||||
lastPartialText = "";
|
||||
draftText = "";
|
||||
draftChunker?.reset();
|
||||
}
|
||||
: undefined,
|
||||
onReasoningEnd: draftStream
|
||||
? () => {
|
||||
// When a thinking block ends, force a new Telegram message for the next text output.
|
||||
if (hasStreamedMessage) {
|
||||
draftStream.forceNewMessage();
|
||||
lastPartialText = "";
|
||||
draftText = "";
|
||||
draftChunker?.reset();
|
||||
}
|
||||
}
|
||||
: undefined,
|
||||
onModelSelected,
|
||||
},
|
||||
}));
|
||||
|
||||
@@ -114,51 +114,12 @@ describe("createTelegramDraftStream", () => {
|
||||
expect(api.deleteMessage).toHaveBeenCalledWith(123, 17);
|
||||
});
|
||||
|
||||
it("includes reply_to_message_id in initial sendMessage when replyToMessageId is set", async () => {
|
||||
it("creates new message after forceNewMessage is called", async () => {
|
||||
const api = {
|
||||
sendMessage: vi.fn().mockResolvedValue({ message_id: 42 }),
|
||||
editMessageText: vi.fn().mockResolvedValue(true),
|
||||
deleteMessage: vi.fn().mockResolvedValue(true),
|
||||
};
|
||||
const stream = createTelegramDraftStream({
|
||||
// oxlint-disable-next-line typescript/no-explicit-any
|
||||
api: api as any,
|
||||
chatId: 123,
|
||||
replyToMessageId: 999,
|
||||
});
|
||||
|
||||
stream.update("Hello");
|
||||
await vi.waitFor(() =>
|
||||
expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", { reply_to_message_id: 999 }),
|
||||
);
|
||||
});
|
||||
|
||||
it("includes both reply_to_message_id and message_thread_id when both are set", async () => {
|
||||
const api = {
|
||||
sendMessage: vi.fn().mockResolvedValue({ message_id: 42 }),
|
||||
editMessageText: vi.fn().mockResolvedValue(true),
|
||||
deleteMessage: vi.fn().mockResolvedValue(true),
|
||||
};
|
||||
const stream = createTelegramDraftStream({
|
||||
// oxlint-disable-next-line typescript/no-explicit-any
|
||||
api: api as any,
|
||||
chatId: 123,
|
||||
thread: { id: 99, scope: "forum" },
|
||||
replyToMessageId: 555,
|
||||
});
|
||||
|
||||
stream.update("Hello");
|
||||
await vi.waitFor(() =>
|
||||
expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", {
|
||||
message_thread_id: 99,
|
||||
reply_to_message_id: 555,
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("passes undefined params when neither thread nor replyToMessageId is set", async () => {
|
||||
const api = {
|
||||
sendMessage: vi.fn().mockResolvedValue({ message_id: 42 }),
|
||||
sendMessage: vi
|
||||
.fn()
|
||||
.mockResolvedValueOnce({ message_id: 17 })
|
||||
.mockResolvedValueOnce({ message_id: 42 }),
|
||||
editMessageText: vi.fn().mockResolvedValue(true),
|
||||
deleteMessage: vi.fn().mockResolvedValue(true),
|
||||
};
|
||||
@@ -168,27 +129,23 @@ describe("createTelegramDraftStream", () => {
|
||||
chatId: 123,
|
||||
});
|
||||
|
||||
// First message
|
||||
stream.update("Hello");
|
||||
await vi.waitFor(() => expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", undefined));
|
||||
});
|
||||
await stream.flush();
|
||||
expect(api.sendMessage).toHaveBeenCalledTimes(1);
|
||||
|
||||
it("includes reply_to_message_id even when thread resolves to general topic", async () => {
|
||||
const api = {
|
||||
sendMessage: vi.fn().mockResolvedValue({ message_id: 42 }),
|
||||
editMessageText: vi.fn().mockResolvedValue(true),
|
||||
deleteMessage: vi.fn().mockResolvedValue(true),
|
||||
};
|
||||
const stream = createTelegramDraftStream({
|
||||
// oxlint-disable-next-line typescript/no-explicit-any
|
||||
api: api as any,
|
||||
chatId: 123,
|
||||
thread: { id: 1, scope: "forum" },
|
||||
replyToMessageId: 888,
|
||||
});
|
||||
// Normal edit (same message)
|
||||
stream.update("Hello edited");
|
||||
await stream.flush();
|
||||
expect(api.editMessageText).toHaveBeenCalledWith(123, 17, "Hello edited");
|
||||
|
||||
stream.update("Hello");
|
||||
await vi.waitFor(() =>
|
||||
expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", { reply_to_message_id: 888 }),
|
||||
);
|
||||
// Force new message (e.g. after thinking block ends)
|
||||
stream.forceNewMessage();
|
||||
stream.update("After thinking");
|
||||
await stream.flush();
|
||||
|
||||
// Should have sent a second new message, not edited the first
|
||||
expect(api.sendMessage).toHaveBeenCalledTimes(2);
|
||||
expect(api.sendMessage).toHaveBeenLastCalledWith(123, "After thinking", undefined);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -10,6 +10,8 @@ export type TelegramDraftStream = {
|
||||
messageId: () => number | undefined;
|
||||
clear: () => Promise<void>;
|
||||
stop: () => void;
|
||||
/** Reset internal state so the next update creates a new message instead of editing. */
|
||||
forceNewMessage: () => void;
|
||||
};
|
||||
|
||||
export function createTelegramDraftStream(params: {
|
||||
@@ -174,6 +176,12 @@ export function createTelegramDraftStream(params: {
|
||||
}
|
||||
};
|
||||
|
||||
const forceNewMessage = () => {
|
||||
streamMessageId = undefined;
|
||||
lastSentText = "";
|
||||
pendingText = "";
|
||||
};
|
||||
|
||||
params.log?.(`telegram stream preview ready (maxChars=${maxChars}, throttleMs=${throttleMs})`);
|
||||
|
||||
return {
|
||||
@@ -182,5 +190,6 @@ export function createTelegramDraftStream(params: {
|
||||
messageId: () => streamMessageId,
|
||||
clear,
|
||||
stop,
|
||||
forceNewMessage,
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user