test(telegram): dedupe streaming cases and tighten sequential key checks

This commit is contained in:
Peter Steinberger
2026-03-03 02:13:55 +00:00
parent 7fdbf1202e
commit 03755f8463
2 changed files with 109 additions and 170 deletions

View File

@@ -588,7 +588,10 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(draftStream.stop).toHaveBeenCalled(); expect(draftStream.stop).toHaveBeenCalled();
}); });
it("disables block streaming when streamMode is off", async () => { it.each([
{ label: "default account config", telegramCfg: {} },
{ label: "account blockStreaming override", telegramCfg: { blockStreaming: true } },
])("disables block streaming when streamMode is off ($label)", async ({ telegramCfg }) => {
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" }); await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" });
return { queuedFinal: true }; return { queuedFinal: true };
@@ -598,6 +601,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
await dispatchWithContext({ await dispatchWithContext({
context: createContext(), context: createContext(),
streamMode: "off", streamMode: "off",
telegramCfg,
}); });
expect(createTelegramDraftStream).not.toHaveBeenCalled(); expect(createTelegramDraftStream).not.toHaveBeenCalled();
@@ -610,69 +614,27 @@ describe("dispatchTelegramMessage draft streaming", () => {
); );
}); });
it("disables block streaming when streamMode is off even if blockStreaming config is true", async () => { it.each(["block", "partial"] as const)(
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { "forces new message when assistant message restarts (%s mode)",
await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" }); async (streamMode) => {
return { queuedFinal: true }; const draftStream = createDraftStream(999);
}); createTelegramDraftStream.mockReturnValue(draftStream);
deliverReplies.mockResolvedValue({ delivered: true }); dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "First response" });
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onPartialReply?.({ text: "After tool call" });
await dispatcherOptions.deliver({ text: "After tool call" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({ await dispatchWithContext({ context: createContext(), streamMode });
context: createContext(),
streamMode: "off",
telegramCfg: { blockStreaming: true },
});
expect(createTelegramDraftStream).not.toHaveBeenCalled(); expect(draftStream.forceNewMessage).toHaveBeenCalledTimes(1);
expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledWith( },
expect.objectContaining({ );
replyOptions: expect.objectContaining({
disableBlockStreaming: true,
}),
}),
);
});
it("forces new message for next assistant block in legacy block stream mode", async () => {
const draftStream = createDraftStream(999);
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
// First assistant message: partial text
await replyOptions?.onPartialReply?.({ text: "First response" });
// New assistant message starts (e.g., after tool call)
await replyOptions?.onAssistantMessageStart?.();
// Second assistant message: new text
await replyOptions?.onPartialReply?.({ text: "After tool call" });
await dispatcherOptions.deliver({ text: "After tool call" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({ context: createContext(), streamMode: "block" });
expect(draftStream.forceNewMessage).toHaveBeenCalledTimes(1);
});
it("forces new message in partial mode when assistant message restarts", async () => {
const draftStream = createDraftStream(999);
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "First response" });
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onPartialReply?.({ text: "After tool call" });
await dispatcherOptions.deliver({ text: "After tool call" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
expect(draftStream.forceNewMessage).toHaveBeenCalledTimes(1);
});
it("does not force new message on first assistant message start", async () => { it("does not force new message on first assistant message start", async () => {
const draftStream = createDraftStream(999); const draftStream = createDraftStream(999);
@@ -1076,7 +1038,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
it.each([undefined, null] as const)( it.each([undefined, null] as const)(
"skips outbound send when final payload text is %s and has no media", "skips outbound send when final payload text is %s and has no media",
async (emptyText) => { async (emptyText) => {
setupDraftStreams({ answerMessageId: 999 }); const { answerDraftStream } = setupDraftStreams({ answerMessageId: 999 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver( await dispatcherOptions.deliver(
{ text: emptyText as unknown as string }, { text: emptyText as unknown as string },
@@ -1090,6 +1052,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(deliverReplies).not.toHaveBeenCalled(); expect(deliverReplies).not.toHaveBeenCalled();
expect(editMessageTelegram).not.toHaveBeenCalled(); expect(editMessageTelegram).not.toHaveBeenCalled();
expect(answerDraftStream.clear).toHaveBeenCalledTimes(1);
}, },
); );
@@ -1595,21 +1558,6 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(draftStream.clear).toHaveBeenCalledTimes(1); expect(draftStream.clear).toHaveBeenCalledTimes(1);
}); });
it("skips final payload when text is undefined", async () => {
const draftStream = createDraftStream(999);
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: undefined as unknown as string }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({ context: createContext() });
expect(deliverReplies).not.toHaveBeenCalled();
expect(draftStream.clear).toHaveBeenCalledTimes(1);
});
it("falls back when all finals are skipped and clears preview", async () => { it("falls back when all finals are skipped and clears preview", async () => {
const draftStream = createDraftStream(999); const draftStream = createDraftStream(999);
createTelegramDraftStream.mockReturnValue(draftStream); createTelegramDraftStream.mockReturnValue(draftStream);

View File

@@ -5,6 +5,7 @@ import type { Chat, Message } from "@grammyjs/types";
import { afterAll, beforeAll, describe, expect, it, vi } from "vitest"; import { afterAll, beforeAll, describe, expect, it, vi } from "vitest";
import { escapeRegExp, formatEnvelopeTimestamp } from "../../test/helpers/envelope-timestamp.js"; import { escapeRegExp, formatEnvelopeTimestamp } from "../../test/helpers/envelope-timestamp.js";
import { withEnvAsync } from "../test-utils/env.js"; import { withEnvAsync } from "../test-utils/env.js";
import { useFrozenTime, useRealTime } from "../test-utils/frozen-time.js";
import { import {
answerCallbackQuerySpy, answerCallbackQuerySpy,
botCtorSpy, botCtorSpy,
@@ -123,97 +124,87 @@ describe("createTelegramBot", () => {
expect(sequentializeSpy).toHaveBeenCalledTimes(1); expect(sequentializeSpy).toHaveBeenCalledTimes(1);
expect(middlewareUseSpy).toHaveBeenCalledWith(sequentializeSpy.mock.results[0]?.value); expect(middlewareUseSpy).toHaveBeenCalledWith(sequentializeSpy.mock.results[0]?.value);
expect(sequentializeKey).toBe(getTelegramSequentialKey); expect(sequentializeKey).toBe(getTelegramSequentialKey);
expect( const cases = [
getTelegramSequentialKey({ message: mockMessage({ chat: mockChat({ id: 123 }) }) }), [{ message: mockMessage({ chat: mockChat({ id: 123 }) }) }, "telegram:123"],
).toBe("telegram:123"); [
expect( {
getTelegramSequentialKey({ message: mockMessage({
message: mockMessage({ chat: mockChat({ id: 123, type: "private" }),
chat: mockChat({ id: 123, type: "private" }), message_thread_id: 9,
message_thread_id: 9, }),
}),
}),
).toBe("telegram:123:topic:9");
expect(
getTelegramSequentialKey({
message: mockMessage({
chat: mockChat({ id: 123, type: "supergroup" }),
message_thread_id: 9,
}),
}),
).toBe("telegram:123");
expect(
getTelegramSequentialKey({
message: mockMessage({ chat: mockChat({ id: 123, type: "supergroup", is_forum: true }) }),
}),
).toBe("telegram:123:topic:1");
expect(
getTelegramSequentialKey({
update: { message: mockMessage({ chat: mockChat({ id: 555 }) }) },
}),
).toBe("telegram:555");
expect(
getTelegramSequentialKey({
channelPost: mockMessage({ chat: mockChat({ id: -100777111222, type: "channel" }) }),
}),
).toBe("telegram:-100777111222");
expect(
getTelegramSequentialKey({
update: {
channel_post: mockMessage({ chat: mockChat({ id: -100777111223, type: "channel" }) }),
}, },
}), "telegram:123:topic:9",
).toBe("telegram:-100777111223"); ],
expect( [
getTelegramSequentialKey({ {
message: mockMessage({ chat: mockChat({ id: 123 }), text: "/stop" }), message: mockMessage({
}), chat: mockChat({ id: 123, type: "supergroup" }),
).toBe("telegram:123:control"); message_thread_id: 9,
expect( }),
getTelegramSequentialKey({ },
message: mockMessage({ chat: mockChat({ id: 123 }), text: "/status" }), "telegram:123",
}), ],
).toBe("telegram:123"); [
expect( {
getTelegramSequentialKey({ message: mockMessage({
message: mockMessage({ chat: mockChat({ id: 123 }), text: "stop" }), chat: mockChat({ id: 123, type: "supergroup", is_forum: true }),
}), }),
).toBe("telegram:123:control"); },
expect( "telegram:123:topic:1",
getTelegramSequentialKey({ ],
message: mockMessage({ chat: mockChat({ id: 123 }), text: "stop please" }), [{ update: { message: mockMessage({ chat: mockChat({ id: 555 }) }) } }, "telegram:555"],
}), [
).toBe("telegram:123:control"); {
expect( channelPost: mockMessage({ chat: mockChat({ id: -100777111222, type: "channel" }) }),
getTelegramSequentialKey({ },
message: mockMessage({ chat: mockChat({ id: 123 }), text: "do not do that" }), "telegram:-100777111222",
}), ],
).toBe("telegram:123:control"); [
expect( {
getTelegramSequentialKey({ update: {
message: mockMessage({ chat: mockChat({ id: 123 }), text: "остановись" }), channel_post: mockMessage({ chat: mockChat({ id: -100777111223, type: "channel" }) }),
}), },
).toBe("telegram:123:control"); },
expect( "telegram:-100777111223",
getTelegramSequentialKey({ ],
message: mockMessage({ chat: mockChat({ id: 123 }), text: "halt" }), [
}), { message: mockMessage({ chat: mockChat({ id: 123 }), text: "/stop" }) },
).toBe("telegram:123:control"); "telegram:123:control",
expect( ],
getTelegramSequentialKey({ [{ message: mockMessage({ chat: mockChat({ id: 123 }), text: "/status" }) }, "telegram:123"],
message: mockMessage({ chat: mockChat({ id: 123 }), text: "/abort" }), [
}), { message: mockMessage({ chat: mockChat({ id: 123 }), text: "stop" }) },
).toBe("telegram:123"); "telegram:123:control",
expect( ],
getTelegramSequentialKey({ [
message: mockMessage({ chat: mockChat({ id: 123 }), text: "/abort now" }), { message: mockMessage({ chat: mockChat({ id: 123 }), text: "stop please" }) },
}), "telegram:123:control",
).toBe("telegram:123"); ],
expect( [
getTelegramSequentialKey({ { message: mockMessage({ chat: mockChat({ id: 123 }), text: "do not do that" }) },
message: mockMessage({ chat: mockChat({ id: 123 }), text: "please do not do that" }), "telegram:123:control",
}), ],
).toBe("telegram:123"); [
{ message: mockMessage({ chat: mockChat({ id: 123 }), text: "остановись" }) },
"telegram:123:control",
],
[
{ message: mockMessage({ chat: mockChat({ id: 123 }), text: "halt" }) },
"telegram:123:control",
],
[{ message: mockMessage({ chat: mockChat({ id: 123 }), text: "/abort" }) }, "telegram:123"],
[
{ message: mockMessage({ chat: mockChat({ id: 123 }), text: "/abort now" }) },
"telegram:123",
],
[
{ message: mockMessage({ chat: mockChat({ id: 123 }), text: "please do not do that" }) },
"telegram:123",
],
] as const;
for (const [input, expected] of cases) {
expect(getTelegramSequentialKey(input)).toBe(expected);
}
}); });
it("routes callback_query payloads as messages and answers callbacks", async () => { it("routes callback_query payloads as messages and answers callbacks", async () => {
createTelegramBot({ token: "tok" }); createTelegramBot({ token: "tok" });
@@ -2031,7 +2022,7 @@ describe("createTelegramBot", () => {
}, },
}); });
vi.useFakeTimers(); useFrozenTime("2026-02-20T00:00:00.000Z");
try { try {
createTelegramBot({ token: "tok", testTimings: TELEGRAM_TEST_TIMINGS }); createTelegramBot({ token: "tok", testTimings: TELEGRAM_TEST_TIMINGS });
const handler = getOnHandler("channel_post") as ( const handler = getOnHandler("channel_post") as (
@@ -2071,7 +2062,7 @@ describe("createTelegramBot", () => {
expect(payload.RawBody).toContain(part1.slice(0, 32)); expect(payload.RawBody).toContain(part1.slice(0, 32));
expect(payload.RawBody).toContain(part2.slice(0, 32)); expect(payload.RawBody).toContain(part2.slice(0, 32));
} finally { } finally {
vi.useRealTimers(); useRealTime();
} }
}); });
it("drops oversized channel_post media instead of dispatching a placeholder message", async () => { it("drops oversized channel_post media instead of dispatching a placeholder message", async () => {