fix: harden typing lifecycle and cross-channel suppression

This commit is contained in:
Peter Steinberger
2026-02-26 17:01:03 +01:00
parent 4894d907fa
commit 37a138c554
14 changed files with 359 additions and 85 deletions

View File

@@ -36,6 +36,7 @@ Docs: https://docs.openclaw.ai
- Typing/Run completion race: prevent post-run keepalive ticks from re-triggering typing callbacks by guarding `triggerTyping()` with `runComplete`, with regression coverage for no-restart behavior during run-complete/dispatch-idle boundaries. (#27413) Thanks @widingmarcus-cyber. - Typing/Run completion race: prevent post-run keepalive ticks from re-triggering typing callbacks by guarding `triggerTyping()` with `runComplete`, with regression coverage for no-restart behavior during run-complete/dispatch-idle boundaries. (#27413) Thanks @widingmarcus-cyber.
- Typing/Dispatch idle: force typing cleanup when `markDispatchIdle` never arrives after run completion, avoiding leaked typing keepalive loops in cron/announce edges. Landed from contributor PR #27541 by @Sid-Qin. (#27493) - Typing/Dispatch idle: force typing cleanup when `markDispatchIdle` never arrives after run completion, avoiding leaked typing keepalive loops in cron/announce edges. Landed from contributor PR #27541 by @Sid-Qin. (#27493)
- Typing/TTL safety net: add max-duration guardrails to shared typing callbacks so stuck lifecycle edges auto-stop typing indicators even when explicit idle/cleanup signals are missed. (#27428) Thanks @Crpdim. - Typing/TTL safety net: add max-duration guardrails to shared typing callbacks so stuck lifecycle edges auto-stop typing indicators even when explicit idle/cleanup signals are missed. (#27428) Thanks @Crpdim.
- Typing/Cross-channel leakage: unify run-scoped typing suppression for cross-channel/internal-webchat routes, preserve current inbound origin as embedded run message channel context, harden shared typing keepalive with consecutive-failure circuit breaker edge-case handling, and enforce dispatcher completion/idle waits in extension dispatcher callsites (Feishu, Matrix, Mattermost, MSTeams) so typing indicators always clean up on success/error paths. Related: #27647, #27493, #27598. Supersedes/replaces draft PRs: #27640, #27593, #27540.
- Onboarding/Gateway: seed default Control UI `allowedOrigins` for non-loopback binds during onboarding (`localhost`/`127.0.0.1` plus custom bind host) so fresh non-loopback setups do not fail startup due to missing origin policy. (#26157) thanks @stakeswky. - Onboarding/Gateway: seed default Control UI `allowedOrigins` for non-loopback binds during onboarding (`localhost`/`127.0.0.1` plus custom bind host) so fresh non-loopback setups do not fail startup due to missing origin policy. (#26157) thanks @stakeswky.
- Docker/GCP onboarding: reduce first-build OOM risk by capping Node heap during `pnpm install`, reuse existing gateway token during `docker-setup.sh` reruns so `.env` stays aligned with config, auto-bootstrap Control UI allowed origins for non-loopback Docker binds, and add GCP docs guidance for tokenized dashboard links + pairing recovery commands. (#26253) Thanks @pandego. - Docker/GCP onboarding: reduce first-build OOM risk by capping Node heap during `pnpm install`, reuse existing gateway token during `docker-setup.sh` reruns so `.env` stays aligned with config, auto-bootstrap Control UI allowed origins for non-loopback Docker binds, and add GCP docs guidance for tokenized dashboard links + pairing recovery commands. (#26253) Thanks @pandego.
- Config/Plugins entries: treat unknown `plugins.entries.*` ids as startup warnings (ignored stale keys) instead of hard validation failures that can crash-loop gateway boot. Landed from contributor PR #27506 by @Sid-Qin. (#27455) - Config/Plugins entries: treat unknown `plugins.entries.*` ids as startup warnings (ignored stale keys) instead of hard validation failures that can crash-loop gateway boot. Landed from contributor PR #27506 by @Sid-Qin. (#27455)

View File

@@ -943,27 +943,33 @@ export async function handleFeishuMessage(params: {
}); });
log(`feishu[${account.accountId}]: dispatching to agent (session=${route.sessionKey})`); log(`feishu[${account.accountId}]: dispatching to agent (session=${route.sessionKey})`);
try {
const { queuedFinal, counts } = await core.channel.reply.dispatchReplyFromConfig({ const { queuedFinal, counts } = await core.channel.reply.dispatchReplyFromConfig({
ctx: ctxPayload, ctx: ctxPayload,
cfg, cfg,
dispatcher, dispatcher,
replyOptions, replyOptions,
});
markDispatchIdle();
if (isGroup && historyKey && chatHistories) {
clearHistoryEntriesIfEnabled({
historyMap: chatHistories,
historyKey,
limit: historyLimit,
}); });
}
log( if (isGroup && historyKey && chatHistories) {
`feishu[${account.accountId}]: dispatch complete (queuedFinal=${queuedFinal}, replies=${counts.final})`, clearHistoryEntriesIfEnabled({
); historyMap: chatHistories,
historyKey,
limit: historyLimit,
});
}
log(
`feishu[${account.accountId}]: dispatch complete (queuedFinal=${queuedFinal}, replies=${counts.final})`,
);
} finally {
dispatcher.markComplete();
try {
await dispatcher.waitForIdle();
} finally {
markDispatchIdle();
}
}
} catch (err) { } catch (err) {
error(`feishu[${account.accountId}]: failed to dispatch message: ${String(err)}`); error(`feishu[${account.accountId}]: failed to dispatch message: ${String(err)}`);
} }

