From 37a138c554c32abc25347388e68882f787e879f9 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Thu, 26 Feb 2026 17:01:03 +0100 Subject: [PATCH] fix: harden typing lifecycle and cross-channel suppression --- CHANGELOG.md | 1 + extensions/feishu/src/bot.ts | 44 ++++++----- .../matrix/src/matrix/monitor/handler.ts | 56 ++++++++------ .../mattermost/src/mattermost/monitor.ts | 42 ++++++----- .../src/monitor-handler/message-handler.ts | 50 +++++++------ .../reply/dispatch-from-config.test.ts | 39 ++++++++++ src/auto-reply/reply/dispatch-from-config.ts | 11 +++ .../reply/get-reply-run.media-only.test.ts | 45 +++++++++++ src/auto-reply/reply/get-reply-run.ts | 15 +++- src/auto-reply/reply/reply-utils.test.ts | 22 ++++++ src/auto-reply/reply/typing-mode.ts | 13 +++- src/auto-reply/types.ts | 11 +++ src/channels/typing.test.ts | 74 +++++++++++++++++++ src/channels/typing.ts | 21 +++++- 14 files changed, 359 insertions(+), 85 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f7fe41d1f0..14b47e24d14 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ Docs: https://docs.openclaw.ai - Typing/Run completion race: prevent post-run keepalive ticks from re-triggering typing callbacks by guarding `triggerTyping()` with `runComplete`, with regression coverage for no-restart behavior during run-complete/dispatch-idle boundaries. (#27413) Thanks @widingmarcus-cyber. - Typing/Dispatch idle: force typing cleanup when `markDispatchIdle` never arrives after run completion, avoiding leaked typing keepalive loops in cron/announce edges. Landed from contributor PR #27541 by @Sid-Qin. (#27493) - Typing/TTL safety net: add max-duration guardrails to shared typing callbacks so stuck lifecycle edges auto-stop typing indicators even when explicit idle/cleanup signals are missed. (#27428) Thanks @Crpdim. +- Typing/Cross-channel leakage: unify run-scoped typing suppression for cross-channel/internal-webchat routes, preserve current inbound origin as embedded run message channel context, harden shared typing keepalive with consecutive-failure circuit breaker edge-case handling, and enforce dispatcher completion/idle waits in extension dispatcher callsites (Feishu, Matrix, Mattermost, MSTeams) so typing indicators always clean up on success/error paths. Related: #27647, #27493, #27598. Supersedes/replaces draft PRs: #27640, #27593, #27540. - Onboarding/Gateway: seed default Control UI `allowedOrigins` for non-loopback binds during onboarding (`localhost`/`127.0.0.1` plus custom bind host) so fresh non-loopback setups do not fail startup due to missing origin policy. (#26157) thanks @stakeswky. - Docker/GCP onboarding: reduce first-build OOM risk by capping Node heap during `pnpm install`, reuse existing gateway token during `docker-setup.sh` reruns so `.env` stays aligned with config, auto-bootstrap Control UI allowed origins for non-loopback Docker binds, and add GCP docs guidance for tokenized dashboard links + pairing recovery commands. (#26253) Thanks @pandego. - Config/Plugins entries: treat unknown `plugins.entries.*` ids as startup warnings (ignored stale keys) instead of hard validation failures that can crash-loop gateway boot. Landed from contributor PR #27506 by @Sid-Qin. (#27455) diff --git a/extensions/feishu/src/bot.ts b/extensions/feishu/src/bot.ts index 31172cb5c50..debbece77c8 100644 --- a/extensions/feishu/src/bot.ts +++ b/extensions/feishu/src/bot.ts @@ -943,27 +943,33 @@ export async function handleFeishuMessage(params: { }); log(`feishu[${account.accountId}]: dispatching to agent (session=${route.sessionKey})`); - - const { queuedFinal, counts } = await core.channel.reply.dispatchReplyFromConfig({ - ctx: ctxPayload, - cfg, - dispatcher, - replyOptions, - }); - - markDispatchIdle(); - - if (isGroup && historyKey && chatHistories) { - clearHistoryEntriesIfEnabled({ - historyMap: chatHistories, - historyKey, - limit: historyLimit, + try { + const { queuedFinal, counts } = await core.channel.reply.dispatchReplyFromConfig({ + ctx: ctxPayload, + cfg, + dispatcher, + replyOptions, }); - } - log( - `feishu[${account.accountId}]: dispatch complete (queuedFinal=${queuedFinal}, replies=${counts.final})`, - ); + if (isGroup && historyKey && chatHistories) { + clearHistoryEntriesIfEnabled({ + historyMap: chatHistories, + historyKey, + limit: historyLimit, + }); + } + + log( + `feishu[${account.accountId}]: dispatch complete (queuedFinal=${queuedFinal}, replies=${counts.final})`, + ); + } finally { + dispatcher.markComplete(); + try { + await dispatcher.waitForIdle(); + } finally { + markDispatchIdle(); + } + } } catch (err) { error(`feishu[${account.accountId}]: failed to dispatch message: ${String(err)}`); } diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index 77e88162af3..b9791b4063f 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -655,31 +655,39 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam }, }); - const { queuedFinal, counts } = await core.channel.reply.dispatchReplyFromConfig({ - ctx: ctxPayload, - cfg, - dispatcher, - replyOptions: { - ...replyOptions, - skillFilter: roomConfig?.skills, - onModelSelected, - }, - }); - markDispatchIdle(); - if (!queuedFinal) { - return; - } - didSendReply = true; - const finalCount = counts.final; - logVerboseMessage( - `matrix: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${replyTarget}`, - ); - if (didSendReply) { - const previewText = bodyText.replace(/\s+/g, " ").slice(0, 160); - core.system.enqueueSystemEvent(`Matrix message from ${senderName}: ${previewText}`, { - sessionKey: route.sessionKey, - contextKey: `matrix:message:${roomId}:${messageId || "unknown"}`, + try { + const { queuedFinal, counts } = await core.channel.reply.dispatchReplyFromConfig({ + ctx: ctxPayload, + cfg, + dispatcher, + replyOptions: { + ...replyOptions, + skillFilter: roomConfig?.skills, + onModelSelected, + }, }); + if (!queuedFinal) { + return; + } + didSendReply = true; + const finalCount = counts.final; + logVerboseMessage( + `matrix: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${replyTarget}`, + ); + if (didSendReply) { + const previewText = bodyText.replace(/\s+/g, " ").slice(0, 160); + core.system.enqueueSystemEvent(`Matrix message from ${senderName}: ${previewText}`, { + sessionKey: route.sessionKey, + contextKey: `matrix:message:${roomId}:${messageId || "unknown"}`, + }); + } + } finally { + dispatcher.markComplete(); + try { + await dispatcher.waitForIdle(); + } finally { + markDispatchIdle(); + } } } catch (err) { runtime.error?.(`matrix handler failed: ${String(err)}`); diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index 606885811a4..c132f902e69 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -775,24 +775,32 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} }, }); - await core.channel.reply.dispatchReplyFromConfig({ - ctx: ctxPayload, - cfg, - dispatcher, - replyOptions: { - ...replyOptions, - disableBlockStreaming: - typeof account.blockStreaming === "boolean" ? !account.blockStreaming : undefined, - onModelSelected, - }, - }); - markDispatchIdle(); - if (historyKey) { - clearHistoryEntriesIfEnabled({ - historyMap: channelHistories, - historyKey, - limit: historyLimit, + try { + await core.channel.reply.dispatchReplyFromConfig({ + ctx: ctxPayload, + cfg, + dispatcher, + replyOptions: { + ...replyOptions, + disableBlockStreaming: + typeof account.blockStreaming === "boolean" ? !account.blockStreaming : undefined, + onModelSelected, + }, }); + if (historyKey) { + clearHistoryEntriesIfEnabled({ + historyMap: channelHistories, + historyKey, + limit: historyLimit, + }); + } + } finally { + dispatcher.markComplete(); + try { + await dispatcher.waitForIdle(); + } finally { + markDispatchIdle(); + } } }; diff --git a/extensions/msteams/src/monitor-handler/message-handler.ts b/extensions/msteams/src/monitor-handler/message-handler.ts index 36fc0382203..af9d860901b 100644 --- a/extensions/msteams/src/monitor-handler/message-handler.ts +++ b/extensions/msteams/src/monitor-handler/message-handler.ts @@ -533,17 +533,30 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { log.info("dispatching to agent", { sessionKey: route.sessionKey }); try { - const { queuedFinal, counts } = await core.channel.reply.dispatchReplyFromConfig({ - ctx: ctxPayload, - cfg, - dispatcher, - replyOptions, - }); + try { + const { queuedFinal, counts } = await core.channel.reply.dispatchReplyFromConfig({ + ctx: ctxPayload, + cfg, + dispatcher, + replyOptions, + }); - markDispatchIdle(); - log.info("dispatch complete", { queuedFinal, counts }); + log.info("dispatch complete", { queuedFinal, counts }); - if (!queuedFinal) { + if (!queuedFinal) { + if (isRoomish && historyKey) { + clearHistoryEntriesIfEnabled({ + historyMap: conversationHistories, + historyKey, + limit: historyLimit, + }); + } + return; + } + const finalCount = counts.final; + logVerboseMessage( + `msteams: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${teamsTo}`, + ); if (isRoomish && historyKey) { clearHistoryEntriesIfEnabled({ historyMap: conversationHistories, @@ -551,18 +564,13 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { limit: historyLimit, }); } - return; - } - const finalCount = counts.final; - logVerboseMessage( - `msteams: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${teamsTo}`, - ); - if (isRoomish && historyKey) { - clearHistoryEntriesIfEnabled({ - historyMap: conversationHistories, - historyKey, - limit: historyLimit, - }); + } finally { + dispatcher.markComplete(); + try { + await dispatcher.waitForIdle(); + } finally { + markDispatchIdle(); + } } } catch (err) { log.error("dispatch failed", { error: String(err) }); diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index 2b6009590ed..c876d050be4 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -286,6 +286,45 @@ describe("dispatchReplyFromConfig", () => { ); }); + it("forces suppressTyping when routing to a different originating channel", async () => { + setNoAbort(); + const cfg = emptyConfig; + const dispatcher = createDispatcher(); + const ctx = buildTestCtx({ + Provider: "slack", + OriginatingChannel: "telegram", + OriginatingTo: "telegram:999", + }); + + const replyResolver = async (_ctx: MsgContext, opts?: GetReplyOptions) => { + expect(opts?.suppressTyping).toBe(true); + expect(opts?.typingPolicy).toBe("system_event"); + return { text: "hi" } satisfies ReplyPayload; + }; + + await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); + }); + + it("forces suppressTyping for internal webchat turns", async () => { + setNoAbort(); + const cfg = emptyConfig; + const dispatcher = createDispatcher(); + const ctx = buildTestCtx({ + Provider: "webchat", + Surface: "webchat", + OriginatingChannel: "webchat", + OriginatingTo: "session:abc", + }); + + const replyResolver = async (_ctx: MsgContext, opts?: GetReplyOptions) => { + expect(opts?.suppressTyping).toBe(true); + expect(opts?.typingPolicy).toBe("internal_webchat"); + return { text: "hi" } satisfies ReplyPayload; + }; + + await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); + }); + it("routes media-only tool results when summaries are suppressed", async () => { setNoAbort(); mocks.routeReply.mockClear(); diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index bdecd87639d..ee6ac1791be 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -12,6 +12,7 @@ import { import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; import { resolveSendPolicy } from "../../sessions/send-policy.js"; import { maybeApplyTtsToPayload, normalizeTtsAutoMode, resolveTtsConfig } from "../../tts/tts.js"; +import { INTERNAL_MESSAGE_CHANNEL } from "../../utils/message-channel.js"; import { getReplyFromConfig } from "../reply.js"; import type { FinalizedMsgContext } from "../templating.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; @@ -253,6 +254,8 @@ export async function dispatchReplyFromConfig(params: { const shouldRouteToOriginating = Boolean( isRoutableChannel(originatingChannel) && originatingTo && originatingChannel !== currentSurface, ); + const shouldSuppressTyping = + shouldRouteToOriginating || originatingChannel === INTERNAL_MESSAGE_CHANNEL; const ttsChannel = shouldRouteToOriginating ? originatingChannel : currentSurface; /** @@ -397,6 +400,14 @@ export async function dispatchReplyFromConfig(params: { ctx, { ...params.replyOptions, + typingPolicy: + params.replyOptions?.typingPolicy ?? + (originatingChannel === INTERNAL_MESSAGE_CHANNEL + ? "internal_webchat" + : shouldRouteToOriginating + ? "system_event" + : undefined), + suppressTyping: params.replyOptions?.suppressTyping === true || shouldSuppressTyping, onToolResult: (payload: ReplyPayload) => { const run = async () => { const ttsPayload = await maybeApplyTtsToPayload({ diff --git a/src/auto-reply/reply/get-reply-run.media-only.test.ts b/src/auto-reply/reply/get-reply-run.media-only.test.ts index d4a40b4eda8..bc43bbb4eb9 100644 --- a/src/auto-reply/reply/get-reply-run.media-only.test.ts +++ b/src/auto-reply/reply/get-reply-run.media-only.test.ts @@ -81,6 +81,7 @@ vi.mock("./typing-mode.js", () => ({ import { runReplyAgent } from "./agent-runner.js"; import { routeReply } from "./route-reply.js"; +import { resolveTypingMode } from "./typing-mode.js"; function baseParams( overrides: Partial[0]> = {}, @@ -249,4 +250,48 @@ describe("runPreparedReply media-only handling", () => { expect(vi.mocked(routeReply)).not.toHaveBeenCalled(); }); + + it("uses inbound origin channel for run messageProvider", async () => { + await runPreparedReply( + baseParams({ + ctx: { + Body: "", + RawBody: "", + CommandBody: "", + ThreadHistoryBody: "Earlier message in this thread", + OriginatingChannel: "webchat", + OriginatingTo: "session:abc", + ChatType: "group", + }, + sessionCtx: { + Body: "", + BodyStripped: "", + ThreadHistoryBody: "Earlier message in this thread", + MediaPath: "/tmp/input.png", + Provider: "telegram", + ChatType: "group", + OriginatingChannel: "telegram", + OriginatingTo: "telegram:123", + }, + }), + ); + + const call = vi.mocked(runReplyAgent).mock.calls[0]?.[0]; + expect(call?.followupRun.run.messageProvider).toBe("webchat"); + }); + + it("passes suppressTyping through typing mode resolution", async () => { + await runPreparedReply( + baseParams({ + opts: { + suppressTyping: true, + }, + }), + ); + + const call = vi.mocked(resolveTypingMode).mock.calls[0]?.[0] as + | { suppressTyping?: boolean } + | undefined; + expect(call?.suppressTyping).toBe(true); + }); }); diff --git a/src/auto-reply/reply/get-reply-run.ts b/src/auto-reply/reply/get-reply-run.ts index b4c4e36281e..4363efc94f3 100644 --- a/src/auto-reply/reply/get-reply-run.ts +++ b/src/auto-reply/reply/get-reply-run.ts @@ -18,6 +18,7 @@ import { import { logVerbose } from "../../globals.js"; import { clearCommandLane, getQueueSize } from "../../process/command-queue.js"; import { normalizeMainKey } from "../../routing/session-key.js"; +import { INTERNAL_MESSAGE_CHANNEL } from "../../utils/message-channel.js"; import { isReasoningTagProvider } from "../../utils/provider-utils.js"; import { hasControlCommand } from "../command-detection.js"; import { buildInboundMediaNote } from "../media-note.js"; @@ -233,11 +234,21 @@ export async function runPreparedReply( const isGroupChat = sessionCtx.ChatType === "group"; const wasMentioned = ctx.WasMentioned === true; const isHeartbeat = opts?.isHeartbeat === true; + const typingPolicy = + opts?.typingPolicy ?? + (isHeartbeat + ? "heartbeat" + : ctx.OriginatingChannel === INTERNAL_MESSAGE_CHANNEL + ? "internal_webchat" + : "auto"); + const suppressTyping = opts?.suppressTyping === true; const typingMode = resolveTypingMode({ configured: sessionCfg?.typingMode ?? agentCfg?.typingMode, isGroupChat, wasMentioned, isHeartbeat, + typingPolicy, + suppressTyping, }); const shouldInjectGroupIntro = Boolean( isGroupChat && (isFirstTurnInSession || sessionEntry?.groupActivationNeedsSystemIntro), @@ -462,8 +473,8 @@ export async function runPreparedReply( sessionId: sessionIdFinal, sessionKey, messageProvider: resolveOriginMessageProvider({ - originatingChannel: sessionCtx.OriginatingChannel, - provider: sessionCtx.Provider, + originatingChannel: ctx.OriginatingChannel ?? sessionCtx.OriginatingChannel, + provider: ctx.Surface ?? ctx.Provider ?? sessionCtx.Provider, }), agentAccountId: sessionCtx.AccountId, groupId: resolveGroupSessionKey(sessionCtx)?.id ?? undefined, diff --git a/src/auto-reply/reply/reply-utils.test.ts b/src/auto-reply/reply/reply-utils.test.ts index f704abf9575..fff937187a6 100644 --- a/src/auto-reply/reply/reply-utils.test.ts +++ b/src/auto-reply/reply/reply-utils.test.ts @@ -257,6 +257,28 @@ describe("resolveTypingMode", () => { }, expected: "never", }, + { + name: "suppressTyping forces never", + input: { + configured: "instant" as const, + isGroupChat: false, + wasMentioned: false, + isHeartbeat: false, + suppressTyping: true, + }, + expected: "never", + }, + { + name: "typingPolicy system_event forces never", + input: { + configured: "instant" as const, + isGroupChat: false, + wasMentioned: false, + isHeartbeat: false, + typingPolicy: "system_event" as const, + }, + expected: "never", + }, ] as const; for (const testCase of cases) { diff --git a/src/auto-reply/reply/typing-mode.ts b/src/auto-reply/reply/typing-mode.ts index 37805ef3be6..66adcaf1e5a 100644 --- a/src/auto-reply/reply/typing-mode.ts +++ b/src/auto-reply/reply/typing-mode.ts @@ -1,5 +1,6 @@ import type { TypingMode } from "../../config/types.js"; import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js"; +import type { TypingPolicy } from "../types.js"; import type { TypingController } from "./typing.js"; export type TypingModeContext = { @@ -7,6 +8,8 @@ export type TypingModeContext = { isGroupChat: boolean; wasMentioned: boolean; isHeartbeat: boolean; + typingPolicy?: TypingPolicy; + suppressTyping?: boolean; }; export const DEFAULT_GROUP_TYPING_MODE: TypingMode = "message"; @@ -16,8 +19,16 @@ export function resolveTypingMode({ isGroupChat, wasMentioned, isHeartbeat, + typingPolicy, + suppressTyping, }: TypingModeContext): TypingMode { - if (isHeartbeat) { + if ( + isHeartbeat || + typingPolicy === "heartbeat" || + typingPolicy === "system_event" || + typingPolicy === "internal_webchat" || + suppressTyping + ) { return "never"; } if (configured) { diff --git a/src/auto-reply/types.ts b/src/auto-reply/types.ts index f522e31042f..69ccead2adc 100644 --- a/src/auto-reply/types.ts +++ b/src/auto-reply/types.ts @@ -13,6 +13,13 @@ export type ModelSelectedContext = { thinkLevel: string | undefined; }; +export type TypingPolicy = + | "auto" + | "user_message" + | "system_event" + | "internal_webchat" + | "heartbeat"; + export type GetReplyOptions = { /** Override run id for agent events (defaults to random UUID). */ runId?: string; @@ -27,6 +34,10 @@ export type GetReplyOptions = { onTypingCleanup?: () => void; onTypingController?: (typing: TypingController) => void; isHeartbeat?: boolean; + /** Policy-level typing control for run classes (user/system/internal/heartbeat). */ + typingPolicy?: TypingPolicy; + /** Force-disable typing indicators for this run (system/internal/cross-channel routes). */ + suppressTyping?: boolean; /** Resolved heartbeat model override (provider/model string from merged per-agent config). */ heartbeatModelOverride?: string; /** If true, suppress tool error warning payloads for this run. */ diff --git a/src/channels/typing.test.ts b/src/channels/typing.test.ts index dbc2a4179ff..69149e30288 100644 --- a/src/channels/typing.test.ts +++ b/src/channels/typing.test.ts @@ -73,6 +73,80 @@ describe("createTypingCallbacks", () => { } }); + it("stops keepalive after consecutive start failures", async () => { + vi.useFakeTimers(); + try { + const start = vi.fn().mockRejectedValue(new Error("gone")); + const onStartError = vi.fn(); + const callbacks = createTypingCallbacks({ start, onStartError }); + + await callbacks.onReplyStart(); + expect(start).toHaveBeenCalledTimes(1); + expect(onStartError).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(3_000); + expect(start).toHaveBeenCalledTimes(2); + expect(onStartError).toHaveBeenCalledTimes(2); + + await vi.advanceTimersByTimeAsync(9_000); + expect(start).toHaveBeenCalledTimes(2); + } finally { + vi.useRealTimers(); + } + }); + + it("does not restart keepalive when breaker trips on initial start", async () => { + vi.useFakeTimers(); + try { + const start = vi.fn().mockRejectedValue(new Error("gone")); + const onStartError = vi.fn(); + const callbacks = createTypingCallbacks({ + start, + onStartError, + maxConsecutiveFailures: 1, + }); + + await callbacks.onReplyStart(); + expect(start).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(9_000); + expect(start).toHaveBeenCalledTimes(1); + expect(onStartError).toHaveBeenCalledTimes(1); + } finally { + vi.useRealTimers(); + } + }); + + it("resets failure counter after a successful keepalive tick", async () => { + vi.useFakeTimers(); + try { + let callCount = 0; + const start = vi.fn().mockImplementation(async () => { + callCount += 1; + if (callCount % 2 === 1) { + throw new Error("flaky"); + } + }); + const onStartError = vi.fn(); + const callbacks = createTypingCallbacks({ + start, + onStartError, + maxConsecutiveFailures: 2, + }); + + await callbacks.onReplyStart(); // fail + await vi.advanceTimersByTimeAsync(3_000); // success + await vi.advanceTimersByTimeAsync(3_000); // fail + await vi.advanceTimersByTimeAsync(3_000); // success + await vi.advanceTimersByTimeAsync(3_000); // fail + + expect(start).toHaveBeenCalledTimes(5); + expect(onStartError).toHaveBeenCalledTimes(3); + } finally { + vi.useRealTimers(); + } + }); + it("deduplicates stop across idle and cleanup", async () => { const start = vi.fn().mockResolvedValue(undefined); const stop = vi.fn().mockResolvedValue(undefined); diff --git a/src/channels/typing.ts b/src/channels/typing.ts index f9554046f05..125f252dd7d 100644 --- a/src/channels/typing.ts +++ b/src/channels/typing.ts @@ -13,6 +13,8 @@ export type CreateTypingCallbacksParams = { onStartError: (err: unknown) => void; onStopError?: (err: unknown) => void; keepaliveIntervalMs?: number; + /** Stop keepalive after this many consecutive start() failures. Default: 2 */ + maxConsecutiveFailures?: number; /** Maximum duration for typing indicator before auto-cleanup (safety TTL). Default: 60s */ maxDurationMs?: number; }; @@ -20,19 +22,31 @@ export type CreateTypingCallbacksParams = { export function createTypingCallbacks(params: CreateTypingCallbacksParams): TypingCallbacks { const stop = params.stop; const keepaliveIntervalMs = params.keepaliveIntervalMs ?? 3_000; + const maxConsecutiveFailures = Math.max(1, params.maxConsecutiveFailures ?? 2); const maxDurationMs = params.maxDurationMs ?? 60_000; // Default 60s TTL let stopSent = false; let closed = false; + let consecutiveFailures = 0; + let breakerTripped = false; let ttlTimer: ReturnType | undefined; - const fireStart = async () => { + const fireStart = async (): Promise => { if (closed) { return; } + if (breakerTripped) { + return; + } try { await params.start(); + consecutiveFailures = 0; } catch (err) { + consecutiveFailures += 1; params.onStartError(err); + if (consecutiveFailures >= maxConsecutiveFailures) { + breakerTripped = true; + keepaliveLoop.stop(); + } } }; @@ -67,9 +81,14 @@ export function createTypingCallbacks(params: CreateTypingCallbacksParams): Typi return; } stopSent = false; + breakerTripped = false; + consecutiveFailures = 0; keepaliveLoop.stop(); clearTtlTimer(); await fireStart(); + if (breakerTripped) { + return; + } keepaliveLoop.start(); startTtlTimer(); // Start TTL safety timer };