feat(slack): add native text streaming support

Adds support for Slack's Agents & AI Apps text streaming APIs
(chat.startStream, chat.appendStream, chat.stopStream) to deliver
LLM responses as a single updating message instead of separate
messages per block.

Changes:
- New src/slack/streaming.ts with stream lifecycle helpers using
  the SDK's ChatStreamer (client.chatStream())
- New 'streaming' config option on SlackAccountConfig
- Updated dispatch.ts to route block replies through the stream
  when enabled, with graceful fallback to normal delivery
- Docs in docs/channels/slack.md covering setup and requirements

The streaming integration works by intercepting the deliver callback
in the reply dispatcher. When streaming is enabled and a thread
context exists, the first text delivery starts a stream, subsequent
deliveries append to it, and the stream is finalized after dispatch
completes. Media payloads and error cases fall back to normal
message delivery.

Refs:
- https://docs.slack.dev/ai/developing-ai-apps#streaming
- https://docs.slack.dev/reference/methods/chat.startStream
- https://docs.slack.dev/reference/methods/chat.appendStream
- https://docs.slack.dev/reference/methods/chat.stopStream
This commit is contained in:
nathandenherder
2026-02-05 18:14:13 -05:00
parent 82419eaad6
commit 6945fbf100
5 changed files with 340 additions and 9 deletions

View File

