mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-08 11:01:24 +00:00
feat(discord): faster reaction status state machine (watchdog + debounce) (#18248)
* fix(discord): avoid unnecessary message fetches in reaction notifications * style(discord): format reaction listener for CI * feat(discord): add reaction status machine and fix tool/final wiring * fix(discord): harden reaction status transitions and cleanup * revert(discord): restore status-machine flow from 0a5a72204 * fix(auto-reply): restore lifecycle callback forwarding for channels * chore(ci): add daily upstream sync workflow for custom branch * fix(discord): non-blocking reactions and robust cleanup * chore: remove unrelated workflow from Discord-only PR * Discord: streamline reaction handling * Docs: add Discord reaction changelog --------- Co-authored-by: Shadow <hi@shadowing.dev>
This commit is contained in:
@@ -11,10 +11,7 @@ import {
|
||||
} from "../../auto-reply/reply/history.js";
|
||||
import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js";
|
||||
import { createReplyDispatcherWithTyping } from "../../auto-reply/reply/reply-dispatcher.js";
|
||||
import {
|
||||
removeAckReactionAfterReply,
|
||||
shouldAckReaction as shouldAckReactionGate,
|
||||
} from "../../channels/ack-reactions.js";
|
||||
import { shouldAckReaction as shouldAckReactionGate } from "../../channels/ack-reactions.js";
|
||||
import { logTypingFailure, logAckFailure } from "../../channels/logging.js";
|
||||
import { createReplyPrefixOptions } from "../../channels/reply-prefix.js";
|
||||
import { recordInboundSession } from "../../channels/session.js";
|
||||
@@ -39,6 +36,240 @@ import { deliverDiscordReply } from "./reply-delivery.js";
|
||||
import { resolveDiscordAutoThreadReplyPlan, resolveDiscordThreadStarter } from "./threading.js";
|
||||
import { sendTyping } from "./typing.js";
|
||||
|
||||
const DISCORD_STATUS_THINKING_EMOJI = "🧠";
|
||||
const DISCORD_STATUS_TOOL_EMOJI = "🛠️";
|
||||
const DISCORD_STATUS_CODING_EMOJI = "💻";
|
||||
const DISCORD_STATUS_WEB_EMOJI = "🌐";
|
||||
const DISCORD_STATUS_DONE_EMOJI = "✅";
|
||||
const DISCORD_STATUS_ERROR_EMOJI = "❌";
|
||||
const DISCORD_STATUS_STALL_SOFT_EMOJI = "⏳";
|
||||
const DISCORD_STATUS_STALL_HARD_EMOJI = "⚠️";
|
||||
const DISCORD_STATUS_DONE_HOLD_MS = 1500;
|
||||
const DISCORD_STATUS_ERROR_HOLD_MS = 2500;
|
||||
const DISCORD_STATUS_DEBOUNCE_MS = 700;
|
||||
const DISCORD_STATUS_STALL_SOFT_MS = 10_000;
|
||||
const DISCORD_STATUS_STALL_HARD_MS = 30_000;
|
||||
|
||||
const CODING_STATUS_TOOL_TOKENS = [
|
||||
"exec",
|
||||
"process",
|
||||
"read",
|
||||
"write",
|
||||
"edit",
|
||||
"session_status",
|
||||
"bash",
|
||||
];
|
||||
|
||||
const WEB_STATUS_TOOL_TOKENS = ["web_search", "web-search", "web_fetch", "web-fetch", "browser"];
|
||||
|
||||
function resolveToolStatusEmoji(toolName?: string): string {
|
||||
const normalized = toolName?.trim().toLowerCase() ?? "";
|
||||
if (!normalized) {
|
||||
return DISCORD_STATUS_TOOL_EMOJI;
|
||||
}
|
||||
if (WEB_STATUS_TOOL_TOKENS.some((token) => normalized.includes(token))) {
|
||||
return DISCORD_STATUS_WEB_EMOJI;
|
||||
}
|
||||
if (CODING_STATUS_TOOL_TOKENS.some((token) => normalized.includes(token))) {
|
||||
return DISCORD_STATUS_CODING_EMOJI;
|
||||
}
|
||||
return DISCORD_STATUS_TOOL_EMOJI;
|
||||
}
|
||||
|
||||
function sleep(ms: number): Promise<void> {
|
||||
return new Promise((resolve) => {
|
||||
setTimeout(resolve, ms);
|
||||
});
|
||||
}
|
||||
|
||||
function createDiscordStatusReactionController(params: {
|
||||
enabled: boolean;
|
||||
channelId: string;
|
||||
messageId: string;
|
||||
initialEmoji: string;
|
||||
rest: unknown;
|
||||
}) {
|
||||
let activeEmoji: string | null = null;
|
||||
let chain: Promise<void> = Promise.resolve();
|
||||
let pendingEmoji: string | null = null;
|
||||
let pendingTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
let finished = false;
|
||||
let softStallTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
let hardStallTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
|
||||
const enqueue = (work: () => Promise<void>) => {
|
||||
chain = chain.then(work).catch((err) => {
|
||||
logAckFailure({
|
||||
log: logVerbose,
|
||||
channel: "discord",
|
||||
target: `${params.channelId}/${params.messageId}`,
|
||||
error: err,
|
||||
});
|
||||
});
|
||||
return chain;
|
||||
};
|
||||
|
||||
const clearStallTimers = () => {
|
||||
if (softStallTimer) {
|
||||
clearTimeout(softStallTimer);
|
||||
softStallTimer = null;
|
||||
}
|
||||
if (hardStallTimer) {
|
||||
clearTimeout(hardStallTimer);
|
||||
hardStallTimer = null;
|
||||
}
|
||||
};
|
||||
|
||||
const clearPendingDebounce = () => {
|
||||
if (pendingTimer) {
|
||||
clearTimeout(pendingTimer);
|
||||
pendingTimer = null;
|
||||
}
|
||||
pendingEmoji = null;
|
||||
};
|
||||
|
||||
const applyEmoji = (emoji: string) =>
|
||||
enqueue(async () => {
|
||||
if (!params.enabled || !emoji || activeEmoji === emoji) {
|
||||
return;
|
||||
}
|
||||
const previousEmoji = activeEmoji;
|
||||
await reactMessageDiscord(params.channelId, params.messageId, emoji, {
|
||||
rest: params.rest as never,
|
||||
});
|
||||
activeEmoji = emoji;
|
||||
if (previousEmoji && previousEmoji !== emoji) {
|
||||
await removeReactionDiscord(params.channelId, params.messageId, previousEmoji, {
|
||||
rest: params.rest as never,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
const requestEmoji = (emoji: string, options?: { immediate?: boolean }) => {
|
||||
if (!params.enabled || !emoji) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
if (options?.immediate) {
|
||||
clearPendingDebounce();
|
||||
return applyEmoji(emoji);
|
||||
}
|
||||
pendingEmoji = emoji;
|
||||
if (pendingTimer) {
|
||||
clearTimeout(pendingTimer);
|
||||
}
|
||||
pendingTimer = setTimeout(() => {
|
||||
pendingTimer = null;
|
||||
const emojiToApply = pendingEmoji;
|
||||
pendingEmoji = null;
|
||||
if (!emojiToApply || emojiToApply === activeEmoji) {
|
||||
return;
|
||||
}
|
||||
void applyEmoji(emojiToApply);
|
||||
}, DISCORD_STATUS_DEBOUNCE_MS);
|
||||
return Promise.resolve();
|
||||
};
|
||||
|
||||
const scheduleStallTimers = () => {
|
||||
if (!params.enabled || finished) {
|
||||
return;
|
||||
}
|
||||
clearStallTimers();
|
||||
softStallTimer = setTimeout(() => {
|
||||
if (finished) {
|
||||
return;
|
||||
}
|
||||
void requestEmoji(DISCORD_STATUS_STALL_SOFT_EMOJI, { immediate: true });
|
||||
}, DISCORD_STATUS_STALL_SOFT_MS);
|
||||
hardStallTimer = setTimeout(() => {
|
||||
if (finished) {
|
||||
return;
|
||||
}
|
||||
void requestEmoji(DISCORD_STATUS_STALL_HARD_EMOJI, { immediate: true });
|
||||
}, DISCORD_STATUS_STALL_HARD_MS);
|
||||
};
|
||||
|
||||
const setPhase = (emoji: string) => {
|
||||
if (!params.enabled || finished) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
scheduleStallTimers();
|
||||
return requestEmoji(emoji);
|
||||
};
|
||||
|
||||
const setTerminal = async (emoji: string) => {
|
||||
if (!params.enabled) {
|
||||
return;
|
||||
}
|
||||
finished = true;
|
||||
clearStallTimers();
|
||||
await requestEmoji(emoji, { immediate: true });
|
||||
};
|
||||
|
||||
const clear = async () => {
|
||||
if (!params.enabled) {
|
||||
return;
|
||||
}
|
||||
finished = true;
|
||||
clearStallTimers();
|
||||
clearPendingDebounce();
|
||||
await enqueue(async () => {
|
||||
const cleanupCandidates = new Set<string>([
|
||||
params.initialEmoji,
|
||||
activeEmoji ?? "",
|
||||
DISCORD_STATUS_THINKING_EMOJI,
|
||||
DISCORD_STATUS_TOOL_EMOJI,
|
||||
DISCORD_STATUS_CODING_EMOJI,
|
||||
DISCORD_STATUS_WEB_EMOJI,
|
||||
DISCORD_STATUS_DONE_EMOJI,
|
||||
DISCORD_STATUS_ERROR_EMOJI,
|
||||
DISCORD_STATUS_STALL_SOFT_EMOJI,
|
||||
DISCORD_STATUS_STALL_HARD_EMOJI,
|
||||
]);
|
||||
activeEmoji = null;
|
||||
for (const emoji of cleanupCandidates) {
|
||||
if (!emoji) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
await removeReactionDiscord(params.channelId, params.messageId, emoji, {
|
||||
rest: params.rest as never,
|
||||
});
|
||||
} catch (err) {
|
||||
logAckFailure({
|
||||
log: logVerbose,
|
||||
channel: "discord",
|
||||
target: `${params.channelId}/${params.messageId}`,
|
||||
error: err,
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
const restoreInitial = async () => {
|
||||
if (!params.enabled) {
|
||||
return;
|
||||
}
|
||||
finished = true;
|
||||
clearStallTimers();
|
||||
clearPendingDebounce();
|
||||
await requestEmoji(params.initialEmoji, { immediate: true });
|
||||
};
|
||||
|
||||
return {
|
||||
setQueued: () => {
|
||||
scheduleStallTimers();
|
||||
return requestEmoji(params.initialEmoji, { immediate: true });
|
||||
},
|
||||
setThinking: () => setPhase(DISCORD_STATUS_THINKING_EMOJI),
|
||||
setTool: (toolName?: string) => setPhase(resolveToolStatusEmoji(toolName)),
|
||||
setDone: () => setTerminal(DISCORD_STATUS_DONE_EMOJI),
|
||||
setError: () => setTerminal(DISCORD_STATUS_ERROR_EMOJI),
|
||||
clear,
|
||||
restoreInitial,
|
||||
};
|
||||
}
|
||||
|
||||
export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) {
|
||||
const {
|
||||
cfg,
|
||||
@@ -108,17 +339,17 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
|
||||
shouldBypassMention,
|
||||
}),
|
||||
);
|
||||
const ackReactionPromise = shouldAckReaction()
|
||||
? reactMessageDiscord(messageChannelId, message.id, ackReaction, {
|
||||
rest: client.rest,
|
||||
}).then(
|
||||
() => true,
|
||||
(err) => {
|
||||
logVerbose(`discord react failed for channel ${messageChannelId}: ${String(err)}`);
|
||||
return false;
|
||||
},
|
||||
)
|
||||
: null;
|
||||
const statusReactionsEnabled = shouldAckReaction();
|
||||
const statusReactions = createDiscordStatusReactionController({
|
||||
enabled: statusReactionsEnabled,
|
||||
channelId: messageChannelId,
|
||||
messageId: message.id,
|
||||
initialEmoji: ackReaction,
|
||||
rest: client.rest,
|
||||
});
|
||||
if (statusReactionsEnabled) {
|
||||
void statusReactions.setQueued();
|
||||
}
|
||||
|
||||
const fromLabel = isDirectMessage
|
||||
? buildDirectLabel(author)
|
||||
@@ -359,6 +590,18 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
|
||||
accountId,
|
||||
});
|
||||
|
||||
const typingCallbacks = createTypingCallbacks({
|
||||
start: () => sendTyping({ client, channelId: typingChannelId }),
|
||||
onStartError: (err) => {
|
||||
logTypingFailure({
|
||||
log: logVerbose,
|
||||
channel: "discord",
|
||||
target: typingChannelId,
|
||||
error: err,
|
||||
});
|
||||
},
|
||||
});
|
||||
|
||||
const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({
|
||||
...prefixOptions,
|
||||
humanDelay: resolveHumanDelayConfig(cfg, route.agentId),
|
||||
@@ -382,35 +625,58 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
|
||||
onError: (err, info) => {
|
||||
runtime.error?.(danger(`discord ${info.kind} reply failed: ${String(err)}`));
|
||||
},
|
||||
onReplyStart: createTypingCallbacks({
|
||||
start: () => sendTyping({ client, channelId: typingChannelId }),
|
||||
onStartError: (err) => {
|
||||
logTypingFailure({
|
||||
log: logVerbose,
|
||||
channel: "discord",
|
||||
target: typingChannelId,
|
||||
error: err,
|
||||
});
|
||||
},
|
||||
}).onReplyStart,
|
||||
});
|
||||
|
||||
const { queuedFinal, counts } = await dispatchInboundMessage({
|
||||
ctx: ctxPayload,
|
||||
cfg,
|
||||
dispatcher,
|
||||
replyOptions: {
|
||||
...replyOptions,
|
||||
skillFilter: channelConfig?.skills,
|
||||
disableBlockStreaming:
|
||||
typeof discordConfig?.blockStreaming === "boolean"
|
||||
? !discordConfig.blockStreaming
|
||||
: undefined,
|
||||
onModelSelected,
|
||||
onReplyStart: async () => {
|
||||
await typingCallbacks.onReplyStart();
|
||||
await statusReactions.setThinking();
|
||||
},
|
||||
});
|
||||
markDispatchIdle();
|
||||
if (!queuedFinal) {
|
||||
|
||||
let dispatchResult: Awaited<ReturnType<typeof dispatchInboundMessage>> | null = null;
|
||||
let dispatchError = false;
|
||||
try {
|
||||
dispatchResult = await dispatchInboundMessage({
|
||||
ctx: ctxPayload,
|
||||
cfg,
|
||||
dispatcher,
|
||||
replyOptions: {
|
||||
...replyOptions,
|
||||
skillFilter: channelConfig?.skills,
|
||||
disableBlockStreaming:
|
||||
typeof discordConfig?.blockStreaming === "boolean"
|
||||
? !discordConfig.blockStreaming
|
||||
: undefined,
|
||||
onModelSelected,
|
||||
onReasoningStream: async () => {
|
||||
await statusReactions.setThinking();
|
||||
},
|
||||
onToolStart: async (payload) => {
|
||||
await statusReactions.setTool(payload.name);
|
||||
},
|
||||
},
|
||||
});
|
||||
} catch (err) {
|
||||
dispatchError = true;
|
||||
throw err;
|
||||
} finally {
|
||||
markDispatchIdle();
|
||||
if (statusReactionsEnabled) {
|
||||
if (dispatchError) {
|
||||
await statusReactions.setError();
|
||||
} else {
|
||||
await statusReactions.setDone();
|
||||
}
|
||||
if (removeAckAfterReply) {
|
||||
void (async () => {
|
||||
await sleep(dispatchError ? DISCORD_STATUS_ERROR_HOLD_MS : DISCORD_STATUS_DONE_HOLD_MS);
|
||||
await statusReactions.clear();
|
||||
})();
|
||||
} else {
|
||||
void statusReactions.restoreInitial();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!dispatchResult?.queuedFinal) {
|
||||
if (isGuildMessage) {
|
||||
clearHistoryEntriesIfEnabled({
|
||||
historyMap: guildHistories,
|
||||
@@ -421,29 +687,11 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
|
||||
return;
|
||||
}
|
||||
if (shouldLogVerbose()) {
|
||||
const finalCount = counts.final;
|
||||
const finalCount = dispatchResult.counts.final;
|
||||
logVerbose(
|
||||
`discord: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${replyTarget}`,
|
||||
);
|
||||
}
|
||||
removeAckReactionAfterReply({
|
||||
removeAfterReply: removeAckAfterReply,
|
||||
ackReactionPromise,
|
||||
ackReactionValue: ackReaction,
|
||||
remove: async () => {
|
||||
await removeReactionDiscord(messageChannelId, message.id, ackReaction, {
|
||||
rest: client.rest,
|
||||
});
|
||||
},
|
||||
onError: (err) => {
|
||||
logAckFailure({
|
||||
log: logVerbose,
|
||||
channel: "discord",
|
||||
target: `${messageChannelId}/${message.id}`,
|
||||
error: err,
|
||||
});
|
||||
},
|
||||
});
|
||||
if (isGuildMessage) {
|
||||
clearHistoryEntriesIfEnabled({
|
||||
historyMap: guildHistories,
|
||||
|
||||
Reference in New Issue
Block a user