diff --git a/CHANGELOG.md b/CHANGELOG.md index 71d0343a22d..95ba337cc44 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ Docs: https://docs.openclaw.ai - Config/Discord: require string IDs in Discord allowlists, keep onboarding inputs string-only, and add doctor repair for numeric entries. (#18220) Thanks @thewilloftheshadow. - Agents/Models: probe the primary model when its auth-profile cooldown is near expiry (with per-provider throttling), so runs recover from temporary rate limits without staying on fallback models until restart. (#17478) Thanks @PlayerGhost. - Agents/Tools: scope the `message` tool schema to the active channel so Telegram uses `buttons` and Discord uses `components`. (#18215) Thanks @obviyus. +- Discord: optimize reaction notification handling to skip unnecessary message fetches in `off`/`all`/`allowlist` modes, streamline reaction routing, and improve reaction emoji formatting. (#18248) Thanks @thewilloftheshadow and @victorGPT. - Telegram: keep draft-stream preview replies attached to the user message for `replyToMode: "all"` in groups and DMs, preserving threaded reply context from preview through finalization. (#17880) Thanks @yinghaosang. - Telegram: disable block streaming when `channels.telegram.streamMode` is `off`, preventing newline/content-block replies from splitting into multiple messages. (#17679) Thanks @saivarunk. - Telegram: route non-abort slash commands on the normal chat/topic sequential lane while keeping true abort requests (`/stop`, `stop`) on the control lane, preventing command/reply race conditions from control-lane bypass. (#17899) Thanks @obviyus. diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index 3355ea4f9ee..1218c113e22 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -352,8 +352,10 @@ export async function runAgentTurnWithFallback(params: { // Must await to ensure typing indicator starts before tool summaries are emitted. if (evt.stream === "tool") { const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; + const name = typeof evt.data.name === "string" ? evt.data.name : undefined; if (phase === "start" || phase === "update") { await params.typingSignals.signalToolStart(); + await params.opts?.onToolStart?.({ name, phase }); } } // Track auto-compaction completion diff --git a/src/auto-reply/types.ts b/src/auto-reply/types.ts index 50f97655236..839fac55977 100644 --- a/src/auto-reply/types.ts +++ b/src/auto-reply/types.ts @@ -39,6 +39,8 @@ export type GetReplyOptions = { onAssistantMessageStart?: () => Promise | void; onBlockReply?: (payload: ReplyPayload, context?: BlockReplyContext) => Promise | void; onToolResult?: (payload: ReplyPayload) => Promise | void; + /** Called when a tool phase starts/updates, before summary payloads are emitted. */ + onToolStart?: (payload: { name?: string; phase?: string }) => Promise | void; /** Called when the actual model is selected (including after fallback). * Use this to get model/provider/thinkLevel for responsePrefix template interpolation. */ onModelSelected?: (ctx: ModelSelectedContext) => void; diff --git a/src/discord/monitor.test.ts b/src/discord/monitor.test.ts index 476405eb73b..97de8d4bf70 100644 --- a/src/discord/monitor.test.ts +++ b/src/discord/monitor.test.ts @@ -774,11 +774,22 @@ function makeReactionEvent(overrides?: { messageId?: string; emojiName?: string; botAsAuthor?: boolean; - guild?: { name?: string }; + messageAuthorId?: string; + messageFetch?: ReturnType; + guild?: { name?: string; id?: string }; }) { const userId = overrides?.userId ?? "user-1"; const messageId = overrides?.messageId ?? "msg-1"; const channelId = overrides?.channelId ?? "channel-1"; + const messageFetch = + overrides?.messageFetch ?? + vi.fn(async () => ({ + author: { + id: overrides?.messageAuthorId ?? (overrides?.botAsAuthor ? "bot-1" : "other-user"), + username: overrides?.botAsAuthor ? "bot" : "otheruser", + discriminator: "0", + }, + })); return { guild_id: overrides?.guildId, channel_id: channelId, @@ -792,23 +803,30 @@ function makeReactionEvent(overrides?: { discriminator: "0", }, message: { - fetch: vi.fn(async () => ({ - author: { - id: overrides?.botAsAuthor ? "bot-1" : "other-user", - username: overrides?.botAsAuthor ? "bot" : "otheruser", - discriminator: "0", - }, - })), + fetch: messageFetch, }, } as unknown as Parameters[0]; } -function makeReactionClient(channelType: ChannelType = ChannelType.DM) { +function makeReactionClient(options?: { + channelType?: ChannelType; + channelName?: string; + parentId?: string; + parentName?: string; +}) { + const channelType = options?.channelType ?? ChannelType.DM; + const channelName = + options?.channelName ?? (channelType === ChannelType.DM ? undefined : "test-channel"); + const parentId = options?.parentId; + const parentName = options?.parentName ?? "parent-channel"; + return { - fetchChannel: vi.fn(async () => ({ - type: channelType, - name: channelType === ChannelType.DM ? undefined : "test-channel", - })), + fetchChannel: vi.fn(async (channelId: string) => { + if (parentId && channelId === parentId) { + return { type: ChannelType.GuildText, name: parentName, parentId: undefined }; + } + return { type: channelType, name: channelName, parentId }; + }), } as unknown as Parameters[1]; } @@ -837,7 +855,7 @@ describe("discord DM reaction handling", () => { resolveAgentRouteMock.mockClear(); const data = makeReactionEvent({ botAsAuthor: true }); - const client = makeReactionClient(ChannelType.DM); + const client = makeReactionClient({ channelType: ChannelType.DM }); const listener = new DiscordReactionListener(makeReactionListenerParams()); await listener.handle(data, client); @@ -854,7 +872,7 @@ describe("discord DM reaction handling", () => { resolveAgentRouteMock.mockClear(); const data = makeReactionEvent({ botAsAuthor: true }); - const client = makeReactionClient(ChannelType.DM); + const client = makeReactionClient({ channelType: ChannelType.DM }); const guildEntries = makeEntries({ "guild-123": { slug: "guild-123" }, }); @@ -880,7 +898,7 @@ describe("discord DM reaction handling", () => { botAsAuthor: true, guild: { name: "Test Guild" }, }); - const client = makeReactionClient(ChannelType.GuildText); + const client = makeReactionClient({ channelType: ChannelType.GuildText }); const listener = new DiscordReactionListener(makeReactionListenerParams()); await listener.handle(data, client); @@ -895,7 +913,7 @@ describe("discord DM reaction handling", () => { resolveAgentRouteMock.mockClear(); const data = makeReactionEvent({ botAsAuthor: true }); - const client = makeReactionClient(ChannelType.DM); + const client = makeReactionClient({ channelType: ChannelType.DM }); const listener = new DiscordReactionListener(makeReactionListenerParams()); await listener.handle(data, client); @@ -911,7 +929,7 @@ describe("discord DM reaction handling", () => { resolveAgentRouteMock.mockClear(); const data = makeReactionEvent({ userId: "user-42", botAsAuthor: true }); - const client = makeReactionClient(ChannelType.DM); + const client = makeReactionClient({ channelType: ChannelType.DM }); const listener = new DiscordReactionListener(makeReactionListenerParams()); await listener.handle(data, client); @@ -926,7 +944,7 @@ describe("discord DM reaction handling", () => { resolveAgentRouteMock.mockClear(); const data = makeReactionEvent({ botAsAuthor: true }); - const client = makeReactionClient(ChannelType.GroupDM); + const client = makeReactionClient({ channelType: ChannelType.GroupDM }); const listener = new DiscordReactionListener(makeReactionListenerParams()); await listener.handle(data, client); @@ -936,3 +954,116 @@ describe("discord DM reaction handling", () => { expect(routeArgs.peer).toEqual({ kind: "group", id: "channel-1" }); }); }); + +describe("discord reaction notification modes", () => { + const guildId = "guild-900"; + const guild = fakeGuild(guildId, "Mode Guild"); + + it("skips message fetch when mode is off", async () => { + enqueueSystemEventSpy.mockClear(); + resolveAgentRouteMock.mockClear(); + + const messageFetch = vi.fn(async () => ({ + author: { id: "bot-1", username: "bot", discriminator: "0" }, + })); + const data = makeReactionEvent({ guildId, guild, messageFetch }); + const client = makeReactionClient({ channelType: ChannelType.GuildText }); + const guildEntries = makeEntries({ + [guildId]: { reactionNotifications: "off" }, + }); + const listener = new DiscordReactionListener(makeReactionListenerParams({ guildEntries })); + + await listener.handle(data, client); + + expect(messageFetch).not.toHaveBeenCalled(); + expect(enqueueSystemEventSpy).not.toHaveBeenCalled(); + }); + + it("skips message fetch when mode is all", async () => { + enqueueSystemEventSpy.mockClear(); + resolveAgentRouteMock.mockClear(); + + const messageFetch = vi.fn(async () => ({ + author: { id: "other-user", username: "other", discriminator: "0" }, + })); + const data = makeReactionEvent({ guildId, guild, messageFetch }); + const client = makeReactionClient({ channelType: ChannelType.GuildText }); + const guildEntries = makeEntries({ + [guildId]: { reactionNotifications: "all" }, + }); + const listener = new DiscordReactionListener(makeReactionListenerParams({ guildEntries })); + + await listener.handle(data, client); + + expect(messageFetch).not.toHaveBeenCalled(); + expect(enqueueSystemEventSpy).toHaveBeenCalledOnce(); + }); + + it("skips message fetch when mode is allowlist", async () => { + enqueueSystemEventSpy.mockClear(); + resolveAgentRouteMock.mockClear(); + + const messageFetch = vi.fn(async () => ({ + author: { id: "other-user", username: "other", discriminator: "0" }, + })); + const data = makeReactionEvent({ guildId, guild, userId: "123", messageFetch }); + const client = makeReactionClient({ channelType: ChannelType.GuildText }); + const guildEntries = makeEntries({ + [guildId]: { reactionNotifications: "allowlist", users: ["123"] }, + }); + const listener = new DiscordReactionListener(makeReactionListenerParams({ guildEntries })); + + await listener.handle(data, client); + + expect(messageFetch).not.toHaveBeenCalled(); + expect(enqueueSystemEventSpy).toHaveBeenCalledOnce(); + }); + + it("fetches message when mode is own", async () => { + enqueueSystemEventSpy.mockClear(); + resolveAgentRouteMock.mockClear(); + + const messageFetch = vi.fn(async () => ({ + author: { id: "bot-1", username: "bot", discriminator: "0" }, + })); + const data = makeReactionEvent({ guildId, guild, messageFetch }); + const client = makeReactionClient({ channelType: ChannelType.GuildText }); + const guildEntries = makeEntries({ + [guildId]: { reactionNotifications: "own" }, + }); + const listener = new DiscordReactionListener(makeReactionListenerParams({ guildEntries })); + + await listener.handle(data, client); + + expect(messageFetch).toHaveBeenCalledOnce(); + expect(enqueueSystemEventSpy).toHaveBeenCalledOnce(); + }); + + it("skips message fetch for thread channels in all mode", async () => { + enqueueSystemEventSpy.mockClear(); + resolveAgentRouteMock.mockClear(); + + const messageFetch = vi.fn(async () => ({ + author: { id: "other-user", username: "other", discriminator: "0" }, + })); + const data = makeReactionEvent({ + guildId, + guild, + channelId: "thread-1", + messageFetch, + }); + const client = makeReactionClient({ + channelType: ChannelType.PublicThread, + parentId: "parent-1", + }); + const guildEntries = makeEntries({ + [guildId]: { reactionNotifications: "all" }, + }); + const listener = new DiscordReactionListener(makeReactionListenerParams({ guildEntries })); + + await listener.handle(data, client); + + expect(messageFetch).not.toHaveBeenCalled(); + expect(enqueueSystemEventSpy).toHaveBeenCalledOnce(); + }); +}); diff --git a/src/discord/monitor/format.ts b/src/discord/monitor/format.ts index 2b47f002694..90868c55028 100644 --- a/src/discord/monitor/format.ts +++ b/src/discord/monitor/format.ts @@ -18,7 +18,12 @@ export function resolveDiscordSystemLocation(params: { export function formatDiscordReactionEmoji(emoji: { id?: string | null; name?: string | null }) { if (emoji.id && emoji.name) { - return `${emoji.name}:${emoji.id}`; + // Custom guild emoji in Discord-renderable form. + return `<:${emoji.name}:${emoji.id}>`; + } + if (emoji.id) { + // Keep id visible even when name is missing (instead of opaque "emoji"). + return `emoji:${emoji.id}`; } return emoji.name ?? "emoji"; } diff --git a/src/discord/monitor/listeners.ts b/src/discord/monitor/listeners.ts index fadca3c82ea..f3bcee77ff5 100644 --- a/src/discord/monitor/listeners.ts +++ b/src/discord/monitor/listeners.ts @@ -185,6 +185,12 @@ async function handleDiscordReactionEvent(params: { if (!user || user.bot) { return; } + + // Early exit: skip bot's own reactions before expensive network calls + if (botUserId && user.id === botUserId) { + return; + } + const isGuildMessage = Boolean(data.guild_id); const guildInfo = isGuildMessage ? resolveDiscordGuildEntry({ @@ -212,17 +218,157 @@ async function handleDiscordReactionEvent(params: { let parentId = "parentId" in channel ? (channel.parentId ?? undefined) : undefined; let parentName: string | undefined; let parentSlug = ""; - if (isThreadChannel) { - if (!parentId) { - const channelInfo = await resolveDiscordChannelInfo(client, data.channel_id); - parentId = channelInfo?.parentId; + const memberRoleIds = Array.isArray(data.rawMember?.roles) + ? data.rawMember.roles.map((roleId: string) => String(roleId)) + : []; + let reactionBase: { baseText: string; contextKey: string } | null = null; + const resolveReactionBase = () => { + if (reactionBase) { + return reactionBase; } + const emojiLabel = formatDiscordReactionEmoji(data.emoji); + const actorLabel = formatDiscordUserTag(user); + const guildSlug = + guildInfo?.slug || + (data.guild?.name + ? normalizeDiscordSlug(data.guild.name) + : (data.guild_id ?? (isGroupDm ? "group-dm" : "dm"))); + const channelLabel = channelSlug + ? `#${channelSlug}` + : channelName + ? `#${normalizeDiscordSlug(channelName)}` + : `#${data.channel_id}`; + const baseText = `Discord reaction ${action}: ${emojiLabel} by ${actorLabel} on ${guildSlug} ${channelLabel} msg ${data.message_id}`; + const contextKey = `discord:reaction:${action}:${data.message_id}:${user.id}:${emojiLabel}`; + reactionBase = { baseText, contextKey }; + return reactionBase; + }; + const emitReaction = (text: string, parentPeerId?: string) => { + const { contextKey } = resolveReactionBase(); + const route = resolveAgentRoute({ + cfg: params.cfg, + channel: "discord", + accountId: params.accountId, + guildId: data.guild_id ?? undefined, + memberRoleIds, + peer: { + kind: isDirectMessage ? "direct" : isGroupDm ? "group" : "channel", + id: isDirectMessage ? user.id : data.channel_id, + }, + parentPeer: parentPeerId ? { kind: "channel", id: parentPeerId } : undefined, + }); + enqueueSystemEvent(text, { + sessionKey: route.sessionKey, + contextKey, + }); + }; + + // Parallelize async operations for thread channels + if (isThreadChannel) { + const reactionMode = guildInfo?.reactionNotifications ?? "own"; + + // Early exit: skip fetching message if notifications are off + if (reactionMode === "off") { + return; + } + + const channelInfoPromise = parentId + ? Promise.resolve({ parentId }) + : resolveDiscordChannelInfo(client, data.channel_id); + + // Fast path: for "all" and "allowlist" modes, we don't need to fetch the message + if (reactionMode === "all" || reactionMode === "allowlist") { + const channelInfo = await channelInfoPromise; + parentId = channelInfo?.parentId; + + if (parentId) { + const parentInfo = await resolveDiscordChannelInfo(client, parentId); + parentName = parentInfo?.name; + parentSlug = parentName ? normalizeDiscordSlug(parentName) : ""; + } + + const channelConfig = resolveDiscordChannelConfigWithFallback({ + guildInfo, + channelId: data.channel_id, + channelName, + channelSlug, + parentId, + parentName, + parentSlug, + scope: "thread", + }); + if (channelConfig?.allowed === false) { + return; + } + + // For allowlist mode, check if user is in allowlist first + if (reactionMode === "allowlist") { + const shouldNotifyAllowlist = shouldEmitDiscordReactionNotification({ + mode: reactionMode, + botId: botUserId, + userId: user.id, + userName: user.username, + userTag: formatDiscordUserTag(user), + allowlist: guildInfo?.users, + }); + if (!shouldNotifyAllowlist) { + return; + } + } + + const { baseText } = resolveReactionBase(); + emitReaction(baseText, parentId); + return; + } + + // For "own" mode, we need to fetch the message to check the author + const messagePromise = data.message.fetch().catch(() => null); + + const [channelInfo, message] = await Promise.all([channelInfoPromise, messagePromise]); + parentId = channelInfo?.parentId; + if (parentId) { const parentInfo = await resolveDiscordChannelInfo(client, parentId); parentName = parentInfo?.name; parentSlug = parentName ? normalizeDiscordSlug(parentName) : ""; } + + const channelConfig = resolveDiscordChannelConfigWithFallback({ + guildInfo, + channelId: data.channel_id, + channelName, + channelSlug, + parentId, + parentName, + parentSlug, + scope: "thread", + }); + if (channelConfig?.allowed === false) { + return; + } + + const messageAuthorId = message?.author?.id ?? undefined; + const shouldNotify = shouldEmitDiscordReactionNotification({ + mode: reactionMode, + botId: botUserId, + messageAuthorId, + userId: user.id, + userName: user.username, + userTag: formatDiscordUserTag(user), + allowlist: guildInfo?.users, + }); + if (!shouldNotify) { + return; + } + + const { baseText } = resolveReactionBase(); + const authorLabel = message?.author ? formatDiscordUserTag(message.author) : undefined; + const text = authorLabel ? `${baseText} from ${authorLabel}` : baseText; + emitReaction(text, parentId); + return; } + + // Non-thread channel path const channelConfig = resolveDiscordChannelConfigWithFallback({ guildInfo, channelId: data.channel_id, @@ -231,17 +377,42 @@ async function handleDiscordReactionEvent(params: { parentId, parentName, parentSlug, - scope: isThreadChannel ? "thread" : "channel", + scope: "channel", }); if (channelConfig?.allowed === false) { return; } - if (botUserId && user.id === botUserId) { + const reactionMode = guildInfo?.reactionNotifications ?? "own"; + + // Early exit: skip fetching message if notifications are off + if (reactionMode === "off") { return; } - const reactionMode = guildInfo?.reactionNotifications ?? "own"; + // Fast path: for "all" and "allowlist" modes, we don't need to fetch the message + if (reactionMode === "all" || reactionMode === "allowlist") { + // For allowlist mode, check if user is in allowlist first + if (reactionMode === "allowlist") { + const shouldNotifyAllowlist = shouldEmitDiscordReactionNotification({ + mode: reactionMode, + botId: botUserId, + userId: user.id, + userName: user.username, + userTag: formatDiscordUserTag(user), + allowlist: guildInfo?.users, + }); + if (!shouldNotifyAllowlist) { + return; + } + } + + const { baseText } = resolveReactionBase(); + emitReaction(baseText, parentId); + return; + } + + // For "own" mode, we need to fetch the message to check the author const message = await data.message.fetch().catch(() => null); const messageAuthorId = message?.author?.id ?? undefined; const shouldNotify = shouldEmitDiscordReactionNotification({ @@ -257,40 +428,10 @@ async function handleDiscordReactionEvent(params: { return; } - const emojiLabel = formatDiscordReactionEmoji(data.emoji); - const actorLabel = formatDiscordUserTag(user); - const guildSlug = - guildInfo?.slug || - (data.guild?.name - ? normalizeDiscordSlug(data.guild.name) - : (data.guild_id ?? (isGroupDm ? "group-dm" : "dm"))); - const channelLabel = channelSlug - ? `#${channelSlug}` - : channelName - ? `#${normalizeDiscordSlug(channelName)}` - : `#${data.channel_id}`; + const { baseText } = resolveReactionBase(); const authorLabel = message?.author ? formatDiscordUserTag(message.author) : undefined; - const baseText = `Discord reaction ${action}: ${emojiLabel} by ${actorLabel} on ${guildSlug} ${channelLabel} msg ${data.message_id}`; const text = authorLabel ? `${baseText} from ${authorLabel}` : baseText; - const memberRoleIds = Array.isArray(data.rawMember?.roles) - ? data.rawMember.roles.map((roleId: string) => String(roleId)) - : []; - const route = resolveAgentRoute({ - cfg: params.cfg, - channel: "discord", - accountId: params.accountId, - guildId: data.guild_id ?? undefined, - memberRoleIds, - peer: { - kind: isDirectMessage ? "direct" : isGroupDm ? "group" : "channel", - id: isDirectMessage ? user.id : data.channel_id, - }, - parentPeer: parentId ? { kind: "channel", id: parentId } : undefined, - }); - enqueueSystemEvent(text, { - sessionKey: route.sessionKey, - contextKey: `discord:reaction:${action}:${data.message_id}:${user.id}:${emojiLabel}`, - }); + emitReaction(text, parentId); } catch (err) { params.logger.error(danger(`discord reaction handler failed: ${String(err)}`)); } diff --git a/src/discord/monitor/message-handler.process.test.ts b/src/discord/monitor/message-handler.process.test.ts index c6be2b43709..a58618920ce 100644 --- a/src/discord/monitor/message-handler.process.test.ts +++ b/src/discord/monitor/message-handler.process.test.ts @@ -1,19 +1,22 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; import { beforeEach, describe, expect, it, vi } from "vitest"; -import { createBaseDiscordMessageContext } from "./message-handler.test-harness.js"; const reactMessageDiscord = vi.fn(async () => {}); const removeReactionDiscord = vi.fn(async () => {}); +const dispatchInboundMessage = vi.fn(async () => ({ + queuedFinal: false, + counts: { final: 0, tool: 0, block: 0 }, +})); vi.mock("../send.js", () => ({ reactMessageDiscord: (...args: unknown[]) => reactMessageDiscord(...args), removeReactionDiscord: (...args: unknown[]) => removeReactionDiscord(...args), })); -vi.mock("../../auto-reply/reply/dispatch-from-config.js", () => ({ - dispatchReplyFromConfig: vi.fn(async () => ({ - queuedFinal: false, - counts: { final: 0, tool: 0, block: 0 }, - })), +vi.mock("../../auto-reply/dispatch.js", () => ({ + dispatchInboundMessage: (...args: unknown[]) => dispatchInboundMessage(...args), })); vi.mock("../../auto-reply/reply/reply-dispatcher.js", () => ({ @@ -33,17 +36,88 @@ vi.mock("../../auto-reply/reply/reply-dispatcher.js", () => ({ const { processDiscordMessage } = await import("./message-handler.process.js"); +async function createBaseContext(overrides: Record = {}) { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-discord-")); + const storePath = path.join(dir, "sessions.json"); + return { + cfg: { messages: { ackReaction: "👀" }, session: { store: storePath } }, + discordConfig: {}, + accountId: "default", + token: "token", + runtime: { log: () => {}, error: () => {} }, + guildHistories: new Map(), + historyLimit: 0, + mediaMaxBytes: 1024, + textLimit: 4000, + replyToMode: "off", + ackReactionScope: "group-mentions", + groupPolicy: "open", + data: { guild: { id: "g1", name: "Guild" } }, + client: { rest: {} }, + message: { + id: "m1", + channelId: "c1", + timestamp: new Date().toISOString(), + attachments: [], + }, + messageChannelId: "c1", + author: { + id: "U1", + username: "alice", + discriminator: "0", + globalName: "Alice", + }, + channelInfo: { name: "general" }, + channelName: "general", + isGuildMessage: true, + isDirectMessage: false, + isGroupDm: false, + commandAuthorized: true, + baseText: "hi", + messageText: "hi", + wasMentioned: false, + shouldRequireMention: true, + canDetectMention: true, + effectiveWasMentioned: true, + shouldBypassMention: false, + threadChannel: null, + threadParentId: undefined, + threadParentName: undefined, + threadParentType: undefined, + threadName: undefined, + displayChannelSlug: "general", + guildInfo: null, + guildSlug: "guild", + channelConfig: null, + baseSessionKey: "agent:main:discord:guild:g1", + route: { + agentId: "main", + channel: "discord", + accountId: "default", + sessionKey: "agent:main:discord:guild:g1", + mainSessionKey: "agent:main:main", + }, + sender: { label: "user" }, + ...overrides, + }; +} + beforeEach(() => { + vi.useRealTimers(); reactMessageDiscord.mockClear(); removeReactionDiscord.mockClear(); + dispatchInboundMessage.mockReset(); + dispatchInboundMessage.mockResolvedValue({ + queuedFinal: false, + counts: { final: 0, tool: 0, block: 0 }, + }); }); describe("processDiscordMessage ack reactions", () => { it("skips ack reactions for group-mentions when mentions are not required", async () => { - const ctx = await createBaseDiscordMessageContext({ + const ctx = await createBaseContext({ shouldRequireMention: false, effectiveWasMentioned: false, - sender: { label: "user" }, }); // oxlint-disable-next-line typescript/no-explicit-any @@ -53,20 +127,19 @@ describe("processDiscordMessage ack reactions", () => { }); it("sends ack reactions for mention-gated guild messages when mentioned", async () => { - const ctx = await createBaseDiscordMessageContext({ + const ctx = await createBaseContext({ shouldRequireMention: true, effectiveWasMentioned: true, - sender: { label: "user" }, }); // oxlint-disable-next-line typescript/no-explicit-any await processDiscordMessage(ctx as any); - expect(reactMessageDiscord).toHaveBeenCalledWith("c1", "m1", "👀", { rest: {} }); + expect(reactMessageDiscord.mock.calls[0]).toEqual(["c1", "m1", "👀", { rest: {} }]); }); it("uses preflight-resolved messageChannelId when message.channelId is missing", async () => { - const ctx = await createBaseDiscordMessageContext({ + const ctx = await createBaseContext({ message: { id: "m1", timestamp: new Date().toISOString(), @@ -75,14 +148,66 @@ describe("processDiscordMessage ack reactions", () => { messageChannelId: "fallback-channel", shouldRequireMention: true, effectiveWasMentioned: true, - sender: { label: "user" }, }); // oxlint-disable-next-line typescript/no-explicit-any await processDiscordMessage(ctx as any); - expect(reactMessageDiscord).toHaveBeenCalledWith("fallback-channel", "m1", "👀", { - rest: {}, + expect(reactMessageDiscord.mock.calls[0]).toEqual([ + "fallback-channel", + "m1", + "👀", + { rest: {} }, + ]); + }); + + it("debounces intermediate phase reactions and jumps to done for short runs", async () => { + dispatchInboundMessage.mockImplementationOnce( + async (params: { + replyOptions?: { + onReasoningStream?: () => Promise | void; + onToolStart?: (payload: { name?: string }) => Promise | void; + }; + }) => { + await params.replyOptions?.onReasoningStream?.(); + await params.replyOptions?.onToolStart?.({ name: "exec" }); + return { queuedFinal: false, counts: { final: 0, tool: 0, block: 0 } }; + }, + ); + + const ctx = await createBaseContext(); + + // oxlint-disable-next-line typescript/no-explicit-any + await processDiscordMessage(ctx as any); + + const emojis = reactMessageDiscord.mock.calls.map((call) => call[2]); + expect(emojis).toContain("👀"); + expect(emojis).toContain("✅"); + expect(emojis).not.toContain("🧠"); + expect(emojis).not.toContain("💻"); + }); + + it("shows stall emojis for long no-progress runs", async () => { + vi.useFakeTimers(); + dispatchInboundMessage.mockImplementationOnce(async () => { + await new Promise((resolve) => { + setTimeout(resolve, 31_000); + }); + return { queuedFinal: false, counts: { final: 0, tool: 0, block: 0 } }; }); + + const ctx = await createBaseContext(); + // oxlint-disable-next-line typescript/no-explicit-any + const runPromise = processDiscordMessage(ctx as any); + + await vi.advanceTimersByTimeAsync(10_000); + expect(reactMessageDiscord.mock.calls.some((call) => call[2] === "⏳")).toBe(true); + + await vi.advanceTimersByTimeAsync(20_000); + expect(reactMessageDiscord.mock.calls.some((call) => call[2] === "⚠️")).toBe(true); + + await vi.advanceTimersByTimeAsync(1_000); + await runPromise; + expect(reactMessageDiscord.mock.calls.some((call) => call[2] === "✅")).toBe(true); }); }); diff --git a/src/discord/monitor/message-handler.process.ts b/src/discord/monitor/message-handler.process.ts index 8e936068c28..13f2ccdaef8 100644 --- a/src/discord/monitor/message-handler.process.ts +++ b/src/discord/monitor/message-handler.process.ts @@ -11,10 +11,7 @@ import { } from "../../auto-reply/reply/history.js"; import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js"; import { createReplyDispatcherWithTyping } from "../../auto-reply/reply/reply-dispatcher.js"; -import { - removeAckReactionAfterReply, - shouldAckReaction as shouldAckReactionGate, -} from "../../channels/ack-reactions.js"; +import { shouldAckReaction as shouldAckReactionGate } from "../../channels/ack-reactions.js"; import { logTypingFailure, logAckFailure } from "../../channels/logging.js"; import { createReplyPrefixOptions } from "../../channels/reply-prefix.js"; import { recordInboundSession } from "../../channels/session.js"; @@ -39,6 +36,240 @@ import { deliverDiscordReply } from "./reply-delivery.js"; import { resolveDiscordAutoThreadReplyPlan, resolveDiscordThreadStarter } from "./threading.js"; import { sendTyping } from "./typing.js"; +const DISCORD_STATUS_THINKING_EMOJI = "🧠"; +const DISCORD_STATUS_TOOL_EMOJI = "🛠️"; +const DISCORD_STATUS_CODING_EMOJI = "💻"; +const DISCORD_STATUS_WEB_EMOJI = "🌐"; +const DISCORD_STATUS_DONE_EMOJI = "✅"; +const DISCORD_STATUS_ERROR_EMOJI = "❌"; +const DISCORD_STATUS_STALL_SOFT_EMOJI = "⏳"; +const DISCORD_STATUS_STALL_HARD_EMOJI = "⚠️"; +const DISCORD_STATUS_DONE_HOLD_MS = 1500; +const DISCORD_STATUS_ERROR_HOLD_MS = 2500; +const DISCORD_STATUS_DEBOUNCE_MS = 700; +const DISCORD_STATUS_STALL_SOFT_MS = 10_000; +const DISCORD_STATUS_STALL_HARD_MS = 30_000; + +const CODING_STATUS_TOOL_TOKENS = [ + "exec", + "process", + "read", + "write", + "edit", + "session_status", + "bash", +]; + +const WEB_STATUS_TOOL_TOKENS = ["web_search", "web-search", "web_fetch", "web-fetch", "browser"]; + +function resolveToolStatusEmoji(toolName?: string): string { + const normalized = toolName?.trim().toLowerCase() ?? ""; + if (!normalized) { + return DISCORD_STATUS_TOOL_EMOJI; + } + if (WEB_STATUS_TOOL_TOKENS.some((token) => normalized.includes(token))) { + return DISCORD_STATUS_WEB_EMOJI; + } + if (CODING_STATUS_TOOL_TOKENS.some((token) => normalized.includes(token))) { + return DISCORD_STATUS_CODING_EMOJI; + } + return DISCORD_STATUS_TOOL_EMOJI; +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} + +function createDiscordStatusReactionController(params: { + enabled: boolean; + channelId: string; + messageId: string; + initialEmoji: string; + rest: unknown; +}) { + let activeEmoji: string | null = null; + let chain: Promise = Promise.resolve(); + let pendingEmoji: string | null = null; + let pendingTimer: ReturnType | null = null; + let finished = false; + let softStallTimer: ReturnType | null = null; + let hardStallTimer: ReturnType | null = null; + + const enqueue = (work: () => Promise) => { + chain = chain.then(work).catch((err) => { + logAckFailure({ + log: logVerbose, + channel: "discord", + target: `${params.channelId}/${params.messageId}`, + error: err, + }); + }); + return chain; + }; + + const clearStallTimers = () => { + if (softStallTimer) { + clearTimeout(softStallTimer); + softStallTimer = null; + } + if (hardStallTimer) { + clearTimeout(hardStallTimer); + hardStallTimer = null; + } + }; + + const clearPendingDebounce = () => { + if (pendingTimer) { + clearTimeout(pendingTimer); + pendingTimer = null; + } + pendingEmoji = null; + }; + + const applyEmoji = (emoji: string) => + enqueue(async () => { + if (!params.enabled || !emoji || activeEmoji === emoji) { + return; + } + const previousEmoji = activeEmoji; + await reactMessageDiscord(params.channelId, params.messageId, emoji, { + rest: params.rest as never, + }); + activeEmoji = emoji; + if (previousEmoji && previousEmoji !== emoji) { + await removeReactionDiscord(params.channelId, params.messageId, previousEmoji, { + rest: params.rest as never, + }); + } + }); + + const requestEmoji = (emoji: string, options?: { immediate?: boolean }) => { + if (!params.enabled || !emoji) { + return Promise.resolve(); + } + if (options?.immediate) { + clearPendingDebounce(); + return applyEmoji(emoji); + } + pendingEmoji = emoji; + if (pendingTimer) { + clearTimeout(pendingTimer); + } + pendingTimer = setTimeout(() => { + pendingTimer = null; + const emojiToApply = pendingEmoji; + pendingEmoji = null; + if (!emojiToApply || emojiToApply === activeEmoji) { + return; + } + void applyEmoji(emojiToApply); + }, DISCORD_STATUS_DEBOUNCE_MS); + return Promise.resolve(); + }; + + const scheduleStallTimers = () => { + if (!params.enabled || finished) { + return; + } + clearStallTimers(); + softStallTimer = setTimeout(() => { + if (finished) { + return; + } + void requestEmoji(DISCORD_STATUS_STALL_SOFT_EMOJI, { immediate: true }); + }, DISCORD_STATUS_STALL_SOFT_MS); + hardStallTimer = setTimeout(() => { + if (finished) { + return; + } + void requestEmoji(DISCORD_STATUS_STALL_HARD_EMOJI, { immediate: true }); + }, DISCORD_STATUS_STALL_HARD_MS); + }; + + const setPhase = (emoji: string) => { + if (!params.enabled || finished) { + return Promise.resolve(); + } + scheduleStallTimers(); + return requestEmoji(emoji); + }; + + const setTerminal = async (emoji: string) => { + if (!params.enabled) { + return; + } + finished = true; + clearStallTimers(); + await requestEmoji(emoji, { immediate: true }); + }; + + const clear = async () => { + if (!params.enabled) { + return; + } + finished = true; + clearStallTimers(); + clearPendingDebounce(); + await enqueue(async () => { + const cleanupCandidates = new Set([ + params.initialEmoji, + activeEmoji ?? "", + DISCORD_STATUS_THINKING_EMOJI, + DISCORD_STATUS_TOOL_EMOJI, + DISCORD_STATUS_CODING_EMOJI, + DISCORD_STATUS_WEB_EMOJI, + DISCORD_STATUS_DONE_EMOJI, + DISCORD_STATUS_ERROR_EMOJI, + DISCORD_STATUS_STALL_SOFT_EMOJI, + DISCORD_STATUS_STALL_HARD_EMOJI, + ]); + activeEmoji = null; + for (const emoji of cleanupCandidates) { + if (!emoji) { + continue; + } + try { + await removeReactionDiscord(params.channelId, params.messageId, emoji, { + rest: params.rest as never, + }); + } catch (err) { + logAckFailure({ + log: logVerbose, + channel: "discord", + target: `${params.channelId}/${params.messageId}`, + error: err, + }); + } + } + }); + }; + + const restoreInitial = async () => { + if (!params.enabled) { + return; + } + finished = true; + clearStallTimers(); + clearPendingDebounce(); + await requestEmoji(params.initialEmoji, { immediate: true }); + }; + + return { + setQueued: () => { + scheduleStallTimers(); + return requestEmoji(params.initialEmoji, { immediate: true }); + }, + setThinking: () => setPhase(DISCORD_STATUS_THINKING_EMOJI), + setTool: (toolName?: string) => setPhase(resolveToolStatusEmoji(toolName)), + setDone: () => setTerminal(DISCORD_STATUS_DONE_EMOJI), + setError: () => setTerminal(DISCORD_STATUS_ERROR_EMOJI), + clear, + restoreInitial, + }; +} + export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) { const { cfg, @@ -108,17 +339,17 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) shouldBypassMention, }), ); - const ackReactionPromise = shouldAckReaction() - ? reactMessageDiscord(messageChannelId, message.id, ackReaction, { - rest: client.rest, - }).then( - () => true, - (err) => { - logVerbose(`discord react failed for channel ${messageChannelId}: ${String(err)}`); - return false; - }, - ) - : null; + const statusReactionsEnabled = shouldAckReaction(); + const statusReactions = createDiscordStatusReactionController({ + enabled: statusReactionsEnabled, + channelId: messageChannelId, + messageId: message.id, + initialEmoji: ackReaction, + rest: client.rest, + }); + if (statusReactionsEnabled) { + void statusReactions.setQueued(); + } const fromLabel = isDirectMessage ? buildDirectLabel(author) @@ -359,6 +590,18 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) accountId, }); + const typingCallbacks = createTypingCallbacks({ + start: () => sendTyping({ client, channelId: typingChannelId }), + onStartError: (err) => { + logTypingFailure({ + log: logVerbose, + channel: "discord", + target: typingChannelId, + error: err, + }); + }, + }); + const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({ ...prefixOptions, humanDelay: resolveHumanDelayConfig(cfg, route.agentId), @@ -382,35 +625,58 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) onError: (err, info) => { runtime.error?.(danger(`discord ${info.kind} reply failed: ${String(err)}`)); }, - onReplyStart: createTypingCallbacks({ - start: () => sendTyping({ client, channelId: typingChannelId }), - onStartError: (err) => { - logTypingFailure({ - log: logVerbose, - channel: "discord", - target: typingChannelId, - error: err, - }); - }, - }).onReplyStart, - }); - - const { queuedFinal, counts } = await dispatchInboundMessage({ - ctx: ctxPayload, - cfg, - dispatcher, - replyOptions: { - ...replyOptions, - skillFilter: channelConfig?.skills, - disableBlockStreaming: - typeof discordConfig?.blockStreaming === "boolean" - ? !discordConfig.blockStreaming - : undefined, - onModelSelected, + onReplyStart: async () => { + await typingCallbacks.onReplyStart(); + await statusReactions.setThinking(); }, }); - markDispatchIdle(); - if (!queuedFinal) { + + let dispatchResult: Awaited> | null = null; + let dispatchError = false; + try { + dispatchResult = await dispatchInboundMessage({ + ctx: ctxPayload, + cfg, + dispatcher, + replyOptions: { + ...replyOptions, + skillFilter: channelConfig?.skills, + disableBlockStreaming: + typeof discordConfig?.blockStreaming === "boolean" + ? !discordConfig.blockStreaming + : undefined, + onModelSelected, + onReasoningStream: async () => { + await statusReactions.setThinking(); + }, + onToolStart: async (payload) => { + await statusReactions.setTool(payload.name); + }, + }, + }); + } catch (err) { + dispatchError = true; + throw err; + } finally { + markDispatchIdle(); + if (statusReactionsEnabled) { + if (dispatchError) { + await statusReactions.setError(); + } else { + await statusReactions.setDone(); + } + if (removeAckAfterReply) { + void (async () => { + await sleep(dispatchError ? DISCORD_STATUS_ERROR_HOLD_MS : DISCORD_STATUS_DONE_HOLD_MS); + await statusReactions.clear(); + })(); + } else { + void statusReactions.restoreInitial(); + } + } + } + + if (!dispatchResult?.queuedFinal) { if (isGuildMessage) { clearHistoryEntriesIfEnabled({ historyMap: guildHistories, @@ -421,29 +687,11 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) return; } if (shouldLogVerbose()) { - const finalCount = counts.final; + const finalCount = dispatchResult.counts.final; logVerbose( `discord: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${replyTarget}`, ); } - removeAckReactionAfterReply({ - removeAfterReply: removeAckAfterReply, - ackReactionPromise, - ackReactionValue: ackReaction, - remove: async () => { - await removeReactionDiscord(messageChannelId, message.id, ackReaction, { - rest: client.rest, - }); - }, - onError: (err) => { - logAckFailure({ - log: logVerbose, - channel: "discord", - target: `${messageChannelId}/${message.id}`, - error: err, - }); - }, - }); if (isGuildMessage) { clearHistoryEntriesIfEnabled({ historyMap: guildHistories,