refactor(channels): share draft stream loop across slack and telegram

This commit is contained in:
Peter Steinberger
2026-02-16 23:47:35 +00:00
parent f6111622e6
commit 3451159174
3 changed files with 115 additions and 150 deletions

View File

@@ -0,0 +1,92 @@
export type DraftStreamLoop = {
update: (text: string) => void;
flush: () => Promise<void>;
stop: () => void;
resetPending: () => void;
waitForInFlight: () => Promise<void>;
};
export function createDraftStreamLoop(params: {
throttleMs: number;
isStopped: () => boolean;
sendOrEditStreamMessage: (text: string) => Promise<void>;
}): DraftStreamLoop {
let lastSentAt = 0;
let pendingText = "";
let inFlightPromise: Promise<void> | undefined;
let timer: ReturnType<typeof setTimeout> | undefined;
const flush = async () => {
if (timer) {
clearTimeout(timer);
timer = undefined;
}
while (!params.isStopped()) {
if (inFlightPromise) {
await inFlightPromise;
continue;
}
const text = pendingText;
if (!text.trim()) {
pendingText = "";
return;
}
pendingText = "";
lastSentAt = Date.now();
const current = params.sendOrEditStreamMessage(text).finally(() => {
if (inFlightPromise === current) {
inFlightPromise = undefined;
}
});
inFlightPromise = current;
await current;
if (!pendingText) {
return;
}
}
};
const schedule = () => {
if (timer) {
return;
}
const delay = Math.max(0, params.throttleMs - (Date.now() - lastSentAt));
timer = setTimeout(() => {
void flush();
}, delay);
};
return {
update: (text: string) => {
if (params.isStopped()) {
return;
}
pendingText = text;
if (inFlightPromise) {
schedule();
return;
}
if (!timer && Date.now() - lastSentAt >= params.throttleMs) {
void flush();
return;
}
schedule();
},
flush,
stop: () => {
pendingText = "";
if (timer) {
clearTimeout(timer);
timer = undefined;
}
},
resetPending: () => {
pendingText = "";
},
waitForInFlight: async () => {
if (inFlightPromise) {
await inFlightPromise;
}
},
};
}

View File

