mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-08 17:38:27 +00:00
Merged via /review-pr -> /prepare-pr -> /merge-pr.
Prepared head SHA: cfd4181a23
Co-authored-by: yinghaosang <261132136+yinghaosang@users.noreply.github.com>
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Reviewed-by: @obviyus
This commit is contained in:
@@ -49,6 +49,7 @@ Docs: https://docs.openclaw.ai
|
|||||||
- Telegram: replace inbound `<media:audio>` placeholder with successful preflight voice transcript in message body context, preventing placeholder-only prompt bodies for mention-gated voice messages. (#16789) Thanks @Limitless2023.
|
- Telegram: replace inbound `<media:audio>` placeholder with successful preflight voice transcript in message body context, preventing placeholder-only prompt bodies for mention-gated voice messages. (#16789) Thanks @Limitless2023.
|
||||||
- Telegram: retry inbound media `getFile` calls (3 attempts with backoff) and gracefully fall back to placeholder-only processing when retries fail, preventing dropped voice/media messages on transient Telegram network errors. (#16154) Thanks @yinghaosang.
|
- Telegram: retry inbound media `getFile` calls (3 attempts with backoff) and gracefully fall back to placeholder-only processing when retries fail, preventing dropped voice/media messages on transient Telegram network errors. (#16154) Thanks @yinghaosang.
|
||||||
- Telegram: finalize streaming preview replies in place instead of sending a second final message, preventing duplicate Telegram assistant outputs at stream completion. (#17218) Thanks @obviyus.
|
- Telegram: finalize streaming preview replies in place instead of sending a second final message, preventing duplicate Telegram assistant outputs at stream completion. (#17218) Thanks @obviyus.
|
||||||
|
- Telegram: keep draft-stream preview replies attached to the user message for `replyToMode: "all"` in groups and DMs, preserving threaded reply context from preview through finalization. (#17880) Thanks @yinghaosang.
|
||||||
- Telegram: disable block streaming when `channels.telegram.streamMode` is `off`, preventing newline/content-block replies from splitting into multiple messages. (#17679) Thanks @saivarunk.
|
- Telegram: disable block streaming when `channels.telegram.streamMode` is `off`, preventing newline/content-block replies from splitting into multiple messages. (#17679) Thanks @saivarunk.
|
||||||
- Telegram: route non-abort slash commands on the normal chat/topic sequential lane while keeping true abort requests (`/stop`, `stop`) on the control lane, preventing command/reply race conditions from control-lane bypass. (#17899) Thanks @obviyus.
|
- Telegram: route non-abort slash commands on the normal chat/topic sequential lane while keeping true abort requests (`/stop`, `stop`) on the control lane, preventing command/reply race conditions from control-lane bypass. (#17899) Thanks @obviyus.
|
||||||
- Discord: preserve channel session continuity when runtime payloads omit `message.channelId` by falling back to event/raw `channel_id` values for routing/session keys, so same-channel messages keep history across turns/restarts. Also align diagnostics so active Discord runs no longer appear as `sessionKey=unknown`. (#17622) Thanks @shakkernerd.
|
- Discord: preserve channel session continuity when runtime payloads omit `message.channelId` by falling back to event/raw `channel_id` values for routing/session keys, so same-channel messages keep history across turns/restarts. Also align diagnostics so active Discord runs no longer appear as `sessionKey=unknown`. (#17622) Thanks @shakkernerd.
|
||||||
|
|||||||
@@ -114,13 +114,14 @@ describe("dispatchTelegramMessage draft streaming", () => {
|
|||||||
context: TelegramMessageContext;
|
context: TelegramMessageContext;
|
||||||
telegramCfg?: Parameters<typeof dispatchTelegramMessage>[0]["telegramCfg"];
|
telegramCfg?: Parameters<typeof dispatchTelegramMessage>[0]["telegramCfg"];
|
||||||
streamMode?: Parameters<typeof dispatchTelegramMessage>[0]["streamMode"];
|
streamMode?: Parameters<typeof dispatchTelegramMessage>[0]["streamMode"];
|
||||||
|
replyToMode?: Parameters<typeof dispatchTelegramMessage>[0]["replyToMode"];
|
||||||
}) {
|
}) {
|
||||||
await dispatchTelegramMessage({
|
await dispatchTelegramMessage({
|
||||||
context: params.context,
|
context: params.context,
|
||||||
bot: createBot(),
|
bot: createBot(),
|
||||||
cfg: {},
|
cfg: {},
|
||||||
runtime: createRuntime(),
|
runtime: createRuntime(),
|
||||||
replyToMode: "first",
|
replyToMode: params.replyToMode ?? "first",
|
||||||
streamMode: params.streamMode ?? "partial",
|
streamMode: params.streamMode ?? "partial",
|
||||||
textLimit: 4096,
|
textLimit: 4096,
|
||||||
telegramCfg: params.telegramCfg ?? {},
|
telegramCfg: params.telegramCfg ?? {},
|
||||||
@@ -151,6 +152,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
|
|||||||
expect.objectContaining({
|
expect.objectContaining({
|
||||||
chatId: 123,
|
chatId: 123,
|
||||||
thread: { id: 777, scope: "dm" },
|
thread: { id: 777, scope: "dm" },
|
||||||
|
replyToMessageId: 456,
|
||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
expect(draftStream.update).toHaveBeenCalledWith("Hello");
|
expect(draftStream.update).toHaveBeenCalledWith("Hello");
|
||||||
@@ -215,6 +217,52 @@ describe("dispatchTelegramMessage draft streaming", () => {
|
|||||||
expect(draftStream.stop).toHaveBeenCalled();
|
expect(draftStream.stop).toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("uses only the latest final payload when multiple finals are emitted", async () => {
|
||||||
|
const draftStream = createDraftStream(999);
|
||||||
|
createTelegramDraftStream.mockReturnValue(draftStream);
|
||||||
|
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
|
||||||
|
async ({ dispatcherOptions, replyOptions }) => {
|
||||||
|
await replyOptions?.onPartialReply?.({ text: "Okay." });
|
||||||
|
await dispatcherOptions.deliver({ text: "Ok" }, { kind: "final" });
|
||||||
|
await dispatcherOptions.deliver({ text: "Okay." }, { kind: "final" });
|
||||||
|
return { queuedFinal: true };
|
||||||
|
},
|
||||||
|
);
|
||||||
|
deliverReplies.mockResolvedValue({ delivered: true });
|
||||||
|
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" });
|
||||||
|
|
||||||
|
await dispatchWithContext({ context: createContext() });
|
||||||
|
|
||||||
|
expect(editMessageTelegram).toHaveBeenCalledTimes(1);
|
||||||
|
expect(editMessageTelegram).toHaveBeenCalledWith(123, 999, "Okay.", expect.any(Object));
|
||||||
|
expect(deliverReplies).not.toHaveBeenCalled();
|
||||||
|
expect(draftStream.clear).not.toHaveBeenCalled();
|
||||||
|
expect(draftStream.stop).toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("ignores transient shorter partial prefixes to avoid preview punctuation flicker", async () => {
|
||||||
|
const draftStream = createDraftStream(999);
|
||||||
|
createTelegramDraftStream.mockReturnValue(draftStream);
|
||||||
|
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
|
||||||
|
async ({ dispatcherOptions, replyOptions }) => {
|
||||||
|
await replyOptions?.onPartialReply?.({ text: "Sure." });
|
||||||
|
await replyOptions?.onPartialReply?.({ text: "Sure" });
|
||||||
|
await replyOptions?.onPartialReply?.({ text: "Sure." });
|
||||||
|
await dispatcherOptions.deliver({ text: "Sure." }, { kind: "final" });
|
||||||
|
return { queuedFinal: true };
|
||||||
|
},
|
||||||
|
);
|
||||||
|
deliverReplies.mockResolvedValue({ delivered: true });
|
||||||
|
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" });
|
||||||
|
|
||||||
|
await dispatchWithContext({ context: createContext() });
|
||||||
|
|
||||||
|
expect(draftStream.update).toHaveBeenCalledTimes(1);
|
||||||
|
expect(draftStream.update).toHaveBeenCalledWith("Sure.");
|
||||||
|
expect(editMessageTelegram).toHaveBeenCalledTimes(1);
|
||||||
|
expect(editMessageTelegram).toHaveBeenCalledWith(123, 999, "Sure.", expect.any(Object));
|
||||||
|
});
|
||||||
|
|
||||||
it("falls back to normal delivery when preview final is too long to edit", async () => {
|
it("falls back to normal delivery when preview final is too long to edit", async () => {
|
||||||
const draftStream = createDraftStream(999);
|
const draftStream = createDraftStream(999);
|
||||||
createTelegramDraftStream.mockReturnValue(draftStream);
|
createTelegramDraftStream.mockReturnValue(draftStream);
|
||||||
@@ -259,4 +307,26 @@ describe("dispatchTelegramMessage draft streaming", () => {
|
|||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("omits replyToMessageId from draft stream when replyToMode is off", async () => {
|
||||||
|
const draftStream = createDraftStream();
|
||||||
|
createTelegramDraftStream.mockReturnValue(draftStream);
|
||||||
|
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
|
||||||
|
await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" });
|
||||||
|
return { queuedFinal: true };
|
||||||
|
});
|
||||||
|
deliverReplies.mockResolvedValue({ delivered: true });
|
||||||
|
|
||||||
|
await dispatchWithContext({
|
||||||
|
context: createContext(),
|
||||||
|
replyToMode: "off",
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(createTelegramDraftStream).toHaveBeenCalledWith(
|
||||||
|
expect.objectContaining({
|
||||||
|
chatId: 123,
|
||||||
|
replyToMessageId: undefined,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -91,12 +91,15 @@ export const dispatchTelegramMessage = async ({
|
|||||||
? telegramCfg.blockStreaming
|
? telegramCfg.blockStreaming
|
||||||
: cfg.agents?.defaults?.blockStreamingDefault === "on";
|
: cfg.agents?.defaults?.blockStreamingDefault === "on";
|
||||||
const canStreamDraft = streamMode !== "off" && !accountBlockStreamingEnabled;
|
const canStreamDraft = streamMode !== "off" && !accountBlockStreamingEnabled;
|
||||||
|
const draftReplyToMessageId =
|
||||||
|
replyToMode !== "off" && typeof msg.message_id === "number" ? msg.message_id : undefined;
|
||||||
const draftStream = canStreamDraft
|
const draftStream = canStreamDraft
|
||||||
? createTelegramDraftStream({
|
? createTelegramDraftStream({
|
||||||
api: bot.api,
|
api: bot.api,
|
||||||
chatId,
|
chatId,
|
||||||
maxChars: draftMaxChars,
|
maxChars: draftMaxChars,
|
||||||
thread: threadSpec,
|
thread: threadSpec,
|
||||||
|
replyToMessageId: draftReplyToMessageId,
|
||||||
log: logVerbose,
|
log: logVerbose,
|
||||||
warn: logVerbose,
|
warn: logVerbose,
|
||||||
})
|
})
|
||||||
@@ -117,6 +120,16 @@ export const dispatchTelegramMessage = async ({
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (streamMode === "partial") {
|
if (streamMode === "partial") {
|
||||||
|
// Some providers briefly emit a shorter prefix snapshot (for example
|
||||||
|
// "Sure." -> "Sure" -> "Sure."). Keep the longer preview to avoid
|
||||||
|
// visible punctuation flicker.
|
||||||
|
if (
|
||||||
|
lastPartialText &&
|
||||||
|
lastPartialText.startsWith(text) &&
|
||||||
|
text.length < lastPartialText.length
|
||||||
|
) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
lastPartialText = text;
|
lastPartialText = text;
|
||||||
draftStream.update(text);
|
draftStream.update(text);
|
||||||
return;
|
return;
|
||||||
@@ -281,42 +294,53 @@ export const dispatchTelegramMessage = async ({
|
|||||||
await flushDraft();
|
await flushDraft();
|
||||||
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
|
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
|
||||||
const previewMessageId = draftStream?.messageId();
|
const previewMessageId = draftStream?.messageId();
|
||||||
const previewButtons = (
|
const finalText = payload.text;
|
||||||
payload.channelData?.telegram as
|
const canFinalizeViaPreviewEdit =
|
||||||
| { buttons?: Array<Array<{ text: string; callback_data: string }>> }
|
!hasMedia &&
|
||||||
| undefined
|
typeof finalText === "string" &&
|
||||||
)?.buttons;
|
finalText.length > 0 &&
|
||||||
let draftStoppedForPreviewEdit = false;
|
typeof previewMessageId === "number" &&
|
||||||
if (!hasMedia && payload.text && typeof previewMessageId === "number") {
|
finalText.length <= draftMaxChars;
|
||||||
const canFinalizeViaPreviewEdit = payload.text.length <= draftMaxChars;
|
if (canFinalizeViaPreviewEdit) {
|
||||||
if (canFinalizeViaPreviewEdit) {
|
draftStream?.stop();
|
||||||
draftStream?.stop();
|
const currentPreviewText = streamMode === "block" ? draftText : lastPartialText;
|
||||||
draftStoppedForPreviewEdit = true;
|
if (
|
||||||
try {
|
currentPreviewText &&
|
||||||
await editMessageTelegram(chatId, previewMessageId, payload.text, {
|
currentPreviewText.startsWith(finalText) &&
|
||||||
api: bot.api,
|
finalText.length < currentPreviewText.length
|
||||||
cfg,
|
) {
|
||||||
accountId: route.accountId,
|
// Ignore regressive final edits (e.g., "Okay." -> "Ok"), which
|
||||||
linkPreview: telegramCfg.linkPreview,
|
// can appear transiently in some provider streams.
|
||||||
buttons: previewButtons,
|
return;
|
||||||
});
|
}
|
||||||
finalizedViaPreviewMessage = true;
|
const previewButtons = (
|
||||||
deliveryState.delivered = true;
|
payload.channelData?.telegram as
|
||||||
return;
|
| { buttons?: Array<Array<{ text: string; callback_data: string }>> }
|
||||||
} catch (err) {
|
| undefined
|
||||||
logVerbose(
|
)?.buttons;
|
||||||
`telegram: preview final edit failed; falling back to standard send (${String(err)})`,
|
try {
|
||||||
);
|
await editMessageTelegram(chatId, previewMessageId, finalText, {
|
||||||
}
|
api: bot.api,
|
||||||
} else {
|
cfg,
|
||||||
|
accountId: route.accountId,
|
||||||
|
linkPreview: telegramCfg.linkPreview,
|
||||||
|
buttons: previewButtons,
|
||||||
|
});
|
||||||
|
finalizedViaPreviewMessage = true;
|
||||||
|
deliveryState.delivered = true;
|
||||||
|
return;
|
||||||
|
} catch (err) {
|
||||||
logVerbose(
|
logVerbose(
|
||||||
`telegram: preview final too long for edit (${payload.text.length} > ${draftMaxChars}); falling back to standard send`,
|
`telegram: preview final edit failed; falling back to standard send (${String(err)})`,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!draftStoppedForPreviewEdit) {
|
if (payload.text && payload.text.length > draftMaxChars) {
|
||||||
draftStream?.stop();
|
logVerbose(
|
||||||
|
`telegram: preview final too long for edit (${payload.text.length} > ${draftMaxChars}); falling back to standard send`,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
draftStream?.stop();
|
||||||
}
|
}
|
||||||
const result = await deliverReplies({
|
const result = await deliverReplies({
|
||||||
...deliveryBaseOptions,
|
...deliveryBaseOptions,
|
||||||
|
|||||||
@@ -113,4 +113,82 @@ describe("createTelegramDraftStream", () => {
|
|||||||
|
|
||||||
expect(api.deleteMessage).toHaveBeenCalledWith(123, 17);
|
expect(api.deleteMessage).toHaveBeenCalledWith(123, 17);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("includes reply_to_message_id in initial sendMessage when replyToMessageId is set", async () => {
|
||||||
|
const api = {
|
||||||
|
sendMessage: vi.fn().mockResolvedValue({ message_id: 42 }),
|
||||||
|
editMessageText: vi.fn().mockResolvedValue(true),
|
||||||
|
deleteMessage: vi.fn().mockResolvedValue(true),
|
||||||
|
};
|
||||||
|
const stream = createTelegramDraftStream({
|
||||||
|
// oxlint-disable-next-line typescript/no-explicit-any
|
||||||
|
api: api as any,
|
||||||
|
chatId: 123,
|
||||||
|
replyToMessageId: 999,
|
||||||
|
});
|
||||||
|
|
||||||
|
stream.update("Hello");
|
||||||
|
await vi.waitFor(() =>
|
||||||
|
expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", { reply_to_message_id: 999 }),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("includes both reply_to_message_id and message_thread_id when both are set", async () => {
|
||||||
|
const api = {
|
||||||
|
sendMessage: vi.fn().mockResolvedValue({ message_id: 42 }),
|
||||||
|
editMessageText: vi.fn().mockResolvedValue(true),
|
||||||
|
deleteMessage: vi.fn().mockResolvedValue(true),
|
||||||
|
};
|
||||||
|
const stream = createTelegramDraftStream({
|
||||||
|
// oxlint-disable-next-line typescript/no-explicit-any
|
||||||
|
api: api as any,
|
||||||
|
chatId: 123,
|
||||||
|
thread: { id: 99, scope: "forum" },
|
||||||
|
replyToMessageId: 555,
|
||||||
|
});
|
||||||
|
|
||||||
|
stream.update("Hello");
|
||||||
|
await vi.waitFor(() =>
|
||||||
|
expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", {
|
||||||
|
message_thread_id: 99,
|
||||||
|
reply_to_message_id: 555,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("passes undefined params when neither thread nor replyToMessageId is set", async () => {
|
||||||
|
const api = {
|
||||||
|
sendMessage: vi.fn().mockResolvedValue({ message_id: 42 }),
|
||||||
|
editMessageText: vi.fn().mockResolvedValue(true),
|
||||||
|
deleteMessage: vi.fn().mockResolvedValue(true),
|
||||||
|
};
|
||||||
|
const stream = createTelegramDraftStream({
|
||||||
|
// oxlint-disable-next-line typescript/no-explicit-any
|
||||||
|
api: api as any,
|
||||||
|
chatId: 123,
|
||||||
|
});
|
||||||
|
|
||||||
|
stream.update("Hello");
|
||||||
|
await vi.waitFor(() => expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", undefined));
|
||||||
|
});
|
||||||
|
|
||||||
|
it("includes reply_to_message_id even when thread resolves to general topic", async () => {
|
||||||
|
const api = {
|
||||||
|
sendMessage: vi.fn().mockResolvedValue({ message_id: 42 }),
|
||||||
|
editMessageText: vi.fn().mockResolvedValue(true),
|
||||||
|
deleteMessage: vi.fn().mockResolvedValue(true),
|
||||||
|
};
|
||||||
|
const stream = createTelegramDraftStream({
|
||||||
|
// oxlint-disable-next-line typescript/no-explicit-any
|
||||||
|
api: api as any,
|
||||||
|
chatId: 123,
|
||||||
|
thread: { id: 1, scope: "forum" },
|
||||||
|
replyToMessageId: 888,
|
||||||
|
});
|
||||||
|
|
||||||
|
stream.update("Hello");
|
||||||
|
await vi.waitFor(() =>
|
||||||
|
expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", { reply_to_message_id: 888 }),
|
||||||
|
);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ export function createTelegramDraftStream(params: {
|
|||||||
chatId: number;
|
chatId: number;
|
||||||
maxChars?: number;
|
maxChars?: number;
|
||||||
thread?: TelegramThreadSpec | null;
|
thread?: TelegramThreadSpec | null;
|
||||||
|
replyToMessageId?: number;
|
||||||
throttleMs?: number;
|
throttleMs?: number;
|
||||||
log?: (message: string) => void;
|
log?: (message: string) => void;
|
||||||
warn?: (message: string) => void;
|
warn?: (message: string) => void;
|
||||||
@@ -28,6 +29,10 @@ export function createTelegramDraftStream(params: {
|
|||||||
const throttleMs = Math.max(250, params.throttleMs ?? DEFAULT_THROTTLE_MS);
|
const throttleMs = Math.max(250, params.throttleMs ?? DEFAULT_THROTTLE_MS);
|
||||||
const chatId = params.chatId;
|
const chatId = params.chatId;
|
||||||
const threadParams = buildTelegramThreadParams(params.thread);
|
const threadParams = buildTelegramThreadParams(params.thread);
|
||||||
|
const replyParams =
|
||||||
|
params.replyToMessageId != null
|
||||||
|
? { ...threadParams, reply_to_message_id: params.replyToMessageId }
|
||||||
|
: threadParams;
|
||||||
|
|
||||||
let streamMessageId: number | undefined;
|
let streamMessageId: number | undefined;
|
||||||
let lastSentText = "";
|
let lastSentText = "";
|
||||||
@@ -64,7 +69,7 @@ export function createTelegramDraftStream(params: {
|
|||||||
await params.api.editMessageText(chatId, streamMessageId, trimmed);
|
await params.api.editMessageText(chatId, streamMessageId, trimmed);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
const sent = await params.api.sendMessage(chatId, trimmed, threadParams);
|
const sent = await params.api.sendMessage(chatId, trimmed, replyParams);
|
||||||
const sentMessageId = sent?.message_id;
|
const sentMessageId = sent?.message_id;
|
||||||
if (typeof sentMessageId !== "number" || !Number.isFinite(sentMessageId)) {
|
if (typeof sentMessageId !== "number" || !Number.isFinite(sentMessageId)) {
|
||||||
stopped = true;
|
stopped = true;
|
||||||
|
|||||||
Reference in New Issue
Block a user