mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-07 00:23:31 +00:00
fix(telegram): prevent silent message loss across all streamMode settings
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
import path from "node:path";
|
||||
import type { Bot } from "grammy";
|
||||
import path from "node:path";
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { STATE_DIR } from "../config/paths.js";
|
||||
|
||||
@@ -479,4 +479,194 @@ describe("dispatchTelegramMessage draft streaming", () => {
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("clears preview for error-only finals", async () => {
|
||||
const draftStream = createDraftStream(999);
|
||||
createTelegramDraftStream.mockReturnValue(draftStream);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
|
||||
await dispatcherOptions.deliver({ text: "tool failed", isError: true }, { kind: "final" });
|
||||
await dispatcherOptions.deliver({ text: "another error", isError: true }, { kind: "final" });
|
||||
return { queuedFinal: true };
|
||||
});
|
||||
deliverReplies.mockResolvedValue({ delivered: true });
|
||||
|
||||
await dispatchWithContext({ context: createContext() });
|
||||
|
||||
// Error payloads skip preview finalization — preview must be cleaned up
|
||||
expect(draftStream.clear).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("clears preview after media final delivery", async () => {
|
||||
const draftStream = createDraftStream(999);
|
||||
createTelegramDraftStream.mockReturnValue(draftStream);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
|
||||
await dispatcherOptions.deliver({ mediaUrl: "file:///tmp/a.png" }, { kind: "final" });
|
||||
return { queuedFinal: true };
|
||||
});
|
||||
deliverReplies.mockResolvedValue({ delivered: true });
|
||||
|
||||
await dispatchWithContext({ context: createContext() });
|
||||
|
||||
expect(draftStream.clear).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("clears stale preview when response is NO_REPLY", async () => {
|
||||
const draftStream = createDraftStream(999);
|
||||
createTelegramDraftStream.mockReturnValue(draftStream);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockResolvedValue({
|
||||
queuedFinal: false,
|
||||
});
|
||||
|
||||
await dispatchWithContext({ context: createContext() });
|
||||
|
||||
// Preview contains stale partial text — must be cleaned up
|
||||
expect(draftStream.clear).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("falls back when all finals are skipped and clears preview", async () => {
|
||||
const draftStream = createDraftStream(999);
|
||||
createTelegramDraftStream.mockReturnValue(draftStream);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
|
||||
dispatcherOptions.onSkip?.({ text: "" }, { reason: "no_reply", kind: "final" });
|
||||
return { queuedFinal: false };
|
||||
});
|
||||
deliverReplies.mockResolvedValueOnce({ delivered: true });
|
||||
|
||||
await dispatchWithContext({ context: createContext() });
|
||||
|
||||
expect(deliverReplies).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
replies: [
|
||||
expect.objectContaining({
|
||||
text: expect.stringContaining("No response"),
|
||||
}),
|
||||
],
|
||||
}),
|
||||
);
|
||||
expect(draftStream.clear).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("sends fallback and clears preview when deliver throws (dispatcher swallows error)", async () => {
|
||||
const draftStream = createDraftStream();
|
||||
createTelegramDraftStream.mockReturnValue(draftStream);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
|
||||
try {
|
||||
await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" });
|
||||
} catch (err) {
|
||||
dispatcherOptions.onError(err, { kind: "final" });
|
||||
}
|
||||
return { queuedFinal: false };
|
||||
});
|
||||
deliverReplies
|
||||
.mockRejectedValueOnce(new Error("network down"))
|
||||
.mockResolvedValueOnce({ delivered: true });
|
||||
|
||||
await expect(dispatchWithContext({ context: createContext() })).resolves.toBeUndefined();
|
||||
// Fallback should be sent because failedDeliveries > 0
|
||||
expect(deliverReplies).toHaveBeenCalledTimes(2);
|
||||
expect(deliverReplies).toHaveBeenLastCalledWith(
|
||||
expect.objectContaining({
|
||||
replies: [
|
||||
expect.objectContaining({
|
||||
text: expect.stringContaining("No response"),
|
||||
}),
|
||||
],
|
||||
}),
|
||||
);
|
||||
expect(draftStream.clear).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("sends fallback in off mode when deliver throws", async () => {
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
|
||||
try {
|
||||
await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" });
|
||||
} catch (err) {
|
||||
dispatcherOptions.onError(err, { kind: "final" });
|
||||
}
|
||||
return { queuedFinal: false };
|
||||
});
|
||||
deliverReplies
|
||||
.mockRejectedValueOnce(new Error("403 bot blocked"))
|
||||
.mockResolvedValueOnce({ delivered: true });
|
||||
|
||||
await dispatchWithContext({ context: createContext(), streamMode: "off" });
|
||||
|
||||
expect(createTelegramDraftStream).not.toHaveBeenCalled();
|
||||
expect(deliverReplies).toHaveBeenCalledTimes(2);
|
||||
expect(deliverReplies).toHaveBeenLastCalledWith(
|
||||
expect.objectContaining({
|
||||
replies: [
|
||||
expect.objectContaining({
|
||||
text: expect.stringContaining("No response"),
|
||||
}),
|
||||
],
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("handles error block + response final — error delivered, response finalizes preview", async () => {
|
||||
const draftStream = createDraftStream(999);
|
||||
createTelegramDraftStream.mockReturnValue(draftStream);
|
||||
editMessageTelegram.mockResolvedValue({ ok: true });
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
|
||||
async ({ dispatcherOptions, replyOptions }) => {
|
||||
replyOptions?.onPartialReply?.({ text: "Processing..." });
|
||||
await dispatcherOptions.deliver(
|
||||
{ text: "⚠️ exec failed", isError: true },
|
||||
{ kind: "block" },
|
||||
);
|
||||
await dispatcherOptions.deliver(
|
||||
{ text: "The command timed out. Here's what I found..." },
|
||||
{ kind: "final" },
|
||||
);
|
||||
return { queuedFinal: true };
|
||||
},
|
||||
);
|
||||
deliverReplies.mockResolvedValue({ delivered: true });
|
||||
|
||||
await dispatchWithContext({ context: createContext() });
|
||||
|
||||
// Block error went through deliverReplies
|
||||
expect(deliverReplies).toHaveBeenCalledTimes(1);
|
||||
// Final was finalized via preview edit
|
||||
expect(editMessageTelegram).toHaveBeenCalledWith(
|
||||
123,
|
||||
999,
|
||||
"The command timed out. Here's what I found...",
|
||||
expect.any(Object),
|
||||
);
|
||||
expect(draftStream.clear).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("supports concurrent dispatches with independent previews", async () => {
|
||||
const draftA = createDraftStream(11);
|
||||
const draftB = createDraftStream(22);
|
||||
createTelegramDraftStream.mockReturnValueOnce(draftA).mockReturnValueOnce(draftB);
|
||||
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
|
||||
async ({ dispatcherOptions, replyOptions }) => {
|
||||
await replyOptions?.onPartialReply?.({ text: "partial" });
|
||||
await dispatcherOptions.deliver({ mediaUrl: "file:///tmp/a.png" }, { kind: "final" });
|
||||
return { queuedFinal: true };
|
||||
},
|
||||
);
|
||||
deliverReplies.mockResolvedValue({ delivered: true });
|
||||
|
||||
await Promise.all([
|
||||
dispatchWithContext({
|
||||
context: createContext({
|
||||
chatId: 1,
|
||||
msg: { chat: { id: 1, type: "private" }, message_id: 1 } as never,
|
||||
}),
|
||||
}),
|
||||
dispatchWithContext({
|
||||
context: createContext({
|
||||
chatId: 2,
|
||||
msg: { chat: { id: 2, type: "private" }, message_id: 2 } as never,
|
||||
}),
|
||||
}),
|
||||
]);
|
||||
|
||||
expect(draftA.clear).toHaveBeenCalledTimes(1);
|
||||
expect(draftB.clear).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,4 +1,10 @@
|
||||
import type { Bot } from "grammy";
|
||||
import type { OpenClawConfig, ReplyToMode, TelegramAccountConfig } from "../config/types.js";
|
||||
import type { RuntimeEnv } from "../runtime.js";
|
||||
import type { TelegramMessageContext } from "./bot-message-context.js";
|
||||
import type { TelegramBotOptions } from "./bot.js";
|
||||
import type { TelegramStreamMode } from "./bot/types.js";
|
||||
import type { TelegramInlineButtons } from "./button-types.js";
|
||||
import { resolveAgentDir } from "../agents/agent-scope.js";
|
||||
import {
|
||||
findModelInCatalog,
|
||||
@@ -15,15 +21,9 @@ import { logAckFailure, logTypingFailure } from "../channels/logging.js";
|
||||
import { createReplyPrefixOptions } from "../channels/reply-prefix.js";
|
||||
import { createTypingCallbacks } from "../channels/typing.js";
|
||||
import { resolveMarkdownTableMode } from "../config/markdown-tables.js";
|
||||
import type { OpenClawConfig, ReplyToMode, TelegramAccountConfig } from "../config/types.js";
|
||||
import { danger, logVerbose } from "../globals.js";
|
||||
import { getAgentScopedMediaLocalRoots } from "../media/local-roots.js";
|
||||
import type { RuntimeEnv } from "../runtime.js";
|
||||
import type { TelegramMessageContext } from "./bot-message-context.js";
|
||||
import type { TelegramBotOptions } from "./bot.js";
|
||||
import { deliverReplies } from "./bot/delivery.js";
|
||||
import type { TelegramStreamMode } from "./bot/types.js";
|
||||
import type { TelegramInlineButtons } from "./button-types.js";
|
||||
import { resolveTelegramDraftStreamingChunking } from "./draft-chunking.js";
|
||||
import { createTelegramDraftStream } from "./draft-stream.js";
|
||||
import { editMessageTelegram } from "./send.js";
|
||||
@@ -189,11 +189,13 @@ export const dispatchTelegramMessage = async ({
|
||||
};
|
||||
|
||||
const disableBlockStreaming =
|
||||
typeof telegramCfg.blockStreaming === "boolean"
|
||||
? !telegramCfg.blockStreaming
|
||||
: draftStream || streamMode === "off"
|
||||
? true
|
||||
: undefined;
|
||||
streamMode === "off"
|
||||
? true // off mode must always disable block streaming
|
||||
: typeof telegramCfg.blockStreaming === "boolean"
|
||||
? !telegramCfg.blockStreaming
|
||||
: draftStream
|
||||
? true
|
||||
: undefined;
|
||||
|
||||
const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({
|
||||
cfg,
|
||||
@@ -269,8 +271,26 @@ export const dispatchTelegramMessage = async ({
|
||||
const deliveryState = {
|
||||
delivered: false,
|
||||
skippedNonSilent: 0,
|
||||
failedDeliveries: 0,
|
||||
};
|
||||
let finalizedViaPreviewMessage = false;
|
||||
|
||||
/**
|
||||
* Clean up the draft preview message. The preview must be removed in every
|
||||
* case EXCEPT when it was successfully finalized as the actual response via
|
||||
* an in-place edit (`finalizedViaPreviewMessage === true`).
|
||||
*/
|
||||
const clearDraftPreviewIfNeeded = async () => {
|
||||
if (finalizedViaPreviewMessage) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await draftStream?.clear();
|
||||
} catch (err) {
|
||||
logVerbose(`telegram: draft preview cleanup failed: ${String(err)}`);
|
||||
}
|
||||
};
|
||||
|
||||
const clearGroupHistory = () => {
|
||||
if (isGroup && historyKey) {
|
||||
clearHistoryEntriesIfEnabled({ historyMap: groupHistories, historyKey, limit: historyLimit });
|
||||
@@ -340,6 +360,9 @@ export const dispatchTelegramMessage = async ({
|
||||
});
|
||||
finalizedViaPreviewMessage = true;
|
||||
deliveryState.delivered = true;
|
||||
logVerbose(
|
||||
`telegram: finalized response via preview edit (messageId=${previewMessageId})`,
|
||||
);
|
||||
return;
|
||||
} catch (err) {
|
||||
logVerbose(
|
||||
@@ -382,6 +405,9 @@ export const dispatchTelegramMessage = async ({
|
||||
});
|
||||
finalizedViaPreviewMessage = true;
|
||||
deliveryState.delivered = true;
|
||||
logVerbose(
|
||||
`telegram: finalized response via post-stop preview edit (messageId=${messageIdAfterStop})`,
|
||||
);
|
||||
return;
|
||||
} catch (err) {
|
||||
logVerbose(
|
||||
@@ -397,6 +423,13 @@ export const dispatchTelegramMessage = async ({
|
||||
});
|
||||
if (result.delivered) {
|
||||
deliveryState.delivered = true;
|
||||
logVerbose(
|
||||
`telegram: ${info.kind} reply delivered to chat ${chatId}${payload.isError ? " (error payload)" : ""}`,
|
||||
);
|
||||
} else {
|
||||
logVerbose(
|
||||
`telegram: ${info.kind} reply delivery returned not-delivered for chat ${chatId}`,
|
||||
);
|
||||
}
|
||||
},
|
||||
onSkip: (_payload, info) => {
|
||||
@@ -405,6 +438,7 @@ export const dispatchTelegramMessage = async ({
|
||||
}
|
||||
},
|
||||
onError: (err, info) => {
|
||||
deliveryState.failedDeliveries += 1;
|
||||
runtime.error?.(danger(`telegram ${info.kind} reply failed: ${String(err)}`));
|
||||
},
|
||||
onReplyStart: createTypingCallbacks({
|
||||
@@ -454,14 +488,13 @@ export const dispatchTelegramMessage = async ({
|
||||
},
|
||||
}));
|
||||
} finally {
|
||||
// Must stop() first to flush debounced content before clear() wipes state
|
||||
await draftStream?.stop();
|
||||
if (!finalizedViaPreviewMessage) {
|
||||
await draftStream?.clear();
|
||||
}
|
||||
}
|
||||
let sentFallback = false;
|
||||
if (!deliveryState.delivered && deliveryState.skippedNonSilent > 0) {
|
||||
if (
|
||||
!deliveryState.delivered &&
|
||||
(deliveryState.skippedNonSilent > 0 || deliveryState.failedDeliveries > 0)
|
||||
) {
|
||||
const result = await deliverReplies({
|
||||
replies: [{ text: EMPTY_RESPONSE_FALLBACK }],
|
||||
...deliveryBaseOptions,
|
||||
@@ -469,6 +502,8 @@ export const dispatchTelegramMessage = async ({
|
||||
sentFallback = result.delivered;
|
||||
}
|
||||
|
||||
await clearDraftPreviewIfNeeded();
|
||||
|
||||
const hasFinalResponse = queuedFinal || sentFallback;
|
||||
if (!hasFinalResponse) {
|
||||
clearGroupHistory();
|
||||
|
||||
@@ -558,6 +558,7 @@ async function sendTelegramText(
|
||||
...baseParams,
|
||||
}),
|
||||
});
|
||||
runtime.log?.(`telegram sendMessage ok chat=${chatId} message=${res.message_id}`);
|
||||
return res.message_id;
|
||||
} catch (err) {
|
||||
const errText = formatErrorMessage(err);
|
||||
@@ -574,6 +575,7 @@ async function sendTelegramText(
|
||||
...baseParams,
|
||||
}),
|
||||
});
|
||||
runtime.log?.(`telegram sendMessage ok chat=${chatId} message=${res.message_id} (plain)`);
|
||||
return res.message_id;
|
||||
}
|
||||
throw err;
|
||||
|
||||
@@ -127,6 +127,7 @@ export function createTelegramDraftStream(params: {
|
||||
}
|
||||
try {
|
||||
await params.api.deleteMessage(chatId, messageId);
|
||||
params.log?.(`telegram stream preview deleted (chat=${chatId}, message=${messageId})`);
|
||||
} catch (err) {
|
||||
params.warn?.(
|
||||
`telegram stream preview cleanup failed: ${err instanceof Error ? err.message : String(err)}`,
|
||||
|
||||
Reference in New Issue
Block a user