perf: speed up telegram media e2e flush timing

This commit is contained in:
Peter Steinberger
2026-02-13 19:35:40 +00:00
parent bbca3b191a
commit e746a67cc3
3 changed files with 33 additions and 8 deletions

View File

@@ -57,11 +57,21 @@ export const registerTelegramHandlers = ({
processMessage, processMessage,
logger, logger,
}: RegisterTelegramHandlerParams) => { }: RegisterTelegramHandlerParams) => {
const DEFAULT_TEXT_FRAGMENT_MAX_GAP_MS = 1500;
const TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS = 4000; const TELEGRAM_TEXT_FRAGMENT_START_THRESHOLD_CHARS = 4000;
const TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS = 1500; const TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS =
typeof opts.testTimings?.textFragmentGapMs === "number" &&
Number.isFinite(opts.testTimings.textFragmentGapMs)
? Math.max(10, Math.floor(opts.testTimings.textFragmentGapMs))
: DEFAULT_TEXT_FRAGMENT_MAX_GAP_MS;
const TELEGRAM_TEXT_FRAGMENT_MAX_ID_GAP = 1; const TELEGRAM_TEXT_FRAGMENT_MAX_ID_GAP = 1;
const TELEGRAM_TEXT_FRAGMENT_MAX_PARTS = 12; const TELEGRAM_TEXT_FRAGMENT_MAX_PARTS = 12;
const TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS = 50_000; const TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS = 50_000;
const mediaGroupTimeoutMs =
typeof opts.testTimings?.mediaGroupFlushMs === "number" &&
Number.isFinite(opts.testTimings.mediaGroupFlushMs)
? Math.max(10, Math.floor(opts.testTimings.mediaGroupFlushMs))
: MEDIA_GROUP_TIMEOUT_MS;
const mediaGroupBuffer = new Map<string, MediaGroupEntry>(); const mediaGroupBuffer = new Map<string, MediaGroupEntry>();
let mediaGroupProcessing: Promise<void> = Promise.resolve(); let mediaGroupProcessing: Promise<void> = Promise.resolve();
@@ -859,7 +869,7 @@ export const registerTelegramHandlers = ({
}) })
.catch(() => undefined); .catch(() => undefined);
await mediaGroupProcessing; await mediaGroupProcessing;
}, MEDIA_GROUP_TIMEOUT_MS); }, mediaGroupTimeoutMs);
} else { } else {
const entry: MediaGroupEntry = { const entry: MediaGroupEntry = {
messages: [{ msg, ctx }], messages: [{ msg, ctx }],
@@ -871,7 +881,7 @@ export const registerTelegramHandlers = ({
}) })
.catch(() => undefined); .catch(() => undefined);
await mediaGroupProcessing; await mediaGroupProcessing;
}, MEDIA_GROUP_TIMEOUT_MS), }, mediaGroupTimeoutMs),
}; };
mediaGroupBuffer.set(mediaGroupId, entry); mediaGroupBuffer.set(mediaGroupId, entry);
} }

View File

