fix(telegram): prevent reasoning duplicates in draft lanes

This commit is contained in:
Ayaan Zaidi
2026-02-19 15:44:50 +05:30
parent 0a599061a9
commit e1fa57cee7
4 changed files with 103 additions and 49 deletions

View File

@@ -438,18 +438,17 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(draftStream.forceNewMessage).not.toHaveBeenCalled();
});
it("forces new message when reasoning ends after previous output", async () => {
const draftStream = createDraftStream(999);
createTelegramDraftStream.mockReturnValue(draftStream);
it("defers reasoning split until next reasoning block in block mode", async () => {
const { reasoningDraftStream } = setupDraftStreams({
answerMessageId: 999,
reasoningMessageId: 111,
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
// First partial: text before thinking
await replyOptions?.onPartialReply?.({ text: "Let me check" });
// Reasoning stream (thinking block)
await replyOptions?.onReasoningStream?.({ text: "Analyzing..." });
// Reasoning ends
await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_first block_" });
await replyOptions?.onReasoningEnd?.();
// Second partial: text after thinking
expect(reasoningDraftStream.forceNewMessage).not.toHaveBeenCalled();
await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_second block_" });
await replyOptions?.onPartialReply?.({ text: "Here's the answer" });
await dispatcherOptions.deliver({ text: "Here's the answer" }, { kind: "final" });
return { queuedFinal: true };
@@ -459,16 +458,17 @@ describe("dispatchTelegramMessage draft streaming", () => {
await dispatchWithContext({ context: createContext(), streamMode: "block" });
// Should force new message when reasoning ends
expect(draftStream.forceNewMessage).toHaveBeenCalled();
expect(reasoningDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
});
it("does not force new message in partial mode when reasoning ends", async () => {
const draftStream = createDraftStream(999);
createTelegramDraftStream.mockReturnValue(draftStream);
it("does not split reasoning lane on reasoning end when no next reasoning block arrives", async () => {
const { reasoningDraftStream } = setupDraftStreams({
answerMessageId: 999,
reasoningMessageId: 111,
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Let me check" });
await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_first block_" });
await replyOptions?.onReasoningEnd?.();
await replyOptions?.onPartialReply?.({ text: "Here's the answer" });
await dispatcherOptions.deliver({ text: "Here's the answer" }, { kind: "final" });
@@ -477,21 +477,20 @@ describe("dispatchTelegramMessage draft streaming", () => {
);
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
await dispatchWithContext({ context: createContext(), streamMode: "block" });
expect(draftStream.forceNewMessage).not.toHaveBeenCalled();
expect(reasoningDraftStream.forceNewMessage).not.toHaveBeenCalled();
});
it("forces new message on reasoning end after streamed reasoning output", async () => {
const draftStream = createDraftStream(999);
createTelegramDraftStream.mockReturnValue(draftStream);
it("does not force new reasoning split in partial mode when no next block arrives", async () => {
const { reasoningDraftStream } = setupDraftStreams({
answerMessageId: 999,
reasoningMessageId: 111,
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
// Reasoning starts immediately (no assistant-answer output yet)
await replyOptions?.onReasoningStream?.({ text: "Thinking..." });
// Reasoning ends
await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_first block_" });
await replyOptions?.onReasoningEnd?.();
// First actual text output
await replyOptions?.onPartialReply?.({ text: "Here's my answer" });
await dispatcherOptions.deliver({ text: "Here's my answer" }, { kind: "final" });
return { queuedFinal: true };
@@ -499,10 +498,9 @@ describe("dispatchTelegramMessage draft streaming", () => {
);
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({ context: createContext(), streamMode: "block" });
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
// Reasoning stream produced preview output, so split for final answer.
expect(draftStream.forceNewMessage).toHaveBeenCalled();
expect(reasoningDraftStream.forceNewMessage).not.toHaveBeenCalled();
});
it("does not finalize preview with reasoning payloads before answer payloads", async () => {
@@ -647,6 +645,48 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(deliverReplies).not.toHaveBeenCalled();
});
it("does not duplicate reasoning final after reasoning end in block mode", async () => {
let reasoningMessageId: number | undefined = 111;
const reasoningDraftStream = {
update: vi.fn(),
flush: vi.fn().mockResolvedValue(undefined),
messageId: vi.fn().mockImplementation(() => reasoningMessageId),
clear: vi.fn().mockResolvedValue(undefined),
stop: vi.fn().mockResolvedValue(undefined),
forceNewMessage: vi.fn().mockImplementation(() => {
reasoningMessageId = undefined;
}),
};
const answerDraftStream = createDraftStream(999);
createTelegramDraftStream
.mockImplementationOnce(() => answerDraftStream)
.mockImplementationOnce(() => reasoningDraftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_step one_" });
await replyOptions?.onReasoningEnd?.();
await dispatcherOptions.deliver(
{ text: "Reasoning:\n_step one expanded_" },
{ kind: "final" },
);
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "111" });
await dispatchWithContext({ context: createContext(), streamMode: "block" });
expect(reasoningDraftStream.forceNewMessage).not.toHaveBeenCalled();
expect(editMessageTelegram).toHaveBeenCalledWith(
123,
111,
"Reasoning:\n_step one expanded_",
expect.any(Object),
);
expect(deliverReplies).not.toHaveBeenCalled();
});
it("splits reasoning preview only when next reasoning block starts in partial mode", async () => {
const { reasoningDraftStream } = setupDraftStreams({
answerMessageId: 999,

View File

@@ -494,10 +494,10 @@ export const dispatchTelegramMessage = async ({
: undefined,
onReasoningStream: reasoningLane.stream
? (payload) => {
// In partial mode, split between reasoning blocks only when the
// next reasoning stream starts. Splitting at reasoning-end can
// orphan the active preview and cause duplicate reasoning sends.
if (streamMode === "partial" && splitReasoningOnNextStream) {
// Split between reasoning blocks only when the next reasoning
// stream starts. Splitting at reasoning-end can orphan the active
// preview and cause duplicate reasoning sends on reasoning final.
if (splitReasoningOnNextStream) {
reasoningLane.stream?.forceNewMessage();
resetDraftLaneState(reasoningLane);
splitReasoningOnNextStream = false;
@@ -516,15 +516,7 @@ export const dispatchTelegramMessage = async ({
: undefined,
onReasoningEnd: reasoningLane.stream
? () => {
// Block mode keeps hard message boundaries at reasoning-end.
if (streamMode === "block") {
if (reasoningLane.hasStreamedMessage) {
reasoningLane.stream?.forceNewMessage();
}
resetDraftLaneState(reasoningLane);
return;
}
// Partial mode splits when/if a later reasoning block begins.
// Split when/if a later reasoning block begins.
splitReasoningOnNextStream = reasoningLane.hasStreamedMessage;
}
: undefined,

View File

@@ -153,6 +153,28 @@ describe("createTelegramDraftStream", () => {
parse_mode: "HTML",
});
});
it("enforces maxChars after renderText expansion", async () => {
const api = createMockDraftApi();
const warn = vi.fn();
const stream = createTelegramDraftStream({
// oxlint-disable-next-line typescript/no-explicit-any
api: api as any,
chatId: 123,
maxChars: 100,
renderText: () => ({ text: `<b>${"<".repeat(120)}</b>`, parseMode: "HTML" }),
warn,
});
stream.update("short raw text");
await stream.flush();
expect(api.sendMessage).not.toHaveBeenCalled();
expect(api.editMessageText).not.toHaveBeenCalled();
expect(warn).toHaveBeenCalledWith(
expect.stringContaining("telegram stream preview stopped (text length 127 > 100)"),
);
});
});
describe("draft stream initial message debounce", () => {

View File

@@ -62,21 +62,21 @@ export function createTelegramDraftStream(params: {
if (!trimmed) {
return false;
}
if (trimmed.length > maxChars) {
// Telegram text messages/edits cap at 4096 chars.
// Stop streaming once we exceed the cap to avoid repeated API failures.
stopped = true;
params.warn?.(
`telegram stream preview stopped (text length ${trimmed.length} > ${maxChars})`,
);
return false;
}
const rendered = params.renderText?.(trimmed) ?? { text: trimmed };
const renderedText = rendered.text.trimEnd();
const renderedParseMode = rendered.parseMode;
if (!renderedText) {
return false;
}
if (renderedText.length > maxChars) {
// Telegram text messages/edits cap at 4096 chars.
// Stop streaming once we exceed the cap to avoid repeated API failures.
stopped = true;
params.warn?.(
`telegram stream preview stopped (text length ${renderedText.length} > ${maxChars})`,
);
return false;
}
if (renderedText === lastSentText && renderedParseMode === lastSentParseMode) {
return true;
}