@@ -1,3 +1,4 @@
import { createDraftStreamLoop } from "../channels/draft-stream-loop.js";
import { deleteSlackMessage, editSlackMessage } from "./actions.js"; import { deleteSlackMessage, editSlackMessage } from "./actions.js";
import { sendMessageSlack } from "./send.js"; import { sendMessageSlack } from "./send.js";
@@ -37,10 +38,6 @@ export function createSlackDraftStream(params: {
let streamMessageId: string | undefined; let streamMessageId: string | undefined;
let streamChannelId: string | undefined; let streamChannelId: string | undefined;
let lastSentText = ""; let lastSentText = "";
let lastSentAt = 0;
let pendingText = "";
let inFlightPromise: Promise<void> | undefined;
let timer: ReturnType<typeof setTimeout> | undefined;
let stopped = false; let stopped = false;
const sendOrEditStreamMessage = async (text: string) => { const sendOrEditStreamMessage = async (text: string) => {
@@ -60,7 +57,6 @@ export function createSlackDraftStream(params: {
return; return;
} }
lastSentText = trimmed; lastSentText = trimmed;
lastSentAt = Date.now();
try { try {
if (streamChannelId && streamMessageId) { if (streamChannelId && streamMessageId) {
await edit(streamChannelId, streamMessageId, trimmed, { await edit(streamChannelId, streamMessageId, trimmed, {
@@ -89,77 +85,20 @@ export function createSlackDraftStream(params: {
); );
} }
}; };
const loop = createDraftStreamLoop({
const flush = async () => { throttleMs,
if (timer) { isStopped: () => stopped,
clearTimeout(timer); sendOrEditStreamMessage,
timer = undefined; });
}
while (!stopped) {
if (inFlightPromise) {
await inFlightPromise;
continue;
}
const text = pendingText;
const trimmed = text.trim();
if (!trimmed) {
pendingText = "";
return;
}
pendingText = "";
const current = sendOrEditStreamMessage(text).finally(() => {
if (inFlightPromise === current) {
inFlightPromise = undefined;
}
});
inFlightPromise = current;
await current;
if (!pendingText) {
return;
}
}
};
const schedule = () => {
if (timer) {
return;
}
const delay = Math.max(0, throttleMs - (Date.now() - lastSentAt));
timer = setTimeout(() => {
void flush();
}, delay);
};
const update = (text: string) => {
if (stopped) {
return;
}
pendingText = text;
if (inFlightPromise) {
schedule();
return;
}
if (!timer && Date.now() - lastSentAt >= throttleMs) {
void flush();
return;
}
schedule();
};
const stop = () => { const stop = () => {
stopped = true; stopped = true;
pendingText = ""; loop.stop();
if (timer) {
clearTimeout(timer);
timer = undefined;
}
}; };
const clear = async () => { const clear = async () => {
stop(); stop();
if (inFlightPromise) { await loop.waitForInFlight();
await inFlightPromise;
}
const channelId = streamChannelId; const channelId = streamChannelId;
const messageId = streamMessageId; const messageId = streamMessageId;
streamChannelId = undefined; streamChannelId = undefined;
@@ -184,14 +123,14 @@ export function createSlackDraftStream(params: {
streamMessageId = undefined; streamMessageId = undefined;
streamChannelId = undefined; streamChannelId = undefined;
lastSentText = ""; lastSentText = "";
pendingText = ""; loop.resetPending();
}; };
params.log?.(`slack stream preview ready (maxChars=${maxChars}, throttleMs=${throttleMs})`); params.log?.(`slack stream preview ready (maxChars=${maxChars}, throttleMs=${throttleMs})`);
return { return {
update, update: loop.update,
flush, flush: loop.flush,
clear, clear,
stop, stop,
forceNewMessage, forceNewMessage,

View File

@@ -1,4 +1,5 @@
import type { Bot } from "grammy"; import type { Bot } from "grammy";
import { createDraftStreamLoop } from "../channels/draft-stream-loop.js";
import { buildTelegramThreadParams, type TelegramThreadSpec } from "./bot/helpers.js"; import { buildTelegramThreadParams, type TelegramThreadSpec } from "./bot/helpers.js";
const TELEGRAM_STREAM_MAX_CHARS = 4096; const TELEGRAM_STREAM_MAX_CHARS = 4096;
@@ -38,10 +39,6 @@ export function createTelegramDraftStream(params: {
let streamMessageId: number | undefined; let streamMessageId: number | undefined;
let lastSentText = ""; let lastSentText = "";
let lastSentAt = 0;
let pendingText = "";
let inFlightPromise: Promise<void> | undefined;
let timer: ReturnType<typeof setTimeout> | undefined;
let stopped = false; let stopped = false;
const sendOrEditStreamMessage = async (text: string) => { const sendOrEditStreamMessage = async (text: string) => {
@@ -65,7 +62,6 @@ export function createTelegramDraftStream(params: {
return; return;
} }
lastSentText = trimmed; lastSentText = trimmed;
lastSentAt = Date.now();
try { try {
if (typeof streamMessageId === "number") { if (typeof streamMessageId === "number") {
await params.api.editMessageText(chatId, streamMessageId, trimmed); await params.api.editMessageText(chatId, streamMessageId, trimmed);
@@ -86,47 +82,15 @@ export function createTelegramDraftStream(params: {
); );
} }
}; };
const loop = createDraftStreamLoop({
const flush = async () => { throttleMs,
if (timer) { isStopped: () => stopped,
clearTimeout(timer); sendOrEditStreamMessage,
timer = undefined; });
}
while (!stopped) {
if (inFlightPromise) {
await inFlightPromise;
continue;
}
const text = pendingText;
const trimmed = text.trim();
if (!trimmed) {
pendingText = "";
return;
}
pendingText = "";
const current = sendOrEditStreamMessage(text).finally(() => {
if (inFlightPromise === current) {
inFlightPromise = undefined;
}
});
inFlightPromise = current;
await current;
if (!pendingText) {
return;
}
}
};
const clear = async () => { const clear = async () => {
if (timer) { stop();
clearTimeout(timer); await loop.waitForInFlight();
timer = undefined;
}
pendingText = "";
stopped = true;
if (inFlightPromise) {
await inFlightPromise;
}
const messageId = streamMessageId; const messageId = streamMessageId;
streamMessageId = undefined; streamMessageId = undefined;
if (typeof messageId !== "number") { if (typeof messageId !== "number") {
@@ -141,52 +105,22 @@ export function createTelegramDraftStream(params: {
} }
}; };
const schedule = () => {
if (timer) {
return;
}
const delay = Math.max(0, throttleMs - (Date.now() - lastSentAt));
timer = setTimeout(() => {
void flush();
}, delay);
};
const update = (text: string) => {
if (stopped) {
return;
}
pendingText = text;
if (inFlightPromise) {
schedule();
return;
}
if (!timer && Date.now() - lastSentAt >= throttleMs) {
void flush();
return;
}
schedule();
};
const stop = () => { const stop = () => {
stopped = true; stopped = true;
pendingText = ""; loop.stop();
if (timer) {
clearTimeout(timer);
timer = undefined;
}
}; };
const forceNewMessage = () => { const forceNewMessage = () => {
streamMessageId = undefined; streamMessageId = undefined;
lastSentText = ""; lastSentText = "";
pendingText = ""; loop.resetPending();
}; };
params.log?.(`telegram stream preview ready (maxChars=${maxChars}, throttleMs=${throttleMs})`); params.log?.(`telegram stream preview ready (maxChars=${maxChars}, throttleMs=${throttleMs})`);
return { return {
update, update: loop.update,
flush, flush: loop.flush,
messageId: () => streamMessageId, messageId: () => streamMessageId,
clear, clear,
stop, stop,