feat(slack): add configurable stream modes

This commit is contained in:
Colin
2026-02-16 16:56:41 -05:00
committed by Peter Steinberger
parent 087edec93f
commit 89ce1460e1
7 changed files with 196 additions and 0 deletions

View File

@@ -329,6 +329,8 @@ export const FIELD_HELP: Record<string, string> = {
"channels.slack.commands.native": 'Override native commands for Slack (bool or "auto").',
"channels.slack.commands.nativeSkills":
'Override native skill commands for Slack (bool or "auto").',
"channels.slack.streamMode":
"Live stream preview mode for Slack replies (replace | status_final | append).",
"session.agentToAgent.maxPingPongTurns":
"Max reply-back turns between requester and target (05).",
"channels.telegram.customCommands":

View File

@@ -279,6 +279,7 @@ export const FIELD_LABELS: Record<string, string> = {
"channels.slack.appToken": "Slack App Token",
"channels.slack.userToken": "Slack User Token",
"channels.slack.userTokenReadOnly": "Slack User Token Read Only",
"channels.slack.streamMode": "Slack Stream Mode",
"channels.slack.thread.historyScope": "Slack Thread History Scope",
"channels.slack.thread.inheritParent": "Slack Thread Parent Inheritance",
"channels.slack.thread.initialHistoryLimit": "Slack Thread Initial History Limit",

View File

@@ -45,6 +45,7 @@ export type SlackChannelConfig = {
};
export type SlackReactionNotificationMode = "off" | "own" | "all" | "allowlist";
export type SlackStreamMode = "replace" | "status_final" | "append";
export type SlackActionConfig = {
reactions?: boolean;
@@ -124,6 +125,8 @@ export type SlackAccountConfig = {
blockStreaming?: boolean;
/** Merge streamed block replies before sending. */
blockStreamingCoalesce?: BlockStreamingCoalesceConfig;
/** Slack stream preview mode (replace|status_final|append). Default: replace. */
streamMode?: SlackStreamMode;
mediaMaxMb?: number;
/** Reaction notification mode (off|own|all|allowlist). Default: own. */
reactionNotifications?: SlackReactionNotificationMode;

View File

@@ -460,6 +460,7 @@ export const GoogleChatAccountSchema = z
chunkMode: z.enum(["length", "newline"]).optional(),
blockStreaming: z.boolean().optional(),
blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(),
streamMode: z.enum(["replace", "status_final", "append"]).optional().default("replace"),
mediaMaxMb: z.number().positive().optional(),
replyToMode: ReplyToModeSchema.optional(),
actions: z

View File

@@ -11,6 +11,11 @@ import { resolveStorePath, updateLastRoute } from "../../../config/sessions.js";
import { danger, logVerbose, shouldLogVerbose } from "../../../globals.js";
import { removeSlackReaction } from "../../actions.js";
import { createSlackDraftStream } from "../../draft-stream.js";
import {
applyAppendOnlyStreamUpdate,
buildStatusFinalPreviewText,
resolveSlackStreamMode,
} from "../../stream-mode.js";
import { resolveSlackThreadTargets } from "../../threading.js";
import { createSlackReplyDeliveryPlan, deliverReplies } from "../replies.js";
@@ -112,6 +117,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
const draftChannelId = draftStream?.channelId();
const finalText = payload.text;
const canFinalizeViaPreviewEdit =
streamMode !== "status_final" &&
mediaCount === 0 &&
!payload.isError &&
typeof finalText === "string" &&
@@ -134,6 +140,21 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
`slack: preview final edit failed; falling back to standard send (${String(err)})`,
);
}
} else if (streamMode === "status_final" && hasStreamedMessage) {
try {
const statusChannelId = draftStream?.channelId();
const statusMessageId = draftStream?.messageId();
if (statusChannelId && statusMessageId) {
await ctx.app.client.chat.update({
token: ctx.botToken,
channel: statusChannelId,
ts: statusMessageId,
text: "Status: complete. Final answer posted below.",
});
}
} catch (err) {
logVerbose(`slack: status_final completion update failed (${String(err)})`);
}
} else if (mediaCount > 0) {
await draftStream?.clear();
hasStreamedMessage = false;
@@ -170,11 +191,42 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
warn: logVerbose,
});
let hasStreamedMessage = false;
const streamMode = resolveSlackStreamMode(account.config.streamMode);
let appendRenderedText = "";
let appendSourceText = "";
let statusUpdateCount = 0;
const updateDraftFromPartial = (text?: string) => {
const trimmed = text?.trimEnd();
if (!trimmed) {
return;
}
if (streamMode === "append") {
const next = applyAppendOnlyStreamUpdate({
incoming: trimmed,
rendered: appendRenderedText,
source: appendSourceText,
});
appendRenderedText = next.rendered;
appendSourceText = next.source;
if (!next.changed) {
return;
}
draftStream.update(next.rendered);
hasStreamedMessage = true;
return;
}
if (streamMode === "status_final") {
statusUpdateCount += 1;
if (statusUpdateCount > 1 && statusUpdateCount % 4 !== 0) {
return;
}
draftStream.update(buildStatusFinalPreviewText(statusUpdateCount));
hasStreamedMessage = true;
return;
}
draftStream.update(trimmed);
hasStreamedMessage = true;
};
@@ -199,12 +251,18 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
if (hasStreamedMessage) {
draftStream.forceNewMessage();
hasStreamedMessage = false;
appendRenderedText = "";
appendSourceText = "";
statusUpdateCount = 0;
}
},
onReasoningEnd: async () => {
if (hasStreamedMessage) {
draftStream.forceNewMessage();
hasStreamedMessage = false;
appendRenderedText = "";
appendSourceText = "";
statusUpdateCount = 0;
}
},
},

