refactor: extract telegram lane delivery and e2e harness

This commit is contained in:
Peter Steinberger
2026-02-22 21:32:14 +01:00
parent acfbe158c6
commit 13541864e5
5 changed files with 784 additions and 628 deletions

View File

@@ -129,6 +129,44 @@ function collectMessagingMediaUrlsFromToolResult(result: unknown): string[] {
return urls;
}
function emitToolResultOutput(params: {
ctx: ToolHandlerContext;
toolName: string;
meta?: string;
isToolError: boolean;
result: unknown;
sanitizedResult: unknown;
}) {
const { ctx, toolName, meta, isToolError, result, sanitizedResult } = params;
if (!ctx.params.onToolResult) {
return;
}
if (ctx.shouldEmitToolOutput()) {
const outputText = extractToolResultText(sanitizedResult);
if (outputText) {
ctx.emitToolOutput(toolName, meta, outputText);
}
return;
}
if (isToolError) {
return;
}
// emitToolOutput() already handles MEDIA: directives when enabled; this path
// only sends raw media URLs for non-verbose delivery mode.
const mediaPaths = filterToolResultMediaUrls(toolName, extractToolResultMediaPaths(result));
if (mediaPaths.length === 0) {
return;
}
try {
void ctx.params.onToolResult({ mediaUrls: mediaPaths });
} catch {
// ignore delivery failures
}
}
export async function handleToolExecutionStart(
ctx: ToolHandlerContext,
evt: AgentEvent & { toolName: string; toolCallId: string; args: unknown },
@@ -371,26 +409,7 @@ export async function handleToolExecutionEnd(
`embedded run tool end: runId=${ctx.params.runId} tool=${toolName} toolCallId=${toolCallId}`,
);
if (ctx.params.onToolResult && ctx.shouldEmitToolOutput()) {
const outputText = extractToolResultText(sanitizedResult);
if (outputText) {
ctx.emitToolOutput(toolName, meta, outputText);
}
}
// Deliver media from tool results when the verbose emitToolOutput path is off.
// When shouldEmitToolOutput() is true, emitToolOutput already delivers media
// via parseReplyDirectives (MEDIA: text extraction), so skip to avoid duplicates.
if (ctx.params.onToolResult && !isToolError && !ctx.shouldEmitToolOutput()) {
const mediaPaths = filterToolResultMediaUrls(toolName, extractToolResultMediaPaths(result));
if (mediaPaths.length > 0) {
try {
void ctx.params.onToolResult({ mediaUrls: mediaPaths });
} catch {
// ignore delivery failures
}
}
}
emitToolResultOutput({ ctx, toolName, meta, isToolError, result, sanitizedResult });
// Run after_tool_call plugin hook (fire-and-forget)
const hookRunnerAfter = ctx.hookRunner ?? getGlobalHookRunner();

View File

@@ -27,6 +27,13 @@ import type { TelegramStreamMode } from "./bot/types.js";
import type { TelegramInlineButtons } from "./button-types.js";
import { createTelegramDraftStream } from "./draft-stream.js";
import { renderTelegramHtmlText } from "./format.js";
import {
type ArchivedPreview,
createLaneDeliveryStateTracker,
createLaneTextDeliverer,
type DraftLaneState,
type LaneName,
} from "./lane-delivery.js";
import {
createTelegramReasoningStepState,
splitTelegramReasoningText,
@@ -149,13 +156,6 @@ export const dispatchTelegramMessage = async ({
replyToMode !== "off" && typeof msg.message_id === "number" ? msg.message_id : undefined;
const draftMinInitialChars = DRAFT_MIN_INITIAL_CHARS;
const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId);
type LaneName = "answer" | "reasoning";
type DraftLaneState = {
stream: ReturnType<typeof createTelegramDraftStream> | undefined;
lastPartialText: string;
hasStreamedMessage: boolean;
};
type ArchivedPreview = { messageId: number; textSnapshot: string };
const archivedAnswerPreviews: ArchivedPreview[] = [];
const archivedReasoningPreviewIds: number[] = [];
const createDraftLane = (laneName: LaneName, enabled: boolean): DraftLaneState => {
@@ -332,11 +332,7 @@ export const dispatchTelegramMessage = async ({
ctxPayload.ReplyToIsQuote && ctxPayload.ReplyToBody
? ctxPayload.ReplyToBody.trim() || undefined
: undefined;
const deliveryState = {
delivered: false,
skippedNonSilent: 0,
failedNonSilent: 0,
};
const deliveryState = createLaneDeliveryStateTracker();
const finalizedPreviewByLane: Record<LaneName, boolean> = {
answer: false,
reasoning: false,
@@ -360,78 +356,6 @@ export const dispatchTelegramMessage = async ({
linkPreview: telegramCfg.linkPreview,
replyQuoteText,
};
const getLanePreviewText = (lane: DraftLaneState) => lane.lastPartialText;
const tryUpdatePreviewForLane = async (params: {
lane: DraftLaneState;
laneName: LaneName;
text: string;
previewButtons?: TelegramInlineButtons;
stopBeforeEdit?: boolean;
updateLaneSnapshot?: boolean;
skipRegressive: "always" | "existingOnly";
context: "final" | "update";
previewMessageId?: number;
previewTextSnapshot?: string;
}): Promise<boolean> => {
const {
lane,
laneName,
text,
previewButtons,
stopBeforeEdit = false,
updateLaneSnapshot = false,
skipRegressive,
context,
previewMessageId: previewMessageIdOverride,
previewTextSnapshot,
} = params;
if (!lane.stream) {
return false;
}
const lanePreviewMessageId = lane.stream.messageId();
const hadPreviewMessage =
typeof previewMessageIdOverride === "number" || typeof lanePreviewMessageId === "number";
if (stopBeforeEdit) {
await lane.stream.stop();
}
const previewMessageId =
typeof previewMessageIdOverride === "number"
? previewMessageIdOverride
: lane.stream.messageId();
if (typeof previewMessageId !== "number") {
return false;
}
const currentPreviewText = previewTextSnapshot ?? getLanePreviewText(lane);
const shouldSkipRegressive =
Boolean(currentPreviewText) &&
currentPreviewText.startsWith(text) &&
text.length < currentPreviewText.length &&
(skipRegressive === "always" || hadPreviewMessage);
if (shouldSkipRegressive) {
// Avoid regressive punctuation/wording flicker from occasional shorter finals.
deliveryState.delivered = true;
return true;
}
try {
await editMessageTelegram(chatId, previewMessageId, text, {
api: bot.api,
cfg,
accountId: route.accountId,
linkPreview: telegramCfg.linkPreview,
buttons: previewButtons,
});
if (updateLaneSnapshot) {
lane.lastPartialText = text;
}
deliveryState.delivered = true;
return true;
} catch (err) {
logVerbose(
`telegram: ${laneName} preview ${context} edit failed; falling back to standard send (${String(err)})`,
);
return false;
}
};
const applyTextToPayload = (payload: ReplyPayload, text: string): ReplyPayload => {
if (payload.text === text) {
return payload;
@@ -445,138 +369,38 @@ export const dispatchTelegramMessage = async ({
onVoiceRecording: sendRecordVoice,
});
if (result.delivered) {
deliveryState.delivered = true;
deliveryState.markDelivered();
}
return result.delivered;
};
type LaneDeliveryResult = "preview-finalized" | "preview-updated" | "sent" | "skipped";
const consumeArchivedAnswerPreviewForFinal = async (params: {
lane: DraftLaneState;
text: string;
payload: ReplyPayload;
previewButtons?: TelegramInlineButtons;
canEditViaPreview: boolean;
}): Promise<LaneDeliveryResult | undefined> => {
const archivedPreview = archivedAnswerPreviews.shift();
if (!archivedPreview) {
return undefined;
}
if (params.canEditViaPreview) {
const finalized = await tryUpdatePreviewForLane({
lane: params.lane,
laneName: "answer",
text: params.text,
previewButtons: params.previewButtons,
stopBeforeEdit: false,
skipRegressive: "existingOnly",
context: "final",
previewMessageId: archivedPreview.messageId,
previewTextSnapshot: archivedPreview.textSnapshot,
});
if (finalized) {
return "preview-finalized";
}
}
try {
await bot.api.deleteMessage(chatId, archivedPreview.messageId);
} catch (err) {
logVerbose(
`telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`,
);
}
const delivered = await sendPayload(applyTextToPayload(params.payload, params.text));
return delivered ? "sent" : "skipped";
};
const deliverLaneText = async (params: {
laneName: LaneName;
text: string;
payload: ReplyPayload;
infoKind: string;
previewButtons?: TelegramInlineButtons;
allowPreviewUpdateForNonFinal?: boolean;
}): Promise<LaneDeliveryResult> => {
const {
laneName,
text,
payload,
infoKind,
previewButtons,
allowPreviewUpdateForNonFinal = false,
} = params;
const lane = lanes[laneName];
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
const canEditViaPreview =
!hasMedia && text.length > 0 && text.length <= draftMaxChars && !payload.isError;
if (infoKind === "final") {
if (laneName === "answer") {
const archivedResult = await consumeArchivedAnswerPreviewForFinal({
lane,
text,
payload,
previewButtons,
canEditViaPreview,
});
if (archivedResult) {
return archivedResult;
}
}
if (canEditViaPreview && !finalizedPreviewByLane[laneName]) {
await flushDraftLane(lane);
if (laneName === "answer") {
const archivedResultAfterFlush = await consumeArchivedAnswerPreviewForFinal({
lane,
text,
payload,
previewButtons,
canEditViaPreview,
});
if (archivedResultAfterFlush) {
return archivedResultAfterFlush;
}
}
const finalized = await tryUpdatePreviewForLane({
lane,
laneName,
text,
previewButtons,
stopBeforeEdit: true,
skipRegressive: "existingOnly",
context: "final",
});
if (finalized) {
finalizedPreviewByLane[laneName] = true;
return "preview-finalized";
}
} else if (!hasMedia && !payload.isError && text.length > draftMaxChars) {
logVerbose(
`telegram: preview final too long for edit (${text.length} > ${draftMaxChars}); falling back to standard send`,
);
}
const deliverLaneText = createLaneTextDeliverer({
lanes,
archivedAnswerPreviews,
finalizedPreviewByLane,
draftMaxChars,
applyTextToPayload,
sendPayload,
flushDraftLane,
stopDraftLane: async (lane) => {
await lane.stream?.stop();
const delivered = await sendPayload(applyTextToPayload(payload, text));
return delivered ? "sent" : "skipped";
}
if (allowPreviewUpdateForNonFinal && canEditViaPreview) {
const updated = await tryUpdatePreviewForLane({
lane,
laneName,
text,
previewButtons,
stopBeforeEdit: false,
updateLaneSnapshot: true,
skipRegressive: "always",
context: "update",
},
editPreview: async ({ messageId, text, previewButtons }) => {
await editMessageTelegram(chatId, messageId, text, {
api: bot.api,
cfg,
accountId: route.accountId,
linkPreview: telegramCfg.linkPreview,
buttons: previewButtons,
});
if (updated) {
return "preview-updated";
}
}
const delivered = await sendPayload(applyTextToPayload(payload, text));
return delivered ? "sent" : "skipped";
};
},
deletePreviewMessage: async (messageId) => {
await bot.api.deleteMessage(chatId, messageId);
},
log: logVerbose,
markDelivered: () => {
deliveryState.markDelivered();
},
});
let queuedFinal = false;
@@ -675,11 +499,11 @@ export const dispatchTelegramMessage = async ({
},
onSkip: (_payload, info) => {
if (info.reason !== "silent") {
deliveryState.skippedNonSilent += 1;
deliveryState.markNonSilentSkip();
}
},
onError: (err, info) => {
deliveryState.failedNonSilent += 1;
deliveryState.markNonSilentFailure();
runtime.error?.(danger(`telegram ${info.kind} reply failed: ${String(err)}`));
},
onReplyStart: createTypingCallbacks({
@@ -793,9 +617,10 @@ export const dispatchTelegramMessage = async ({
}
}
let sentFallback = false;
const deliverySummary = deliveryState.snapshot();
if (
!deliveryState.delivered &&
(deliveryState.skippedNonSilent > 0 || deliveryState.failedNonSilent > 0)
!deliverySummary.delivered &&
(deliverySummary.skippedNonSilent > 0 || deliverySummary.failedNonSilent > 0)
) {
const result = await deliverReplies({
replies: [{ text: EMPTY_RESPONSE_FALLBACK }],

View File

@@ -0,0 +1,286 @@
import type { ReplyPayload } from "../auto-reply/types.js";
import type { TelegramInlineButtons } from "./button-types.js";
import type { TelegramDraftStream } from "./draft-stream.js";
export type LaneName = "answer" | "reasoning";
export type DraftLaneState = {
stream: TelegramDraftStream | undefined;
lastPartialText: string;
hasStreamedMessage: boolean;
};
export type ArchivedPreview = {
messageId: number;
textSnapshot: string;
};
export type LaneDeliveryResult = "preview-finalized" | "preview-updated" | "sent" | "skipped";
export type LaneDeliverySnapshot = {
delivered: boolean;
skippedNonSilent: number;
failedNonSilent: number;
};
export type LaneDeliveryStateTracker = {
markDelivered: () => void;
markNonSilentSkip: () => void;
markNonSilentFailure: () => void;
snapshot: () => LaneDeliverySnapshot;
};
export function createLaneDeliveryStateTracker(): LaneDeliveryStateTracker {
const state: LaneDeliverySnapshot = {
delivered: false,
skippedNonSilent: 0,
failedNonSilent: 0,
};
return {
markDelivered: () => {
state.delivered = true;
},
markNonSilentSkip: () => {
state.skippedNonSilent += 1;
},
markNonSilentFailure: () => {
state.failedNonSilent += 1;
},
snapshot: () => ({ ...state }),
};
}
type CreateLaneTextDelivererParams = {
lanes: Record<LaneName, DraftLaneState>;
archivedAnswerPreviews: ArchivedPreview[];
finalizedPreviewByLane: Record<LaneName, boolean>;
draftMaxChars: number;
applyTextToPayload: (payload: ReplyPayload, text: string) => ReplyPayload;
sendPayload: (payload: ReplyPayload) => Promise<boolean>;
flushDraftLane: (lane: DraftLaneState) => Promise<void>;
stopDraftLane: (lane: DraftLaneState) => Promise<void>;
editPreview: (params: {
laneName: LaneName;
messageId: number;
text: string;
context: "final" | "update";
previewButtons?: TelegramInlineButtons;
}) => Promise<void>;
deletePreviewMessage: (messageId: number) => Promise<void>;
log: (message: string) => void;
markDelivered: () => void;
};
type DeliverLaneTextParams = {
laneName: LaneName;
text: string;
payload: ReplyPayload;
infoKind: string;
previewButtons?: TelegramInlineButtons;
allowPreviewUpdateForNonFinal?: boolean;
};
type TryUpdatePreviewParams = {
lane: DraftLaneState;
laneName: LaneName;
text: string;
previewButtons?: TelegramInlineButtons;
stopBeforeEdit?: boolean;
updateLaneSnapshot?: boolean;
skipRegressive: "always" | "existingOnly";
context: "final" | "update";
previewMessageId?: number;
previewTextSnapshot?: string;
};
type ConsumeArchivedAnswerPreviewParams = {
lane: DraftLaneState;
text: string;
payload: ReplyPayload;
previewButtons?: TelegramInlineButtons;
canEditViaPreview: boolean;
};
export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
const getLanePreviewText = (lane: DraftLaneState) => lane.lastPartialText;
const tryUpdatePreviewForLane = async ({
lane,
laneName,
text,
previewButtons,
stopBeforeEdit = false,
updateLaneSnapshot = false,
skipRegressive,
context,
previewMessageId: previewMessageIdOverride,
previewTextSnapshot,
}: TryUpdatePreviewParams): Promise<boolean> => {
if (!lane.stream) {
return false;
}
const lanePreviewMessageId = lane.stream.messageId();
const hadPreviewMessage =
typeof previewMessageIdOverride === "number" || typeof lanePreviewMessageId === "number";
if (stopBeforeEdit) {
await params.stopDraftLane(lane);
}
const previewMessageId =
typeof previewMessageIdOverride === "number"
? previewMessageIdOverride
: lane.stream.messageId();
if (typeof previewMessageId !== "number") {
return false;
}
const currentPreviewText = previewTextSnapshot ?? getLanePreviewText(lane);
const shouldSkipRegressive =
Boolean(currentPreviewText) &&
currentPreviewText.startsWith(text) &&
text.length < currentPreviewText.length &&
(skipRegressive === "always" || hadPreviewMessage);
if (shouldSkipRegressive) {
params.markDelivered();
return true;
}
try {
await params.editPreview({
laneName,
messageId: previewMessageId,
text,
previewButtons,
context,
});
if (updateLaneSnapshot) {
lane.lastPartialText = text;
}
params.markDelivered();
return true;
} catch (err) {
params.log(
`telegram: ${laneName} preview ${context} edit failed; falling back to standard send (${String(err)})`,
);
return false;
}
};
const consumeArchivedAnswerPreviewForFinal = async ({
lane,
text,
payload,
previewButtons,
canEditViaPreview,
}: ConsumeArchivedAnswerPreviewParams): Promise<LaneDeliveryResult | undefined> => {
const archivedPreview = params.archivedAnswerPreviews.shift();
if (!archivedPreview) {
return undefined;
}
if (canEditViaPreview) {
const finalized = await tryUpdatePreviewForLane({
lane,
laneName: "answer",
text,
previewButtons,
stopBeforeEdit: false,
skipRegressive: "existingOnly",
context: "final",
previewMessageId: archivedPreview.messageId,
previewTextSnapshot: archivedPreview.textSnapshot,
});
if (finalized) {
return "preview-finalized";
}
}
try {
await params.deletePreviewMessage(archivedPreview.messageId);
} catch (err) {
params.log(
`telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`,
);
}
const delivered = await params.sendPayload(params.applyTextToPayload(payload, text));
return delivered ? "sent" : "skipped";
};
return async ({
laneName,
text,
payload,
infoKind,
previewButtons,
allowPreviewUpdateForNonFinal = false,
}: DeliverLaneTextParams): Promise<LaneDeliveryResult> => {
const lane = params.lanes[laneName];
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
const canEditViaPreview =
!hasMedia && text.length > 0 && text.length <= params.draftMaxChars && !payload.isError;
if (infoKind === "final") {
if (laneName === "answer") {
const archivedResult = await consumeArchivedAnswerPreviewForFinal({
lane,
text,
payload,
previewButtons,
canEditViaPreview,
});
if (archivedResult) {
return archivedResult;
}
}
if (canEditViaPreview && !params.finalizedPreviewByLane[laneName]) {
await params.flushDraftLane(lane);
if (laneName === "answer") {
const archivedResultAfterFlush = await consumeArchivedAnswerPreviewForFinal({
lane,
text,
payload,
previewButtons,
canEditViaPreview,
});
if (archivedResultAfterFlush) {
return archivedResultAfterFlush;
}
}
const finalized = await tryUpdatePreviewForLane({
lane,
laneName,
text,
previewButtons,
stopBeforeEdit: true,
skipRegressive: "existingOnly",
context: "final",
});
if (finalized) {
params.finalizedPreviewByLane[laneName] = true;
return "preview-finalized";
}
} else if (!hasMedia && !payload.isError && text.length > params.draftMaxChars) {
params.log(
`telegram: preview final too long for edit (${text.length} > ${params.draftMaxChars}); falling back to standard send`,
);
}
await params.stopDraftLane(lane);
const delivered = await params.sendPayload(params.applyTextToPayload(payload, text));
return delivered ? "sent" : "skipped";
}
if (allowPreviewUpdateForNonFinal && canEditViaPreview) {
const updated = await tryUpdatePreviewForLane({
lane,
laneName,
text,
previewButtons,
stopBeforeEdit: false,
updateLaneSnapshot: true,
skipRegressive: "always",
context: "update",
});
if (updated) {
return "preview-updated";
}
}
const delivered = await params.sendPayload(params.applyTextToPayload(payload, text));
return delivered ? "sent" : "skipped";
};
}