mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-09 02:08:27 +00:00
* fix(telegram): handle Grammy HttpError network failures (#3815) Grammy wraps fetch errors in an .error property (not .cause). Added .error traversal to collectErrorCandidates in network-errors.ts. Registered scoped unhandled rejection handler in monitorTelegramProvider to catch network errors that escape the polling loop (e.g., from setMyCommands during bot setup). Handler is unregistered when the provider stops. * fix(telegram): address review feedback for Grammy HttpError handling - Gate .error traversal on HttpError name to avoid widening search graph - Use runtime logger instead of console.warn for consistency - Add isGrammyHttpError check to scope unhandled rejection handler - Consolidate isNetworkRelatedError into isRecoverableTelegramNetworkError - Add 'timeout' to recoverable message snippets for full coverage
This commit is contained in:
@@ -6,6 +6,7 @@ import { loadConfig } from "../config/config.js";
|
||||
import { computeBackoff, sleepWithAbort } from "../infra/backoff.js";
|
||||
import { formatErrorMessage } from "../infra/errors.js";
|
||||
import { formatDurationMs } from "../infra/format-duration.js";
|
||||
import { registerUnhandledRejectionHandler } from "../infra/unhandled-rejections.js";
|
||||
import { resolveTelegramAccount } from "./accounts.js";
|
||||
import { resolveTelegramAllowedUpdates } from "./allowed-updates.js";
|
||||
import { createTelegramBot } from "./bot.js";
|
||||
@@ -78,133 +79,137 @@ const isGetUpdatesConflict = (err: unknown) => {
|
||||
return haystack.includes("getupdates");
|
||||
};
|
||||
|
||||
const NETWORK_ERROR_SNIPPETS = [
|
||||
"fetch failed",
|
||||
"network",
|
||||
"timeout",
|
||||
"socket",
|
||||
"econnreset",
|
||||
"econnrefused",
|
||||
"undici",
|
||||
];
|
||||
|
||||
const isNetworkRelatedError = (err: unknown) => {
|
||||
if (!err) {
|
||||
/** Check if error is a Grammy HttpError (used to scope unhandled rejection handling) */
|
||||
const isGrammyHttpError = (err: unknown): boolean => {
|
||||
if (!err || typeof err !== "object") {
|
||||
return false;
|
||||
}
|
||||
const message = formatErrorMessage(err).toLowerCase();
|
||||
if (!message) {
|
||||
return false;
|
||||
}
|
||||
return NETWORK_ERROR_SNIPPETS.some((snippet) => message.includes(snippet));
|
||||
return (err as { name?: string }).name === "HttpError";
|
||||
};
|
||||
|
||||
export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
||||
const cfg = opts.config ?? loadConfig();
|
||||
const account = resolveTelegramAccount({
|
||||
cfg,
|
||||
accountId: opts.accountId,
|
||||
});
|
||||
const token = opts.token?.trim() || account.token;
|
||||
if (!token) {
|
||||
throw new Error(
|
||||
`Telegram bot token missing for account "${account.accountId}" (set channels.telegram.accounts.${account.accountId}.botToken/tokenFile or TELEGRAM_BOT_TOKEN for default).`,
|
||||
);
|
||||
}
|
||||
const log = opts.runtime?.error ?? console.error;
|
||||
|
||||
const proxyFetch =
|
||||
opts.proxyFetch ?? (account.config.proxy ? makeProxyFetch(account.config.proxy) : undefined);
|
||||
|
||||
let lastUpdateId = await readTelegramUpdateOffset({
|
||||
accountId: account.accountId,
|
||||
});
|
||||
const persistUpdateId = async (updateId: number) => {
|
||||
if (lastUpdateId !== null && updateId <= lastUpdateId) {
|
||||
return;
|
||||
// Register handler for Grammy HttpError unhandled rejections.
|
||||
// This catches network errors that escape the polling loop's try-catch
|
||||
// (e.g., from setMyCommands during bot setup).
|
||||
// We gate on isGrammyHttpError to avoid suppressing non-Telegram errors.
|
||||
const unregisterHandler = registerUnhandledRejectionHandler((err) => {
|
||||
if (isGrammyHttpError(err) && isRecoverableTelegramNetworkError(err, { context: "polling" })) {
|
||||
log(`[telegram] Suppressed network error: ${formatErrorMessage(err)}`);
|
||||
return true; // handled - don't crash
|
||||
}
|
||||
lastUpdateId = updateId;
|
||||
try {
|
||||
await writeTelegramUpdateOffset({
|
||||
accountId: account.accountId,
|
||||
updateId,
|
||||
});
|
||||
} catch (err) {
|
||||
(opts.runtime?.error ?? console.error)(
|
||||
`telegram: failed to persist update offset: ${String(err)}`,
|
||||
return false;
|
||||
});
|
||||
|
||||
try {
|
||||
const cfg = opts.config ?? loadConfig();
|
||||
const account = resolveTelegramAccount({
|
||||
cfg,
|
||||
accountId: opts.accountId,
|
||||
});
|
||||
const token = opts.token?.trim() || account.token;
|
||||
if (!token) {
|
||||
throw new Error(
|
||||
`Telegram bot token missing for account "${account.accountId}" (set channels.telegram.accounts.${account.accountId}.botToken/tokenFile or TELEGRAM_BOT_TOKEN for default).`,
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
const bot = createTelegramBot({
|
||||
token,
|
||||
runtime: opts.runtime,
|
||||
proxyFetch,
|
||||
config: cfg,
|
||||
accountId: account.accountId,
|
||||
updateOffset: {
|
||||
lastUpdateId,
|
||||
onUpdateId: persistUpdateId,
|
||||
},
|
||||
});
|
||||
const proxyFetch =
|
||||
opts.proxyFetch ?? (account.config.proxy ? makeProxyFetch(account.config.proxy) : undefined);
|
||||
|
||||
if (opts.useWebhook) {
|
||||
await startTelegramWebhook({
|
||||
token,
|
||||
let lastUpdateId = await readTelegramUpdateOffset({
|
||||
accountId: account.accountId,
|
||||
config: cfg,
|
||||
path: opts.webhookPath,
|
||||
port: opts.webhookPort,
|
||||
secret: opts.webhookSecret,
|
||||
runtime: opts.runtime as RuntimeEnv,
|
||||
fetch: proxyFetch,
|
||||
abortSignal: opts.abortSignal,
|
||||
publicUrl: opts.webhookUrl,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// Use grammyjs/runner for concurrent update processing
|
||||
let restartAttempts = 0;
|
||||
|
||||
while (!opts.abortSignal?.aborted) {
|
||||
const runner = run(bot, createTelegramRunnerOptions(cfg));
|
||||
const stopOnAbort = () => {
|
||||
if (opts.abortSignal?.aborted) {
|
||||
void runner.stop();
|
||||
const persistUpdateId = async (updateId: number) => {
|
||||
if (lastUpdateId !== null && updateId <= lastUpdateId) {
|
||||
return;
|
||||
}
|
||||
lastUpdateId = updateId;
|
||||
try {
|
||||
await writeTelegramUpdateOffset({
|
||||
accountId: account.accountId,
|
||||
updateId,
|
||||
});
|
||||
} catch (err) {
|
||||
(opts.runtime?.error ?? console.error)(
|
||||
`telegram: failed to persist update offset: ${String(err)}`,
|
||||
);
|
||||
}
|
||||
};
|
||||
opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true });
|
||||
try {
|
||||
// runner.task() returns a promise that resolves when the runner stops
|
||||
await runner.task();
|
||||
|
||||
const bot = createTelegramBot({
|
||||
token,
|
||||
runtime: opts.runtime,
|
||||
proxyFetch,
|
||||
config: cfg,
|
||||
accountId: account.accountId,
|
||||
updateOffset: {
|
||||
lastUpdateId,
|
||||
onUpdateId: persistUpdateId,
|
||||
},
|
||||
});
|
||||
|
||||
if (opts.useWebhook) {
|
||||
await startTelegramWebhook({
|
||||
token,
|
||||
accountId: account.accountId,
|
||||
config: cfg,
|
||||
path: opts.webhookPath,
|
||||
port: opts.webhookPort,
|
||||
secret: opts.webhookSecret,
|
||||
runtime: opts.runtime as RuntimeEnv,
|
||||
fetch: proxyFetch,
|
||||
abortSignal: opts.abortSignal,
|
||||
publicUrl: opts.webhookUrl,
|
||||
});
|
||||
return;
|
||||
} catch (err) {
|
||||
if (opts.abortSignal?.aborted) {
|
||||
throw err;
|
||||
}
|
||||
const isConflict = isGetUpdatesConflict(err);
|
||||
const isRecoverable = isRecoverableTelegramNetworkError(err, { context: "polling" });
|
||||
const isNetworkError = isNetworkRelatedError(err);
|
||||
if (!isConflict && !isRecoverable && !isNetworkError) {
|
||||
throw err;
|
||||
}
|
||||
restartAttempts += 1;
|
||||
const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts);
|
||||
const reason = isConflict ? "getUpdates conflict" : "network error";
|
||||
const errMsg = formatErrorMessage(err);
|
||||
(opts.runtime?.error ?? console.error)(
|
||||
`Telegram ${reason}: ${errMsg}; retrying in ${formatDurationMs(delayMs)}.`,
|
||||
);
|
||||
try {
|
||||
await sleepWithAbort(delayMs, opts.abortSignal);
|
||||
} catch (sleepErr) {
|
||||
if (opts.abortSignal?.aborted) {
|
||||
return;
|
||||
}
|
||||
throw sleepErr;
|
||||
}
|
||||
} finally {
|
||||
opts.abortSignal?.removeEventListener("abort", stopOnAbort);
|
||||
}
|
||||
|
||||
// Use grammyjs/runner for concurrent update processing
|
||||
let restartAttempts = 0;
|
||||
|
||||
while (!opts.abortSignal?.aborted) {
|
||||
const runner = run(bot, createTelegramRunnerOptions(cfg));
|
||||
const stopOnAbort = () => {
|
||||
if (opts.abortSignal?.aborted) {
|
||||
void runner.stop();
|
||||
}
|
||||
};
|
||||
opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true });
|
||||
try {
|
||||
// runner.task() returns a promise that resolves when the runner stops
|
||||
await runner.task();
|
||||
return;
|
||||
} catch (err) {
|
||||
if (opts.abortSignal?.aborted) {
|
||||
throw err;
|
||||
}
|
||||
const isConflict = isGetUpdatesConflict(err);
|
||||
const isRecoverable = isRecoverableTelegramNetworkError(err, { context: "polling" });
|
||||
if (!isConflict && !isRecoverable) {
|
||||
throw err;
|
||||
}
|
||||
restartAttempts += 1;
|
||||
const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts);
|
||||
const reason = isConflict ? "getUpdates conflict" : "network error";
|
||||
const errMsg = formatErrorMessage(err);
|
||||
(opts.runtime?.error ?? console.error)(
|
||||
`Telegram ${reason}: ${errMsg}; retrying in ${formatDurationMs(delayMs)}.`,
|
||||
);
|
||||
try {
|
||||
await sleepWithAbort(delayMs, opts.abortSignal);
|
||||
} catch (sleepErr) {
|
||||
if (opts.abortSignal?.aborted) {
|
||||
return;
|
||||
}
|
||||
throw sleepErr;
|
||||
}
|
||||
} finally {
|
||||
opts.abortSignal?.removeEventListener("abort", stopOnAbort);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
unregisterHandler();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user