View File

@@ -655,31 +655,39 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
}, },
}); });
const { queuedFinal, counts } = await core.channel.reply.dispatchReplyFromConfig({ try {
ctx: ctxPayload, const { queuedFinal, counts } = await core.channel.reply.dispatchReplyFromConfig({
cfg, ctx: ctxPayload,
dispatcher, cfg,
replyOptions: { dispatcher,
...replyOptions, replyOptions: {
skillFilter: roomConfig?.skills, ...replyOptions,
onModelSelected, skillFilter: roomConfig?.skills,
}, onModelSelected,
}); },
markDispatchIdle();
if (!queuedFinal) {
return;
}
didSendReply = true;
const finalCount = counts.final;
logVerboseMessage(
`matrix: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${replyTarget}`,
);
if (didSendReply) {
const previewText = bodyText.replace(/\s+/g, " ").slice(0, 160);
core.system.enqueueSystemEvent(`Matrix message from ${senderName}: ${previewText}`, {
sessionKey: route.sessionKey,
contextKey: `matrix:message:${roomId}:${messageId || "unknown"}`,
}); });
if (!queuedFinal) {
return;
}
didSendReply = true;
const finalCount = counts.final;
logVerboseMessage(
`matrix: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${replyTarget}`,
);
if (didSendReply) {
const previewText = bodyText.replace(/\s+/g, " ").slice(0, 160);
core.system.enqueueSystemEvent(`Matrix message from ${senderName}: ${previewText}`, {
sessionKey: route.sessionKey,
contextKey: `matrix:message:${roomId}:${messageId || "unknown"}`,
});
}
} finally {
dispatcher.markComplete();
try {
await dispatcher.waitForIdle();
} finally {
markDispatchIdle();
}
} }
} catch (err) { } catch (err) {
runtime.error?.(`matrix handler failed: ${String(err)}`); runtime.error?.(`matrix handler failed: ${String(err)}`);

View File

@@ -775,24 +775,32 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
}, },
}); });
await core.channel.reply.dispatchReplyFromConfig({ try {
ctx: ctxPayload, await core.channel.reply.dispatchReplyFromConfig({
cfg, ctx: ctxPayload,
dispatcher, cfg,
replyOptions: { dispatcher,
...replyOptions, replyOptions: {
disableBlockStreaming: ...replyOptions,
typeof account.blockStreaming === "boolean" ? !account.blockStreaming : undefined, disableBlockStreaming:
onModelSelected, typeof account.blockStreaming === "boolean" ? !account.blockStreaming : undefined,
}, onModelSelected,
}); },
markDispatchIdle();
if (historyKey) {
clearHistoryEntriesIfEnabled({
historyMap: channelHistories,
historyKey,
limit: historyLimit,
}); });
if (historyKey) {
clearHistoryEntriesIfEnabled({
historyMap: channelHistories,
historyKey,
limit: historyLimit,
});
}
} finally {
dispatcher.markComplete();
try {
await dispatcher.waitForIdle();
} finally {
markDispatchIdle();
}
} }
}; };