@@ -1,3 +1,5 @@
import type { ReplyPayload } from "../../../auto-reply/types.js";
import type { SlackStreamSession } from "../../streaming.js";
import type { PreparedSlackMessage } from "./types.js";
import { resolveHumanDelayConfig } from "../../../agents/identity.js";
import { dispatchInboundMessage } from "../../../auto-reply/dispatch.js";
@@ -10,9 +12,39 @@ import { createTypingCallbacks } from "../../../channels/typing.js";
import { resolveStorePath, updateLastRoute } from "../../../config/sessions.js";
import { danger, logVerbose, shouldLogVerbose } from "../../../globals.js";
import { removeSlackReaction } from "../../actions.js";
import { appendSlackStream, startSlackStream, stopSlackStream } from "../../streaming.js";
import { resolveSlackThreadTargets } from "../../threading.js";
import { createSlackReplyDeliveryPlan, deliverReplies } from "../replies.js";
/**
* Check whether a reply payload contains media (images, files, etc.)
* that cannot be delivered through the streaming API.
*/
function hasMedia(payload: ReplyPayload): boolean {
return Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
}
/**
* Determine if Slack native text streaming should be used for this message.
*
* Streaming requires:
* 1. The `streaming` config option enabled on the account
* 2. A thread timestamp (streaming only works within threads)
*/
function shouldUseStreaming(params: {
streamingEnabled: boolean;
threadTs: string | undefined;
}): boolean {
if (!params.streamingEnabled) {
return false;
}
if (!params.threadTs) {
logVerbose("slack-stream: streaming disabled — no thread_ts available");
return false;
}
return true;
}
export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessage) {
const { ctx, account, message, route } = prepared;
const cfg = ctx.cfg;
@@ -102,11 +134,30 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
accountId: route.accountId,
});
const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({
...prefixOptions,
humanDelay: resolveHumanDelayConfig(cfg, route.agentId),
deliver: async (payload) => {
const replyThreadTs = replyPlan.nextThreadTs();
// -----------------------------------------------------------------------
// Slack native text streaming state
// -----------------------------------------------------------------------
const streamingEnabled = account.config.streaming === true;
const replyThreadTs = replyPlan.nextThreadTs();
const useStreaming = shouldUseStreaming({
streamingEnabled,
threadTs: replyThreadTs ?? incomingThreadTs ?? statusThreadTs,
});
let streamSession: SlackStreamSession | null = null;
let streamFailed = false;
/**
* Deliver a payload via Slack native text streaming when possible.
* Falls back to normal delivery for media payloads, errors, or if the
* streaming API call itself fails.
*/
const deliverWithStreaming = async (payload: ReplyPayload): Promise<void> => {
const effectiveThreadTs = replyPlan.nextThreadTs();
// Fall back to normal delivery for media, errors, or if streaming already failed
if (streamFailed || hasMedia(payload) || !payload.text?.trim()) {
await deliverReplies({
replies: [payload],
target: prepared.replyTarget,
@@ -114,9 +165,92 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
accountId: account.accountId,
runtime,
textLimit: ctx.textLimit,
replyThreadTs,
replyThreadTs: effectiveThreadTs,
});
replyPlan.markSent();
return;
}
const text = payload.text.trim();
try {
if (!streamSession) {
// Determine the thread_ts for the stream (required by Slack API)
const streamThreadTs = effectiveThreadTs ?? incomingThreadTs ?? statusThreadTs;
if (!streamThreadTs) {
// No thread context — can't stream, fall back
logVerbose(
"slack-stream: no thread_ts for stream start, falling back to normal delivery",
);
streamFailed = true;
await deliverReplies({
replies: [payload],
target: prepared.replyTarget,
token: ctx.botToken,
accountId: account.accountId,
runtime,
textLimit: ctx.textLimit,
replyThreadTs: effectiveThreadTs,
});
replyPlan.markSent();
return;
}
// Start a new stream
streamSession = await startSlackStream({
client: ctx.app.client,
channel: message.channel,
threadTs: streamThreadTs,
text,
});
replyPlan.markSent();
} else {
// Append to existing stream
await appendSlackStream({
session: streamSession,
text: "\n" + text,
});
}
} catch (err) {
runtime.error?.(
danger(`slack-stream: streaming API call failed: ${String(err)}, falling back`),
);
streamFailed = true;
// Fall back to normal delivery for this payload
await deliverReplies({
replies: [payload],
target: prepared.replyTarget,
token: ctx.botToken,
accountId: account.accountId,
runtime,
textLimit: ctx.textLimit,
replyThreadTs: effectiveThreadTs,
});
replyPlan.markSent();
}
};
const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({
...prefixOptions,
humanDelay: resolveHumanDelayConfig(cfg, route.agentId),
deliver: async (payload) => {
if (useStreaming) {
await deliverWithStreaming(payload);
} else {
const effectiveThreadTs = replyPlan.nextThreadTs();
await deliverReplies({
replies: [payload],
target: prepared.replyTarget,
token: ctx.botToken,
accountId: account.accountId,
runtime,
textLimit: ctx.textLimit,
replyThreadTs: effectiveThreadTs,
});
replyPlan.markSent();
}
},
onError: (err, info) => {
runtime.error?.(danger(`slack ${info.kind} reply failed: ${String(err)}`));
@@ -135,14 +269,29 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
skillFilter: prepared.channelConfig?.skills,
hasRepliedRef,
disableBlockStreaming:
typeof account.config.blockStreaming === "boolean"
? !account.config.blockStreaming
: undefined,
// When native streaming is active, keep block streaming enabled so we
// get incremental block callbacks that we route through the stream.
useStreaming
? false
: typeof account.config.blockStreaming === "boolean"
? !account.config.blockStreaming
: undefined,
onModelSelected,
},
});
markDispatchIdle();
// -----------------------------------------------------------------------
// Finalize the stream if one was started
// -----------------------------------------------------------------------
if (streamSession && !streamSession.stopped) {
try {
await stopSlackStream({ session: streamSession });
} catch (err) {
runtime.error?.(danger(`slack-stream: failed to stop stream: ${String(err)}`));
}
}
const anyReplyDelivered = queuedFinal || (counts.block ?? 0) > 0 || (counts.final ?? 0) > 0;
if (!anyReplyDelivered) {

136
src/slack/streaming.ts Normal file
View File

@@ -0,0 +1,136 @@
/**
* Slack native text streaming helpers.
*
* Uses the Slack SDK's `ChatStreamer` (via `client.chatStream()`) to stream
* text responses word-by-word in a single updating message, matching Slack's
* "Agents & AI Apps" streaming UX.
*
* @see https://docs.slack.dev/ai/developing-ai-apps#streaming
* @see https://docs.slack.dev/reference/methods/chat.startStream
* @see https://docs.slack.dev/reference/methods/chat.appendStream
* @see https://docs.slack.dev/reference/methods/chat.stopStream
*/
import type { ChatStreamer, WebClient } from "@slack/web-api";
import { logVerbose } from "../globals.js";
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
export type SlackStreamSession = {
/** The SDK ChatStreamer instance managing this stream. */
streamer: ChatStreamer;
/** Channel this stream lives in. */
channel: string;
/** Thread timestamp (required for streaming). */
threadTs: string;
/** True once stop() has been called. */
stopped: boolean;
};
export type StartSlackStreamParams = {
client: WebClient;
channel: string;
threadTs: string;
/** Optional initial markdown text to include in the stream start. */
text?: string;
};
export type AppendSlackStreamParams = {
session: SlackStreamSession;
text: string;
};
export type StopSlackStreamParams = {
session: SlackStreamSession;
/** Optional final markdown text to append before stopping. */
text?: string;
};
// ---------------------------------------------------------------------------
// Stream lifecycle
// ---------------------------------------------------------------------------
/**
* Start a new Slack text stream.
*
* Returns a {@link SlackStreamSession} that should be passed to
* {@link appendSlackStream} and {@link stopSlackStream}.
*
* The first chunk of text can optionally be included via `text`.
*/
export async function startSlackStream(
params: StartSlackStreamParams,
): Promise<SlackStreamSession> {
const { client, channel, threadTs, text } = params;
logVerbose(`slack-stream: starting stream in ${channel} thread=${threadTs}`);
const streamer = client.chatStream({
channel,
thread_ts: threadTs,
});
const session: SlackStreamSession = {
streamer,
channel,
threadTs,
stopped: false,
};
// If initial text is provided, send it as the first append which will
// trigger the ChatStreamer to call chat.startStream under the hood.
if (text) {
await streamer.append({ markdown_text: text });
logVerbose(`slack-stream: appended initial text (${text.length} chars)`);
}
return session;
}
/**
* Append markdown text to an active Slack stream.
*/
export async function appendSlackStream(params: AppendSlackStreamParams): Promise<void> {
const { session, text } = params;
if (session.stopped) {
logVerbose("slack-stream: attempted to append to a stopped stream, ignoring");
return;
}
if (!text) {
return;
}
await session.streamer.append({ markdown_text: text });
logVerbose(`slack-stream: appended ${text.length} chars`);
}
/**
* Stop (finalize) a Slack stream.
*
* After calling this the stream message becomes a normal Slack message.
* Optionally include final text to append before stopping.
*/
export async function stopSlackStream(params: StopSlackStreamParams): Promise<void> {
const { session, text } = params;
if (session.stopped) {
logVerbose("slack-stream: stream already stopped, ignoring duplicate stop");
return;
}
session.stopped = true;
logVerbose(
`slack-stream: stopping stream in ${session.channel} thread=${session.threadTs}${
text ? ` (final text: ${text.length} chars)` : ""
}`,
);
await session.streamer.stop(text ? { markdown_text: text } : undefined);
logVerbose("slack-stream: stream stopped");
}