mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-18 18:27:27 +00:00
fix(telegram): thread media transport policy into SSRF (#44639)
* fix(telegram): preserve media download transport policy * refactor(telegram): thread media transport policy * fix(telegram): sync fallback media policy * fix: note telegram media transport fix (#44639)
This commit is contained in:
@@ -74,6 +74,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Memory/session sync: add mode-aware post-compaction session reindexing with `agents.defaults.compaction.postIndexSync` plus `agents.defaults.memorySearch.sync.sessions.postCompactionForce`, so compacted session memory can refresh immediately without forcing every deployment into synchronous reindexing. (#25561) thanks @rodrigouroz.
|
||||
- Telegram/model picker: make inline model button selections persist the chosen session model correctly, clear overrides when selecting the configured default, and include effective fallback models in `/models` button validation. (#40105) Thanks @avirweb.
|
||||
- Telegram/native command sync: suppress expected `BOT_COMMANDS_TOO_MUCH` retry error noise, add a final fallback summary log, and document the difference between command-menu overflow and real Telegram network failures.
|
||||
- Telegram/media downloads: thread the same direct or proxy transport policy into SSRF-guarded file fetches so inbound attachments keep working when Telegram falls back between env-proxy and direct networking. (#44639) Thanks @obviyus.
|
||||
- Mattermost/reply media delivery: pass agent-scoped `mediaLocalRoots` through shared reply delivery so allowed local files upload correctly from button, slash-command, and model-picker replies. (#44021) Thanks @LyleLiu666.
|
||||
- Plugins/env-scoped roots: fix plugin discovery/load caches and provenance tracking so same-process `HOME`/`OPENCLAW_HOME` changes no longer reuse stale plugin state or misreport `~/...` plugins as untracked. (#44046) thanks @gumadeiras.
|
||||
- Gateway/session discovery: discover disk-only and retired ACP session stores under custom templated `session.store` roots so ACP reconciliation, session-id/session-label targeting, and run-id fallback keep working after restart. (#44176) thanks @gumadeiras.
|
||||
|
||||
@@ -7,6 +7,7 @@ import {
|
||||
createPinnedDispatcher,
|
||||
resolvePinnedHostnameWithPolicy,
|
||||
type LookupFn,
|
||||
type PinnedDispatcherPolicy,
|
||||
SsrFBlockedError,
|
||||
type SsrFPolicy,
|
||||
} from "./ssrf.js";
|
||||
@@ -29,6 +30,7 @@ export type GuardedFetchOptions = {
|
||||
signal?: AbortSignal;
|
||||
policy?: SsrFPolicy;
|
||||
lookupFn?: LookupFn;
|
||||
dispatcherPolicy?: PinnedDispatcherPolicy;
|
||||
mode?: GuardedFetchMode;
|
||||
pinDns?: boolean;
|
||||
/** @deprecated use `mode: "trusted_env_proxy"` for trusted/operator-controlled URLs. */
|
||||
@@ -196,7 +198,7 @@ export async function fetchWithSsrFGuard(params: GuardedFetchOptions): Promise<G
|
||||
if (canUseTrustedEnvProxy) {
|
||||
dispatcher = new EnvHttpProxyAgent();
|
||||
} else if (params.pinDns !== false) {
|
||||
dispatcher = createPinnedDispatcher(pinned);
|
||||
dispatcher = createPinnedDispatcher(pinned, params.dispatcherPolicy);
|
||||
}
|
||||
|
||||
const init: RequestInit & { dispatcher?: Dispatcher } = {
|
||||
|
||||
@@ -1,13 +1,24 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
|
||||
const { agentCtor } = vi.hoisted(() => ({
|
||||
const { agentCtor, envHttpProxyAgentCtor, proxyAgentCtor } = vi.hoisted(() => ({
|
||||
agentCtor: vi.fn(function MockAgent(this: { options: unknown }, options: unknown) {
|
||||
this.options = options;
|
||||
}),
|
||||
envHttpProxyAgentCtor: vi.fn(function MockEnvHttpProxyAgent(
|
||||
this: { options: unknown },
|
||||
options: unknown,
|
||||
) {
|
||||
this.options = options;
|
||||
}),
|
||||
proxyAgentCtor: vi.fn(function MockProxyAgent(this: { options: unknown }, options: unknown) {
|
||||
this.options = options;
|
||||
}),
|
||||
}));
|
||||
|
||||
vi.mock("undici", () => ({
|
||||
Agent: agentCtor,
|
||||
EnvHttpProxyAgent: envHttpProxyAgentCtor,
|
||||
ProxyAgent: proxyAgentCtor,
|
||||
}));
|
||||
|
||||
import { createPinnedDispatcher, type PinnedHostname } from "./ssrf.js";
|
||||
@@ -34,4 +45,84 @@ describe("createPinnedDispatcher", () => {
|
||||
| undefined;
|
||||
expect(firstCallArg?.connect?.autoSelectFamily).toBeUndefined();
|
||||
});
|
||||
|
||||
it("preserves caller transport hints while overriding lookup", () => {
|
||||
const lookup = vi.fn() as unknown as PinnedHostname["lookup"];
|
||||
const previousLookup = vi.fn();
|
||||
const pinned: PinnedHostname = {
|
||||
hostname: "api.telegram.org",
|
||||
addresses: ["149.154.167.220"],
|
||||
lookup,
|
||||
};
|
||||
|
||||
createPinnedDispatcher(pinned, {
|
||||
mode: "direct",
|
||||
connect: {
|
||||
autoSelectFamily: true,
|
||||
autoSelectFamilyAttemptTimeout: 300,
|
||||
lookup: previousLookup,
|
||||
},
|
||||
});
|
||||
|
||||
expect(agentCtor).toHaveBeenCalledWith({
|
||||
connect: {
|
||||
autoSelectFamily: true,
|
||||
autoSelectFamilyAttemptTimeout: 300,
|
||||
lookup,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps env proxy route while pinning the direct no-proxy path", () => {
|
||||
const lookup = vi.fn() as unknown as PinnedHostname["lookup"];
|
||||
const pinned: PinnedHostname = {
|
||||
hostname: "api.telegram.org",
|
||||
addresses: ["149.154.167.220"],
|
||||
lookup,
|
||||
};
|
||||
|
||||
createPinnedDispatcher(pinned, {
|
||||
mode: "env-proxy",
|
||||
connect: {
|
||||
autoSelectFamily: true,
|
||||
},
|
||||
proxyTls: {
|
||||
autoSelectFamily: true,
|
||||
},
|
||||
});
|
||||
|
||||
expect(envHttpProxyAgentCtor).toHaveBeenCalledWith({
|
||||
connect: {
|
||||
autoSelectFamily: true,
|
||||
lookup,
|
||||
},
|
||||
proxyTls: {
|
||||
autoSelectFamily: true,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps explicit proxy routing intact", () => {
|
||||
const lookup = vi.fn() as unknown as PinnedHostname["lookup"];
|
||||
const pinned: PinnedHostname = {
|
||||
hostname: "api.telegram.org",
|
||||
addresses: ["149.154.167.220"],
|
||||
lookup,
|
||||
};
|
||||
|
||||
createPinnedDispatcher(pinned, {
|
||||
mode: "explicit-proxy",
|
||||
proxyUrl: "http://127.0.0.1:7890",
|
||||
proxyTls: {
|
||||
autoSelectFamily: false,
|
||||
},
|
||||
});
|
||||
|
||||
expect(proxyAgentCtor).toHaveBeenCalledWith({
|
||||
uri: "http://127.0.0.1:7890",
|
||||
proxyTls: {
|
||||
autoSelectFamily: false,
|
||||
},
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { lookup as dnsLookupCb, type LookupAddress } from "node:dns";
|
||||
import { lookup as dnsLookup } from "node:dns/promises";
|
||||
import { Agent, type Dispatcher } from "undici";
|
||||
import { Agent, EnvHttpProxyAgent, ProxyAgent, type Dispatcher } from "undici";
|
||||
import {
|
||||
extractEmbeddedIpv4FromIpv6,
|
||||
isBlockedSpecialUseIpv4Address,
|
||||
@@ -255,6 +255,22 @@ export type PinnedHostname = {
|
||||
lookup: typeof dnsLookupCb;
|
||||
};
|
||||
|
||||
export type PinnedDispatcherPolicy =
|
||||
| {
|
||||
mode: "direct";
|
||||
connect?: Record<string, unknown>;
|
||||
}
|
||||
| {
|
||||
mode: "env-proxy";
|
||||
connect?: Record<string, unknown>;
|
||||
proxyTls?: Record<string, unknown>;
|
||||
}
|
||||
| {
|
||||
mode: "explicit-proxy";
|
||||
proxyUrl: string;
|
||||
proxyTls?: Record<string, unknown>;
|
||||
};
|
||||
|
||||
function dedupeAndPreferIpv4(results: readonly LookupAddress[]): string[] {
|
||||
const seen = new Set<string>();
|
||||
const ipv4: string[] = [];
|
||||
@@ -329,11 +345,37 @@ export async function resolvePinnedHostname(
|
||||
return await resolvePinnedHostnameWithPolicy(hostname, { lookupFn });
|
||||
}
|
||||
|
||||
export function createPinnedDispatcher(pinned: PinnedHostname): Dispatcher {
|
||||
return new Agent({
|
||||
connect: {
|
||||
lookup: pinned.lookup,
|
||||
},
|
||||
function withPinnedLookup(
|
||||
lookup: PinnedHostname["lookup"],
|
||||
connect?: Record<string, unknown>,
|
||||
): Record<string, unknown> {
|
||||
return connect ? { ...connect, lookup } : { lookup };
|
||||
}
|
||||
|
||||
export function createPinnedDispatcher(
|
||||
pinned: PinnedHostname,
|
||||
policy?: PinnedDispatcherPolicy,
|
||||
): Dispatcher {
|
||||
if (!policy || policy.mode === "direct") {
|
||||
return new Agent({
|
||||
connect: withPinnedLookup(pinned.lookup, policy?.connect),
|
||||
});
|
||||
}
|
||||
|
||||
if (policy.mode === "env-proxy") {
|
||||
return new EnvHttpProxyAgent({
|
||||
connect: withPinnedLookup(pinned.lookup, policy.connect),
|
||||
...(policy.proxyTls ? { proxyTls: { ...policy.proxyTls } } : {}),
|
||||
});
|
||||
}
|
||||
|
||||
const proxyUrl = policy.proxyUrl.trim();
|
||||
if (!policy.proxyTls) {
|
||||
return new ProxyAgent(proxyUrl);
|
||||
}
|
||||
return new ProxyAgent({
|
||||
uri: proxyUrl,
|
||||
proxyTls: { ...policy.proxyTls },
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
142
src/media/fetch.telegram-network.test.ts
Normal file
142
src/media/fetch.telegram-network.test.ts
Normal file
@@ -0,0 +1,142 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { resolveTelegramTransport } from "../telegram/fetch.js";
|
||||
import { fetchRemoteMedia } from "./fetch.js";
|
||||
|
||||
const undiciFetch = vi.hoisted(() => vi.fn());
|
||||
const AgentCtor = vi.hoisted(() =>
|
||||
vi.fn(function MockAgent(
|
||||
this: { options?: Record<string, unknown> },
|
||||
options?: Record<string, unknown>,
|
||||
) {
|
||||
this.options = options;
|
||||
}),
|
||||
);
|
||||
const EnvHttpProxyAgentCtor = vi.hoisted(() =>
|
||||
vi.fn(function MockEnvHttpProxyAgent(
|
||||
this: { options?: Record<string, unknown> },
|
||||
options?: Record<string, unknown>,
|
||||
) {
|
||||
this.options = options;
|
||||
}),
|
||||
);
|
||||
const ProxyAgentCtor = vi.hoisted(() =>
|
||||
vi.fn(function MockProxyAgent(
|
||||
this: { options?: Record<string, unknown> | string },
|
||||
options?: Record<string, unknown> | string,
|
||||
) {
|
||||
this.options = options;
|
||||
}),
|
||||
);
|
||||
|
||||
vi.mock("undici", () => ({
|
||||
Agent: AgentCtor,
|
||||
EnvHttpProxyAgent: EnvHttpProxyAgentCtor,
|
||||
ProxyAgent: ProxyAgentCtor,
|
||||
fetch: undiciFetch,
|
||||
}));
|
||||
|
||||
describe("fetchRemoteMedia telegram network policy", () => {
|
||||
type LookupFn = NonNullable<Parameters<typeof fetchRemoteMedia>[0]["lookupFn"]>;
|
||||
|
||||
afterEach(() => {
|
||||
undiciFetch.mockReset();
|
||||
AgentCtor.mockClear();
|
||||
EnvHttpProxyAgentCtor.mockClear();
|
||||
ProxyAgentCtor.mockClear();
|
||||
vi.unstubAllEnvs();
|
||||
});
|
||||
|
||||
it("preserves Telegram resolver transport policy for file downloads", async () => {
|
||||
const lookupFn = vi.fn(async () => [
|
||||
{ address: "149.154.167.220", family: 4 },
|
||||
]) as unknown as LookupFn;
|
||||
undiciFetch.mockResolvedValueOnce(
|
||||
new Response(new Uint8Array([0xff, 0xd8, 0xff, 0x00]), {
|
||||
status: 200,
|
||||
headers: { "content-type": "image/jpeg" },
|
||||
}),
|
||||
);
|
||||
|
||||
const telegramTransport = resolveTelegramTransport(undefined, {
|
||||
network: {
|
||||
autoSelectFamily: true,
|
||||
dnsResultOrder: "verbatim",
|
||||
},
|
||||
});
|
||||
|
||||
await fetchRemoteMedia({
|
||||
url: "https://api.telegram.org/file/bottok/photos/1.jpg",
|
||||
fetchImpl: telegramTransport.sourceFetch,
|
||||
dispatcherPolicy: telegramTransport.pinnedDispatcherPolicy,
|
||||
lookupFn,
|
||||
maxBytes: 1024,
|
||||
ssrfPolicy: {
|
||||
allowedHostnames: ["api.telegram.org"],
|
||||
allowRfc2544BenchmarkRange: true,
|
||||
},
|
||||
});
|
||||
|
||||
const init = undiciFetch.mock.calls[0]?.[1] as
|
||||
| (RequestInit & {
|
||||
dispatcher?: {
|
||||
options?: {
|
||||
connect?: Record<string, unknown>;
|
||||
};
|
||||
};
|
||||
})
|
||||
| undefined;
|
||||
|
||||
expect(init?.dispatcher?.options?.connect).toEqual(
|
||||
expect.objectContaining({
|
||||
autoSelectFamily: true,
|
||||
autoSelectFamilyAttemptTimeout: 300,
|
||||
lookup: expect.any(Function),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("keeps explicit proxy routing for file downloads", async () => {
|
||||
const { makeProxyFetch } = await import("../telegram/proxy.js");
|
||||
const lookupFn = vi.fn(async () => [
|
||||
{ address: "149.154.167.220", family: 4 },
|
||||
]) as unknown as LookupFn;
|
||||
undiciFetch.mockResolvedValueOnce(
|
||||
new Response(new Uint8Array([0x25, 0x50, 0x44, 0x46]), {
|
||||
status: 200,
|
||||
headers: { "content-type": "application/pdf" },
|
||||
}),
|
||||
);
|
||||
|
||||
const telegramTransport = resolveTelegramTransport(makeProxyFetch("http://127.0.0.1:7890"), {
|
||||
network: {
|
||||
autoSelectFamily: false,
|
||||
dnsResultOrder: "ipv4first",
|
||||
},
|
||||
});
|
||||
|
||||
await fetchRemoteMedia({
|
||||
url: "https://api.telegram.org/file/bottok/files/1.pdf",
|
||||
fetchImpl: telegramTransport.sourceFetch,
|
||||
dispatcherPolicy: telegramTransport.pinnedDispatcherPolicy,
|
||||
lookupFn,
|
||||
maxBytes: 1024,
|
||||
ssrfPolicy: {
|
||||
allowedHostnames: ["api.telegram.org"],
|
||||
allowRfc2544BenchmarkRange: true,
|
||||
},
|
||||
});
|
||||
|
||||
const init = undiciFetch.mock.calls[0]?.[1] as
|
||||
| (RequestInit & {
|
||||
dispatcher?: {
|
||||
options?: {
|
||||
uri?: string;
|
||||
};
|
||||
};
|
||||
})
|
||||
| undefined;
|
||||
|
||||
expect(init?.dispatcher?.options?.uri).toBe("http://127.0.0.1:7890");
|
||||
expect(ProxyAgentCtor).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
@@ -1,6 +1,6 @@
|
||||
import path from "node:path";
|
||||
import { fetchWithSsrFGuard, withStrictGuardedFetchMode } from "../infra/net/fetch-guard.js";
|
||||
import type { LookupFn, SsrFPolicy } from "../infra/net/ssrf.js";
|
||||
import type { LookupFn, PinnedDispatcherPolicy, SsrFPolicy } from "../infra/net/ssrf.js";
|
||||
import { detectMime, extensionForMime } from "./mime.js";
|
||||
import { readResponseWithLimit } from "./read-response-with-limit.js";
|
||||
|
||||
@@ -35,6 +35,7 @@ type FetchMediaOptions = {
|
||||
readIdleTimeoutMs?: number;
|
||||
ssrfPolicy?: SsrFPolicy;
|
||||
lookupFn?: LookupFn;
|
||||
dispatcherPolicy?: PinnedDispatcherPolicy;
|
||||
};
|
||||
|
||||
function stripQuotes(value: string): string {
|
||||
@@ -92,6 +93,7 @@ export async function fetchRemoteMedia(options: FetchMediaOptions): Promise<Fetc
|
||||
readIdleTimeoutMs,
|
||||
ssrfPolicy,
|
||||
lookupFn,
|
||||
dispatcherPolicy,
|
||||
} = options;
|
||||
|
||||
let res: Response;
|
||||
@@ -106,6 +108,7 @@ export async function fetchRemoteMedia(options: FetchMediaOptions): Promise<Fetc
|
||||
maxRedirects,
|
||||
policy: ssrfPolicy,
|
||||
lookupFn,
|
||||
dispatcherPolicy,
|
||||
}),
|
||||
);
|
||||
res = result.response;
|
||||
|
||||
@@ -126,7 +126,7 @@ export const registerTelegramHandlers = ({
|
||||
accountId,
|
||||
bot,
|
||||
opts,
|
||||
telegramFetchImpl,
|
||||
telegramTransport,
|
||||
runtime,
|
||||
mediaMaxBytes,
|
||||
telegramCfg,
|
||||
@@ -379,7 +379,7 @@ export const registerTelegramHandlers = ({
|
||||
for (const { ctx } of entry.messages) {
|
||||
let media;
|
||||
try {
|
||||
media = await resolveMedia(ctx, mediaMaxBytes, opts.token, telegramFetchImpl);
|
||||
media = await resolveMedia(ctx, mediaMaxBytes, opts.token, telegramTransport);
|
||||
} catch (mediaErr) {
|
||||
if (!isRecoverableMediaGroupError(mediaErr)) {
|
||||
throw mediaErr;
|
||||
@@ -483,7 +483,7 @@ export const registerTelegramHandlers = ({
|
||||
},
|
||||
mediaMaxBytes,
|
||||
opts.token,
|
||||
telegramFetchImpl,
|
||||
telegramTransport,
|
||||
);
|
||||
if (!media) {
|
||||
return [];
|
||||
@@ -994,7 +994,7 @@ export const registerTelegramHandlers = ({
|
||||
|
||||
let media: Awaited<ReturnType<typeof resolveMedia>> = null;
|
||||
try {
|
||||
media = await resolveMedia(ctx, mediaMaxBytes, opts.token, telegramFetchImpl);
|
||||
media = await resolveMedia(ctx, mediaMaxBytes, opts.token, telegramTransport);
|
||||
} catch (mediaErr) {
|
||||
if (isMediaSizeLimitError(mediaErr)) {
|
||||
if (sendOversizeWarning) {
|
||||
|
||||
@@ -65,6 +65,7 @@ import {
|
||||
import type { TelegramContext } from "./bot/types.js";
|
||||
import { resolveTelegramConversationRoute } from "./conversation-route.js";
|
||||
import { shouldSuppressLocalTelegramExecApprovalPrompt } from "./exec-approvals.js";
|
||||
import type { TelegramTransport } from "./fetch.js";
|
||||
import {
|
||||
evaluateTelegramGroupBaseAccess,
|
||||
evaluateTelegramGroupPolicyAccess,
|
||||
@@ -94,7 +95,7 @@ export type RegisterTelegramHandlerParams = {
|
||||
bot: Bot;
|
||||
mediaMaxBytes: number;
|
||||
opts: TelegramBotOptions;
|
||||
telegramFetchImpl?: typeof fetch;
|
||||
telegramTransport?: TelegramTransport;
|
||||
runtime: RuntimeEnv;
|
||||
telegramCfg: TelegramAccountConfig;
|
||||
allowFrom?: Array<string | number>;
|
||||
|
||||
@@ -38,7 +38,7 @@ import {
|
||||
type TelegramUpdateKeyContext,
|
||||
} from "./bot-updates.js";
|
||||
import { buildTelegramGroupPeerId, resolveTelegramStreamMode } from "./bot/helpers.js";
|
||||
import { resolveTelegramFetch } from "./fetch.js";
|
||||
import { resolveTelegramTransport } from "./fetch.js";
|
||||
import { tagTelegramNetworkError } from "./network-errors.js";
|
||||
import { createTelegramSendChatActionHandler } from "./sendchataction-401-backoff.js";
|
||||
import { getTelegramSequentialKey } from "./sequential-key.js";
|
||||
@@ -132,19 +132,21 @@ export function createTelegramBot(opts: TelegramBotOptions) {
|
||||
: null;
|
||||
const telegramCfg = account.config;
|
||||
|
||||
const fetchImpl = resolveTelegramFetch(opts.proxyFetch, {
|
||||
const telegramTransport = resolveTelegramTransport(opts.proxyFetch, {
|
||||
network: telegramCfg.network,
|
||||
}) as unknown as ApiClientOptions["fetch"];
|
||||
const shouldProvideFetch = Boolean(fetchImpl);
|
||||
});
|
||||
const shouldProvideFetch = Boolean(telegramTransport.fetch);
|
||||
// grammY's ApiClientOptions types still track `node-fetch` types; Node 22+ global fetch
|
||||
// (undici) is structurally compatible at runtime but not assignable in TS.
|
||||
const fetchForClient = fetchImpl as unknown as NonNullable<ApiClientOptions["fetch"]>;
|
||||
const fetchForClient = telegramTransport.fetch as unknown as NonNullable<
|
||||
ApiClientOptions["fetch"]
|
||||
>;
|
||||
|
||||
// When a shutdown abort signal is provided, wrap fetch so every Telegram API request
|
||||
// (especially long-polling getUpdates) aborts immediately on shutdown. Without this,
|
||||
// the in-flight getUpdates hangs for up to 30s, and a new gateway instance starting
|
||||
// its own poll triggers a 409 Conflict from Telegram.
|
||||
let finalFetch = shouldProvideFetch && fetchImpl ? fetchForClient : undefined;
|
||||
let finalFetch = shouldProvideFetch ? fetchForClient : undefined;
|
||||
if (opts.fetchAbortSignal) {
|
||||
const baseFetch =
|
||||
finalFetch ?? (globalThis.fetch as unknown as NonNullable<ApiClientOptions["fetch"]>);
|
||||
@@ -493,7 +495,7 @@ export function createTelegramBot(opts: TelegramBotOptions) {
|
||||
accountId: account.accountId,
|
||||
bot,
|
||||
opts,
|
||||
telegramFetchImpl: fetchImpl as unknown as typeof fetch | undefined,
|
||||
telegramTransport,
|
||||
runtime,
|
||||
mediaMaxBytes,
|
||||
telegramCfg,
|
||||
|
||||
@@ -6,9 +6,13 @@ import type { TelegramContext } from "./types.js";
|
||||
const saveMediaBuffer = vi.fn();
|
||||
const fetchRemoteMedia = vi.fn();
|
||||
|
||||
vi.mock("../../media/store.js", () => ({
|
||||
saveMediaBuffer: (...args: unknown[]) => saveMediaBuffer(...args),
|
||||
}));
|
||||
vi.mock("../../media/store.js", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("../../media/store.js")>();
|
||||
return {
|
||||
...actual,
|
||||
saveMediaBuffer: (...args: unknown[]) => saveMediaBuffer(...args),
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("../../media/fetch.js", () => ({
|
||||
fetchRemoteMedia: (...args: unknown[]) => fetchRemoteMedia(...args),
|
||||
@@ -297,6 +301,7 @@ describe("resolveMedia getFile retry", () => {
|
||||
it("uses caller-provided fetch impl for file downloads", async () => {
|
||||
const getFile = vi.fn().mockResolvedValue({ file_path: "documents/file_42.pdf" });
|
||||
const callerFetch = vi.fn() as unknown as typeof fetch;
|
||||
const callerTransport = { fetch: callerFetch, sourceFetch: callerFetch };
|
||||
fetchRemoteMedia.mockResolvedValueOnce({
|
||||
buffer: Buffer.from("pdf-data"),
|
||||
contentType: "application/pdf",
|
||||
@@ -311,7 +316,7 @@ describe("resolveMedia getFile retry", () => {
|
||||
makeCtx("document", getFile),
|
||||
MAX_MEDIA_BYTES,
|
||||
BOT_TOKEN,
|
||||
callerFetch,
|
||||
callerTransport,
|
||||
);
|
||||
|
||||
expect(result).not.toBeNull();
|
||||
@@ -325,6 +330,7 @@ describe("resolveMedia getFile retry", () => {
|
||||
it("uses caller-provided fetch impl for sticker downloads", async () => {
|
||||
const getFile = vi.fn().mockResolvedValue({ file_path: "stickers/file_0.webp" });
|
||||
const callerFetch = vi.fn() as unknown as typeof fetch;
|
||||
const callerTransport = { fetch: callerFetch, sourceFetch: callerFetch };
|
||||
fetchRemoteMedia.mockResolvedValueOnce({
|
||||
buffer: Buffer.from("sticker-data"),
|
||||
contentType: "image/webp",
|
||||
@@ -339,7 +345,7 @@ describe("resolveMedia getFile retry", () => {
|
||||
makeCtx("sticker", getFile),
|
||||
MAX_MEDIA_BYTES,
|
||||
BOT_TOKEN,
|
||||
callerFetch,
|
||||
callerTransport,
|
||||
);
|
||||
|
||||
expect(result).not.toBeNull();
|
||||
|
||||
@@ -4,6 +4,7 @@ import { formatErrorMessage } from "../../infra/errors.js";
|
||||
import { retryAsync } from "../../infra/retry.js";
|
||||
import { fetchRemoteMedia } from "../../media/fetch.js";
|
||||
import { saveMediaBuffer } from "../../media/store.js";
|
||||
import type { TelegramTransport } from "../fetch.js";
|
||||
import { cacheSticker, getCachedSticker } from "../sticker-cache.js";
|
||||
import { resolveTelegramMediaPlaceholder } from "./helpers.js";
|
||||
import type { StickerMetadata, TelegramContext } from "./types.js";
|
||||
@@ -92,17 +93,23 @@ async function resolveTelegramFileWithRetry(
|
||||
}
|
||||
}
|
||||
|
||||
function resolveRequiredFetchImpl(fetchImpl?: typeof fetch): typeof fetch {
|
||||
const resolved = fetchImpl ?? globalThis.fetch;
|
||||
if (!resolved) {
|
||||
function resolveRequiredTelegramTransport(transport?: TelegramTransport): TelegramTransport {
|
||||
if (transport) {
|
||||
return transport;
|
||||
}
|
||||
const resolvedFetch = globalThis.fetch;
|
||||
if (!resolvedFetch) {
|
||||
throw new Error("fetch is not available; set channels.telegram.proxy in config");
|
||||
}
|
||||
return resolved;
|
||||
return {
|
||||
fetch: resolvedFetch,
|
||||
sourceFetch: resolvedFetch,
|
||||
};
|
||||
}
|
||||
|
||||
function resolveOptionalFetchImpl(fetchImpl?: typeof fetch): typeof fetch | null {
|
||||
function resolveOptionalTelegramTransport(transport?: TelegramTransport): TelegramTransport | null {
|
||||
try {
|
||||
return resolveRequiredFetchImpl(fetchImpl);
|
||||
return resolveRequiredTelegramTransport(transport);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
@@ -114,14 +121,15 @@ const TELEGRAM_DOWNLOAD_IDLE_TIMEOUT_MS = 30_000;
|
||||
async function downloadAndSaveTelegramFile(params: {
|
||||
filePath: string;
|
||||
token: string;
|
||||
fetchImpl: typeof fetch;
|
||||
transport: TelegramTransport;
|
||||
maxBytes: number;
|
||||
telegramFileName?: string;
|
||||
}) {
|
||||
const url = `https://api.telegram.org/file/bot${params.token}/${params.filePath}`;
|
||||
const fetched = await fetchRemoteMedia({
|
||||
url,
|
||||
fetchImpl: params.fetchImpl,
|
||||
fetchImpl: params.transport.sourceFetch,
|
||||
dispatcherPolicy: params.transport.pinnedDispatcherPolicy,
|
||||
filePathHint: params.filePath,
|
||||
maxBytes: params.maxBytes,
|
||||
readIdleTimeoutMs: TELEGRAM_DOWNLOAD_IDLE_TIMEOUT_MS,
|
||||
@@ -142,7 +150,7 @@ async function resolveStickerMedia(params: {
|
||||
ctx: TelegramContext;
|
||||
maxBytes: number;
|
||||
token: string;
|
||||
fetchImpl?: typeof fetch;
|
||||
transport?: TelegramTransport;
|
||||
}): Promise<
|
||||
| {
|
||||
path: string;
|
||||
@@ -153,7 +161,7 @@ async function resolveStickerMedia(params: {
|
||||
| null
|
||||
| undefined
|
||||
> {
|
||||
const { msg, ctx, maxBytes, token, fetchImpl } = params;
|
||||
const { msg, ctx, maxBytes, token, transport } = params;
|
||||
if (!msg.sticker) {
|
||||
return undefined;
|
||||
}
|
||||
@@ -173,15 +181,15 @@ async function resolveStickerMedia(params: {
|
||||
logVerbose("telegram: getFile returned no file_path for sticker");
|
||||
return null;
|
||||
}
|
||||
const resolvedFetchImpl = resolveOptionalFetchImpl(fetchImpl);
|
||||
if (!resolvedFetchImpl) {
|
||||
const resolvedTransport = resolveOptionalTelegramTransport(transport);
|
||||
if (!resolvedTransport) {
|
||||
logVerbose("telegram: fetch not available for sticker download");
|
||||
return null;
|
||||
}
|
||||
const saved = await downloadAndSaveTelegramFile({
|
||||
filePath: file.file_path,
|
||||
token,
|
||||
fetchImpl: resolvedFetchImpl,
|
||||
transport: resolvedTransport,
|
||||
maxBytes,
|
||||
});
|
||||
|
||||
@@ -237,7 +245,7 @@ export async function resolveMedia(
|
||||
ctx: TelegramContext,
|
||||
maxBytes: number,
|
||||
token: string,
|
||||
fetchImpl?: typeof fetch,
|
||||
transport?: TelegramTransport,
|
||||
): Promise<{
|
||||
path: string;
|
||||
contentType?: string;
|
||||
@@ -250,7 +258,7 @@ export async function resolveMedia(
|
||||
ctx,
|
||||
maxBytes,
|
||||
token,
|
||||
fetchImpl,
|
||||
transport,
|
||||
});
|
||||
if (stickerResolved !== undefined) {
|
||||
return stickerResolved;
|
||||
@@ -271,7 +279,7 @@ export async function resolveMedia(
|
||||
const saved = await downloadAndSaveTelegramFile({
|
||||
filePath: file.file_path,
|
||||
token,
|
||||
fetchImpl: resolveRequiredFetchImpl(fetchImpl),
|
||||
transport: resolveRequiredTelegramTransport(transport),
|
||||
maxBytes,
|
||||
telegramFileName: resolveTelegramFileName(msg),
|
||||
});
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { resolveFetch } from "../infra/fetch.js";
|
||||
import { resolveTelegramFetch } from "./fetch.js";
|
||||
import { resolveTelegramFetch, resolveTelegramTransport } from "./fetch.js";
|
||||
|
||||
const setDefaultResultOrder = vi.hoisted(() => vi.fn());
|
||||
const setDefaultAutoSelectFamily = vi.hoisted(() => vi.fn());
|
||||
@@ -313,12 +313,13 @@ describe("resolveTelegramFetch", () => {
|
||||
.mockResolvedValueOnce({ ok: true } as Response)
|
||||
.mockResolvedValueOnce({ ok: true } as Response);
|
||||
|
||||
const resolved = resolveTelegramFetchOrThrow(undefined, {
|
||||
const transport = resolveTelegramTransport(undefined, {
|
||||
network: {
|
||||
autoSelectFamily: true,
|
||||
dnsResultOrder: "ipv4first",
|
||||
},
|
||||
});
|
||||
const resolved = transport.fetch;
|
||||
|
||||
await resolved("https://api.telegram.org/botx/sendMessage");
|
||||
await resolved("https://api.telegram.org/botx/sendChatAction");
|
||||
@@ -338,6 +339,11 @@ describe("resolveTelegramFetch", () => {
|
||||
autoSelectFamily: false,
|
||||
}),
|
||||
);
|
||||
expect(transport.pinnedDispatcherPolicy).toEqual(
|
||||
expect.objectContaining({
|
||||
mode: "direct",
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("arms sticky IPv4 fallback when env proxy init falls back to direct Agent", async () => {
|
||||
|
||||
@@ -3,6 +3,7 @@ import { Agent, EnvHttpProxyAgent, ProxyAgent, fetch as undiciFetch } from "undi
|
||||
import type { TelegramNetworkConfig } from "../config/types.telegram.js";
|
||||
import { resolveFetch } from "../infra/fetch.js";
|
||||
import { hasEnvHttpProxyConfigured } from "../infra/net/proxy-env.js";
|
||||
import type { PinnedDispatcherPolicy } from "../infra/net/ssrf.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import {
|
||||
resolveTelegramAutoSelectFamilyDecision,
|
||||
@@ -181,13 +182,13 @@ function hasEnvHttpProxyForTelegramApi(env: NodeJS.ProcessEnv = process.env): bo
|
||||
return hasEnvHttpProxyConfigured("https", env);
|
||||
}
|
||||
|
||||
function createTelegramDispatcher(params: {
|
||||
function resolveTelegramDispatcherPolicy(params: {
|
||||
autoSelectFamily: boolean | null;
|
||||
dnsResultOrder: TelegramDnsResultOrder | null;
|
||||
useEnvProxy: boolean;
|
||||
forceIpv4: boolean;
|
||||
proxyUrl?: string;
|
||||
}): { dispatcher: TelegramDispatcher; mode: TelegramDispatcherMode } {
|
||||
}): { policy: PinnedDispatcherPolicy; mode: TelegramDispatcherMode } {
|
||||
const connect = buildTelegramConnectOptions({
|
||||
autoSelectFamily: params.autoSelectFamily,
|
||||
dnsResultOrder: params.dnsResultOrder,
|
||||
@@ -195,35 +196,77 @@ function createTelegramDispatcher(params: {
|
||||
});
|
||||
const explicitProxyUrl = params.proxyUrl?.trim();
|
||||
if (explicitProxyUrl) {
|
||||
const proxyOptions = connect
|
||||
return {
|
||||
policy: connect
|
||||
? {
|
||||
mode: "explicit-proxy",
|
||||
proxyUrl: explicitProxyUrl,
|
||||
proxyTls: { ...connect },
|
||||
}
|
||||
: {
|
||||
mode: "explicit-proxy",
|
||||
proxyUrl: explicitProxyUrl,
|
||||
},
|
||||
mode: "explicit-proxy",
|
||||
};
|
||||
}
|
||||
if (params.useEnvProxy) {
|
||||
return {
|
||||
policy: {
|
||||
mode: "env-proxy",
|
||||
...(connect ? { connect: { ...connect }, proxyTls: { ...connect } } : {}),
|
||||
},
|
||||
mode: "env-proxy",
|
||||
};
|
||||
}
|
||||
return {
|
||||
policy: {
|
||||
mode: "direct",
|
||||
...(connect ? { connect: { ...connect } } : {}),
|
||||
},
|
||||
mode: "direct",
|
||||
};
|
||||
}
|
||||
|
||||
function createTelegramDispatcher(policy: PinnedDispatcherPolicy): {
|
||||
dispatcher: TelegramDispatcher;
|
||||
mode: TelegramDispatcherMode;
|
||||
effectivePolicy: PinnedDispatcherPolicy;
|
||||
} {
|
||||
if (policy.mode === "explicit-proxy") {
|
||||
const proxyOptions = policy.proxyTls
|
||||
? ({
|
||||
uri: explicitProxyUrl,
|
||||
proxyTls: connect,
|
||||
uri: policy.proxyUrl,
|
||||
proxyTls: { ...policy.proxyTls },
|
||||
} satisfies ConstructorParameters<typeof ProxyAgent>[0])
|
||||
: explicitProxyUrl;
|
||||
: policy.proxyUrl;
|
||||
try {
|
||||
return {
|
||||
dispatcher: new ProxyAgent(proxyOptions),
|
||||
mode: "explicit-proxy",
|
||||
effectivePolicy: policy,
|
||||
};
|
||||
} catch (err) {
|
||||
const reason = err instanceof Error ? err.message : String(err);
|
||||
throw new Error(`explicit proxy dispatcher init failed: ${reason}`, { cause: err });
|
||||
}
|
||||
}
|
||||
if (params.useEnvProxy) {
|
||||
const proxyOptions = connect
|
||||
? ({
|
||||
connect,
|
||||
// undici's EnvHttpProxyAgent passes `connect` only to the no-proxy Agent.
|
||||
// Real proxied HTTPS traffic reads transport settings from ProxyAgent.proxyTls.
|
||||
proxyTls: connect,
|
||||
} satisfies ConstructorParameters<typeof EnvHttpProxyAgent>[0])
|
||||
: undefined;
|
||||
|
||||
if (policy.mode === "env-proxy") {
|
||||
const proxyOptions =
|
||||
policy.connect || policy.proxyTls
|
||||
? ({
|
||||
...(policy.connect ? { connect: { ...policy.connect } } : {}),
|
||||
// undici's EnvHttpProxyAgent passes `connect` only to the no-proxy Agent.
|
||||
// Real proxied HTTPS traffic reads transport settings from ProxyAgent.proxyTls.
|
||||
...(policy.proxyTls ? { proxyTls: { ...policy.proxyTls } } : {}),
|
||||
} satisfies ConstructorParameters<typeof EnvHttpProxyAgent>[0])
|
||||
: undefined;
|
||||
try {
|
||||
return {
|
||||
dispatcher: new EnvHttpProxyAgent(proxyOptions),
|
||||
mode: "env-proxy",
|
||||
effectivePolicy: policy,
|
||||
};
|
||||
} catch (err) {
|
||||
log.warn(
|
||||
@@ -231,16 +274,34 @@ function createTelegramDispatcher(params: {
|
||||
err instanceof Error ? err.message : String(err)
|
||||
}`,
|
||||
);
|
||||
const directPolicy: PinnedDispatcherPolicy = {
|
||||
mode: "direct",
|
||||
...(policy.connect ? { connect: { ...policy.connect } } : {}),
|
||||
};
|
||||
return {
|
||||
dispatcher: new Agent(
|
||||
directPolicy.connect
|
||||
? ({
|
||||
connect: { ...directPolicy.connect },
|
||||
} satisfies ConstructorParameters<typeof Agent>[0])
|
||||
: undefined,
|
||||
),
|
||||
mode: "direct",
|
||||
effectivePolicy: directPolicy,
|
||||
};
|
||||
}
|
||||
}
|
||||
const agentOptions = connect
|
||||
? ({
|
||||
connect,
|
||||
} satisfies ConstructorParameters<typeof Agent>[0])
|
||||
: undefined;
|
||||
|
||||
return {
|
||||
dispatcher: new Agent(agentOptions),
|
||||
dispatcher: new Agent(
|
||||
policy.connect
|
||||
? ({
|
||||
connect: { ...policy.connect },
|
||||
} satisfies ConstructorParameters<typeof Agent>[0])
|
||||
: undefined,
|
||||
),
|
||||
mode: "direct",
|
||||
effectivePolicy: policy,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -329,10 +390,16 @@ function shouldRetryWithIpv4Fallback(err: unknown): boolean {
|
||||
}
|
||||
|
||||
// Prefer wrapped fetch when available to normalize AbortSignal across runtimes.
|
||||
export function resolveTelegramFetch(
|
||||
export type TelegramTransport = {
|
||||
fetch: typeof fetch;
|
||||
sourceFetch: typeof fetch;
|
||||
pinnedDispatcherPolicy?: PinnedDispatcherPolicy;
|
||||
};
|
||||
|
||||
export function resolveTelegramTransport(
|
||||
proxyFetch?: typeof fetch,
|
||||
options?: { network?: TelegramNetworkConfig },
|
||||
): typeof fetch {
|
||||
): TelegramTransport {
|
||||
const autoSelectDecision = resolveTelegramAutoSelectFamilyDecision({
|
||||
network: options?.network,
|
||||
});
|
||||
@@ -351,51 +418,51 @@ export function resolveTelegramFetch(
|
||||
: proxyFetch
|
||||
? resolveWrappedFetch(proxyFetch)
|
||||
: undiciSourceFetch;
|
||||
|
||||
const dnsResultOrder = normalizeDnsResultOrder(dnsDecision.value);
|
||||
// Preserve fully caller-owned custom fetch implementations.
|
||||
// OpenClaw proxy fetches are metadata-tagged and continue into resolver-scoped policy.
|
||||
if (proxyFetch && !explicitProxyUrl) {
|
||||
return sourceFetch;
|
||||
return { fetch: sourceFetch, sourceFetch };
|
||||
}
|
||||
|
||||
const dnsResultOrder = normalizeDnsResultOrder(dnsDecision.value);
|
||||
const useEnvProxy = !explicitProxyUrl && hasEnvHttpProxyForTelegramApi();
|
||||
const defaultDispatcherResolution = createTelegramDispatcher({
|
||||
const defaultDispatcherResolution = resolveTelegramDispatcherPolicy({
|
||||
autoSelectFamily: autoSelectDecision.value,
|
||||
dnsResultOrder,
|
||||
useEnvProxy,
|
||||
forceIpv4: false,
|
||||
proxyUrl: explicitProxyUrl,
|
||||
});
|
||||
const defaultDispatcher = defaultDispatcherResolution.dispatcher;
|
||||
const defaultDispatcher = createTelegramDispatcher(defaultDispatcherResolution.policy);
|
||||
const shouldBypassEnvProxy = shouldBypassEnvProxyForTelegramApi();
|
||||
const allowStickyIpv4Fallback =
|
||||
defaultDispatcherResolution.mode === "direct" ||
|
||||
(defaultDispatcherResolution.mode === "env-proxy" && shouldBypassEnvProxy);
|
||||
const stickyShouldUseEnvProxy = defaultDispatcherResolution.mode === "env-proxy";
|
||||
defaultDispatcher.mode === "direct" ||
|
||||
(defaultDispatcher.mode === "env-proxy" && shouldBypassEnvProxy);
|
||||
const stickyShouldUseEnvProxy = defaultDispatcher.mode === "env-proxy";
|
||||
|
||||
let stickyIpv4FallbackEnabled = false;
|
||||
let stickyIpv4Dispatcher: TelegramDispatcher | null = null;
|
||||
const resolveStickyIpv4Dispatcher = () => {
|
||||
if (!stickyIpv4Dispatcher) {
|
||||
stickyIpv4Dispatcher = createTelegramDispatcher({
|
||||
autoSelectFamily: false,
|
||||
dnsResultOrder: "ipv4first",
|
||||
useEnvProxy: stickyShouldUseEnvProxy,
|
||||
forceIpv4: true,
|
||||
proxyUrl: explicitProxyUrl,
|
||||
}).dispatcher;
|
||||
stickyIpv4Dispatcher = createTelegramDispatcher(
|
||||
resolveTelegramDispatcherPolicy({
|
||||
autoSelectFamily: false,
|
||||
dnsResultOrder: "ipv4first",
|
||||
useEnvProxy: stickyShouldUseEnvProxy,
|
||||
forceIpv4: true,
|
||||
proxyUrl: explicitProxyUrl,
|
||||
}).policy,
|
||||
).dispatcher;
|
||||
}
|
||||
return stickyIpv4Dispatcher;
|
||||
};
|
||||
|
||||
return (async (input: RequestInfo | URL, init?: RequestInit) => {
|
||||
const resolvedFetch = (async (input: RequestInfo | URL, init?: RequestInit) => {
|
||||
const callerProvidedDispatcher = Boolean(
|
||||
(init as RequestInitWithDispatcher | undefined)?.dispatcher,
|
||||
);
|
||||
const initialInit = withDispatcherIfMissing(
|
||||
init,
|
||||
stickyIpv4FallbackEnabled ? resolveStickyIpv4Dispatcher() : defaultDispatcher,
|
||||
stickyIpv4FallbackEnabled ? resolveStickyIpv4Dispatcher() : defaultDispatcher.dispatcher,
|
||||
);
|
||||
try {
|
||||
return await sourceFetch(input, initialInit);
|
||||
@@ -421,4 +488,17 @@ export function resolveTelegramFetch(
|
||||
throw err;
|
||||
}
|
||||
}) as typeof fetch;
|
||||
|
||||
return {
|
||||
fetch: resolvedFetch,
|
||||
sourceFetch,
|
||||
pinnedDispatcherPolicy: defaultDispatcher.effectivePolicy,
|
||||
};
|
||||
}
|
||||
|
||||
export function resolveTelegramFetch(
|
||||
proxyFetch?: typeof fetch,
|
||||
options?: { network?: TelegramNetworkConfig },
|
||||
): typeof fetch {
|
||||
return resolveTelegramTransport(proxyFetch, options).fetch;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user