@@ -1,7 +1,6 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { resetInboundDedupe } from "../auto-reply/reply/inbound-dedupe.js"; import { resetInboundDedupe } from "../auto-reply/reply/inbound-dedupe.js";
import * as ssrf from "../infra/net/ssrf.js"; import * as ssrf from "../infra/net/ssrf.js";
import { MEDIA_GROUP_TIMEOUT_MS } from "./bot-updates.js";
const useSpy = vi.fn(); const useSpy = vi.fn();
const middlewareUseSpy = vi.fn(); const middlewareUseSpy = vi.fn();
@@ -14,6 +13,10 @@ const describeStickerImageSpy = vi.fn();
const resolvePinnedHostname = ssrf.resolvePinnedHostname; const resolvePinnedHostname = ssrf.resolvePinnedHostname;
const lookupMock = vi.fn(); const lookupMock = vi.fn();
let resolvePinnedHostnameSpy: ReturnType<typeof vi.spyOn> = null; let resolvePinnedHostnameSpy: ReturnType<typeof vi.spyOn> = null;
const TELEGRAM_TEST_TIMINGS = {
mediaGroupFlushMs: 75,
textFragmentGapMs: 120,
} as const;
const sleep = async (ms: number) => { const sleep = async (ms: number) => {
await new Promise<void>((resolve) => setTimeout(resolve, ms)); await new Promise<void>((resolve) => setTimeout(resolve, ms));
@@ -141,6 +144,7 @@ describe("telegram inbound media", () => {
const runtimeError = vi.fn(); const runtimeError = vi.fn();
createTelegramBot({ createTelegramBot({
token: "tok", token: "tok",
testTimings: TELEGRAM_TEST_TIMINGS,
runtime: { runtime: {
log: runtimeLog, log: runtimeLog,
error: runtimeError, error: runtimeError,
@@ -207,6 +211,7 @@ describe("telegram inbound media", () => {
createTelegramBot({ createTelegramBot({
token: "tok", token: "tok",
testTimings: TELEGRAM_TEST_TIMINGS,
proxyFetch: proxyFetch as unknown as typeof fetch, proxyFetch: proxyFetch as unknown as typeof fetch,
runtime: { runtime: {
log: runtimeLog, log: runtimeLog,
@@ -254,6 +259,7 @@ describe("telegram inbound media", () => {
createTelegramBot({ createTelegramBot({
token: "tok", token: "tok",
testTimings: TELEGRAM_TEST_TIMINGS,
runtime: { runtime: {
log: runtimeLog, log: runtimeLog,
error: runtimeError, error: runtimeError,
@@ -294,7 +300,7 @@ describe("telegram media groups", () => {
}); });
const MEDIA_GROUP_TEST_TIMEOUT_MS = process.platform === "win32" ? 45_000 : 20_000; const MEDIA_GROUP_TEST_TIMEOUT_MS = process.platform === "win32" ? 45_000 : 20_000;
const MEDIA_GROUP_FLUSH_MS = MEDIA_GROUP_TIMEOUT_MS + 25; const MEDIA_GROUP_FLUSH_MS = TELEGRAM_TEST_TIMINGS.mediaGroupFlushMs + 120;
it( it(
"buffers messages with same media_group_id and processes them together", "buffers messages with same media_group_id and processes them together",
@@ -317,6 +323,7 @@ describe("telegram media groups", () => {
createTelegramBot({ createTelegramBot({
token: "tok", token: "tok",
testTimings: TELEGRAM_TEST_TIMINGS,
runtime: { runtime: {
log: vi.fn(), log: vi.fn(),
error: runtimeError, error: runtimeError,
@@ -390,7 +397,7 @@ describe("telegram media groups", () => {
arrayBuffer: async () => new Uint8Array([0x89, 0x50, 0x4e, 0x47]).buffer, arrayBuffer: async () => new Uint8Array([0x89, 0x50, 0x4e, 0x47]).buffer,
} as Response); } as Response);
createTelegramBot({ token: "tok" }); createTelegramBot({ token: "tok", testTimings: TELEGRAM_TEST_TIMINGS });
const handler = onSpy.mock.calls.find((call) => call[0] === "message")?.[1] as ( const handler = onSpy.mock.calls.find((call) => call[0] === "message")?.[1] as (
ctx: Record<string, unknown>, ctx: Record<string, unknown>,
) => Promise<void>; ) => Promise<void>;
@@ -459,6 +466,7 @@ describe("telegram stickers", () => {
const runtimeError = vi.fn(); const runtimeError = vi.fn();
createTelegramBot({ createTelegramBot({
token: "tok", token: "tok",
testTimings: TELEGRAM_TEST_TIMINGS,
runtime: { runtime: {
log: runtimeLog, log: runtimeLog,
error: runtimeError, error: runtimeError,
@@ -541,6 +549,7 @@ describe("telegram stickers", () => {
const runtimeError = vi.fn(); const runtimeError = vi.fn();
createTelegramBot({ createTelegramBot({
token: "tok", token: "tok",
testTimings: TELEGRAM_TEST_TIMINGS,
runtime: { runtime: {
log: vi.fn(), log: vi.fn(),
error: runtimeError, error: runtimeError,
@@ -615,6 +624,7 @@ describe("telegram stickers", () => {
createTelegramBot({ createTelegramBot({
token: "tok", token: "tok",
testTimings: TELEGRAM_TEST_TIMINGS,
runtime: { runtime: {
log: vi.fn(), log: vi.fn(),
error: runtimeError, error: runtimeError,
@@ -675,6 +685,7 @@ describe("telegram stickers", () => {
createTelegramBot({ createTelegramBot({
token: "tok", token: "tok",
testTimings: TELEGRAM_TEST_TIMINGS,
runtime: { runtime: {
log: vi.fn(), log: vi.fn(),
error: runtimeError, error: runtimeError,
@@ -726,7 +737,7 @@ describe("telegram text fragments", () => {
}); });
const TEXT_FRAGMENT_TEST_TIMEOUT_MS = process.platform === "win32" ? 45_000 : 20_000; const TEXT_FRAGMENT_TEST_TIMEOUT_MS = process.platform === "win32" ? 45_000 : 20_000;
const TEXT_FRAGMENT_FLUSH_MS = 1600; const TEXT_FRAGMENT_FLUSH_MS = TELEGRAM_TEST_TIMINGS.textFragmentGapMs + 160;
it( it(
"buffers near-limit text and processes sequential parts as one message", "buffers near-limit text and processes sequential parts as one message",
@@ -738,7 +749,7 @@ describe("telegram text fragments", () => {
onSpy.mockReset(); onSpy.mockReset();
replySpy.mockReset(); replySpy.mockReset();
createTelegramBot({ token: "tok" }); createTelegramBot({ token: "tok", testTimings: TELEGRAM_TEST_TIMINGS });
const handler = onSpy.mock.calls.find((call) => call[0] === "message")?.[1] as ( const handler = onSpy.mock.calls.find((call) => call[0] === "message")?.[1] as (
ctx: Record<string, unknown>, ctx: Record<string, unknown>,
) => Promise<void>; ) => Promise<void>;

View File

@@ -62,6 +62,10 @@ export type TelegramBotOptions = {
lastUpdateId?: number | null; lastUpdateId?: number | null;
onUpdateId?: (updateId: number) => void | Promise<void>; onUpdateId?: (updateId: number) => void | Promise<void>;
}; };
testTimings?: {
mediaGroupFlushMs?: number;
textFragmentGapMs?: number;
};
}; };
export function getTelegramSequentialKey(ctx: { export function getTelegramSequentialKey(ctx: {