mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-09 12:17:40 +00:00
fix(telegram): skip failed photo downloads in media group instead of dropping entire group (#20598)
Merged via /review-pr -> /prepare-pr -> /merge-pr.
Prepared head SHA: 4a9c5f7af7
Co-authored-by: mcaxtr <7562095+mcaxtr@users.noreply.github.com>
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Reviewed-by: @obviyus
This commit is contained in:
@@ -20,6 +20,7 @@ import { loadSessionStore, resolveStorePath } from "../config/sessions.js";
|
||||
import type { TelegramGroupConfig, TelegramTopicConfig } from "../config/types.js";
|
||||
import { danger, logVerbose, warn } from "../globals.js";
|
||||
import { enqueueSystemEvent } from "../infra/system-events.js";
|
||||
import { MediaFetchError } from "../media/fetch.js";
|
||||
import { readChannelAllowFromStore } from "../pairing/pairing-store.js";
|
||||
import { resolveAgentRoute } from "../routing/resolve-route.js";
|
||||
import { resolveThreadSessionKeys } from "../routing/session-key.js";
|
||||
@@ -61,6 +62,15 @@ import {
|
||||
import { buildInlineKeyboard } from "./send.js";
|
||||
import { wasSentByBot } from "./sent-message-cache.js";
|
||||
|
||||
function isMediaSizeLimitError(err: unknown): boolean {
|
||||
const errMsg = String(err);
|
||||
return errMsg.includes("exceeds") && errMsg.includes("MB limit");
|
||||
}
|
||||
|
||||
function isRecoverableMediaGroupError(err: unknown): boolean {
|
||||
return err instanceof MediaFetchError || isMediaSizeLimitError(err);
|
||||
}
|
||||
|
||||
export const registerTelegramHandlers = ({
|
||||
cfg,
|
||||
accountId,
|
||||
@@ -270,7 +280,18 @@ export const registerTelegramHandlers = ({
|
||||
|
||||
const allMedia: TelegramMediaRef[] = [];
|
||||
for (const { ctx } of entry.messages) {
|
||||
const media = await resolveMedia(ctx, mediaMaxBytes, opts.token, opts.proxyFetch);
|
||||
let media;
|
||||
try {
|
||||
media = await resolveMedia(ctx, mediaMaxBytes, opts.token, opts.proxyFetch);
|
||||
} catch (mediaErr) {
|
||||
if (!isRecoverableMediaGroupError(mediaErr)) {
|
||||
throw mediaErr;
|
||||
}
|
||||
runtime.log?.(
|
||||
warn(`media group: skipping photo that failed to fetch: ${String(mediaErr)}`),
|
||||
);
|
||||
continue;
|
||||
}
|
||||
if (media) {
|
||||
allMedia.push({
|
||||
path: media.path,
|
||||
@@ -663,8 +684,7 @@ export const registerTelegramHandlers = ({
|
||||
try {
|
||||
media = await resolveMedia(ctx, mediaMaxBytes, opts.token, opts.proxyFetch);
|
||||
} catch (mediaErr) {
|
||||
const errMsg = String(mediaErr);
|
||||
if (errMsg.includes("exceeds") && errMsg.includes("MB limit")) {
|
||||
if (isMediaSizeLimitError(mediaErr)) {
|
||||
if (sendOversizeWarning) {
|
||||
const limitMb = Math.round(mediaMaxBytes / (1024 * 1024));
|
||||
await withTelegramApiErrorLogging({
|
||||
@@ -676,7 +696,7 @@ export const registerTelegramHandlers = ({
|
||||
}),
|
||||
}).catch(() => {});
|
||||
}
|
||||
logger.warn({ chatId, error: errMsg }, oversizeLogMessage);
|
||||
logger.warn({ chatId, error: String(mediaErr) }, oversizeLogMessage);
|
||||
return;
|
||||
}
|
||||
throw mediaErr;
|
||||
|
||||
@@ -1760,10 +1760,19 @@ describe("createTelegramBot", () => {
|
||||
await Promise.all([first, second]);
|
||||
expect(replySpy).not.toHaveBeenCalled();
|
||||
|
||||
const flushTimerCall = [...setTimeoutSpy.mock.calls]
|
||||
.toReversed()
|
||||
.find((call) => call[1] === TELEGRAM_TEST_TIMINGS.mediaGroupFlushMs);
|
||||
const flushTimer = flushTimerCall?.[0] as (() => unknown) | undefined;
|
||||
const flushTimerCallIndex = setTimeoutSpy.mock.calls.findLastIndex(
|
||||
(call) => call[1] === TELEGRAM_TEST_TIMINGS.mediaGroupFlushMs,
|
||||
);
|
||||
const flushTimer =
|
||||
flushTimerCallIndex >= 0
|
||||
? (setTimeoutSpy.mock.calls[flushTimerCallIndex]?.[0] as (() => unknown) | undefined)
|
||||
: undefined;
|
||||
// Cancel the real timer so it cannot fire a second time after we manually invoke it.
|
||||
if (flushTimerCallIndex >= 0) {
|
||||
clearTimeout(
|
||||
setTimeoutSpy.mock.results[flushTimerCallIndex]?.value as ReturnType<typeof setTimeout>,
|
||||
);
|
||||
}
|
||||
expect(flushTimer).toBeTypeOf("function");
|
||||
await flushTimer?.();
|
||||
|
||||
@@ -1874,4 +1883,208 @@ describe("createTelegramBot", () => {
|
||||
expect(replySpy).not.toHaveBeenCalled();
|
||||
fetchSpy.mockRestore();
|
||||
});
|
||||
it("processes remaining media group photos when one photo download fails", async () => {
|
||||
onSpy.mockReset();
|
||||
replySpy.mockReset();
|
||||
|
||||
loadConfig.mockReturnValue({
|
||||
channels: {
|
||||
telegram: {
|
||||
groupPolicy: "open",
|
||||
groups: {
|
||||
"-100777111222": {
|
||||
enabled: true,
|
||||
requireMention: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
let fetchCallIndex = 0;
|
||||
const fetchSpy = vi.spyOn(globalThis, "fetch").mockImplementation(async () => {
|
||||
fetchCallIndex++;
|
||||
if (fetchCallIndex === 2) {
|
||||
throw new Error("MediaFetchError: Failed to fetch media");
|
||||
}
|
||||
return new Response(new Uint8Array([0x89, 0x50, 0x4e, 0x47]), {
|
||||
status: 200,
|
||||
headers: { "content-type": "image/png" },
|
||||
});
|
||||
});
|
||||
|
||||
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout");
|
||||
try {
|
||||
createTelegramBot({ token: "tok", testTimings: TELEGRAM_TEST_TIMINGS });
|
||||
const handler = getOnHandler("channel_post") as (
|
||||
ctx: Record<string, unknown>,
|
||||
) => Promise<void>;
|
||||
|
||||
const first = handler({
|
||||
channelPost: {
|
||||
chat: { id: -100777111222, type: "channel", title: "Wake Channel" },
|
||||
message_id: 401,
|
||||
caption: "partial album",
|
||||
date: 1736380800,
|
||||
media_group_id: "partial-album-1",
|
||||
photo: [{ file_id: "p1" }],
|
||||
},
|
||||
me: { username: "openclaw_bot" },
|
||||
getFile: async () => ({ file_path: "photos/p1.jpg" }),
|
||||
});
|
||||
|
||||
const second = handler({
|
||||
channelPost: {
|
||||
chat: { id: -100777111222, type: "channel", title: "Wake Channel" },
|
||||
message_id: 402,
|
||||
date: 1736380801,
|
||||
media_group_id: "partial-album-1",
|
||||
photo: [{ file_id: "p2" }],
|
||||
},
|
||||
me: { username: "openclaw_bot" },
|
||||
getFile: async () => ({ file_path: "photos/p2.jpg" }),
|
||||
});
|
||||
|
||||
await Promise.all([first, second]);
|
||||
expect(replySpy).not.toHaveBeenCalled();
|
||||
|
||||
const flushTimerCallIndex = setTimeoutSpy.mock.calls.findLastIndex(
|
||||
(call) => call[1] === TELEGRAM_TEST_TIMINGS.mediaGroupFlushMs,
|
||||
);
|
||||
const flushTimer =
|
||||
flushTimerCallIndex >= 0
|
||||
? (setTimeoutSpy.mock.calls[flushTimerCallIndex]?.[0] as (() => unknown) | undefined)
|
||||
: undefined;
|
||||
// Cancel the real timer so it cannot fire a second time after we manually invoke it.
|
||||
if (flushTimerCallIndex >= 0) {
|
||||
clearTimeout(
|
||||
setTimeoutSpy.mock.results[flushTimerCallIndex]?.value as ReturnType<typeof setTimeout>,
|
||||
);
|
||||
}
|
||||
expect(flushTimer).toBeTypeOf("function");
|
||||
await flushTimer?.();
|
||||
|
||||
expect(replySpy).toHaveBeenCalledTimes(1);
|
||||
const payload = replySpy.mock.calls[0]?.[0] as { Body?: string; MediaPaths?: string[] };
|
||||
expect(payload.Body).toContain("partial album");
|
||||
expect(payload.MediaPaths).toHaveLength(1);
|
||||
} finally {
|
||||
setTimeoutSpy.mockRestore();
|
||||
fetchSpy.mockRestore();
|
||||
}
|
||||
});
|
||||
it("drops the media group when a non-recoverable media error occurs", async () => {
|
||||
onSpy.mockReset();
|
||||
replySpy.mockReset();
|
||||
|
||||
loadConfig.mockReturnValue({
|
||||
channels: {
|
||||
telegram: {
|
||||
groupPolicy: "open",
|
||||
groups: {
|
||||
"-100777111222": {
|
||||
enabled: true,
|
||||
requireMention: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const fetchSpy = vi.spyOn(globalThis, "fetch").mockImplementation(
|
||||
async () =>
|
||||
new Response(new Uint8Array([0x89, 0x50, 0x4e, 0x47]), {
|
||||
status: 200,
|
||||
headers: { "content-type": "image/png" },
|
||||
}),
|
||||
);
|
||||
|
||||
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout");
|
||||
try {
|
||||
createTelegramBot({ token: "tok", testTimings: TELEGRAM_TEST_TIMINGS });
|
||||
const handler = getOnHandler("channel_post") as (
|
||||
ctx: Record<string, unknown>,
|
||||
) => Promise<void>;
|
||||
|
||||
const first = handler({
|
||||
channelPost: {
|
||||
chat: { id: -100777111222, type: "channel", title: "Wake Channel" },
|
||||
message_id: 501,
|
||||
caption: "fatal album",
|
||||
date: 1736380800,
|
||||
media_group_id: "fatal-album-1",
|
||||
photo: [{ file_id: "p1" }],
|
||||
},
|
||||
me: { username: "openclaw_bot" },
|
||||
getFile: async () => ({ file_path: "photos/p1.jpg" }),
|
||||
});
|
||||
|
||||
const second = handler({
|
||||
channelPost: {
|
||||
chat: { id: -100777111222, type: "channel", title: "Wake Channel" },
|
||||
message_id: 502,
|
||||
date: 1736380801,
|
||||
media_group_id: "fatal-album-1",
|
||||
photo: [{ file_id: "p2" }],
|
||||
},
|
||||
me: { username: "openclaw_bot" },
|
||||
getFile: async () => ({}),
|
||||
});
|
||||
|
||||
await Promise.all([first, second]);
|
||||
expect(replySpy).not.toHaveBeenCalled();
|
||||
|
||||
const flushTimerCallIndex = setTimeoutSpy.mock.calls.findLastIndex(
|
||||
(call) => call[1] === TELEGRAM_TEST_TIMINGS.mediaGroupFlushMs,
|
||||
);
|
||||
const flushTimer =
|
||||
flushTimerCallIndex >= 0
|
||||
? (setTimeoutSpy.mock.calls[flushTimerCallIndex]?.[0] as (() => unknown) | undefined)
|
||||
: undefined;
|
||||
// Cancel the real timer so it cannot fire a second time after we manually invoke it.
|
||||
if (flushTimerCallIndex >= 0) {
|
||||
clearTimeout(
|
||||
setTimeoutSpy.mock.results[flushTimerCallIndex]?.value as ReturnType<typeof setTimeout>,
|
||||
);
|
||||
}
|
||||
expect(flushTimer).toBeTypeOf("function");
|
||||
await flushTimer?.();
|
||||
|
||||
expect(replySpy).not.toHaveBeenCalled();
|
||||
} finally {
|
||||
setTimeoutSpy.mockRestore();
|
||||
fetchSpy.mockRestore();
|
||||
}
|
||||
});
|
||||
it("dedupes duplicate message updates by update_id", async () => {
|
||||
onSpy.mockReset();
|
||||
replySpy.mockReset();
|
||||
|
||||
loadConfig.mockReturnValue({
|
||||
channels: {
|
||||
telegram: { dmPolicy: "open", allowFrom: ["*"] },
|
||||
},
|
||||
});
|
||||
|
||||
createTelegramBot({ token: "tok" });
|
||||
const handler = getOnHandler("message") as (ctx: Record<string, unknown>) => Promise<void>;
|
||||
|
||||
const ctx = {
|
||||
update: { update_id: 111 },
|
||||
message: {
|
||||
chat: { id: 123, type: "private" },
|
||||
from: { id: 456, username: "testuser" },
|
||||
text: "hello",
|
||||
date: 1736380800,
|
||||
message_id: 42,
|
||||
},
|
||||
me: { username: "openclaw_bot" },
|
||||
getFile: async () => ({ download: async () => new Uint8Array() }),
|
||||
};
|
||||
|
||||
await handler(ctx);
|
||||
await handler(ctx);
|
||||
|
||||
expect(replySpy).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user