refactor(config): unify streaming config across channels

This commit is contained in:
Peter Steinberger
2026-02-21 19:53:23 +01:00
parent 747bb581b3
commit 2c14b0cf4c
26 changed files with 885 additions and 156 deletions

View File

@@ -68,6 +68,42 @@ describe("doctor config flow", () => {
});
});
it("preserves discord streaming intent while stripping unsupported keys on repair", async () => {
const result = await runDoctorConfigWithInput({
repair: true,
config: {
channels: {
discord: {
streaming: true,
lifecycle: {
enabled: true,
reactions: {
queued: "⏳",
thinking: "🧠",
tool: "🔧",
done: "✅",
error: "❌",
},
},
},
},
},
});
const cfg = result.cfg as {
channels: {
discord: {
streamMode?: string;
streaming?: string;
lifecycle?: unknown;
};
};
};
expect(cfg.channels.discord.streaming).toBe("partial");
expect(cfg.channels.discord.streamMode).toBeUndefined();
expect(cfg.channels.discord.lifecycle).toBeUndefined();
});
it("resolves Telegram @username allowFrom entries to numeric IDs on repair", async () => {
const fetchSpy = vi.fn(async (url: string) => {
const u = String(url);

View File

@@ -145,4 +145,81 @@ describe("normalizeLegacyConfigValues", () => {
"Moved channels.discord.accounts.work.dm.allowFrom → channels.discord.accounts.work.allowFrom.",
]);
});
it("migrates Discord streaming boolean alias to streaming enum", () => {
const res = normalizeLegacyConfigValues({
channels: {
discord: {
streaming: true,
accounts: {
work: {
streaming: false,
},
},
},
},
});
expect(res.config.channels?.discord?.streaming).toBe("partial");
expect(res.config.channels?.discord?.streamMode).toBeUndefined();
expect(res.config.channels?.discord?.accounts?.work?.streaming).toBe("off");
expect(res.config.channels?.discord?.accounts?.work?.streamMode).toBeUndefined();
expect(res.changes).toEqual([
"Normalized channels.discord.streaming boolean → enum (partial).",
"Normalized channels.discord.accounts.work.streaming boolean → enum (off).",
]);
});
it("migrates Discord legacy streamMode into streaming enum", () => {
const res = normalizeLegacyConfigValues({
channels: {
discord: {
streaming: false,
streamMode: "block",
},
},
});
expect(res.config.channels?.discord?.streaming).toBe("block");
expect(res.config.channels?.discord?.streamMode).toBeUndefined();
expect(res.changes).toEqual([
"Moved channels.discord.streamMode → channels.discord.streaming (block).",
"Normalized channels.discord.streaming boolean → enum (block).",
]);
});
it("migrates Telegram streamMode into streaming enum", () => {
const res = normalizeLegacyConfigValues({
channels: {
telegram: {
streamMode: "block",
},
},
});
expect(res.config.channels?.telegram?.streaming).toBe("block");
expect(res.config.channels?.telegram?.streamMode).toBeUndefined();
expect(res.changes).toEqual([
"Moved channels.telegram.streamMode → channels.telegram.streaming (block).",
]);
});
it("migrates Slack legacy streaming keys to unified config", () => {
const res = normalizeLegacyConfigValues({
channels: {
slack: {
streaming: false,
streamMode: "status_final",
},
},
});
expect(res.config.channels?.slack?.streaming).toBe("progress");
expect(res.config.channels?.slack?.nativeStreaming).toBe(false);
expect(res.config.channels?.slack?.streamMode).toBeUndefined();
expect(res.changes).toEqual([
"Moved channels.slack.streamMode → channels.slack.streaming (progress).",
"Moved channels.slack.streaming (boolean) → channels.slack.nativeStreaming (false).",
]);
});
});

View File