View File

@@ -0,0 +1,78 @@
import { describe, expect, it } from "vitest";
import {
applyAppendOnlyStreamUpdate,
buildStatusFinalPreviewText,
resolveSlackStreamMode,
} from "./stream-mode.js";
describe("resolveSlackStreamMode", () => {
it("defaults to replace", () => {
expect(resolveSlackStreamMode(undefined)).toBe("replace");
expect(resolveSlackStreamMode("")).toBe("replace");
expect(resolveSlackStreamMode("unknown")).toBe("replace");
});
it("accepts valid modes", () => {
expect(resolveSlackStreamMode("replace")).toBe("replace");
expect(resolveSlackStreamMode("status_final")).toBe("status_final");
expect(resolveSlackStreamMode("append")).toBe("append");
});
});
describe("applyAppendOnlyStreamUpdate", () => {
it("starts with first incoming text", () => {
const next = applyAppendOnlyStreamUpdate({
incoming: "hello",
rendered: "",
source: "",
});
expect(next).toEqual({ rendered: "hello", source: "hello", changed: true });
});
it("uses cumulative incoming text when it extends prior source", () => {
const next = applyAppendOnlyStreamUpdate({
incoming: "hello world",
rendered: "hello",
source: "hello",
});
expect(next).toEqual({
rendered: "hello world",
source: "hello world",
changed: true,
});
});
it("ignores regressive shorter incoming text", () => {
const next = applyAppendOnlyStreamUpdate({
incoming: "hello",
rendered: "hello world",
source: "hello world",
});
expect(next).toEqual({
rendered: "hello world",
source: "hello world",
changed: false,
});
});
it("appends non-prefix incoming chunks", () => {
const next = applyAppendOnlyStreamUpdate({
incoming: "next chunk",
rendered: "hello world",
source: "hello world",
});
expect(next).toEqual({
rendered: "hello world\nnext chunk",
source: "next chunk",
changed: true,
});
});
});
describe("buildStatusFinalPreviewText", () => {
it("cycles status dots", () => {
expect(buildStatusFinalPreviewText(1)).toBe("Status: thinking..");
expect(buildStatusFinalPreviewText(2)).toBe("Status: thinking...");
expect(buildStatusFinalPreviewText(3)).toBe("Status: thinking.");
});
});

53
src/slack/stream-mode.ts Normal file
View File

@@ -0,0 +1,53 @@
export type SlackStreamMode = "replace" | "status_final" | "append";
const DEFAULT_STREAM_MODE: SlackStreamMode = "replace";
export function resolveSlackStreamMode(raw: unknown): SlackStreamMode {
if (typeof raw !== "string") {
return DEFAULT_STREAM_MODE;
}
const normalized = raw.trim().toLowerCase();
if (normalized === "replace" || normalized === "status_final" || normalized === "append") {
return normalized;
}
return DEFAULT_STREAM_MODE;
}
export function applyAppendOnlyStreamUpdate(params: {
incoming: string;
rendered: string;
source: string;
}): { rendered: string; source: string; changed: boolean } {
const incoming = params.incoming.trimEnd();
if (!incoming) {
return { rendered: params.rendered, source: params.source, changed: false };
}
if (!params.rendered) {
return { rendered: incoming, source: incoming, changed: true };
}
if (incoming === params.source) {
return { rendered: params.rendered, source: params.source, changed: false };
}
// Typical model partials are cumulative prefixes.
if (incoming.startsWith(params.source) || incoming.startsWith(params.rendered)) {
return { rendered: incoming, source: incoming, changed: incoming !== params.rendered };
}
// Ignore regressive shorter variants of the same stream.
if (params.source.startsWith(incoming)) {
return { rendered: params.rendered, source: params.source, changed: false };
}
const separator = params.rendered.endsWith("\n") ? "" : "\n";
return {
rendered: `${params.rendered}${separator}${incoming}`,
source: incoming,
changed: true,
};
}
export function buildStatusFinalPreviewText(updateCount: number): string {
const dots = ".".repeat((Math.max(1, updateCount) % 3) + 1);
return `Status: thinking${dots}`;
}