View File

@@ -533,17 +533,30 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
log.info("dispatching to agent", { sessionKey: route.sessionKey }); log.info("dispatching to agent", { sessionKey: route.sessionKey });
try { try {
const { queuedFinal, counts } = await core.channel.reply.dispatchReplyFromConfig({ try {
ctx: ctxPayload, const { queuedFinal, counts } = await core.channel.reply.dispatchReplyFromConfig({
cfg, ctx: ctxPayload,
dispatcher, cfg,
replyOptions, dispatcher,
}); replyOptions,
});
markDispatchIdle(); log.info("dispatch complete", { queuedFinal, counts });
log.info("dispatch complete", { queuedFinal, counts });
if (!queuedFinal) { if (!queuedFinal) {
if (isRoomish && historyKey) {
clearHistoryEntriesIfEnabled({
historyMap: conversationHistories,
historyKey,
limit: historyLimit,
});
}
return;
}
const finalCount = counts.final;
logVerboseMessage(
`msteams: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${teamsTo}`,
);
if (isRoomish && historyKey) { if (isRoomish && historyKey) {
clearHistoryEntriesIfEnabled({ clearHistoryEntriesIfEnabled({
historyMap: conversationHistories, historyMap: conversationHistories,
@@ -551,18 +564,13 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
limit: historyLimit, limit: historyLimit,
}); });
} }
return; } finally {
} dispatcher.markComplete();
const finalCount = counts.final; try {
logVerboseMessage( await dispatcher.waitForIdle();
`msteams: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${teamsTo}`, } finally {
); markDispatchIdle();
if (isRoomish && historyKey) { }
clearHistoryEntriesIfEnabled({
historyMap: conversationHistories,
historyKey,
limit: historyLimit,
});
} }
} catch (err) { } catch (err) {
log.error("dispatch failed", { error: String(err) }); log.error("dispatch failed", { error: String(err) });

View File

@@ -286,6 +286,45 @@ describe("dispatchReplyFromConfig", () => {
); );
}); });
it("forces suppressTyping when routing to a different originating channel", async () => {
setNoAbort();
const cfg = emptyConfig;
const dispatcher = createDispatcher();
const ctx = buildTestCtx({
Provider: "slack",
OriginatingChannel: "telegram",
OriginatingTo: "telegram:999",
});
const replyResolver = async (_ctx: MsgContext, opts?: GetReplyOptions) => {
expect(opts?.suppressTyping).toBe(true);
expect(opts?.typingPolicy).toBe("system_event");
return { text: "hi" } satisfies ReplyPayload;
};
await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver });
});
it("forces suppressTyping for internal webchat turns", async () => {
setNoAbort();
const cfg = emptyConfig;
const dispatcher = createDispatcher();
const ctx = buildTestCtx({
Provider: "webchat",
Surface: "webchat",
OriginatingChannel: "webchat",
OriginatingTo: "session:abc",
});
const replyResolver = async (_ctx: MsgContext, opts?: GetReplyOptions) => {
expect(opts?.suppressTyping).toBe(true);
expect(opts?.typingPolicy).toBe("internal_webchat");
return { text: "hi" } satisfies ReplyPayload;
};
await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver });
});
it("routes media-only tool results when summaries are suppressed", async () => { it("routes media-only tool results when summaries are suppressed", async () => {
setNoAbort(); setNoAbort();
mocks.routeReply.mockClear(); mocks.routeReply.mockClear();

View File

@@ -12,6 +12,7 @@ import {
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
import { resolveSendPolicy } from "../../sessions/send-policy.js"; import { resolveSendPolicy } from "../../sessions/send-policy.js";
import { maybeApplyTtsToPayload, normalizeTtsAutoMode, resolveTtsConfig } from "../../tts/tts.js"; import { maybeApplyTtsToPayload, normalizeTtsAutoMode, resolveTtsConfig } from "../../tts/tts.js";
import { INTERNAL_MESSAGE_CHANNEL } from "../../utils/message-channel.js";
import { getReplyFromConfig } from "../reply.js"; import { getReplyFromConfig } from "../reply.js";
import type { FinalizedMsgContext } from "../templating.js"; import type { FinalizedMsgContext } from "../templating.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js";
@@ -253,6 +254,8 @@ export async function dispatchReplyFromConfig(params: {
const shouldRouteToOriginating = Boolean( const shouldRouteToOriginating = Boolean(
isRoutableChannel(originatingChannel) && originatingTo && originatingChannel !== currentSurface, isRoutableChannel(originatingChannel) && originatingTo && originatingChannel !== currentSurface,
); );
const shouldSuppressTyping =
shouldRouteToOriginating || originatingChannel === INTERNAL_MESSAGE_CHANNEL;
const ttsChannel = shouldRouteToOriginating ? originatingChannel : currentSurface; const ttsChannel = shouldRouteToOriginating ? originatingChannel : currentSurface;
/** /**
@@ -397,6 +400,14 @@ export async function dispatchReplyFromConfig(params: {
ctx, ctx,
{ {
...params.replyOptions, ...params.replyOptions,
typingPolicy:
params.replyOptions?.typingPolicy ??
(originatingChannel === INTERNAL_MESSAGE_CHANNEL
? "internal_webchat"
: shouldRouteToOriginating
? "system_event"
: undefined),
suppressTyping: params.replyOptions?.suppressTyping === true || shouldSuppressTyping,
onToolResult: (payload: ReplyPayload) => { onToolResult: (payload: ReplyPayload) => {
const run = async () => { const run = async () => {
const ttsPayload = await maybeApplyTtsToPayload({ const ttsPayload = await maybeApplyTtsToPayload({

View File

@@ -81,6 +81,7 @@ vi.mock("./typing-mode.js", () => ({
import { runReplyAgent } from "./agent-runner.js"; import { runReplyAgent } from "./agent-runner.js";
import { routeReply } from "./route-reply.js"; import { routeReply } from "./route-reply.js";
import { resolveTypingMode } from "./typing-mode.js";
function baseParams( function baseParams(
overrides: Partial<Parameters<typeof runPreparedReply>[0]> = {}, overrides: Partial<Parameters<typeof runPreparedReply>[0]> = {},
@@ -249,4 +250,48 @@ describe("runPreparedReply media-only handling", () => {
expect(vi.mocked(routeReply)).not.toHaveBeenCalled(); expect(vi.mocked(routeReply)).not.toHaveBeenCalled();
}); });
it("uses inbound origin channel for run messageProvider", async () => {
await runPreparedReply(
baseParams({
ctx: {
Body: "",
RawBody: "",
CommandBody: "",
ThreadHistoryBody: "Earlier message in this thread",
OriginatingChannel: "webchat",
OriginatingTo: "session:abc",
ChatType: "group",
},
sessionCtx: {
Body: "",
BodyStripped: "",
ThreadHistoryBody: "Earlier message in this thread",
MediaPath: "/tmp/input.png",
Provider: "telegram",
ChatType: "group",
OriginatingChannel: "telegram",
OriginatingTo: "telegram:123",
},
}),
);
const call = vi.mocked(runReplyAgent).mock.calls[0]?.[0];
expect(call?.followupRun.run.messageProvider).toBe("webchat");
});
it("passes suppressTyping through typing mode resolution", async () => {
await runPreparedReply(
baseParams({
opts: {
suppressTyping: true,
},
}),
);
const call = vi.mocked(resolveTypingMode).mock.calls[0]?.[0] as
| { suppressTyping?: boolean }
| undefined;
expect(call?.suppressTyping).toBe(true);
});
}); });

View File

@@ -18,6 +18,7 @@ import {
import { logVerbose } from "../../globals.js"; import { logVerbose } from "../../globals.js";
import { clearCommandLane, getQueueSize } from "../../process/command-queue.js"; import { clearCommandLane, getQueueSize } from "../../process/command-queue.js";
import { normalizeMainKey } from "../../routing/session-key.js"; import { normalizeMainKey } from "../../routing/session-key.js";
import { INTERNAL_MESSAGE_CHANNEL } from "../../utils/message-channel.js";
import { isReasoningTagProvider } from "../../utils/provider-utils.js"; import { isReasoningTagProvider } from "../../utils/provider-utils.js";
import { hasControlCommand } from "../command-detection.js"; import { hasControlCommand } from "../command-detection.js";
import { buildInboundMediaNote } from "../media-note.js"; import { buildInboundMediaNote } from "../media-note.js";
@@ -233,11 +234,21 @@ export async function runPreparedReply(
const isGroupChat = sessionCtx.ChatType === "group"; const isGroupChat = sessionCtx.ChatType === "group";
const wasMentioned = ctx.WasMentioned === true; const wasMentioned = ctx.WasMentioned === true;
const isHeartbeat = opts?.isHeartbeat === true; const isHeartbeat = opts?.isHeartbeat === true;
const typingPolicy =
opts?.typingPolicy ??
(isHeartbeat
? "heartbeat"
: ctx.OriginatingChannel === INTERNAL_MESSAGE_CHANNEL
? "internal_webchat"
: "auto");
const suppressTyping = opts?.suppressTyping === true;
const typingMode = resolveTypingMode({ const typingMode = resolveTypingMode({
configured: sessionCfg?.typingMode ?? agentCfg?.typingMode, configured: sessionCfg?.typingMode ?? agentCfg?.typingMode,
isGroupChat, isGroupChat,
wasMentioned, wasMentioned,
isHeartbeat, isHeartbeat,
typingPolicy,
suppressTyping,
}); });
const shouldInjectGroupIntro = Boolean( const shouldInjectGroupIntro = Boolean(
isGroupChat && (isFirstTurnInSession || sessionEntry?.groupActivationNeedsSystemIntro), isGroupChat && (isFirstTurnInSession || sessionEntry?.groupActivationNeedsSystemIntro),
@@ -462,8 +473,8 @@ export async function runPreparedReply(
sessionId: sessionIdFinal, sessionId: sessionIdFinal,
sessionKey, sessionKey,
messageProvider: resolveOriginMessageProvider({ messageProvider: resolveOriginMessageProvider({
originatingChannel: sessionCtx.OriginatingChannel, originatingChannel: ctx.OriginatingChannel ?? sessionCtx.OriginatingChannel,
provider: sessionCtx.Provider, provider: ctx.Surface ?? ctx.Provider ?? sessionCtx.Provider,
}), }),
agentAccountId: sessionCtx.AccountId, agentAccountId: sessionCtx.AccountId,
groupId: resolveGroupSessionKey(sessionCtx)?.id ?? undefined, groupId: resolveGroupSessionKey(sessionCtx)?.id ?? undefined,

View File

@@ -257,6 +257,28 @@ describe("resolveTypingMode", () => {
}, },
expected: "never", expected: "never",
}, },
{
name: "suppressTyping forces never",
input: {
configured: "instant" as const,
isGroupChat: false,
wasMentioned: false,
isHeartbeat: false,
suppressTyping: true,
},
expected: "never",
},
{
name: "typingPolicy system_event forces never",
input: {
configured: "instant" as const,
isGroupChat: false,
wasMentioned: false,
isHeartbeat: false,
typingPolicy: "system_event" as const,
},
expected: "never",
},
] as const; ] as const;
for (const testCase of cases) { for (const testCase of cases) {

View File

@@ -1,5 +1,6 @@
import type { TypingMode } from "../../config/types.js"; import type { TypingMode } from "../../config/types.js";
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js"; import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js";
import type { TypingPolicy } from "../types.js";
import type { TypingController } from "./typing.js"; import type { TypingController } from "./typing.js";
export type TypingModeContext = { export type TypingModeContext = {
@@ -7,6 +8,8 @@ export type TypingModeContext = {
isGroupChat: boolean; isGroupChat: boolean;
wasMentioned: boolean; wasMentioned: boolean;
isHeartbeat: boolean; isHeartbeat: boolean;
typingPolicy?: TypingPolicy;
suppressTyping?: boolean;
}; };
export const DEFAULT_GROUP_TYPING_MODE: TypingMode = "message"; export const DEFAULT_GROUP_TYPING_MODE: TypingMode = "message";
@@ -16,8 +19,16 @@ export function resolveTypingMode({
isGroupChat, isGroupChat,
wasMentioned, wasMentioned,
isHeartbeat, isHeartbeat,
typingPolicy,
suppressTyping,
}: TypingModeContext): TypingMode { }: TypingModeContext): TypingMode {
if (isHeartbeat) { if (
isHeartbeat ||
typingPolicy === "heartbeat" ||
typingPolicy === "system_event" ||
typingPolicy === "internal_webchat" ||
suppressTyping
) {
return "never"; return "never";
} }
if (configured) { if (configured) {

View File

@@ -13,6 +13,13 @@ export type ModelSelectedContext = {
thinkLevel: string | undefined; thinkLevel: string | undefined;
}; };
export type TypingPolicy =
| "auto"
| "user_message"
| "system_event"
| "internal_webchat"
| "heartbeat";
export type GetReplyOptions = { export type GetReplyOptions = {
/** Override run id for agent events (defaults to random UUID). */ /** Override run id for agent events (defaults to random UUID). */
runId?: string; runId?: string;
@@ -27,6 +34,10 @@ export type GetReplyOptions = {
onTypingCleanup?: () => void; onTypingCleanup?: () => void;
onTypingController?: (typing: TypingController) => void; onTypingController?: (typing: TypingController) => void;
isHeartbeat?: boolean; isHeartbeat?: boolean;
/** Policy-level typing control for run classes (user/system/internal/heartbeat). */
typingPolicy?: TypingPolicy;
/** Force-disable typing indicators for this run (system/internal/cross-channel routes). */
suppressTyping?: boolean;
/** Resolved heartbeat model override (provider/model string from merged per-agent config). */ /** Resolved heartbeat model override (provider/model string from merged per-agent config). */
heartbeatModelOverride?: string; heartbeatModelOverride?: string;
/** If true, suppress tool error warning payloads for this run. */ /** If true, suppress tool error warning payloads for this run. */

View File

@@ -73,6 +73,80 @@ describe("createTypingCallbacks", () => {
} }
}); });
it("stops keepalive after consecutive start failures", async () => {
vi.useFakeTimers();
try {
const start = vi.fn().mockRejectedValue(new Error("gone"));
const onStartError = vi.fn();
const callbacks = createTypingCallbacks({ start, onStartError });
await callbacks.onReplyStart();
expect(start).toHaveBeenCalledTimes(1);
expect(onStartError).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(3_000);
expect(start).toHaveBeenCalledTimes(2);
expect(onStartError).toHaveBeenCalledTimes(2);
await vi.advanceTimersByTimeAsync(9_000);
expect(start).toHaveBeenCalledTimes(2);
} finally {
vi.useRealTimers();
}
});
it("does not restart keepalive when breaker trips on initial start", async () => {
vi.useFakeTimers();
try {
const start = vi.fn().mockRejectedValue(new Error("gone"));
const onStartError = vi.fn();
const callbacks = createTypingCallbacks({
start,
onStartError,
maxConsecutiveFailures: 1,
});
await callbacks.onReplyStart();
expect(start).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(9_000);
expect(start).toHaveBeenCalledTimes(1);
expect(onStartError).toHaveBeenCalledTimes(1);
} finally {
vi.useRealTimers();
}
});
it("resets failure counter after a successful keepalive tick", async () => {
vi.useFakeTimers();
try {
let callCount = 0;
const start = vi.fn().mockImplementation(async () => {
callCount += 1;
if (callCount % 2 === 1) {
throw new Error("flaky");
}
});
const onStartError = vi.fn();
const callbacks = createTypingCallbacks({
start,
onStartError,
maxConsecutiveFailures: 2,
});
await callbacks.onReplyStart(); // fail
await vi.advanceTimersByTimeAsync(3_000); // success
await vi.advanceTimersByTimeAsync(3_000); // fail
await vi.advanceTimersByTimeAsync(3_000); // success
await vi.advanceTimersByTimeAsync(3_000); // fail
expect(start).toHaveBeenCalledTimes(5);
expect(onStartError).toHaveBeenCalledTimes(3);
} finally {
vi.useRealTimers();
}
});
it("deduplicates stop across idle and cleanup", async () => { it("deduplicates stop across idle and cleanup", async () => {
const start = vi.fn().mockResolvedValue(undefined); const start = vi.fn().mockResolvedValue(undefined);
const stop = vi.fn().mockResolvedValue(undefined); const stop = vi.fn().mockResolvedValue(undefined);

View File

@@ -13,6 +13,8 @@ export type CreateTypingCallbacksParams = {
onStartError: (err: unknown) => void; onStartError: (err: unknown) => void;
onStopError?: (err: unknown) => void; onStopError?: (err: unknown) => void;
keepaliveIntervalMs?: number; keepaliveIntervalMs?: number;
/** Stop keepalive after this many consecutive start() failures. Default: 2 */
maxConsecutiveFailures?: number;
/** Maximum duration for typing indicator before auto-cleanup (safety TTL). Default: 60s */ /** Maximum duration for typing indicator before auto-cleanup (safety TTL). Default: 60s */
maxDurationMs?: number; maxDurationMs?: number;
}; };
@@ -20,19 +22,31 @@ export type CreateTypingCallbacksParams = {
export function createTypingCallbacks(params: CreateTypingCallbacksParams): TypingCallbacks { export function createTypingCallbacks(params: CreateTypingCallbacksParams): TypingCallbacks {
const stop = params.stop; const stop = params.stop;
const keepaliveIntervalMs = params.keepaliveIntervalMs ?? 3_000; const keepaliveIntervalMs = params.keepaliveIntervalMs ?? 3_000;
const maxConsecutiveFailures = Math.max(1, params.maxConsecutiveFailures ?? 2);
const maxDurationMs = params.maxDurationMs ?? 60_000; // Default 60s TTL const maxDurationMs = params.maxDurationMs ?? 60_000; // Default 60s TTL
let stopSent = false; let stopSent = false;
let closed = false; let closed = false;
let consecutiveFailures = 0;
let breakerTripped = false;
let ttlTimer: ReturnType<typeof setTimeout> | undefined; let ttlTimer: ReturnType<typeof setTimeout> | undefined;
const fireStart = async () => { const fireStart = async (): Promise<void> => {
if (closed) { if (closed) {
return; return;
} }
if (breakerTripped) {
return;
}
try { try {
await params.start(); await params.start();
consecutiveFailures = 0;
} catch (err) { } catch (err) {
consecutiveFailures += 1;
params.onStartError(err); params.onStartError(err);
if (consecutiveFailures >= maxConsecutiveFailures) {
breakerTripped = true;
keepaliveLoop.stop();
}
} }
}; };
@@ -67,9 +81,14 @@ export function createTypingCallbacks(params: CreateTypingCallbacksParams): Typi
return; return;
} }
stopSent = false; stopSent = false;
breakerTripped = false;
consecutiveFailures = 0;
keepaliveLoop.stop(); keepaliveLoop.stop();
clearTtlTimer(); clearTtlTimer();
await fireStart(); await fireStart();
if (breakerTripped) {
return;
}
keepaliveLoop.start(); keepaliveLoop.start();
startTtlTimer(); // Start TTL safety timer startTtlTimer(); // Start TTL safety timer
}; };