fix: keep channel typing active during long inference (#25886, thanks @stakeswky)

Co-authored-by: stakeswky <stakeswky@users.noreply.github.com>
This commit is contained in:
Peter Steinberger
2026-02-25 02:03:15 +00:00
parent dcd90438ec
commit e0201c2774
10 changed files with 111 additions and 16 deletions

View File

@@ -190,4 +190,48 @@ describe("createTypingCallbacks", () => {
expect(stop).toHaveBeenCalledTimes(1);
expect(onStopError).toHaveBeenCalledTimes(1);
});
it("sends typing keepalive pings until idle cleanup", async () => {
vi.useFakeTimers();
try {
const start = vi.fn().mockResolvedValue(undefined);
const stop = vi.fn().mockResolvedValue(undefined);
const onStartError = vi.fn();
const callbacks = createTypingCallbacks({ start, stop, onStartError });
await callbacks.onReplyStart();
expect(start).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(2_999);
expect(start).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(1);
expect(start).toHaveBeenCalledTimes(2);
await vi.advanceTimersByTimeAsync(3_000);
expect(start).toHaveBeenCalledTimes(3);
callbacks.onIdle?.();
await flushMicrotasks();
expect(stop).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(9_000);
expect(start).toHaveBeenCalledTimes(3);
} finally {
vi.useRealTimers();
}
});
it("deduplicates stop across idle and cleanup", async () => {
const start = vi.fn().mockResolvedValue(undefined);
const stop = vi.fn().mockResolvedValue(undefined);
const onStartError = vi.fn();
const callbacks = createTypingCallbacks({ start, stop, onStartError });
callbacks.onIdle?.();
callbacks.onCleanup?.();
await flushMicrotasks();
expect(stop).toHaveBeenCalledTimes(1);
});
});

View File

@@ -10,9 +10,15 @@ export function createTypingCallbacks(params: {
stop?: () => Promise<void>;
onStartError: (err: unknown) => void;
onStopError?: (err: unknown) => void;
keepaliveIntervalMs?: number;
}): TypingCallbacks {
const stop = params.stop;
const onReplyStart = async () => {
const keepaliveIntervalMs = params.keepaliveIntervalMs ?? 3_000;
let keepaliveTimer: ReturnType<typeof setInterval> | undefined;
let keepaliveStartInFlight = false;
let stopSent = false;
const fireStart = async () => {
try {
await params.start();
} catch (err) {
@@ -20,11 +26,41 @@ export function createTypingCallbacks(params: {
}
};
const fireStop = stop
? () => {
void stop().catch((err) => (params.onStopError ?? params.onStartError)(err));
const clearKeepalive = () => {
if (!keepaliveTimer) {
return;
}
clearInterval(keepaliveTimer);
keepaliveTimer = undefined;
keepaliveStartInFlight = false;
};
const onReplyStart = async () => {
stopSent = false;
clearKeepalive();
await fireStart();
if (keepaliveIntervalMs <= 0) {
return;
}
keepaliveTimer = setInterval(() => {
if (keepaliveStartInFlight) {
return;
}
: undefined;
keepaliveStartInFlight = true;
void fireStart().finally(() => {
keepaliveStartInFlight = false;
});
}, keepaliveIntervalMs);
};
const fireStop = () => {
clearKeepalive();
if (!stop || stopSent) {
return;
}
stopSent = true;
void stop().catch((err) => (params.onStopError ?? params.onStartError)(err));
};
return { onReplyStart, onIdle: fireStop, onCleanup: fireStop };
}

View File

@@ -669,6 +669,8 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
await typingCallbacks.onReplyStart();
await statusReactions.setThinking();
},
onIdle: typingCallbacks.onIdle,
onCleanup: typingCallbacks.onCleanup,
});
let dispatchResult: Awaited<ReturnType<typeof dispatchInboundMessage>> | null = null;

View File

@@ -238,6 +238,8 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
deps.runtime.error?.(danger(`signal ${info.kind} reply failed: ${String(err)}`));
},
onReplyStart: typingCallbacks.onReplyStart,
onIdle: typingCallbacks.onIdle,
onCleanup: typingCallbacks.onCleanup,
});
const { queuedFinal } = await dispatchInboundMessage({

View File

@@ -306,6 +306,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
},
onReplyStart: typingCallbacks.onReplyStart,
onIdle: typingCallbacks.onIdle,
onCleanup: typingCallbacks.onCleanup,
});
const draftStream = createSlackDraftStream({

View File

@@ -418,6 +418,18 @@ export const dispatchTelegramMessage = async ({
void statusReactionController.setThinking();
}
const typingCallbacks = createTypingCallbacks({
start: sendTyping,
onStartError: (err) => {
logTypingFailure({
log: logVerbose,
channel: "telegram",
target: String(chatId),
error: err,
});
},
});
try {
({ queuedFinal } = await dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
@@ -528,17 +540,9 @@ export const dispatchTelegramMessage = async ({
deliveryState.markNonSilentFailure();
runtime.error?.(danger(`telegram ${info.kind} reply failed: ${String(err)}`));
},
onReplyStart: createTypingCallbacks({
start: sendTyping,
onStartError: (err) => {
logTypingFailure({
log: logVerbose,
channel: "telegram",
target: String(chatId),
error: err,
});
},
}).onReplyStart,
onReplyStart: typingCallbacks.onReplyStart,
onIdle: typingCallbacks.onIdle,
onCleanup: typingCallbacks.onCleanup,
},
replyOptions: {
skillFilter,