refactor: unify typing dispatch lifecycle and policy boundaries

This commit is contained in:
Peter Steinberger
2026-02-26 17:36:09 +01:00
parent 6fd9ec97de
commit 273973d374
19 changed files with 420 additions and 164 deletions

View File

@@ -22,6 +22,7 @@ import { shouldSkipDuplicateInbound } from "./inbound-dedupe.js";
import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js";
import { shouldSuppressReasoningPayload } from "./reply-payloads.js";
import { isRoutableChannel, routeReply } from "./route-reply.js";
import { resolveRunTypingPolicy } from "./typing-policy.js";
const AUDIO_PLACEHOLDER_RE = /^<media:audio>(\s*\([^)]*\))?$/i;
const AUDIO_HEADER_RE = /^\[Audio\b/i;
@@ -395,19 +396,19 @@ export async function dispatchReplyFromConfig(params: {
}
return { ...payload, text: undefined };
};
const typing = resolveRunTypingPolicy({
requestedPolicy: params.replyOptions?.typingPolicy,
suppressTyping: params.replyOptions?.suppressTyping === true || shouldSuppressTyping,
originatingChannel,
systemEvent: shouldRouteToOriginating,
});
const replyResult = await (params.replyResolver ?? getReplyFromConfig)(
ctx,
{
...params.replyOptions,
typingPolicy:
params.replyOptions?.typingPolicy ??
(originatingChannel === INTERNAL_MESSAGE_CHANNEL
? "internal_webchat"
: shouldRouteToOriginating
? "system_event"
: undefined),
suppressTyping: params.replyOptions?.suppressTyping === true || shouldSuppressTyping,
typingPolicy: typing.typingPolicy,
suppressTyping: typing.suppressTyping,
onToolResult: (payload: ReplyPayload) => {
const run = async () => {
const ttsPayload = await maybeApplyTtsToPayload({

View File

@@ -18,7 +18,6 @@ import {
import { logVerbose } from "../../globals.js";
import { clearCommandLane, getQueueSize } from "../../process/command-queue.js";
import { normalizeMainKey } from "../../routing/session-key.js";
import { INTERNAL_MESSAGE_CHANNEL } from "../../utils/message-channel.js";
import { isReasoningTagProvider } from "../../utils/provider-utils.js";
import { hasControlCommand } from "../command-detection.js";
import { buildInboundMediaNote } from "../media-note.js";
@@ -47,6 +46,7 @@ import { routeReply } from "./route-reply.js";
import { BARE_SESSION_RESET_PROMPT } from "./session-reset-prompt.js";
import { ensureSkillSnapshot, prependSystemEvents } from "./session-updates.js";
import { resolveTypingMode } from "./typing-mode.js";
import { resolveRunTypingPolicy } from "./typing-policy.js";
import type { TypingController } from "./typing.js";
import { appendUntrustedContext } from "./untrusted-context.js";
@@ -234,14 +234,12 @@ export async function runPreparedReply(
const isGroupChat = sessionCtx.ChatType === "group";
const wasMentioned = ctx.WasMentioned === true;
const isHeartbeat = opts?.isHeartbeat === true;
const typingPolicy =
opts?.typingPolicy ??
(isHeartbeat
? "heartbeat"
: ctx.OriginatingChannel === INTERNAL_MESSAGE_CHANNEL
? "internal_webchat"
: "auto");
const suppressTyping = opts?.suppressTyping === true;
const { typingPolicy, suppressTyping } = resolveRunTypingPolicy({
requestedPolicy: opts?.typingPolicy,
suppressTyping: opts?.suppressTyping === true,
isHeartbeat,
originatingChannel: ctx.OriginatingChannel,
});
const typingMode = resolveTypingMode({
configured: sessionCfg?.typingMode ?? agentCfg?.typingMode,
isGroupChat,

View File

@@ -0,0 +1,61 @@
import { describe, expect, it } from "vitest";
import { resolveRunTypingPolicy } from "./typing-policy.js";
describe("resolveRunTypingPolicy", () => {
it("forces heartbeat policy for heartbeat runs", () => {
const resolved = resolveRunTypingPolicy({
requestedPolicy: "user_message",
isHeartbeat: true,
});
expect(resolved).toEqual({
typingPolicy: "heartbeat",
suppressTyping: true,
});
});
it("forces internal webchat policy", () => {
const resolved = resolveRunTypingPolicy({
requestedPolicy: "user_message",
originatingChannel: "webchat",
});
expect(resolved).toEqual({
typingPolicy: "internal_webchat",
suppressTyping: true,
});
});
it("forces system event policy for routed turns", () => {
const resolved = resolveRunTypingPolicy({
requestedPolicy: "user_message",
systemEvent: true,
originatingChannel: "telegram",
});
expect(resolved).toEqual({
typingPolicy: "system_event",
suppressTyping: true,
});
});
it("preserves requested policy for regular user turns", () => {
const resolved = resolveRunTypingPolicy({
requestedPolicy: "user_message",
originatingChannel: "telegram",
});
expect(resolved).toEqual({
typingPolicy: "user_message",
suppressTyping: false,
});
});
it("respects explicit suppressTyping", () => {
const resolved = resolveRunTypingPolicy({
requestedPolicy: "auto",
originatingChannel: "telegram",
suppressTyping: true,
});
expect(resolved).toEqual({
typingPolicy: "auto",
suppressTyping: true,
});
});
});

View File

@@ -0,0 +1,35 @@
import { INTERNAL_MESSAGE_CHANNEL } from "../../utils/message-channel.js";
import type { TypingPolicy } from "../types.js";
export type ResolveRunTypingPolicyParams = {
requestedPolicy?: TypingPolicy;
suppressTyping?: boolean;
isHeartbeat?: boolean;
originatingChannel?: string;
systemEvent?: boolean;
};
export type ResolvedRunTypingPolicy = {
typingPolicy: TypingPolicy;
suppressTyping: boolean;
};
export function resolveRunTypingPolicy(
params: ResolveRunTypingPolicyParams,
): ResolvedRunTypingPolicy {
const typingPolicy = params.isHeartbeat
? "heartbeat"
: params.originatingChannel === INTERNAL_MESSAGE_CHANNEL
? "internal_webchat"
: params.systemEvent
? "system_event"
: (params.requestedPolicy ?? "auto");
const suppressTyping =
params.suppressTyping === true ||
typingPolicy === "heartbeat" ||
typingPolicy === "system_event" ||
typingPolicy === "internal_webchat";
return { typingPolicy, suppressTyping };
}

View File

@@ -1,4 +1,5 @@
import { createTypingKeepaliveLoop } from "../../channels/typing-lifecycle.js";
import { createTypingStartGuard } from "../../channels/typing-start-guard.js";
import { isSilentReplyPrefixText, isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js";
export type TypingController = {
@@ -99,15 +100,16 @@ export function createTypingController(params: {
const isActive = () => active && !sealed;
const startGuard = createTypingStartGuard({
isSealed: () => sealed,
shouldBlock: () => runComplete,
rethrowOnError: true,
});
const triggerTyping = async () => {
if (sealed) {
return;
}
// Late callbacks after a run completed should never restart typing.
if (runComplete) {
return;
}
await onReplyStart?.();
await startGuard.run(async () => {
await onReplyStart?.();
});
};
const typingLoop = createTypingKeepaliveLoop({

View File

@@ -0,0 +1,65 @@
import { describe, expect, it, vi } from "vitest";
import { createTypingStartGuard } from "./typing-start-guard.js";
describe("createTypingStartGuard", () => {
it("skips starts when sealed", async () => {
const start = vi.fn();
const guard = createTypingStartGuard({
isSealed: () => true,
});
const result = await guard.run(start);
expect(result).toBe("skipped");
expect(start).not.toHaveBeenCalled();
});
it("trips breaker after max consecutive failures", async () => {
const onStartError = vi.fn();
const onTrip = vi.fn();
const guard = createTypingStartGuard({
isSealed: () => false,
onStartError,
onTrip,
maxConsecutiveFailures: 2,
});
const start = vi.fn().mockRejectedValue(new Error("fail"));
const first = await guard.run(start);
const second = await guard.run(start);
const third = await guard.run(start);
expect(first).toBe("failed");
expect(second).toBe("tripped");
expect(third).toBe("skipped");
expect(onStartError).toHaveBeenCalledTimes(2);
expect(onTrip).toHaveBeenCalledTimes(1);
});
it("resets breaker state", async () => {
const guard = createTypingStartGuard({
isSealed: () => false,
maxConsecutiveFailures: 1,
});
const failStart = vi.fn().mockRejectedValue(new Error("fail"));
const okStart = vi.fn().mockResolvedValue(undefined);
const trip = await guard.run(failStart);
expect(trip).toBe("tripped");
expect(guard.isTripped()).toBe(true);
guard.reset();
const started = await guard.run(okStart);
expect(started).toBe("started");
expect(guard.isTripped()).toBe(false);
});
it("rethrows start errors when configured", async () => {
const guard = createTypingStartGuard({
isSealed: () => false,
rethrowOnError: true,
});
const start = vi.fn().mockRejectedValue(new Error("boom"));
await expect(guard.run(start)).rejects.toThrow("boom");
});
});

View File

@@ -0,0 +1,63 @@
export type TypingStartGuard = {
run: (start: () => Promise<void> | void) => Promise<"started" | "skipped" | "failed" | "tripped">;
reset: () => void;
isTripped: () => boolean;
};
export function createTypingStartGuard(params: {
isSealed: () => boolean;
shouldBlock?: () => boolean;
onStartError?: (err: unknown) => void;
maxConsecutiveFailures?: number;
onTrip?: () => void;
rethrowOnError?: boolean;
}): TypingStartGuard {
const maxConsecutiveFailures =
typeof params.maxConsecutiveFailures === "number" && params.maxConsecutiveFailures > 0
? Math.floor(params.maxConsecutiveFailures)
: undefined;
let consecutiveFailures = 0;
let tripped = false;
const isBlocked = () => {
if (params.isSealed()) {
return true;
}
if (tripped) {
return true;
}
return params.shouldBlock?.() === true;
};
const run: TypingStartGuard["run"] = async (start) => {
if (isBlocked()) {
return "skipped";
}
try {
await start();
consecutiveFailures = 0;
return "started";
} catch (err) {
consecutiveFailures += 1;
params.onStartError?.(err);
if (params.rethrowOnError) {
throw err;
}
if (maxConsecutiveFailures && consecutiveFailures >= maxConsecutiveFailures) {
tripped = true;
params.onTrip?.();
return "tripped";
}
return "failed";
}
};
return {
run,
reset: () => {
consecutiveFailures = 0;
tripped = false;
},
isTripped: () => tripped,
};
}

View File

@@ -1,4 +1,5 @@
import { createTypingKeepaliveLoop } from "./typing-lifecycle.js";
import { createTypingStartGuard } from "./typing-start-guard.js";
export type TypingCallbacks = {
onReplyStart: () => Promise<void>;
@@ -26,28 +27,19 @@ export function createTypingCallbacks(params: CreateTypingCallbacksParams): Typi
const maxDurationMs = params.maxDurationMs ?? 60_000; // Default 60s TTL
let stopSent = false;
let closed = false;
let consecutiveFailures = 0;
let breakerTripped = false;
let ttlTimer: ReturnType<typeof setTimeout> | undefined;
const startGuard = createTypingStartGuard({
isSealed: () => closed,
onStartError: params.onStartError,
maxConsecutiveFailures,
onTrip: () => {
keepaliveLoop.stop();
},
});
const fireStart = async (): Promise<void> => {
if (closed) {
return;
}
if (breakerTripped) {
return;
}
try {
await params.start();
consecutiveFailures = 0;
} catch (err) {
consecutiveFailures += 1;
params.onStartError(err);
if (consecutiveFailures >= maxConsecutiveFailures) {
breakerTripped = true;
keepaliveLoop.stop();
}
}
await startGuard.run(() => params.start());
};
const keepaliveLoop = createTypingKeepaliveLoop({
@@ -81,12 +73,11 @@ export function createTypingCallbacks(params: CreateTypingCallbacksParams): Typi
return;
}
stopSent = false;
breakerTripped = false;
consecutiveFailures = 0;
startGuard.reset();
keepaliveLoop.stop();
clearTtlTimer();
await fireStart();
if (breakerTripped) {
if (startGuard.isTripped()) {
return;
}
keepaliveLoop.start();

View File

@@ -1,6 +1,8 @@
import { describe, expect, it, vi } from "vitest";
import { fetchWithBearerAuthScopeFallback } from "./fetch-auth.js";
const asFetch = (fn: unknown): typeof fetch => fn as typeof fetch;
describe("fetchWithBearerAuthScopeFallback", () => {
it("rejects non-https urls when https is required", async () => {
await expect(
@@ -19,7 +21,7 @@ describe("fetchWithBearerAuthScopeFallback", () => {
const response = await fetchWithBearerAuthScopeFallback({
url: "https://example.com/file",
scopes: ["https://graph.microsoft.com"],
fetchFn,
fetchFn: asFetch(fetchFn),
tokenProvider,
});
@@ -38,7 +40,7 @@ describe("fetchWithBearerAuthScopeFallback", () => {
const response = await fetchWithBearerAuthScopeFallback({
url: "https://graph.microsoft.com/v1.0/me",
scopes: ["https://graph.microsoft.com", "https://api.botframework.com"],
fetchFn,
fetchFn: asFetch(fetchFn),
tokenProvider,
});
@@ -57,7 +59,7 @@ describe("fetchWithBearerAuthScopeFallback", () => {
const response = await fetchWithBearerAuthScopeFallback({
url: "https://example.com/file",
scopes: ["https://graph.microsoft.com"],
fetchFn,
fetchFn: asFetch(fetchFn),
tokenProvider,
shouldAttachAuth: () => false,
});
@@ -82,7 +84,7 @@ describe("fetchWithBearerAuthScopeFallback", () => {
const response = await fetchWithBearerAuthScopeFallback({
url: "https://graph.microsoft.com/v1.0/me",
scopes: ["https://first.example", "https://second.example"],
fetchFn,
fetchFn: asFetch(fetchFn),
tokenProvider,
});

View File

@@ -17,6 +17,7 @@ import {
shouldComputeCommandAuthorized,
} from "../../auto-reply/command-detection.js";
import { shouldHandleTextCommands } from "../../auto-reply/commands-registry.js";
import { withReplyDispatcher } from "../../auto-reply/dispatch.js";
import {
formatAgentEnvelope,
formatInboundEnvelope,
@@ -304,6 +305,7 @@ function createRuntimeChannel(): PluginRuntime["channel"] {
resolveEffectiveMessagesConfig,
resolveHumanDelayConfig,
dispatchReplyFromConfig,
withReplyDispatcher,
finalizeInboundContext,
formatAgentEnvelope,
/** @deprecated Prefer `BodyForAgent` + structured user-context blocks (do not build plaintext envelopes for prompts). */

View File

@@ -55,6 +55,7 @@ type ShouldHandleTextCommands =
typeof import("../../auto-reply/commands-registry.js").shouldHandleTextCommands;
type DispatchReplyFromConfig =
typeof import("../../auto-reply/reply/dispatch-from-config.js").dispatchReplyFromConfig;
type WithReplyDispatcher = typeof import("../../auto-reply/dispatch.js").withReplyDispatcher;
type FinalizeInboundContext =
typeof import("../../auto-reply/reply/inbound-context.js").finalizeInboundContext;
type FormatAgentEnvelope = typeof import("../../auto-reply/envelope.js").formatAgentEnvelope;
@@ -222,6 +223,7 @@ export type PluginRuntime = {
resolveEffectiveMessagesConfig: ResolveEffectiveMessagesConfig;
resolveHumanDelayConfig: ResolveHumanDelayConfig;
dispatchReplyFromConfig: DispatchReplyFromConfig;
withReplyDispatcher: WithReplyDispatcher;
finalizeInboundContext: FinalizeInboundContext;
formatAgentEnvelope: FormatAgentEnvelope;
/** @deprecated Prefer `BodyForAgent` + structured user-context blocks (do not build plaintext envelopes for prompts). */

View File

@@ -111,8 +111,10 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
hadPreviewMessage: boolean;
}): boolean => {
const currentPreviewText = args.currentPreviewText;
if (currentPreviewText === undefined) {
return false;
}
return (
currentPreviewText !== undefined &&
currentPreviewText.startsWith(args.text) &&
args.text.length < currentPreviewText.length &&
(args.skipRegressive === "always" || args.hadPreviewMessage)