From 89ce1460e15d6f2219cc5ffb866b8b91f19ae2a1 Mon Sep 17 00:00:00 2001 From: Colin Date: Mon, 16 Feb 2026 16:56:41 -0500 Subject: [PATCH] feat(slack): add configurable stream modes --- src/config/schema.help.ts | 2 + src/config/schema.labels.ts | 1 + src/config/types.slack.ts | 3 + src/config/zod-schema.providers-core.ts | 1 + src/slack/monitor/message-handler/dispatch.ts | 58 ++++++++++++++ src/slack/stream-mode.test.ts | 78 +++++++++++++++++++ src/slack/stream-mode.ts | 53 +++++++++++++ 7 files changed, 196 insertions(+) create mode 100644 src/slack/stream-mode.test.ts create mode 100644 src/slack/stream-mode.ts diff --git a/src/config/schema.help.ts b/src/config/schema.help.ts index e46af66b93d..88e66fd6fff 100644 --- a/src/config/schema.help.ts +++ b/src/config/schema.help.ts @@ -329,6 +329,8 @@ export const FIELD_HELP: Record = { "channels.slack.commands.native": 'Override native commands for Slack (bool or "auto").', "channels.slack.commands.nativeSkills": 'Override native skill commands for Slack (bool or "auto").', + "channels.slack.streamMode": + "Live stream preview mode for Slack replies (replace | status_final | append).", "session.agentToAgent.maxPingPongTurns": "Max reply-back turns between requester and target (0–5).", "channels.telegram.customCommands": diff --git a/src/config/schema.labels.ts b/src/config/schema.labels.ts index e7fc90854ca..e0c28ae0a81 100644 --- a/src/config/schema.labels.ts +++ b/src/config/schema.labels.ts @@ -279,6 +279,7 @@ export const FIELD_LABELS: Record = { "channels.slack.appToken": "Slack App Token", "channels.slack.userToken": "Slack User Token", "channels.slack.userTokenReadOnly": "Slack User Token Read Only", + "channels.slack.streamMode": "Slack Stream Mode", "channels.slack.thread.historyScope": "Slack Thread History Scope", "channels.slack.thread.inheritParent": "Slack Thread Parent Inheritance", "channels.slack.thread.initialHistoryLimit": "Slack Thread Initial History Limit", diff --git a/src/config/types.slack.ts b/src/config/types.slack.ts index ead656cce29..ae5dee2e9f9 100644 --- a/src/config/types.slack.ts +++ b/src/config/types.slack.ts @@ -45,6 +45,7 @@ export type SlackChannelConfig = { }; export type SlackReactionNotificationMode = "off" | "own" | "all" | "allowlist"; +export type SlackStreamMode = "replace" | "status_final" | "append"; export type SlackActionConfig = { reactions?: boolean; @@ -124,6 +125,8 @@ export type SlackAccountConfig = { blockStreaming?: boolean; /** Merge streamed block replies before sending. */ blockStreamingCoalesce?: BlockStreamingCoalesceConfig; + /** Slack stream preview mode (replace|status_final|append). Default: replace. */ + streamMode?: SlackStreamMode; mediaMaxMb?: number; /** Reaction notification mode (off|own|all|allowlist). Default: own. */ reactionNotifications?: SlackReactionNotificationMode; diff --git a/src/config/zod-schema.providers-core.ts b/src/config/zod-schema.providers-core.ts index 7e1dd801313..319c167b3c0 100644 --- a/src/config/zod-schema.providers-core.ts +++ b/src/config/zod-schema.providers-core.ts @@ -460,6 +460,7 @@ export const GoogleChatAccountSchema = z chunkMode: z.enum(["length", "newline"]).optional(), blockStreaming: z.boolean().optional(), blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), + streamMode: z.enum(["replace", "status_final", "append"]).optional().default("replace"), mediaMaxMb: z.number().positive().optional(), replyToMode: ReplyToModeSchema.optional(), actions: z diff --git a/src/slack/monitor/message-handler/dispatch.ts b/src/slack/monitor/message-handler/dispatch.ts index 8022c4dbf3d..a2f515f3413 100644 --- a/src/slack/monitor/message-handler/dispatch.ts +++ b/src/slack/monitor/message-handler/dispatch.ts @@ -11,6 +11,11 @@ import { resolveStorePath, updateLastRoute } from "../../../config/sessions.js"; import { danger, logVerbose, shouldLogVerbose } from "../../../globals.js"; import { removeSlackReaction } from "../../actions.js"; import { createSlackDraftStream } from "../../draft-stream.js"; +import { + applyAppendOnlyStreamUpdate, + buildStatusFinalPreviewText, + resolveSlackStreamMode, +} from "../../stream-mode.js"; import { resolveSlackThreadTargets } from "../../threading.js"; import { createSlackReplyDeliveryPlan, deliverReplies } from "../replies.js"; @@ -112,6 +117,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag const draftChannelId = draftStream?.channelId(); const finalText = payload.text; const canFinalizeViaPreviewEdit = + streamMode !== "status_final" && mediaCount === 0 && !payload.isError && typeof finalText === "string" && @@ -134,6 +140,21 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag `slack: preview final edit failed; falling back to standard send (${String(err)})`, ); } + } else if (streamMode === "status_final" && hasStreamedMessage) { + try { + const statusChannelId = draftStream?.channelId(); + const statusMessageId = draftStream?.messageId(); + if (statusChannelId && statusMessageId) { + await ctx.app.client.chat.update({ + token: ctx.botToken, + channel: statusChannelId, + ts: statusMessageId, + text: "Status: complete. Final answer posted below.", + }); + } + } catch (err) { + logVerbose(`slack: status_final completion update failed (${String(err)})`); + } } else if (mediaCount > 0) { await draftStream?.clear(); hasStreamedMessage = false; @@ -170,11 +191,42 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag warn: logVerbose, }); let hasStreamedMessage = false; + const streamMode = resolveSlackStreamMode(account.config.streamMode); + let appendRenderedText = ""; + let appendSourceText = ""; + let statusUpdateCount = 0; const updateDraftFromPartial = (text?: string) => { const trimmed = text?.trimEnd(); if (!trimmed) { return; } + + if (streamMode === "append") { + const next = applyAppendOnlyStreamUpdate({ + incoming: trimmed, + rendered: appendRenderedText, + source: appendSourceText, + }); + appendRenderedText = next.rendered; + appendSourceText = next.source; + if (!next.changed) { + return; + } + draftStream.update(next.rendered); + hasStreamedMessage = true; + return; + } + + if (streamMode === "status_final") { + statusUpdateCount += 1; + if (statusUpdateCount > 1 && statusUpdateCount % 4 !== 0) { + return; + } + draftStream.update(buildStatusFinalPreviewText(statusUpdateCount)); + hasStreamedMessage = true; + return; + } + draftStream.update(trimmed); hasStreamedMessage = true; }; @@ -199,12 +251,18 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag if (hasStreamedMessage) { draftStream.forceNewMessage(); hasStreamedMessage = false; + appendRenderedText = ""; + appendSourceText = ""; + statusUpdateCount = 0; } }, onReasoningEnd: async () => { if (hasStreamedMessage) { draftStream.forceNewMessage(); hasStreamedMessage = false; + appendRenderedText = ""; + appendSourceText = ""; + statusUpdateCount = 0; } }, }, diff --git a/src/slack/stream-mode.test.ts b/src/slack/stream-mode.test.ts new file mode 100644 index 00000000000..aa913420059 --- /dev/null +++ b/src/slack/stream-mode.test.ts @@ -0,0 +1,78 @@ +import { describe, expect, it } from "vitest"; +import { + applyAppendOnlyStreamUpdate, + buildStatusFinalPreviewText, + resolveSlackStreamMode, +} from "./stream-mode.js"; + +describe("resolveSlackStreamMode", () => { + it("defaults to replace", () => { + expect(resolveSlackStreamMode(undefined)).toBe("replace"); + expect(resolveSlackStreamMode("")).toBe("replace"); + expect(resolveSlackStreamMode("unknown")).toBe("replace"); + }); + + it("accepts valid modes", () => { + expect(resolveSlackStreamMode("replace")).toBe("replace"); + expect(resolveSlackStreamMode("status_final")).toBe("status_final"); + expect(resolveSlackStreamMode("append")).toBe("append"); + }); +}); + +describe("applyAppendOnlyStreamUpdate", () => { + it("starts with first incoming text", () => { + const next = applyAppendOnlyStreamUpdate({ + incoming: "hello", + rendered: "", + source: "", + }); + expect(next).toEqual({ rendered: "hello", source: "hello", changed: true }); + }); + + it("uses cumulative incoming text when it extends prior source", () => { + const next = applyAppendOnlyStreamUpdate({ + incoming: "hello world", + rendered: "hello", + source: "hello", + }); + expect(next).toEqual({ + rendered: "hello world", + source: "hello world", + changed: true, + }); + }); + + it("ignores regressive shorter incoming text", () => { + const next = applyAppendOnlyStreamUpdate({ + incoming: "hello", + rendered: "hello world", + source: "hello world", + }); + expect(next).toEqual({ + rendered: "hello world", + source: "hello world", + changed: false, + }); + }); + + it("appends non-prefix incoming chunks", () => { + const next = applyAppendOnlyStreamUpdate({ + incoming: "next chunk", + rendered: "hello world", + source: "hello world", + }); + expect(next).toEqual({ + rendered: "hello world\nnext chunk", + source: "next chunk", + changed: true, + }); + }); +}); + +describe("buildStatusFinalPreviewText", () => { + it("cycles status dots", () => { + expect(buildStatusFinalPreviewText(1)).toBe("Status: thinking.."); + expect(buildStatusFinalPreviewText(2)).toBe("Status: thinking..."); + expect(buildStatusFinalPreviewText(3)).toBe("Status: thinking."); + }); +}); diff --git a/src/slack/stream-mode.ts b/src/slack/stream-mode.ts new file mode 100644 index 00000000000..be523f04d33 --- /dev/null +++ b/src/slack/stream-mode.ts @@ -0,0 +1,53 @@ +export type SlackStreamMode = "replace" | "status_final" | "append"; + +const DEFAULT_STREAM_MODE: SlackStreamMode = "replace"; + +export function resolveSlackStreamMode(raw: unknown): SlackStreamMode { + if (typeof raw !== "string") { + return DEFAULT_STREAM_MODE; + } + const normalized = raw.trim().toLowerCase(); + if (normalized === "replace" || normalized === "status_final" || normalized === "append") { + return normalized; + } + return DEFAULT_STREAM_MODE; +} + +export function applyAppendOnlyStreamUpdate(params: { + incoming: string; + rendered: string; + source: string; +}): { rendered: string; source: string; changed: boolean } { + const incoming = params.incoming.trimEnd(); + if (!incoming) { + return { rendered: params.rendered, source: params.source, changed: false }; + } + if (!params.rendered) { + return { rendered: incoming, source: incoming, changed: true }; + } + if (incoming === params.source) { + return { rendered: params.rendered, source: params.source, changed: false }; + } + + // Typical model partials are cumulative prefixes. + if (incoming.startsWith(params.source) || incoming.startsWith(params.rendered)) { + return { rendered: incoming, source: incoming, changed: incoming !== params.rendered }; + } + + // Ignore regressive shorter variants of the same stream. + if (params.source.startsWith(incoming)) { + return { rendered: params.rendered, source: params.source, changed: false }; + } + + const separator = params.rendered.endsWith("\n") ? "" : "\n"; + return { + rendered: `${params.rendered}${separator}${incoming}`, + source: incoming, + changed: true, + }; +} + +export function buildStatusFinalPreviewText(updateCount: number): string { + const dots = ".".repeat((Math.max(1, updateCount) % 3) + 1); + return `Status: thinking${dots}`; +}