@@ -1,4 +1,11 @@
import type { OpenClawConfig } from "../config/config.js";
import {
resolveDiscordPreviewStreamMode,
resolveSlackNativeStreaming,
resolveSlackStreamingMode,
resolveTelegramPreviewStreamMode,
} from "../config/discord-preview-streaming.js";
export function normalizeLegacyConfigValues(cfg: OpenClawConfig): {
config: OpenClawConfig;
changes: string[];
@@ -90,20 +97,178 @@ export function normalizeLegacyConfigValues(cfg: OpenClawConfig): {
return { entry: updated, changed };
};
const normalizeProvider = (provider: "slack" | "discord") => {
const normalizeTelegramStreamingAliases = (params: {
entry: Record<string, unknown>;
pathPrefix: string;
}): { entry: Record<string, unknown>; changed: boolean } => {
let updated = params.entry;
const hadLegacyStreamMode = updated.streamMode !== undefined;
const beforeStreaming = updated.streaming;
const resolved = resolveTelegramPreviewStreamMode(updated);
const shouldNormalize =
hadLegacyStreamMode ||
typeof beforeStreaming === "boolean" ||
(typeof beforeStreaming === "string" && beforeStreaming !== resolved);
if (!shouldNormalize) {
return { entry: updated, changed: false };
}
let changed = false;
if (beforeStreaming !== resolved) {
updated = { ...updated, streaming: resolved };
changed = true;
}
if (hadLegacyStreamMode) {
const { streamMode: _ignored, ...rest } = updated;
updated = rest;
changed = true;
changes.push(
`Moved ${params.pathPrefix}.streamMode → ${params.pathPrefix}.streaming (${resolved}).`,
);
}
if (typeof beforeStreaming === "boolean") {
changes.push(`Normalized ${params.pathPrefix}.streaming boolean → enum (${resolved}).`);
} else if (typeof beforeStreaming === "string" && beforeStreaming !== resolved) {
changes.push(
`Normalized ${params.pathPrefix}.streaming (${beforeStreaming}) → (${resolved}).`,
);
}
return { entry: updated, changed };
};
const normalizeDiscordStreamingAliases = (params: {
entry: Record<string, unknown>;
pathPrefix: string;
}): { entry: Record<string, unknown>; changed: boolean } => {
let updated = params.entry;
const hadLegacyStreamMode = updated.streamMode !== undefined;
const beforeStreaming = updated.streaming;
const resolved = resolveDiscordPreviewStreamMode(updated);
const shouldNormalize =
hadLegacyStreamMode ||
typeof beforeStreaming === "boolean" ||
(typeof beforeStreaming === "string" && beforeStreaming !== resolved);
if (!shouldNormalize) {
return { entry: updated, changed: false };
}
let changed = false;
if (beforeStreaming !== resolved) {
updated = { ...updated, streaming: resolved };
changed = true;
}
if (hadLegacyStreamMode) {
const { streamMode: _ignored, ...rest } = updated;
updated = rest;
changed = true;
changes.push(
`Moved ${params.pathPrefix}.streamMode → ${params.pathPrefix}.streaming (${resolved}).`,
);
}
if (typeof beforeStreaming === "boolean") {
changes.push(`Normalized ${params.pathPrefix}.streaming boolean → enum (${resolved}).`);
} else if (typeof beforeStreaming === "string" && beforeStreaming !== resolved) {
changes.push(
`Normalized ${params.pathPrefix}.streaming (${beforeStreaming}) → (${resolved}).`,
);
}
return { entry: updated, changed };
};
const normalizeSlackStreamingAliases = (params: {
entry: Record<string, unknown>;
pathPrefix: string;
}): { entry: Record<string, unknown>; changed: boolean } => {
let updated = params.entry;
const hadLegacyStreamMode = updated.streamMode !== undefined;
const legacyStreaming = updated.streaming;
const beforeStreaming = updated.streaming;
const beforeNativeStreaming = updated.nativeStreaming;
const resolvedStreaming = resolveSlackStreamingMode(updated);
const resolvedNativeStreaming = resolveSlackNativeStreaming(updated);
const shouldNormalize =
hadLegacyStreamMode ||
typeof legacyStreaming === "boolean" ||
(typeof legacyStreaming === "string" && legacyStreaming !== resolvedStreaming);
if (!shouldNormalize) {
return { entry: updated, changed: false };
}
let changed = false;
if (beforeStreaming !== resolvedStreaming) {
updated = { ...updated, streaming: resolvedStreaming };
changed = true;
}
if (
typeof beforeNativeStreaming !== "boolean" ||
beforeNativeStreaming !== resolvedNativeStreaming
) {
updated = { ...updated, nativeStreaming: resolvedNativeStreaming };
changed = true;
}
if (hadLegacyStreamMode) {
const { streamMode: _ignored, ...rest } = updated;
updated = rest;
changed = true;
changes.push(
`Moved ${params.pathPrefix}.streamMode → ${params.pathPrefix}.streaming (${resolvedStreaming}).`,
);
}
if (typeof legacyStreaming === "boolean") {
changes.push(
`Moved ${params.pathPrefix}.streaming (boolean) → ${params.pathPrefix}.nativeStreaming (${resolvedNativeStreaming}).`,
);
} else if (typeof legacyStreaming === "string" && legacyStreaming !== resolvedStreaming) {
changes.push(
`Normalized ${params.pathPrefix}.streaming (${legacyStreaming}) → (${resolvedStreaming}).`,
);
}
return { entry: updated, changed };
};
const normalizeProvider = (provider: "telegram" | "slack" | "discord") => {
const channels = next.channels as Record<string, unknown> | undefined;
const rawEntry = channels?.[provider];
if (!isRecord(rawEntry)) {
return;
}
const base = normalizeDmAliases({
provider,
entry: rawEntry,
pathPrefix: `channels.${provider}`,
});
let updated = base.entry;
let changed = base.changed;
let updated = rawEntry;
let changed = false;
if (provider !== "telegram") {
const base = normalizeDmAliases({
provider,
entry: rawEntry,
pathPrefix: `channels.${provider}`,
});
updated = base.entry;
changed = base.changed;
}
if (provider === "telegram") {
const streaming = normalizeTelegramStreamingAliases({
entry: updated,
pathPrefix: `channels.${provider}`,
});
updated = streaming.entry;
changed = changed || streaming.changed;
} else if (provider === "discord") {
const streaming = normalizeDiscordStreamingAliases({
entry: updated,
pathPrefix: `channels.${provider}`,
});
updated = streaming.entry;
changed = changed || streaming.changed;
} else if (provider === "slack") {
const streaming = normalizeSlackStreamingAliases({
entry: updated,
pathPrefix: `channels.${provider}`,
});
updated = streaming.entry;
changed = changed || streaming.changed;
}
const rawAccounts = updated.accounts;
if (isRecord(rawAccounts)) {
@@ -113,13 +278,41 @@ export function normalizeLegacyConfigValues(cfg: OpenClawConfig): {
if (!isRecord(rawAccount)) {
continue;
}
const res = normalizeDmAliases({
provider,
entry: rawAccount,
pathPrefix: `channels.${provider}.accounts.${accountId}`,
});
if (res.changed) {
accounts[accountId] = res.entry;
let accountEntry = rawAccount;
let accountChanged = false;
if (provider !== "telegram") {
const res = normalizeDmAliases({
provider,
entry: rawAccount,
pathPrefix: `channels.${provider}.accounts.${accountId}`,
});
accountEntry = res.entry;
accountChanged = res.changed;
}
if (provider === "telegram") {
const streaming = normalizeTelegramStreamingAliases({
entry: accountEntry,
pathPrefix: `channels.${provider}.accounts.${accountId}`,
});
accountEntry = streaming.entry;
accountChanged = accountChanged || streaming.changed;
} else if (provider === "discord") {
const streaming = normalizeDiscordStreamingAliases({
entry: accountEntry,
pathPrefix: `channels.${provider}.accounts.${accountId}`,
});
accountEntry = streaming.entry;
accountChanged = accountChanged || streaming.changed;
} else if (provider === "slack") {
const streaming = normalizeSlackStreamingAliases({
entry: accountEntry,
pathPrefix: `channels.${provider}.accounts.${accountId}`,
});
accountEntry = streaming.entry;
accountChanged = accountChanged || streaming.changed;
}
if (accountChanged) {
accounts[accountId] = accountEntry;
accountsChanged = true;
}
}
@@ -140,6 +333,7 @@ export function normalizeLegacyConfigValues(cfg: OpenClawConfig): {
}
};
normalizeProvider("telegram");
normalizeProvider("slack");
normalizeProvider("discord");

View File

@@ -378,27 +378,27 @@ describe("legacy config detection", () => {
expect(res.config.channels?.telegram?.groupPolicy).toBe("allowlist");
}
});
it("defaults telegram.streaming to false when telegram section exists", async () => {
it("defaults telegram.streaming to off when telegram section exists", async () => {
const res = validateConfigObject({ channels: { telegram: {} } });
expect(res.ok).toBe(true);
if (res.ok) {
expect(res.config.channels?.telegram?.streaming).toBe(false);
expect(res.config.channels?.telegram?.streaming).toBe("off");
expect(res.config.channels?.telegram?.streamMode).toBeUndefined();
}
});
it("migrates legacy telegram.streamMode=off to streaming=false", async () => {
it("migrates legacy telegram.streamMode=off to streaming=off", async () => {
const res = validateConfigObject({ channels: { telegram: { streamMode: "off" } } });
expect(res.ok).toBe(true);
if (res.ok) {
expect(res.config.channels?.telegram?.streaming).toBe(false);
expect(res.config.channels?.telegram?.streaming).toBe("off");
expect(res.config.channels?.telegram?.streamMode).toBeUndefined();
}
});
it("migrates legacy telegram.streamMode=block to streaming=true", async () => {
it("migrates legacy telegram.streamMode=block to streaming=block", async () => {
const res = validateConfigObject({ channels: { telegram: { streamMode: "block" } } });
expect(res.ok).toBe(true);
if (res.ok) {
expect(res.config.channels?.telegram?.streaming).toBe(true);
expect(res.config.channels?.telegram?.streaming).toBe("block");
expect(res.config.channels?.telegram?.streamMode).toBeUndefined();
}
});
@@ -416,10 +416,113 @@ describe("legacy config detection", () => {
});
expect(res.ok).toBe(true);
if (res.ok) {
expect(res.config.channels?.telegram?.accounts?.ops?.streaming).toBe(false);
expect(res.config.channels?.telegram?.accounts?.ops?.streaming).toBe("off");
expect(res.config.channels?.telegram?.accounts?.ops?.streamMode).toBeUndefined();
}
});
it("normalizes channels.discord.streaming booleans in legacy migration", async () => {
const res = migrateLegacyConfig({
channels: {
discord: {
streaming: true,
},
},
});
expect(res.changes).toContain(
"Normalized channels.discord.streaming boolean → enum (partial).",
);
expect(res.config?.channels?.discord?.streaming).toBe("partial");
expect(res.config?.channels?.discord?.streamMode).toBeUndefined();
});
it("migrates channels.discord.streamMode to channels.discord.streaming in legacy migration", async () => {
const res = migrateLegacyConfig({
channels: {
discord: {
streaming: false,
streamMode: "block",
},
},
});
expect(res.changes).toContain(
"Moved channels.discord.streamMode → channels.discord.streaming (block).",
);
expect(res.changes).toContain("Normalized channels.discord.streaming boolean → enum (block).");
expect(res.config?.channels?.discord?.streaming).toBe("block");
expect(res.config?.channels?.discord?.streamMode).toBeUndefined();
});
it("migrates discord.streaming=true to streaming=partial", async () => {
const res = validateConfigObject({ channels: { discord: { streaming: true } } });
expect(res.ok).toBe(true);
if (res.ok) {
expect(res.config.channels?.discord?.streaming).toBe("partial");
expect(res.config.channels?.discord?.streamMode).toBeUndefined();
}
});
it("migrates discord.streaming=false to streaming=off", async () => {
const res = validateConfigObject({ channels: { discord: { streaming: false } } });
expect(res.ok).toBe(true);
if (res.ok) {
expect(res.config.channels?.discord?.streaming).toBe("off");
expect(res.config.channels?.discord?.streamMode).toBeUndefined();
}
});
it("keeps explicit discord.streamMode and normalizes to streaming", async () => {
const res = validateConfigObject({
channels: { discord: { streamMode: "block", streaming: false } },
});
expect(res.ok).toBe(true);
if (res.ok) {
expect(res.config.channels?.discord?.streaming).toBe("block");
expect(res.config.channels?.discord?.streamMode).toBeUndefined();
}
});
it("migrates discord.accounts.*.streaming alias to streaming enum", async () => {
const res = validateConfigObject({
channels: {
discord: {
accounts: {
work: {
streaming: true,
},
},
},
},
});
expect(res.ok).toBe(true);
if (res.ok) {
expect(res.config.channels?.discord?.accounts?.work?.streaming).toBe("partial");
expect(res.config.channels?.discord?.accounts?.work?.streamMode).toBeUndefined();
}
});
it("migrates slack.streamMode values to slack.streaming enum", async () => {
const res = validateConfigObject({
channels: {
slack: {
streamMode: "status_final",
},
},
});
expect(res.ok).toBe(true);
if (res.ok) {
expect(res.config.channels?.slack?.streaming).toBe("progress");
expect(res.config.channels?.slack?.streamMode).toBeUndefined();
expect(res.config.channels?.slack?.nativeStreaming).toBe(true);
}
});
it("migrates legacy slack.streaming boolean to nativeStreaming", async () => {
const res = validateConfigObject({
channels: {
slack: {
streaming: false,
},
},
});
expect(res.ok).toBe(true);
if (res.ok) {
expect(res.config.channels?.slack?.streaming).toBe("partial");
expect(res.config.channels?.slack?.nativeStreaming).toBe(false);
}
});
it('rejects whatsapp.dmPolicy="open" without allowFrom "*"', async () => {
const res = validateConfigObject({
channels: {

View File

@@ -1,3 +1,9 @@
import {
resolveDiscordPreviewStreamMode,
resolveSlackNativeStreaming,
resolveSlackStreamingMode,
resolveTelegramPreviewStreamMode,
} from "./discord-preview-streaming.js";
import {
ensureRecord,
getRecord,
@@ -206,6 +212,115 @@ export const LEGACY_CONFIG_MIGRATIONS_PART_1: LegacyConfigMigration[] = [
raw.channels = channels;
},
},
{
id: "channels.streaming-keys->channels.streaming",
describe:
"Normalize legacy streaming keys to channels.<provider>.streaming (Telegram/Discord/Slack)",
apply: (raw, changes) => {
const channels = getRecord(raw.channels);
if (!channels) {
return;
}
const migrateProviderEntry = (params: {
provider: "telegram" | "discord" | "slack";
entry: Record<string, unknown>;
pathPrefix: string;
}) => {
const hasLegacyStreamMode = params.entry.streamMode !== undefined;
const legacyStreaming = params.entry.streaming;
const legacyNativeStreaming = params.entry.nativeStreaming;
if (params.provider === "telegram") {
if (!hasLegacyStreamMode && typeof legacyStreaming !== "boolean") {
return;
}
const resolved = resolveTelegramPreviewStreamMode(params.entry);
params.entry.streaming = resolved;
if (hasLegacyStreamMode) {
delete params.entry.streamMode;
changes.push(
`Moved ${params.pathPrefix}.streamMode → ${params.pathPrefix}.streaming (${resolved}).`,
);
}
if (typeof legacyStreaming === "boolean") {
changes.push(`Normalized ${params.pathPrefix}.streaming boolean → enum (${resolved}).`);
}
return;
}
if (params.provider === "discord") {
if (!hasLegacyStreamMode && typeof legacyStreaming !== "boolean") {
return;
}
const resolved = resolveDiscordPreviewStreamMode(params.entry);
params.entry.streaming = resolved;
if (hasLegacyStreamMode) {
delete params.entry.streamMode;
changes.push(
`Moved ${params.pathPrefix}.streamMode → ${params.pathPrefix}.streaming (${resolved}).`,
);
}
if (typeof legacyStreaming === "boolean") {
changes.push(`Normalized ${params.pathPrefix}.streaming boolean → enum (${resolved}).`);
}
return;
}
if (!hasLegacyStreamMode && typeof legacyStreaming !== "boolean") {
return;
}
const resolvedStreaming = resolveSlackStreamingMode(params.entry);
const resolvedNativeStreaming = resolveSlackNativeStreaming(params.entry);
params.entry.streaming = resolvedStreaming;
params.entry.nativeStreaming = resolvedNativeStreaming;
if (hasLegacyStreamMode) {
delete params.entry.streamMode;
changes.push(
`Moved ${params.pathPrefix}.streamMode → ${params.pathPrefix}.streaming (${resolvedStreaming}).`,
);
}
if (typeof legacyStreaming === "boolean") {
changes.push(
`Moved ${params.pathPrefix}.streaming (boolean) → ${params.pathPrefix}.nativeStreaming (${resolvedNativeStreaming}).`,
);
} else if (typeof legacyNativeStreaming !== "boolean" && hasLegacyStreamMode) {
changes.push(`Set ${params.pathPrefix}.nativeStreaming → ${resolvedNativeStreaming}.`);
}
};
const migrateProvider = (provider: "telegram" | "discord" | "slack") => {
const providerEntry = getRecord(channels[provider]);
if (!providerEntry) {
return;
}
migrateProviderEntry({
provider,
entry: providerEntry,
pathPrefix: `channels.${provider}`,
});
const accounts = getRecord(providerEntry.accounts);
if (!accounts) {
return;
}
for (const [accountId, accountValue] of Object.entries(accounts)) {
const account = getRecord(accountValue);
if (!account) {
continue;
}
migrateProviderEntry({
provider,
entry: account,
pathPrefix: `channels.${provider}.accounts.${accountId}`,
});
}
};
migrateProvider("telegram");
migrateProvider("discord");
migrateProvider("slack");
},
},
{
id: "routing.allowFrom->channels.whatsapp.allowFrom",
describe: "Move routing.allowFrom to channels.whatsapp.allowFrom",

View File

@@ -379,8 +379,12 @@ export const FIELD_HELP: Record<string, string> = {
"channels.slack.commands.native": 'Override native commands for Slack (bool or "auto").',
"channels.slack.commands.nativeSkills":
'Override native skill commands for Slack (bool or "auto").',
"channels.slack.streaming":
'Unified Slack stream preview mode: "off" | "partial" | "block" | "progress". Legacy boolean/streamMode keys are auto-mapped.',
"channels.slack.nativeStreaming":
"Enable native Slack text streaming (chat.startStream/chat.appendStream/chat.stopStream) when channels.slack.streaming is partial (default: true).",
"channels.slack.streamMode":
"Live stream preview mode for Slack replies (replace | status_final | append).",
"Legacy Slack preview mode alias (replace | status_final | append); auto-migrated to channels.slack.streaming.",
"session.agentToAgent.maxPingPongTurns":
"Max reply-back turns between requester and target (05).",
"channels.telegram.customCommands":
@@ -403,13 +407,15 @@ export const FIELD_HELP: Record<string, string> = {
"channels.telegram.dmPolicy":
'Direct message access control ("pairing" recommended). "open" requires channels.telegram.allowFrom=["*"].',
"channels.telegram.streaming":
"Enable Telegram live stream preview via message edits (default: false; legacy streamMode auto-maps here).",
'Unified Telegram stream preview mode: "off" | "partial" | "block" | "progress". "progress" maps to "partial" on Telegram. Legacy boolean/streamMode keys are auto-mapped.',
"channels.discord.streaming":
'Unified Discord stream preview mode: "off" | "partial" | "block" | "progress". "progress" maps to "partial" on Discord. Legacy boolean/streamMode keys are auto-mapped.',
"channels.discord.streamMode":
"Live stream preview mode for Discord replies (off | partial | block). Separate from block streaming; uses sendMessage + editMessage.",
"Legacy Discord preview mode alias (off | partial | block); auto-migrated to channels.discord.streaming.",
"channels.discord.draftChunk.minChars":
'Minimum chars before emitting a Discord stream preview update when channels.discord.streamMode="block" (default: 200).',
'Minimum chars before emitting a Discord stream preview update when channels.discord.streaming="block" (default: 200).',
"channels.discord.draftChunk.maxChars":
'Target max size for a Discord stream preview chunk when channels.discord.streamMode="block" (default: 800; clamped to channels.discord.textChunkLimit).',
'Target max size for a Discord stream preview chunk when channels.discord.streaming="block" (default: 800; clamped to channels.discord.textChunkLimit).',
"channels.discord.draftChunk.breakPreference":
"Preferred breakpoints for Discord draft chunks (paragraph | newline | sentence). Default: paragraph.",
"channels.telegram.retry.attempts":

View File

@@ -265,7 +265,7 @@ export const FIELD_LABELS: Record<string, string> = {
...IRC_FIELD_LABELS,
"channels.telegram.botToken": "Telegram Bot Token",
"channels.telegram.dmPolicy": "Telegram DM Policy",
"channels.telegram.streaming": "Telegram Streaming",
"channels.telegram.streaming": "Telegram Streaming Mode",
"channels.telegram.retry.attempts": "Telegram Retry Attempts",
"channels.telegram.retry.minDelayMs": "Telegram Retry Min Delay (ms)",
"channels.telegram.retry.maxDelayMs": "Telegram Retry Max Delay (ms)",
@@ -281,7 +281,8 @@ export const FIELD_LABELS: Record<string, string> = {
"channels.bluebubbles.dmPolicy": "BlueBubbles DM Policy",
"channels.discord.dmPolicy": "Discord DM Policy",
"channels.discord.dm.policy": "Discord DM Policy",
"channels.discord.streamMode": "Discord Stream Mode",
"channels.discord.streaming": "Discord Streaming Mode",
"channels.discord.streamMode": "Discord Stream Mode (Legacy)",
"channels.discord.draftChunk.minChars": "Discord Draft Chunk Min Chars",
"channels.discord.draftChunk.maxChars": "Discord Draft Chunk Max Chars",
"channels.discord.draftChunk.breakPreference": "Discord Draft Chunk Break Preference",
@@ -312,7 +313,9 @@ export const FIELD_LABELS: Record<string, string> = {
"channels.slack.appToken": "Slack App Token",
"channels.slack.userToken": "Slack User Token",
"channels.slack.userTokenReadOnly": "Slack User Token Read Only",
"channels.slack.streamMode": "Slack Stream Mode",
"channels.slack.streaming": "Slack Streaming Mode",
"channels.slack.nativeStreaming": "Slack Native Streaming",
"channels.slack.streamMode": "Slack Stream Mode (Legacy)",
"channels.slack.thread.historyScope": "Slack Thread History Scope",
"channels.slack.thread.inheritParent": "Slack Thread Parent Inheritance",
"channels.slack.thread.initialHistoryLimit": "Slack Thread Initial History Limit",

View File

@@ -13,7 +13,7 @@ import type { DmConfig, ProviderCommandsConfig } from "./types.messages.js";
import type { GroupToolPolicyBySenderConfig, GroupToolPolicyConfig } from "./types.tools.js";
import type { TtsConfig } from "./types.tts.js";
export type DiscordStreamMode = "partial" | "block" | "off";
export type DiscordStreamMode = "off" | "partial" | "block" | "progress";
export type DiscordDmConfig = {
/** If false, ignore all incoming Discord DMs. Default: true. */
@@ -198,14 +198,20 @@ export type DiscordAccountConfig = {
/** Disable block streaming for this account. */
blockStreaming?: boolean;
/**
* Live preview streaming mode (edit-based, like Telegram).
* - "partial": send a message and continuously edit it with new content as tokens arrive.
* - "block": stream previews in draft-sized chunks (like Telegram block mode).
* - "off": no preview streaming (default).
* When enabled, block streaming is automatically suppressed to avoid double-streaming.
* Live stream preview mode:
* - "off": disable preview updates
* - "partial": edit a single preview message
* - "block": stream in chunked preview updates
* - "progress": alias that maps to "partial" on Discord
*
* Legacy boolean values are still accepted and auto-migrated.
*/
streamMode?: DiscordStreamMode;
/** Chunking config for Discord stream previews in `streamMode: "block"`. */
streaming?: DiscordStreamMode | boolean;
/**
* @deprecated Legacy key; migrated automatically to `streaming`.
*/
streamMode?: "partial" | "block" | "off";
/** Chunking config for Discord stream previews in `streaming: "block"`. */
draftChunk?: BlockStreamingChunkConfig;
/** Merge streamed block replies before sending. */
blockStreamingCoalesce?: BlockStreamingCoalesceConfig;

View File

@@ -45,7 +45,8 @@ export type SlackChannelConfig = {
};
export type SlackReactionNotificationMode = "off" | "own" | "all" | "allowlist";
export type SlackStreamMode = "replace" | "status_final" | "append";
export type SlackStreamingMode = "off" | "partial" | "block" | "progress";
export type SlackLegacyStreamMode = "replace" | "status_final" | "append";
export type SlackActionConfig = {
reactions?: boolean;
@@ -126,14 +127,22 @@ export type SlackAccountConfig = {
/** Merge streamed block replies before sending. */
blockStreamingCoalesce?: BlockStreamingCoalesceConfig;
/**
* Enable Slack native text streaming (Agents & AI Apps). Default: true.
* Stream preview mode:
* - "off": disable live preview streaming
* - "partial": replace preview text with the latest partial output (default)
* - "block": append chunked preview updates
* - "progress": show progress status, then send final text
*
* Set to `false` to disable native Slack text streaming and use normal reply
* delivery behavior only.
* Legacy boolean values are still accepted and auto-migrated.
*/
streaming?: boolean;
/** Slack stream preview mode (replace|status_final|append). Default: replace. */
streamMode?: SlackStreamMode;
streaming?: SlackStreamingMode | boolean;
/**
* Slack native text streaming toggle (`chat.startStream` / `chat.appendStream` / `chat.stopStream`).
* Used when `streaming` is `partial`. Default: true.
*/
nativeStreaming?: boolean;
/** @deprecated Legacy preview mode key; migrated automatically to `streaming`. */
streamMode?: SlackLegacyStreamMode;
mediaMaxMb?: number;
/** Reaction notification mode (off|own|all|allowlist). Default: own. */
reactionNotifications?: SlackReactionNotificationMode;

View File

@@ -28,6 +28,7 @@ export type TelegramNetworkConfig = {
};
export type TelegramInlineButtonsScope = "off" | "dm" | "group" | "all" | "allowlist";
export type TelegramStreamingMode = "off" | "partial" | "block" | "progress";
export type TelegramCapabilitiesConfig =
| string[]
@@ -95,15 +96,23 @@ export type TelegramAccountConfig = {
textChunkLimit?: number;
/** Chunking mode: "length" (default) splits by size; "newline" splits on every newline. */
chunkMode?: "length" | "newline";
/** Enable live stream preview via message edits (default: true). */
streaming?: boolean;
/**
* Stream preview mode:
* - "off": disable preview updates
* - "partial": edit a single preview message
* - "block": stream in larger chunked updates
* - "progress": alias that maps to "partial" on Telegram
*
* Legacy boolean values are still accepted and auto-migrated.
*/
streaming?: TelegramStreamingMode | boolean;
/** Disable block streaming for this account. */
blockStreaming?: boolean;
/** @deprecated Legacy chunking config from `streamMode: "block"`; ignored after migration. */
draftChunk?: BlockStreamingChunkConfig;
/** Merge streamed block replies before sending. */
blockStreamingCoalesce?: BlockStreamingCoalesceConfig;
/** @deprecated Legacy key; migrated automatically to `streaming` boolean. */
/** @deprecated Legacy key; migrated automatically to `streaming`. */
streamMode?: "off" | "partial" | "block";
mediaMaxMb?: number;
/** Telegram API client timeout in seconds (grammY ApiClientOptions). */

View File

@@ -1,6 +1,12 @@
import { z } from "zod";
import { isSafeScpRemoteHost } from "../infra/scp-host.js";
import { isValidInboundPathRootPattern } from "../media/inbound-path-policy.js";
import {
resolveDiscordPreviewStreamMode,
resolveSlackNativeStreaming,
resolveSlackStreamingMode,
resolveTelegramPreviewStreamMode,
} from "./discord-preview-streaming.js";
import {
normalizeTelegramCommandDescription,
normalizeTelegramCommandName,
@@ -99,25 +105,24 @@ const validateTelegramCustomCommands = (
}
};
function normalizeTelegramStreamingConfig(value: {
streaming?: boolean;
streamMode?: "off" | "partial" | "block";
function normalizeTelegramStreamingConfig(value: { streaming?: unknown; streamMode?: unknown }) {
value.streaming = resolveTelegramPreviewStreamMode(value);
delete value.streamMode;
}
function normalizeDiscordStreamingConfig(value: { streaming?: unknown; streamMode?: unknown }) {
value.streaming = resolveDiscordPreviewStreamMode(value);
delete value.streamMode;
}
function normalizeSlackStreamingConfig(value: {
streaming?: unknown;
nativeStreaming?: unknown;
streamMode?: unknown;
}) {
if (typeof value.streaming === "boolean") {
delete value.streamMode;
return;
}
if (value.streamMode === "off") {
value.streaming = false;
delete value.streamMode;
return;
}
if (value.streamMode === "partial" || value.streamMode === "block") {
value.streaming = true;
delete value.streamMode;
return;
}
value.streaming = false;
value.nativeStreaming = resolveSlackNativeStreaming(value);
value.streaming = resolveSlackStreamingMode(value);
delete value.streamMode;
}
export const TelegramAccountSchemaBase = z
@@ -143,7 +148,7 @@ export const TelegramAccountSchemaBase = z
dms: z.record(z.string(), DmConfigSchema.optional()).optional(),
textChunkLimit: z.number().int().positive().optional(),
chunkMode: z.enum(["length", "newline"]).optional(),
streaming: z.boolean().optional(),
streaming: z.union([z.boolean(), z.enum(["off", "partial", "block", "progress"])]).optional(),
blockStreaming: z.boolean().optional(),
draftChunk: BlockStreamingChunkSchema.optional(),
blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(),
@@ -332,7 +337,9 @@ export const DiscordAccountSchema = z
chunkMode: z.enum(["length", "newline"]).optional(),
blockStreaming: z.boolean().optional(),
blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(),
streamMode: z.enum(["partial", "block", "off"]).optional().default("off"),
// Canonical streaming mode. Legacy aliases (`streamMode`, boolean `streaming`) are auto-mapped.
streaming: z.union([z.boolean(), z.enum(["off", "partial", "block", "progress"])]).optional(),
streamMode: z.enum(["partial", "block", "off"]).optional(),
draftChunk: BlockStreamingChunkSchema.optional(),
maxLinesPerMessage: z.number().int().positive().optional(),
mediaMaxMb: z.number().positive().optional(),
@@ -422,6 +429,8 @@ export const DiscordAccountSchema = z
})
.strict()
.superRefine((value, ctx) => {
normalizeDiscordStreamingConfig(value);
const activityText = typeof value.activity === "string" ? value.activity.trim() : "";
const hasActivity = Boolean(activityText);
const hasActivityType = value.activityType !== undefined;
@@ -610,7 +619,9 @@ export const SlackAccountSchema = z
chunkMode: z.enum(["length", "newline"]).optional(),
blockStreaming: z.boolean().optional(),
blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(),
streaming: z.boolean().optional(),
streaming: z.union([z.boolean(), z.enum(["off", "partial", "block", "progress"])]).optional(),
nativeStreaming: z.boolean().optional(),
streamMode: z.enum(["replace", "status_final", "append"]).optional(),
mediaMaxMb: z.number().positive().optional(),
reactionNotifications: z.enum(["off", "own", "all", "allowlist"]).optional(),
reactionAllowlist: z.array(z.union([z.string(), z.number()])).optional(),
@@ -652,6 +663,8 @@ export const SlackAccountSchema = z
})
.strict()
.superRefine((value, ctx) => {
normalizeSlackStreamingConfig(value);
const dmPolicy = value.dmPolicy ?? value.dm?.policy ?? "pairing";
const allowFrom = value.allowFrom ?? value.dm?.allowFrom;
const allowFromPath =

View File

@@ -381,6 +381,28 @@ describe("processDiscordMessage draft streaming", () => {
expect(deliverDiscordReply).not.toHaveBeenCalled();
});
it("accepts streaming=true alias for partial preview mode", async () => {
dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => {
await params?.dispatcher.sendFinalReply({ text: "Hello\nWorld" });
return { queuedFinal: true, counts: { final: 1, tool: 0, block: 0 } };
});
const ctx = await createBaseContext({
discordConfig: { streaming: true, maxLinesPerMessage: 5 },
});
// oxlint-disable-next-line typescript/no-explicit-any
await processDiscordMessage(ctx as any);
expect(editMessageDiscord).toHaveBeenCalledWith(
"c1",
"preview-1",
{ content: "Hello\nWorld" },
{ rest: {} },
);
expect(deliverDiscordReply).not.toHaveBeenCalled();
});
it("falls back to standard send when final needs multiple chunks", async () => {
dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => {
await params?.dispatcher.sendFinalReply({ text: "Hello\nWorld" });

View File

@@ -21,6 +21,7 @@ import {
type StatusReactionAdapter,
} from "../../channels/status-reactions.js";
import { createTypingCallbacks } from "../../channels/typing.js";
import { resolveDiscordPreviewStreamMode } from "../../config/discord-preview-streaming.js";
import { resolveMarkdownTableMode } from "../../config/markdown-tables.js";
import { readSessionUpdatedAt, resolveStorePath } from "../../config/sessions.js";
import { danger, logVerbose, shouldLogVerbose } from "../../globals.js";
@@ -413,7 +414,7 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
});
// --- Discord draft stream (edit-based preview streaming) ---
const discordStreamMode = discordConfig?.streamMode ?? "off";
const discordStreamMode = resolveDiscordPreviewStreamMode(discordConfig);
const draftMaxChars = Math.min(textLimit, 2000);
const accountBlockStreamingEnabled =
typeof discordConfig?.blockStreaming === "boolean"

View File

@@ -2,13 +2,15 @@ import { describe, expect, it } from "vitest";
import { isSlackStreamingEnabled, resolveSlackStreamingThreadHint } from "./dispatch.js";
describe("slack native streaming defaults", () => {
it("is enabled when config is undefined", () => {
expect(isSlackStreamingEnabled(undefined)).toBe(true);
it("is enabled for partial mode when native streaming is on", () => {
expect(isSlackStreamingEnabled({ mode: "partial", nativeStreaming: true })).toBe(true);
});
it("can be disabled explicitly", () => {
expect(isSlackStreamingEnabled(false)).toBe(false);
expect(isSlackStreamingEnabled(true)).toBe(true);
it("is disabled outside partial mode or when native streaming is off", () => {
expect(isSlackStreamingEnabled({ mode: "partial", nativeStreaming: false })).toBe(false);
expect(isSlackStreamingEnabled({ mode: "block", nativeStreaming: true })).toBe(false);
expect(isSlackStreamingEnabled({ mode: "progress", nativeStreaming: true })).toBe(false);
expect(isSlackStreamingEnabled({ mode: "off", nativeStreaming: true })).toBe(false);
});
});

View File

@@ -14,7 +14,7 @@ import { createSlackDraftStream } from "../../draft-stream.js";
import {
applyAppendOnlyStreamUpdate,
buildStatusFinalPreviewText,
resolveSlackStreamMode,
resolveSlackStreamingConfig,
} from "../../stream-mode.js";
import type { SlackStreamSession } from "../../streaming.js";
import { appendSlackStream, startSlackStream, stopSlackStream } from "../../streaming.js";
@@ -26,8 +26,14 @@ function hasMedia(payload: ReplyPayload): boolean {
return Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
}
export function isSlackStreamingEnabled(streaming: boolean | undefined): boolean {
return streaming !== false;
export function isSlackStreamingEnabled(params: {
mode: "off" | "partial" | "block" | "progress";
nativeStreaming: boolean;
}): boolean {
if (params.mode !== "partial") {
return false;
}
return params.nativeStreaming;
}
export function resolveSlackStreamingThreadHint(params: {
@@ -146,7 +152,16 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
accountId: route.accountId,
});
const streamingEnabled = isSlackStreamingEnabled(account.config.streaming);
const slackStreaming = resolveSlackStreamingConfig({
streaming: account.config.streaming,
streamMode: account.config.streamMode,
nativeStreaming: account.config.nativeStreaming,
});
const previewStreamingEnabled = slackStreaming.mode !== "off";
const streamingEnabled = isSlackStreamingEnabled({
mode: slackStreaming.mode,
nativeStreaming: slackStreaming.nativeStreaming,
});
const streamThreadHint = resolveSlackStreamingThreadHint({
replyToMode: ctx.replyToMode,
incomingThreadTs,
@@ -233,6 +248,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
const draftChannelId = draftStream?.channelId();
const finalText = payload.text;
const canFinalizeViaPreviewEdit =
previewStreamingEnabled &&
streamMode !== "status_final" &&
mediaCount === 0 &&
!payload.isError &&
@@ -256,7 +272,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
`slack: preview final edit failed; falling back to standard send (${String(err)})`,
);
}
} else if (streamMode === "status_final" && hasStreamedMessage) {
} else if (previewStreamingEnabled && streamMode === "status_final" && hasStreamedMessage) {
try {
const statusChannelId = draftStream?.channelId();
const statusMessageId = draftStream?.messageId();
@@ -307,7 +323,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
warn: logVerbose,
});
let hasStreamedMessage = false;
const streamMode = resolveSlackStreamMode(account.config.streamMode);
const streamMode = slackStreaming.draftMode;
let appendRenderedText = "";
let appendSourceText = "";
let statusUpdateCount = 0;
@@ -363,31 +379,37 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
onModelSelected,
onPartialReply: useStreaming
? undefined
: async (payload) => {
updateDraftFromPartial(payload.text);
},
: !previewStreamingEnabled
? undefined
: async (payload) => {
updateDraftFromPartial(payload.text);
},
onAssistantMessageStart: useStreaming
? undefined
: async () => {
if (hasStreamedMessage) {
draftStream.forceNewMessage();
hasStreamedMessage = false;
appendRenderedText = "";
appendSourceText = "";
statusUpdateCount = 0;
}
},
: !previewStreamingEnabled
? undefined
: async () => {
if (hasStreamedMessage) {
draftStream.forceNewMessage();
hasStreamedMessage = false;
appendRenderedText = "";
appendSourceText = "";
statusUpdateCount = 0;
}
},
onReasoningEnd: useStreaming
? undefined
: async () => {
if (hasStreamedMessage) {
draftStream.forceNewMessage();
hasStreamedMessage = false;
appendRenderedText = "";
appendSourceText = "";
statusUpdateCount = 0;
}
},
: !previewStreamingEnabled
? undefined
: async () => {
if (hasStreamedMessage) {
draftStream.forceNewMessage();
hasStreamedMessage = false;
appendRenderedText = "";
appendSourceText = "";
statusUpdateCount = 0;
}
},
},
});
await draftStream.flush();

View File

@@ -2,6 +2,7 @@ import { describe, expect, it } from "vitest";
import {
applyAppendOnlyStreamUpdate,
buildStatusFinalPreviewText,
resolveSlackStreamingConfig,
resolveSlackStreamMode,
} from "./stream-mode.js";
@@ -19,6 +20,48 @@ describe("resolveSlackStreamMode", () => {
});
});
describe("resolveSlackStreamingConfig", () => {
it("defaults to partial mode with native streaming enabled", () => {
expect(resolveSlackStreamingConfig({})).toEqual({
mode: "partial",
nativeStreaming: true,
draftMode: "replace",
});
});
it("maps legacy streamMode values to unified streaming modes", () => {
expect(resolveSlackStreamingConfig({ streamMode: "append" })).toMatchObject({
mode: "block",
draftMode: "append",
});
expect(resolveSlackStreamingConfig({ streamMode: "status_final" })).toMatchObject({
mode: "progress",
draftMode: "status_final",
});
});
it("moves legacy streaming boolean to native streaming toggle", () => {
expect(resolveSlackStreamingConfig({ streaming: false })).toEqual({
mode: "partial",
nativeStreaming: false,
draftMode: "replace",
});
});
it("accepts unified enum values directly", () => {
expect(resolveSlackStreamingConfig({ streaming: "off" })).toEqual({
mode: "off",
nativeStreaming: true,
draftMode: "replace",
});
expect(resolveSlackStreamingConfig({ streaming: "progress" })).toEqual({
mode: "progress",
nativeStreaming: true,
draftMode: "status_final",
});
});
});
describe("applyAppendOnlyStreamUpdate", () => {
it("starts with first incoming text", () => {
const next = applyAppendOnlyStreamUpdate({

View File

@@ -1,5 +1,13 @@
export type SlackStreamMode = "replace" | "status_final" | "append";
import {
mapStreamingModeToSlackLegacyDraftStreamMode,
resolveSlackNativeStreaming,
resolveSlackStreamingMode,
type SlackLegacyDraftStreamMode,
type StreamingMode,
} from "../config/discord-preview-streaming.js";
export type SlackStreamMode = SlackLegacyDraftStreamMode;
export type SlackStreamingMode = StreamingMode;
const DEFAULT_STREAM_MODE: SlackStreamMode = "replace";
export function resolveSlackStreamMode(raw: unknown): SlackStreamMode {
@@ -13,6 +21,20 @@ export function resolveSlackStreamMode(raw: unknown): SlackStreamMode {
return DEFAULT_STREAM_MODE;
}
export function resolveSlackStreamingConfig(params: {
streaming?: unknown;
streamMode?: unknown;
nativeStreaming?: unknown;
}): { mode: SlackStreamingMode; nativeStreaming: boolean; draftMode: SlackStreamMode } {
const mode = resolveSlackStreamingMode(params);
const nativeStreaming = resolveSlackNativeStreaming(params);
return {
mode,
nativeStreaming,
draftMode: mapStreamingModeToSlackLegacyDraftStreamMode(mode),
};
}
export function applyAppendOnlyStreamUpdate(params: {
incoming: string;
rendered: string;

View File

@@ -15,6 +15,10 @@ describe("resolveTelegramStreamMode", () => {
it("maps legacy streamMode values", () => {
expect(resolveTelegramStreamMode({ streamMode: "off" })).toBe("off");
expect(resolveTelegramStreamMode({ streamMode: "partial" })).toBe("partial");
expect(resolveTelegramStreamMode({ streamMode: "block" })).toBe("partial");
expect(resolveTelegramStreamMode({ streamMode: "block" })).toBe("block");
});
it("maps unified progress mode to partial on Telegram", () => {
expect(resolveTelegramStreamMode({ streaming: "progress" })).toBe("partial");
});
});

View File

@@ -1,5 +1,6 @@
import type { Chat, Message, MessageOrigin, User } from "@grammyjs/types";
import { formatLocationText, type NormalizedLocation } from "../../channels/location.js";
import { resolveTelegramPreviewStreamMode } from "../../config/discord-preview-streaming.js";
import type { TelegramGroupConfig, TelegramTopicConfig } from "../../config/types.js";
import { readChannelAllowFromStore } from "../../pairing/pairing-store.js";
import {
@@ -154,20 +155,10 @@ export function buildTypingThreadParams(messageThreadId?: number) {
}
export function resolveTelegramStreamMode(telegramCfg?: {
streaming?: boolean;
streamMode?: TelegramStreamMode;
streaming?: unknown;
streamMode?: unknown;
}): TelegramStreamMode {
if (typeof telegramCfg?.streaming === "boolean") {
return telegramCfg.streaming ? "partial" : "off";
}
const raw = telegramCfg?.streamMode?.trim().toLowerCase();
if (raw === "off") {
return "off";
}
if (raw === "partial" || raw === "block") {
return "partial";
}
return "off";
return resolveTelegramPreviewStreamMode(telegramCfg);
}
export function buildTelegramGroupPeerId(chatId: number | string, messageThreadId?: number) {