Discord: implement stream preview mode (#22111)

* Discord: implement stream preview mode

* Changelog: note Discord stream preview mode

* Tests: type discord draft stream mocks

* Docs: document Discord stream preview
This commit is contained in:
Shadow
2026-02-20 12:37:15 -06:00
committed by GitHub
parent 5dd304d1c6
commit 09e6970386
10 changed files with 682 additions and 16 deletions

View File

@@ -1,5 +1,6 @@
import { ChannelType } from "@buape/carbon";
import { resolveAckReaction, resolveHumanDelayConfig } from "../../agents/identity.js";
import { EmbeddedBlockChunker } from "../../agents/pi-embedded-block-chunker.js";
import { resolveChunkMode } from "../../auto-reply/chunk.js";
import { dispatchInboundMessage } from "../../auto-reply/dispatch.js";
import { formatInboundEnvelope, resolveEnvelopeFormatOptions } from "../../auto-reply/envelope.js";
@@ -18,11 +19,16 @@ import { createTypingCallbacks } from "../../channels/typing.js";
import { resolveMarkdownTableMode } from "../../config/markdown-tables.js";
import { readSessionUpdatedAt, resolveStorePath } from "../../config/sessions.js";
import { danger, logVerbose, shouldLogVerbose } from "../../globals.js";
import { convertMarkdownTables } from "../../markdown/tables.js";
import { buildAgentSessionKey } from "../../routing/resolve-route.js";
import { resolveThreadSessionKeys } from "../../routing/session-key.js";
import { buildUntrustedChannelMetadata } from "../../security/channel-metadata.js";
import { truncateUtf16Safe } from "../../utils.js";
import { chunkDiscordTextWithMode } from "../chunk.js";
import { resolveDiscordDraftStreamingChunking } from "../draft-chunking.js";
import { createDiscordDraftStream } from "../draft-stream.js";
import { reactMessageDiscord, removeReactionDiscord } from "../send.js";
import { editMessageDiscord } from "../send.messages.js";
import { normalizeDiscordSlug, resolveDiscordOwnerAllowFrom } from "./allow-list.js";
import { resolveTimestampMs } from "./format.js";
import type { DiscordMessagePreflightContext } from "./message-handler.preflight.js";
@@ -594,6 +600,7 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
channel: "discord",
accountId,
});
const chunkMode = resolveChunkMode(cfg, "discord", accountId);
const typingCallbacks = createTypingCallbacks({
start: () => sendTyping({ client, channelId: typingChannelId }),
@@ -607,10 +614,216 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
},
});
// --- Discord draft stream (edit-based preview streaming) ---
const discordStreamMode = discordConfig?.streamMode ?? "off";
const draftMaxChars = Math.min(textLimit, 2000);
const accountBlockStreamingEnabled =
typeof discordConfig?.blockStreaming === "boolean"
? discordConfig.blockStreaming
: cfg.agents?.defaults?.blockStreamingDefault === "on";
const canStreamDraft = discordStreamMode !== "off" && !accountBlockStreamingEnabled;
const draftReplyToMessageId = () => replyReference.use();
const deliverChannelId = deliverTarget.startsWith("channel:")
? deliverTarget.slice("channel:".length)
: messageChannelId;
const draftStream = canStreamDraft
? createDiscordDraftStream({
rest: client.rest,
channelId: deliverChannelId,
maxChars: draftMaxChars,
replyToMessageId: draftReplyToMessageId,
minInitialChars: 30,
throttleMs: 1200,
log: logVerbose,
warn: logVerbose,
})
: undefined;
const draftChunking =
draftStream && discordStreamMode === "block"
? resolveDiscordDraftStreamingChunking(cfg, accountId)
: undefined;
const shouldSplitPreviewMessages = discordStreamMode === "block";
const draftChunker = draftChunking ? new EmbeddedBlockChunker(draftChunking) : undefined;
let lastPartialText = "";
let draftText = "";
let hasStreamedMessage = false;
let finalizedViaPreviewMessage = false;
const resolvePreviewFinalText = (text?: string) => {
if (typeof text !== "string") {
return undefined;
}
const formatted = convertMarkdownTables(text, tableMode);
const chunks = chunkDiscordTextWithMode(formatted, {
maxChars: draftMaxChars,
maxLines: discordConfig?.maxLinesPerMessage,
chunkMode,
});
if (!chunks.length && formatted) {
chunks.push(formatted);
}
if (chunks.length !== 1) {
return undefined;
}
const trimmed = chunks[0].trim();
if (!trimmed) {
return undefined;
}
const currentPreviewText = discordStreamMode === "block" ? draftText : lastPartialText;
if (
currentPreviewText &&
currentPreviewText.startsWith(trimmed) &&
trimmed.length < currentPreviewText.length
) {
return undefined;
}
return trimmed;
};
const updateDraftFromPartial = (text?: string) => {
if (!draftStream || !text) {
return;
}
if (text === lastPartialText) {
return;
}
hasStreamedMessage = true;
if (discordStreamMode === "partial") {
// Keep the longer preview to avoid visible punctuation flicker.
if (
lastPartialText &&
lastPartialText.startsWith(text) &&
text.length < lastPartialText.length
) {
return;
}
lastPartialText = text;
draftStream.update(text);
return;
}
let delta = text;
if (text.startsWith(lastPartialText)) {
delta = text.slice(lastPartialText.length);
} else {
// Streaming buffer reset (or non-monotonic stream). Start fresh.
draftChunker?.reset();
draftText = "";
}
lastPartialText = text;
if (!delta) {
return;
}
if (!draftChunker) {
draftText = text;
draftStream.update(draftText);
return;
}
draftChunker.append(delta);
draftChunker.drain({
force: false,
emit: (chunk) => {
draftText += chunk;
draftStream.update(draftText);
},
});
};
const flushDraft = async () => {
if (!draftStream) {
return;
}
if (draftChunker?.hasBuffered()) {
draftChunker.drain({
force: true,
emit: (chunk) => {
draftText += chunk;
},
});
draftChunker.reset();
if (draftText) {
draftStream.update(draftText);
}
}
await draftStream.flush();
};
// When draft streaming is active, suppress block streaming to avoid double-streaming.
const disableBlockStreamingForDraft = draftStream ? true : undefined;
const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({
...prefixOptions,
humanDelay: resolveHumanDelayConfig(cfg, route.agentId),
deliver: async (payload: ReplyPayload) => {
deliver: async (payload: ReplyPayload, info) => {
const isFinal = info.kind === "final";
if (draftStream && isFinal) {
await flushDraft();
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
const finalText = payload.text;
const previewFinalText = resolvePreviewFinalText(finalText);
const previewMessageId = draftStream.messageId();
// Try to finalize via preview edit (text-only, fits in 2000 chars, not an error)
const canFinalizeViaPreviewEdit =
!finalizedViaPreviewMessage &&
!hasMedia &&
typeof previewFinalText === "string" &&
typeof previewMessageId === "string" &&
!payload.isError;
if (canFinalizeViaPreviewEdit) {
await draftStream.stop();
try {
await editMessageDiscord(
deliverChannelId,
previewMessageId,
{ content: previewFinalText },
{ rest: client.rest },
);
finalizedViaPreviewMessage = true;
replyReference.markSent();
return;
} catch (err) {
logVerbose(
`discord: preview final edit failed; falling back to standard send (${String(err)})`,
);
}
}
// Check if stop() flushed a message we can edit
if (!finalizedViaPreviewMessage) {
await draftStream.stop();
const messageIdAfterStop = draftStream.messageId();
if (
typeof messageIdAfterStop === "string" &&
typeof previewFinalText === "string" &&
!hasMedia &&
!payload.isError
) {
try {
await editMessageDiscord(
deliverChannelId,
messageIdAfterStop,
{ content: previewFinalText },
{ rest: client.rest },
);
finalizedViaPreviewMessage = true;
replyReference.markSent();
return;
} catch (err) {
logVerbose(
`discord: post-stop preview edit failed; falling back to standard send (${String(err)})`,
);
}
}
}
// Clear the preview and fall through to standard delivery
if (!finalizedViaPreviewMessage) {
await draftStream.clear();
}
}
const replyToId = replyReference.use();
await deliverDiscordReply({
replies: [payload],
@@ -623,7 +836,7 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
textLimit,
maxLinesPerMessage: discordConfig?.maxLinesPerMessage,
tableMode,
chunkMode: resolveChunkMode(cfg, "discord", accountId),
chunkMode,
});
replyReference.markSent();
},
@@ -647,9 +860,33 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
...replyOptions,
skillFilter: channelConfig?.skills,
disableBlockStreaming:
typeof discordConfig?.blockStreaming === "boolean"
disableBlockStreamingForDraft ??
(typeof discordConfig?.blockStreaming === "boolean"
? !discordConfig.blockStreaming
: undefined,
: undefined),
onPartialReply: draftStream ? (payload) => updateDraftFromPartial(payload.text) : undefined,
onAssistantMessageStart: draftStream
? () => {
if (shouldSplitPreviewMessages && hasStreamedMessage) {
logVerbose("discord: calling forceNewMessage() for draft stream");
draftStream.forceNewMessage();
}
lastPartialText = "";
draftText = "";
draftChunker?.reset();
}
: undefined,
onReasoningEnd: draftStream
? () => {
if (shouldSplitPreviewMessages && hasStreamedMessage) {
logVerbose("discord: calling forceNewMessage() for draft stream");
draftStream.forceNewMessage();
}
lastPartialText = "";
draftText = "";
draftChunker?.reset();
}
: undefined,
onModelSelected,
onReasoningStream: async () => {
await statusReactions.setThinking();
@@ -663,6 +900,11 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
dispatchError = true;
throw err;
} finally {
// Must stop() first to flush debounced content before clear() wipes state
await draftStream?.stop();
if (!finalizedViaPreviewMessage) {
await draftStream?.clear();
}
markDispatchIdle();
if (statusReactionsEnabled) {
if (dispatchError) {