refactor(telegram): unify reasoning and answer lane delivery

This commit is contained in:
Ayaan Zaidi
2026-02-19 20:17:14 +05:30
parent 5ffb73e8b8
commit e5ff60607d

View File

@@ -149,47 +149,8 @@ export const dispatchTelegramMessage = async ({
replyToMode !== "off" && typeof msg.message_id === "number" ? msg.message_id : undefined;
const draftMinInitialChars =
streamMode === "partial" || streamReasoningDraft ? 1 : DRAFT_MIN_INITIAL_CHARS;
const answerDraftStream = canStreamAnswerDraft
? createTelegramDraftStream({
api: bot.api,
chatId,
maxChars: draftMaxChars,
thread: threadSpec,
replyToMessageId: draftReplyToMessageId,
minInitialChars: draftMinInitialChars,
renderText: renderDraftPreview,
log: logVerbose,
warn: logVerbose,
})
: undefined;
const reasoningDraftStream = canStreamReasoningDraft
? createTelegramDraftStream({
api: bot.api,
chatId,
maxChars: draftMaxChars,
thread: threadSpec,
replyToMessageId: draftReplyToMessageId,
minInitialChars: draftMinInitialChars,
renderText: renderDraftPreview,
log: logVerbose,
warn: logVerbose,
})
: undefined;
const answerDraftChunking =
answerDraftStream && streamMode === "block"
? resolveTelegramDraftStreamingChunking(cfg, route.accountId)
: undefined;
const answerDraftChunker = answerDraftChunking
? new EmbeddedBlockChunker(answerDraftChunking)
: undefined;
const reasoningDraftChunking =
reasoningDraftStream && streamMode === "block"
? resolveTelegramDraftStreamingChunking(cfg, route.accountId)
: undefined;
const reasoningDraftChunker = reasoningDraftChunking
? new EmbeddedBlockChunker(reasoningDraftChunking)
: undefined;
const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId);
type LaneName = "answer" | "reasoning";
type DraftLaneState = {
stream: ReturnType<typeof createTelegramDraftStream> | undefined;
lastPartialText: string;
@@ -197,22 +158,52 @@ export const dispatchTelegramMessage = async ({
hasStreamedMessage: boolean;
chunker: EmbeddedBlockChunker | undefined;
};
const answerLane: DraftLaneState = {
stream: answerDraftStream,
lastPartialText: "",
draftText: "",
hasStreamedMessage: false,
chunker: answerDraftChunker,
const createDraftLane = (enabled: boolean): DraftLaneState => {
const stream = enabled
? createTelegramDraftStream({
api: bot.api,
chatId,
maxChars: draftMaxChars,
thread: threadSpec,
replyToMessageId: draftReplyToMessageId,
minInitialChars: draftMinInitialChars,
renderText: renderDraftPreview,
log: logVerbose,
warn: logVerbose,
})
: undefined;
const chunker =
stream && streamMode === "block"
? new EmbeddedBlockChunker(resolveTelegramDraftStreamingChunking(cfg, route.accountId))
: undefined;
return {
stream,
lastPartialText: "",
draftText: "",
hasStreamedMessage: false,
chunker,
};
};
const reasoningLane: DraftLaneState = {
stream: reasoningDraftStream,
lastPartialText: "",
draftText: "",
hasStreamedMessage: false,
chunker: reasoningDraftChunker,
const lanes: Record<LaneName, DraftLaneState> = {
answer: createDraftLane(canStreamAnswerDraft),
reasoning: createDraftLane(canStreamReasoningDraft),
};
const answerLane = lanes.answer;
const reasoningLane = lanes.reasoning;
let splitReasoningOnNextStream = false;
const reasoningStepState = createTelegramReasoningStepState();
type SplitLaneSegment = { lane: LaneName; text: string };
const splitTextIntoLaneSegments = (text?: string): SplitLaneSegment[] => {
const split = splitTelegramReasoningText(text);
const segments: SplitLaneSegment[] = [];
if (split.reasoningText) {
segments.push({ lane: "reasoning", text: split.reasoningText });
}
if (split.answerText) {
segments.push({ lane: "answer", text: split.answerText });
}
return segments;
};
const resetDraftLaneState = (lane: DraftLaneState) => {
lane.lastPartialText = "";
lane.draftText = "";
@@ -271,17 +262,12 @@ export const dispatchTelegramMessage = async ({
});
};
const updateDraftLanesFromPartial = (text: string | undefined) => {
if (!text) {
return;
}
const split = splitTelegramReasoningText(text);
if (split.reasoningText) {
reasoningStepState.noteReasoningHint();
reasoningStepState.noteReasoningDelivered();
updateDraftFromPartial(reasoningLane, split.reasoningText);
}
if (split.answerText) {
updateDraftFromPartial(answerLane, split.answerText);
for (const segment of splitTextIntoLaneSegments(text)) {
if (segment.lane === "reasoning") {
reasoningStepState.noteReasoningHint();
reasoningStepState.noteReasoningDelivered();
}
updateDraftFromPartial(lanes[segment.lane], segment.text);
}
};
const flushDraftLane = async (lane: DraftLaneState) => {
@@ -381,8 +367,10 @@ export const dispatchTelegramMessage = async ({
delivered: false,
skippedNonSilent: 0,
};
let finalizedViaPreviewMessage = false;
let finalizedReasoningViaPreviewMessage = false;
const finalizedPreviewByLane: Record<LaneName, boolean> = {
answer: false,
reasoning: false,
};
const clearGroupHistory = () => {
if (isGroup && historyKey) {
clearHistoryEntriesIfEnabled({ historyMap: groupHistories, historyKey, limit: historyLimit });
@@ -402,89 +390,67 @@ export const dispatchTelegramMessage = async ({
linkPreview: telegramCfg.linkPreview,
replyQuoteText,
};
const tryFinalizePreviewForLane = async (params: {
const getLanePreviewText = (lane: DraftLaneState) =>
streamMode === "block" ? lane.draftText : lane.lastPartialText;
const tryUpdatePreviewForLane = async (params: {
lane: DraftLaneState;
laneName: "answer" | "reasoning";
finalText: string;
laneName: LaneName;
text: string;
previewButtons?: TelegramInlineButtons;
stopBeforeEdit?: boolean;
updateLaneSnapshot?: boolean;
skipRegressive: "always" | "existingOnly";
context: "final" | "update";
}): Promise<boolean> => {
const { lane, laneName, finalText, previewButtons } = params;
const {
lane,
laneName,
text,
previewButtons,
stopBeforeEdit = false,
updateLaneSnapshot = false,
skipRegressive,
context,
} = params;
if (!lane.stream) {
return false;
}
const hadPreviewMessage = typeof lane.stream.messageId() === "number";
const currentPreviewText = streamMode === "block" ? lane.draftText : lane.lastPartialText;
await lane.stream.stop();
if (stopBeforeEdit) {
await lane.stream.stop();
}
const previewMessageId = lane.stream.messageId();
if (typeof previewMessageId !== "number") {
return false;
}
if (
hadPreviewMessage &&
currentPreviewText &&
currentPreviewText.startsWith(finalText) &&
finalText.length < currentPreviewText.length
) {
const currentPreviewText = getLanePreviewText(lane);
const shouldSkipRegressive =
Boolean(currentPreviewText) &&
currentPreviewText.startsWith(text) &&
text.length < currentPreviewText.length &&
(skipRegressive === "always" || hadPreviewMessage);
if (shouldSkipRegressive) {
// Avoid regressive punctuation/wording flicker from occasional shorter finals.
deliveryState.delivered = true;
return true;
}
try {
await editMessageTelegram(chatId, previewMessageId, finalText, {
await editMessageTelegram(chatId, previewMessageId, text, {
api: bot.api,
cfg,
accountId: route.accountId,
linkPreview: telegramCfg.linkPreview,
buttons: previewButtons,
});
if (updateLaneSnapshot) {
lane.lastPartialText = text;
lane.draftText = text;
}
deliveryState.delivered = true;
return true;
} catch (err) {
logVerbose(
`telegram: ${laneName} preview final edit failed; falling back to standard send (${String(err)})`,
);
return false;
}
};
const tryEditExistingPreviewForLane = async (params: {
lane: DraftLaneState;
laneName: "answer" | "reasoning";
finalText: string;
previewButtons?: TelegramInlineButtons;
}): Promise<boolean> => {
const { lane, laneName, finalText, previewButtons } = params;
if (!lane.stream) {
return false;
}
const previewMessageId = lane.stream.messageId();
if (typeof previewMessageId !== "number") {
return false;
}
const currentPreviewText = streamMode === "block" ? lane.draftText : lane.lastPartialText;
if (
currentPreviewText &&
currentPreviewText.startsWith(finalText) &&
finalText.length < currentPreviewText.length
) {
// Avoid regressive punctuation/wording flicker from occasional shorter finals.
deliveryState.delivered = true;
return true;
}
try {
await editMessageTelegram(chatId, previewMessageId, finalText, {
api: bot.api,
cfg,
accountId: route.accountId,
linkPreview: telegramCfg.linkPreview,
buttons: previewButtons,
});
lane.lastPartialText = finalText;
lane.draftText = finalText;
deliveryState.delivered = true;
return true;
} catch (err) {
logVerbose(
`telegram: ${laneName} preview update failed; falling back to standard send (${String(err)})`,
`telegram: ${laneName} preview ${context} edit failed; falling back to standard send (${String(err)})`,
);
return false;
}
@@ -506,33 +472,72 @@ export const dispatchTelegramMessage = async ({
}
return result.delivered;
};
const tryFinalizeLaneText = async (params: {
lane: DraftLaneState;
laneName: "answer" | "reasoning";
type LaneDeliveryResult = "preview-finalized" | "preview-updated" | "sent" | "skipped";
const deliverLaneText = async (params: {
laneName: LaneName;
text: string;
previewButtons?: TelegramInlineButtons;
alreadyFinalized?: boolean;
payload: ReplyPayload;
}): Promise<boolean> => {
const { lane, laneName, text, previewButtons, alreadyFinalized, payload } = params;
infoKind: string;
previewButtons?: TelegramInlineButtons;
allowPreviewUpdateForNonFinal?: boolean;
}): Promise<LaneDeliveryResult> => {
const {
laneName,
text,
payload,
infoKind,
previewButtons,
allowPreviewUpdateForNonFinal = false,
} = params;
const lane = lanes[laneName];
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
const canFinalizeViaPreviewEdit =
const canEditViaPreview =
!hasMedia && text.length > 0 && text.length <= draftMaxChars && !payload.isError;
if (!canFinalizeViaPreviewEdit || alreadyFinalized) {
if (!hasMedia && !payload.isError && text.length > draftMaxChars) {
if (infoKind === "final") {
if (canEditViaPreview && !finalizedPreviewByLane[laneName]) {
await flushDraftLane(lane);
const finalized = await tryUpdatePreviewForLane({
lane,
laneName,
text,
previewButtons,
stopBeforeEdit: true,
skipRegressive: "existingOnly",
context: "final",
});
if (finalized) {
finalizedPreviewByLane[laneName] = true;
return "preview-finalized";
}
} else if (!hasMedia && !payload.isError && text.length > draftMaxChars) {
logVerbose(
`telegram: preview final too long for edit (${text.length} > ${draftMaxChars}); falling back to standard send`,
);
}
return false;
await lane.stream?.stop();
const delivered = await sendPayload(applyTextToPayload(payload, text));
return delivered ? "sent" : "skipped";
}
await flushDraftLane(lane);
return tryFinalizePreviewForLane({
lane,
laneName,
finalText: text,
previewButtons,
});
if (allowPreviewUpdateForNonFinal && canEditViaPreview) {
const updated = await tryUpdatePreviewForLane({
lane,
laneName,
text,
previewButtons,
stopBeforeEdit: false,
updateLaneSnapshot: true,
skipRegressive: "always",
context: "update",
});
if (updated) {
return "preview-updated";
}
}
const delivered = await sendPayload(applyTextToPayload(payload, text));
return delivered ? "sent" : "skipped";
};
let queuedFinal = false;
@@ -546,7 +551,7 @@ export const dispatchTelegramMessage = async ({
const previewButtons = (
payload.channelData?.telegram as { buttons?: TelegramInlineButtons } | undefined
)?.buttons;
const split = splitTelegramReasoningText(payload.text);
const segments = splitTextIntoLaneSegments(payload.text);
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
const flushBufferedFinalAnswer = async () => {
@@ -559,112 +564,51 @@ export const dispatchTelegramMessage = async ({
| { buttons?: TelegramInlineButtons }
| undefined
)?.buttons;
const finalizedBufferedAnswer = await tryFinalizeLaneText({
lane: answerLane,
await deliverLaneText({
laneName: "answer",
text: buffered.text,
previewButtons: bufferedButtons,
alreadyFinalized: finalizedViaPreviewMessage,
payload: buffered.payload,
infoKind: "final",
previewButtons: bufferedButtons,
});
if (finalizedBufferedAnswer) {
finalizedViaPreviewMessage = true;
reasoningStepState.resetForNextStep();
return;
}
await answerLane.stream?.stop();
await sendPayload(applyTextToPayload(buffered.payload, buffered.text));
reasoningStepState.resetForNextStep();
};
const deliverReasoningText = async (text: string) => {
reasoningStepState.noteReasoningHint();
if (info.kind === "final") {
const finalizedReasoning = await tryFinalizeLaneText({
lane: reasoningLane,
laneName: "reasoning",
text,
previewButtons,
payload,
});
if (finalizedReasoning) {
finalizedReasoningViaPreviewMessage = true;
reasoningStepState.noteReasoningDelivered();
await flushBufferedFinalAnswer();
return;
}
await reasoningLane.stream?.stop();
const delivered = await sendPayload(applyTextToPayload(payload, text));
if (delivered) {
for (const segment of segments) {
if (
segment.lane === "answer" &&
info.kind === "final" &&
reasoningStepState.shouldBufferFinalAnswer()
) {
reasoningStepState.bufferFinalAnswer({ payload, text: segment.text });
continue;
}
if (segment.lane === "reasoning") {
reasoningStepState.noteReasoningHint();
}
const result = await deliverLaneText({
laneName: segment.lane,
text: segment.text,
payload,
infoKind: info.kind,
previewButtons,
allowPreviewUpdateForNonFinal: segment.lane === "reasoning",
});
if (segment.lane === "reasoning") {
if (result !== "skipped") {
reasoningStepState.noteReasoningDelivered();
await flushBufferedFinalAnswer();
}
return;
continue;
}
const canEditReasoningPreview =
!hasMedia && text.length > 0 && text.length <= draftMaxChars && !payload.isError;
if (canEditReasoningPreview) {
const updatedReasoning = await tryEditExistingPreviewForLane({
lane: reasoningLane,
laneName: "reasoning",
finalText: text,
previewButtons,
});
if (updatedReasoning) {
reasoningStepState.noteReasoningDelivered();
await flushBufferedFinalAnswer();
return;
}
}
const delivered = await sendPayload(applyTextToPayload(payload, text));
if (delivered) {
reasoningStepState.noteReasoningDelivered();
await flushBufferedFinalAnswer();
}
};
const deliverAnswerText = async (text: string) => {
if (info.kind === "final" && reasoningStepState.shouldBufferFinalAnswer()) {
reasoningStepState.bufferFinalAnswer({ payload, text });
return;
}
if (info.kind === "final") {
const finalizedAnswer = await tryFinalizeLaneText({
lane: answerLane,
laneName: "answer",
text,
previewButtons,
alreadyFinalized: finalizedViaPreviewMessage,
payload,
});
if (finalizedAnswer && !finalizedViaPreviewMessage) {
finalizedViaPreviewMessage = true;
if (reasoningLane.hasStreamedMessage) {
finalizedReasoningViaPreviewMessage = true;
}
reasoningStepState.resetForNextStep();
return;
}
await answerLane.stream?.stop();
}
await sendPayload(applyTextToPayload(payload, text));
if (info.kind === "final") {
if (reasoningLane.hasStreamedMessage) {
finalizedReasoningViaPreviewMessage = true;
finalizedPreviewByLane.reasoning = true;
}
reasoningStepState.resetForNextStep();
}
};
if (split.reasoningText) {
await deliverReasoningText(split.reasoningText);
}
if (split.answerText) {
await deliverAnswerText(split.answerText);
return;
}
if (split.reasoningText) {
if (segments.length > 0) {
return;
}
@@ -709,9 +653,10 @@ export const dispatchTelegramMessage = async ({
replyOptions: {
skillFilter,
disableBlockStreaming,
onPartialReply: answerLane.stream
? (payload) => updateDraftLanesFromPartial(payload.text)
: undefined,
onPartialReply:
answerLane.stream || reasoningLane.stream
? (payload) => updateDraftLanesFromPartial(payload.text)
: undefined,
onReasoningStream: reasoningLane.stream
? (payload) => {
// Split between reasoning blocks only when the next reasoning
@@ -722,14 +667,12 @@ export const dispatchTelegramMessage = async ({
resetDraftLaneState(reasoningLane);
splitReasoningOnNextStream = false;
}
const split = splitTelegramReasoningText(payload.text);
if (split.reasoningText) {
reasoningStepState.noteReasoningHint();
reasoningStepState.noteReasoningDelivered();
updateDraftFromPartial(reasoningLane, split.reasoningText);
}
if (split.answerText) {
updateDraftFromPartial(answerLane, split.answerText);
for (const segment of splitTextIntoLaneSegments(payload.text)) {
if (segment.lane === "reasoning") {
reasoningStepState.noteReasoningHint();
reasoningStepState.noteReasoningDelivered();
}
updateDraftFromPartial(lanes[segment.lane], segment.text);
}
}
: undefined,
@@ -759,12 +702,12 @@ export const dispatchTelegramMessage = async ({
Boolean(reasoningLane.stream) &&
answerLane.stream === reasoningLane.stream;
await answerLane.stream?.stop();
if (!finalizedViaPreviewMessage) {
if (!finalizedPreviewByLane.answer) {
await answerLane.stream?.clear();
}
if (!streamsShareHandle) {
await reasoningLane.stream?.stop();
if (!finalizedReasoningViaPreviewMessage) {
if (!finalizedPreviewByLane.reasoning) {
await reasoningLane.stream?.clear();
}
}