diff --git a/src/channels/draft-stream-loop.ts b/src/channels/draft-stream-loop.ts new file mode 100644 index 00000000000..ed492ff4012 --- /dev/null +++ b/src/channels/draft-stream-loop.ts @@ -0,0 +1,92 @@ +export type DraftStreamLoop = { + update: (text: string) => void; + flush: () => Promise; + stop: () => void; + resetPending: () => void; + waitForInFlight: () => Promise; +}; + +export function createDraftStreamLoop(params: { + throttleMs: number; + isStopped: () => boolean; + sendOrEditStreamMessage: (text: string) => Promise; +}): DraftStreamLoop { + let lastSentAt = 0; + let pendingText = ""; + let inFlightPromise: Promise | undefined; + let timer: ReturnType | undefined; + + const flush = async () => { + if (timer) { + clearTimeout(timer); + timer = undefined; + } + while (!params.isStopped()) { + if (inFlightPromise) { + await inFlightPromise; + continue; + } + const text = pendingText; + if (!text.trim()) { + pendingText = ""; + return; + } + pendingText = ""; + lastSentAt = Date.now(); + const current = params.sendOrEditStreamMessage(text).finally(() => { + if (inFlightPromise === current) { + inFlightPromise = undefined; + } + }); + inFlightPromise = current; + await current; + if (!pendingText) { + return; + } + } + }; + + const schedule = () => { + if (timer) { + return; + } + const delay = Math.max(0, params.throttleMs - (Date.now() - lastSentAt)); + timer = setTimeout(() => { + void flush(); + }, delay); + }; + + return { + update: (text: string) => { + if (params.isStopped()) { + return; + } + pendingText = text; + if (inFlightPromise) { + schedule(); + return; + } + if (!timer && Date.now() - lastSentAt >= params.throttleMs) { + void flush(); + return; + } + schedule(); + }, + flush, + stop: () => { + pendingText = ""; + if (timer) { + clearTimeout(timer); + timer = undefined; + } + }, + resetPending: () => { + pendingText = ""; + }, + waitForInFlight: async () => { + if (inFlightPromise) { + await inFlightPromise; + } + }, + }; +} diff --git a/src/slack/draft-stream.ts b/src/slack/draft-stream.ts index 3e79a6e00b2..b482ebd5820 100644 --- a/src/slack/draft-stream.ts +++ b/src/slack/draft-stream.ts @@ -1,3 +1,4 @@ +import { createDraftStreamLoop } from "../channels/draft-stream-loop.js"; import { deleteSlackMessage, editSlackMessage } from "./actions.js"; import { sendMessageSlack } from "./send.js"; @@ -37,10 +38,6 @@ export function createSlackDraftStream(params: { let streamMessageId: string | undefined; let streamChannelId: string | undefined; let lastSentText = ""; - let lastSentAt = 0; - let pendingText = ""; - let inFlightPromise: Promise | undefined; - let timer: ReturnType | undefined; let stopped = false; const sendOrEditStreamMessage = async (text: string) => { @@ -60,7 +57,6 @@ export function createSlackDraftStream(params: { return; } lastSentText = trimmed; - lastSentAt = Date.now(); try { if (streamChannelId && streamMessageId) { await edit(streamChannelId, streamMessageId, trimmed, { @@ -89,77 +85,20 @@ export function createSlackDraftStream(params: { ); } }; - - const flush = async () => { - if (timer) { - clearTimeout(timer); - timer = undefined; - } - while (!stopped) { - if (inFlightPromise) { - await inFlightPromise; - continue; - } - const text = pendingText; - const trimmed = text.trim(); - if (!trimmed) { - pendingText = ""; - return; - } - pendingText = ""; - const current = sendOrEditStreamMessage(text).finally(() => { - if (inFlightPromise === current) { - inFlightPromise = undefined; - } - }); - inFlightPromise = current; - await current; - if (!pendingText) { - return; - } - } - }; - - const schedule = () => { - if (timer) { - return; - } - const delay = Math.max(0, throttleMs - (Date.now() - lastSentAt)); - timer = setTimeout(() => { - void flush(); - }, delay); - }; - - const update = (text: string) => { - if (stopped) { - return; - } - pendingText = text; - if (inFlightPromise) { - schedule(); - return; - } - if (!timer && Date.now() - lastSentAt >= throttleMs) { - void flush(); - return; - } - schedule(); - }; + const loop = createDraftStreamLoop({ + throttleMs, + isStopped: () => stopped, + sendOrEditStreamMessage, + }); const stop = () => { stopped = true; - pendingText = ""; - if (timer) { - clearTimeout(timer); - timer = undefined; - } + loop.stop(); }; const clear = async () => { stop(); - if (inFlightPromise) { - await inFlightPromise; - } + await loop.waitForInFlight(); const channelId = streamChannelId; const messageId = streamMessageId; streamChannelId = undefined; @@ -184,14 +123,14 @@ export function createSlackDraftStream(params: { streamMessageId = undefined; streamChannelId = undefined; lastSentText = ""; - pendingText = ""; + loop.resetPending(); }; params.log?.(`slack stream preview ready (maxChars=${maxChars}, throttleMs=${throttleMs})`); return { - update, - flush, + update: loop.update, + flush: loop.flush, clear, stop, forceNewMessage, diff --git a/src/telegram/draft-stream.ts b/src/telegram/draft-stream.ts index 17b3cbec654..1682413eb10 100644 --- a/src/telegram/draft-stream.ts +++ b/src/telegram/draft-stream.ts @@ -1,4 +1,5 @@ import type { Bot } from "grammy"; +import { createDraftStreamLoop } from "../channels/draft-stream-loop.js"; import { buildTelegramThreadParams, type TelegramThreadSpec } from "./bot/helpers.js"; const TELEGRAM_STREAM_MAX_CHARS = 4096; @@ -38,10 +39,6 @@ export function createTelegramDraftStream(params: { let streamMessageId: number | undefined; let lastSentText = ""; - let lastSentAt = 0; - let pendingText = ""; - let inFlightPromise: Promise | undefined; - let timer: ReturnType | undefined; let stopped = false; const sendOrEditStreamMessage = async (text: string) => { @@ -65,7 +62,6 @@ export function createTelegramDraftStream(params: { return; } lastSentText = trimmed; - lastSentAt = Date.now(); try { if (typeof streamMessageId === "number") { await params.api.editMessageText(chatId, streamMessageId, trimmed); @@ -86,47 +82,15 @@ export function createTelegramDraftStream(params: { ); } }; - - const flush = async () => { - if (timer) { - clearTimeout(timer); - timer = undefined; - } - while (!stopped) { - if (inFlightPromise) { - await inFlightPromise; - continue; - } - const text = pendingText; - const trimmed = text.trim(); - if (!trimmed) { - pendingText = ""; - return; - } - pendingText = ""; - const current = sendOrEditStreamMessage(text).finally(() => { - if (inFlightPromise === current) { - inFlightPromise = undefined; - } - }); - inFlightPromise = current; - await current; - if (!pendingText) { - return; - } - } - }; + const loop = createDraftStreamLoop({ + throttleMs, + isStopped: () => stopped, + sendOrEditStreamMessage, + }); const clear = async () => { - if (timer) { - clearTimeout(timer); - timer = undefined; - } - pendingText = ""; - stopped = true; - if (inFlightPromise) { - await inFlightPromise; - } + stop(); + await loop.waitForInFlight(); const messageId = streamMessageId; streamMessageId = undefined; if (typeof messageId !== "number") { @@ -141,52 +105,22 @@ export function createTelegramDraftStream(params: { } }; - const schedule = () => { - if (timer) { - return; - } - const delay = Math.max(0, throttleMs - (Date.now() - lastSentAt)); - timer = setTimeout(() => { - void flush(); - }, delay); - }; - - const update = (text: string) => { - if (stopped) { - return; - } - pendingText = text; - if (inFlightPromise) { - schedule(); - return; - } - if (!timer && Date.now() - lastSentAt >= throttleMs) { - void flush(); - return; - } - schedule(); - }; - const stop = () => { stopped = true; - pendingText = ""; - if (timer) { - clearTimeout(timer); - timer = undefined; - } + loop.stop(); }; const forceNewMessage = () => { streamMessageId = undefined; lastSentText = ""; - pendingText = ""; + loop.resetPending(); }; params.log?.(`telegram stream preview ready (maxChars=${maxChars}, throttleMs=${throttleMs})`); return { - update, - flush, + update: loop.update, + flush: loop.flush, messageId: () => streamMessageId, clear, stop,