mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-07 16:31:23 +00:00
* fix(hooks): include guildId and channelName in message_received metadata The message_received hook (both plugin and internal) already exposes sender identity fields (senderId, senderName, senderUsername, senderE164) but omits the guild/channel context. Plugins that track per-channel activity receive NULL values for channel identification. Add guildId (ctx.GroupSpace) and channelName (ctx.GroupChannel) to the metadata block in both the plugin hook and internal hook dispatch paths. These properties are already populated by channel providers (e.g. Discord sets GroupSpace to the guild ID and GroupChannel to #channel-name) and used elsewhere in the codebase (channels/conversation-label.ts). * test: cover guild/channel hook metadata propagation (#26115) (thanks @davidrudduck) --------- Co-authored-by: Peter Steinberger <steipete@gmail.com>
516 lines
18 KiB
TypeScript
516 lines
18 KiB
TypeScript
import { resolveSessionAgentId } from "../../agents/agent-scope.js";
|
|
import type { OpenClawConfig } from "../../config/config.js";
|
|
import { loadSessionStore, resolveStorePath } from "../../config/sessions.js";
|
|
import { logVerbose } from "../../globals.js";
|
|
import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js";
|
|
import { isDiagnosticsEnabled } from "../../infra/diagnostic-events.js";
|
|
import {
|
|
logMessageProcessed,
|
|
logMessageQueued,
|
|
logSessionStateChange,
|
|
} from "../../logging/diagnostic.js";
|
|
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
|
|
import { maybeApplyTtsToPayload, normalizeTtsAutoMode, resolveTtsConfig } from "../../tts/tts.js";
|
|
import { getReplyFromConfig } from "../reply.js";
|
|
import type { FinalizedMsgContext } from "../templating.js";
|
|
import type { GetReplyOptions, ReplyPayload } from "../types.js";
|
|
import { formatAbortReplyText, tryFastAbortFromMessage } from "./abort.js";
|
|
import { shouldSkipDuplicateInbound } from "./inbound-dedupe.js";
|
|
import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js";
|
|
import { shouldSuppressReasoningPayload } from "./reply-payloads.js";
|
|
import { isRoutableChannel, routeReply } from "./route-reply.js";
|
|
|
|
const AUDIO_PLACEHOLDER_RE = /^<media:audio>(\s*\([^)]*\))?$/i;
|
|
const AUDIO_HEADER_RE = /^\[Audio\b/i;
|
|
|
|
const normalizeMediaType = (value: string): string => value.split(";")[0]?.trim().toLowerCase();
|
|
|
|
const isInboundAudioContext = (ctx: FinalizedMsgContext): boolean => {
|
|
const rawTypes = [
|
|
typeof ctx.MediaType === "string" ? ctx.MediaType : undefined,
|
|
...(Array.isArray(ctx.MediaTypes) ? ctx.MediaTypes : []),
|
|
].filter(Boolean) as string[];
|
|
const types = rawTypes.map((type) => normalizeMediaType(type));
|
|
if (types.some((type) => type === "audio" || type.startsWith("audio/"))) {
|
|
return true;
|
|
}
|
|
|
|
const body =
|
|
typeof ctx.BodyForCommands === "string"
|
|
? ctx.BodyForCommands
|
|
: typeof ctx.CommandBody === "string"
|
|
? ctx.CommandBody
|
|
: typeof ctx.RawBody === "string"
|
|
? ctx.RawBody
|
|
: typeof ctx.Body === "string"
|
|
? ctx.Body
|
|
: "";
|
|
const trimmed = body.trim();
|
|
if (!trimmed) {
|
|
return false;
|
|
}
|
|
if (AUDIO_PLACEHOLDER_RE.test(trimmed)) {
|
|
return true;
|
|
}
|
|
return AUDIO_HEADER_RE.test(trimmed);
|
|
};
|
|
|
|
const resolveSessionTtsAuto = (
|
|
ctx: FinalizedMsgContext,
|
|
cfg: OpenClawConfig,
|
|
): string | undefined => {
|
|
const targetSessionKey =
|
|
ctx.CommandSource === "native" ? ctx.CommandTargetSessionKey?.trim() : undefined;
|
|
const sessionKey = (targetSessionKey ?? ctx.SessionKey)?.trim();
|
|
if (!sessionKey) {
|
|
return undefined;
|
|
}
|
|
const agentId = resolveSessionAgentId({ sessionKey, config: cfg });
|
|
const storePath = resolveStorePath(cfg.session?.store, { agentId });
|
|
try {
|
|
const store = loadSessionStore(storePath);
|
|
const entry = store[sessionKey.toLowerCase()] ?? store[sessionKey];
|
|
return normalizeTtsAutoMode(entry?.ttsAuto);
|
|
} catch {
|
|
return undefined;
|
|
}
|
|
};
|
|
|
|
export type DispatchFromConfigResult = {
|
|
queuedFinal: boolean;
|
|
counts: Record<ReplyDispatchKind, number>;
|
|
};
|
|
|
|
export async function dispatchReplyFromConfig(params: {
|
|
ctx: FinalizedMsgContext;
|
|
cfg: OpenClawConfig;
|
|
dispatcher: ReplyDispatcher;
|
|
replyOptions?: Omit<GetReplyOptions, "onToolResult" | "onBlockReply">;
|
|
replyResolver?: typeof getReplyFromConfig;
|
|
}): Promise<DispatchFromConfigResult> {
|
|
const { ctx, cfg, dispatcher } = params;
|
|
const diagnosticsEnabled = isDiagnosticsEnabled(cfg);
|
|
const channel = String(ctx.Surface ?? ctx.Provider ?? "unknown").toLowerCase();
|
|
const chatId = ctx.To ?? ctx.From;
|
|
const messageId = ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast;
|
|
const sessionKey = ctx.SessionKey;
|
|
const startTime = diagnosticsEnabled ? Date.now() : 0;
|
|
const canTrackSession = diagnosticsEnabled && Boolean(sessionKey);
|
|
|
|
const recordProcessed = (
|
|
outcome: "completed" | "skipped" | "error",
|
|
opts?: {
|
|
reason?: string;
|
|
error?: string;
|
|
},
|
|
) => {
|
|
if (!diagnosticsEnabled) {
|
|
return;
|
|
}
|
|
logMessageProcessed({
|
|
channel,
|
|
chatId,
|
|
messageId,
|
|
sessionKey,
|
|
durationMs: Date.now() - startTime,
|
|
outcome,
|
|
reason: opts?.reason,
|
|
error: opts?.error,
|
|
});
|
|
};
|
|
|
|
const markProcessing = () => {
|
|
if (!canTrackSession || !sessionKey) {
|
|
return;
|
|
}
|
|
logMessageQueued({ sessionKey, channel, source: "dispatch" });
|
|
logSessionStateChange({
|
|
sessionKey,
|
|
state: "processing",
|
|
reason: "message_start",
|
|
});
|
|
};
|
|
|
|
const markIdle = (reason: string) => {
|
|
if (!canTrackSession || !sessionKey) {
|
|
return;
|
|
}
|
|
logSessionStateChange({
|
|
sessionKey,
|
|
state: "idle",
|
|
reason,
|
|
});
|
|
};
|
|
|
|
if (shouldSkipDuplicateInbound(ctx)) {
|
|
recordProcessed("skipped", { reason: "duplicate" });
|
|
return { queuedFinal: false, counts: dispatcher.getQueuedCounts() };
|
|
}
|
|
|
|
const inboundAudio = isInboundAudioContext(ctx);
|
|
const sessionTtsAuto = resolveSessionTtsAuto(ctx, cfg);
|
|
const hookRunner = getGlobalHookRunner();
|
|
|
|
// Extract message context for hooks (plugin and internal)
|
|
const timestamp =
|
|
typeof ctx.Timestamp === "number" && Number.isFinite(ctx.Timestamp) ? ctx.Timestamp : undefined;
|
|
const messageIdForHook =
|
|
ctx.MessageSidFull ?? ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast;
|
|
const content =
|
|
typeof ctx.BodyForCommands === "string"
|
|
? ctx.BodyForCommands
|
|
: typeof ctx.RawBody === "string"
|
|
? ctx.RawBody
|
|
: typeof ctx.Body === "string"
|
|
? ctx.Body
|
|
: "";
|
|
const channelId = (ctx.OriginatingChannel ?? ctx.Surface ?? ctx.Provider ?? "").toLowerCase();
|
|
const conversationId = ctx.OriginatingTo ?? ctx.To ?? ctx.From ?? undefined;
|
|
|
|
// Trigger plugin hooks (fire-and-forget)
|
|
if (hookRunner?.hasHooks("message_received")) {
|
|
void hookRunner
|
|
.runMessageReceived(
|
|
{
|
|
from: ctx.From ?? "",
|
|
content,
|
|
timestamp,
|
|
metadata: {
|
|
to: ctx.To,
|
|
provider: ctx.Provider,
|
|
surface: ctx.Surface,
|
|
threadId: ctx.MessageThreadId,
|
|
originatingChannel: ctx.OriginatingChannel,
|
|
originatingTo: ctx.OriginatingTo,
|
|
messageId: messageIdForHook,
|
|
senderId: ctx.SenderId,
|
|
senderName: ctx.SenderName,
|
|
senderUsername: ctx.SenderUsername,
|
|
senderE164: ctx.SenderE164,
|
|
guildId: ctx.GroupSpace,
|
|
channelName: ctx.GroupChannel,
|
|
},
|
|
},
|
|
{
|
|
channelId,
|
|
accountId: ctx.AccountId,
|
|
conversationId,
|
|
},
|
|
)
|
|
.catch((err) => {
|
|
logVerbose(`dispatch-from-config: message_received plugin hook failed: ${String(err)}`);
|
|
});
|
|
}
|
|
|
|
// Bridge to internal hooks (HOOK.md discovery system) - refs #8807
|
|
if (sessionKey) {
|
|
void triggerInternalHook(
|
|
createInternalHookEvent("message", "received", sessionKey, {
|
|
from: ctx.From ?? "",
|
|
content,
|
|
timestamp,
|
|
channelId,
|
|
accountId: ctx.AccountId,
|
|
conversationId,
|
|
messageId: messageIdForHook,
|
|
metadata: {
|
|
to: ctx.To,
|
|
provider: ctx.Provider,
|
|
surface: ctx.Surface,
|
|
threadId: ctx.MessageThreadId,
|
|
senderId: ctx.SenderId,
|
|
senderName: ctx.SenderName,
|
|
senderUsername: ctx.SenderUsername,
|
|
senderE164: ctx.SenderE164,
|
|
guildId: ctx.GroupSpace,
|
|
channelName: ctx.GroupChannel,
|
|
},
|
|
}),
|
|
).catch((err) => {
|
|
logVerbose(`dispatch-from-config: message_received internal hook failed: ${String(err)}`);
|
|
});
|
|
}
|
|
|
|
// Check if we should route replies to originating channel instead of dispatcher.
|
|
// Only route when the originating channel is DIFFERENT from the current surface.
|
|
// This handles cross-provider routing (e.g., message from Telegram being processed
|
|
// by a shared session that's currently on Slack) while preserving normal dispatcher
|
|
// flow when the provider handles its own messages.
|
|
//
|
|
// Debug: `pnpm test src/auto-reply/reply/dispatch-from-config.test.ts`
|
|
const originatingChannel = ctx.OriginatingChannel;
|
|
const originatingTo = ctx.OriginatingTo;
|
|
const currentSurface = (ctx.Surface ?? ctx.Provider)?.toLowerCase();
|
|
const shouldRouteToOriginating =
|
|
isRoutableChannel(originatingChannel) && originatingTo && originatingChannel !== currentSurface;
|
|
const ttsChannel = shouldRouteToOriginating ? originatingChannel : currentSurface;
|
|
|
|
/**
|
|
* Helper to send a payload via route-reply (async).
|
|
* Only used when actually routing to a different provider.
|
|
* Note: Only called when shouldRouteToOriginating is true, so
|
|
* originatingChannel and originatingTo are guaranteed to be defined.
|
|
*/
|
|
const sendPayloadAsync = async (
|
|
payload: ReplyPayload,
|
|
abortSignal?: AbortSignal,
|
|
mirror?: boolean,
|
|
): Promise<void> => {
|
|
// TypeScript doesn't narrow these from the shouldRouteToOriginating check,
|
|
// but they're guaranteed non-null when this function is called.
|
|
if (!originatingChannel || !originatingTo) {
|
|
return;
|
|
}
|
|
if (abortSignal?.aborted) {
|
|
return;
|
|
}
|
|
const result = await routeReply({
|
|
payload,
|
|
channel: originatingChannel,
|
|
to: originatingTo,
|
|
sessionKey: ctx.SessionKey,
|
|
accountId: ctx.AccountId,
|
|
threadId: ctx.MessageThreadId,
|
|
cfg,
|
|
abortSignal,
|
|
mirror,
|
|
});
|
|
if (!result.ok) {
|
|
logVerbose(`dispatch-from-config: route-reply failed: ${result.error ?? "unknown error"}`);
|
|
}
|
|
};
|
|
|
|
markProcessing();
|
|
|
|
try {
|
|
const fastAbort = await tryFastAbortFromMessage({ ctx, cfg });
|
|
if (fastAbort.handled) {
|
|
const payload = {
|
|
text: formatAbortReplyText(fastAbort.stoppedSubagents),
|
|
} satisfies ReplyPayload;
|
|
let queuedFinal = false;
|
|
let routedFinalCount = 0;
|
|
if (shouldRouteToOriginating && originatingChannel && originatingTo) {
|
|
const result = await routeReply({
|
|
payload,
|
|
channel: originatingChannel,
|
|
to: originatingTo,
|
|
sessionKey: ctx.SessionKey,
|
|
accountId: ctx.AccountId,
|
|
threadId: ctx.MessageThreadId,
|
|
cfg,
|
|
});
|
|
queuedFinal = result.ok;
|
|
if (result.ok) {
|
|
routedFinalCount += 1;
|
|
}
|
|
if (!result.ok) {
|
|
logVerbose(
|
|
`dispatch-from-config: route-reply (abort) failed: ${result.error ?? "unknown error"}`,
|
|
);
|
|
}
|
|
} else {
|
|
queuedFinal = dispatcher.sendFinalReply(payload);
|
|
}
|
|
const counts = dispatcher.getQueuedCounts();
|
|
counts.final += routedFinalCount;
|
|
recordProcessed("completed", { reason: "fast_abort" });
|
|
markIdle("message_completed");
|
|
return { queuedFinal, counts };
|
|
}
|
|
|
|
// Track accumulated block text for TTS generation after streaming completes.
|
|
// When block streaming succeeds, there's no final reply, so we need to generate
|
|
// TTS audio separately from the accumulated block content.
|
|
let accumulatedBlockText = "";
|
|
let blockCount = 0;
|
|
|
|
const shouldSendToolSummaries = ctx.ChatType !== "group" && ctx.CommandSource !== "native";
|
|
|
|
const resolveToolDeliveryPayload = (payload: ReplyPayload): ReplyPayload | null => {
|
|
if (shouldSendToolSummaries) {
|
|
return payload;
|
|
}
|
|
// Group/native flows intentionally suppress tool summary text, but media-only
|
|
// tool results (for example TTS audio) must still be delivered.
|
|
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
|
|
if (!hasMedia) {
|
|
return null;
|
|
}
|
|
return { ...payload, text: undefined };
|
|
};
|
|
|
|
const replyResult = await (params.replyResolver ?? getReplyFromConfig)(
|
|
ctx,
|
|
{
|
|
...params.replyOptions,
|
|
onToolResult: (payload: ReplyPayload) => {
|
|
const run = async () => {
|
|
const ttsPayload = await maybeApplyTtsToPayload({
|
|
payload,
|
|
cfg,
|
|
channel: ttsChannel,
|
|
kind: "tool",
|
|
inboundAudio,
|
|
ttsAuto: sessionTtsAuto,
|
|
});
|
|
const deliveryPayload = resolveToolDeliveryPayload(ttsPayload);
|
|
if (!deliveryPayload) {
|
|
return;
|
|
}
|
|
if (shouldRouteToOriginating) {
|
|
await sendPayloadAsync(deliveryPayload, undefined, false);
|
|
} else {
|
|
dispatcher.sendToolResult(deliveryPayload);
|
|
}
|
|
};
|
|
return run();
|
|
},
|
|
onBlockReply: (payload: ReplyPayload, context) => {
|
|
const run = async () => {
|
|
// Suppress reasoning payloads — channels using this generic dispatch
|
|
// path (WhatsApp, web, etc.) do not have a dedicated reasoning lane.
|
|
// Telegram has its own dispatch path that handles reasoning splitting.
|
|
if (shouldSuppressReasoningPayload(payload)) {
|
|
return;
|
|
}
|
|
// Accumulate block text for TTS generation after streaming
|
|
if (payload.text) {
|
|
if (accumulatedBlockText.length > 0) {
|
|
accumulatedBlockText += "\n";
|
|
}
|
|
accumulatedBlockText += payload.text;
|
|
blockCount++;
|
|
}
|
|
const ttsPayload = await maybeApplyTtsToPayload({
|
|
payload,
|
|
cfg,
|
|
channel: ttsChannel,
|
|
kind: "block",
|
|
inboundAudio,
|
|
ttsAuto: sessionTtsAuto,
|
|
});
|
|
if (shouldRouteToOriginating) {
|
|
await sendPayloadAsync(ttsPayload, context?.abortSignal, false);
|
|
} else {
|
|
dispatcher.sendBlockReply(ttsPayload);
|
|
}
|
|
};
|
|
return run();
|
|
},
|
|
},
|
|
cfg,
|
|
);
|
|
|
|
const replies = replyResult ? (Array.isArray(replyResult) ? replyResult : [replyResult]) : [];
|
|
|
|
let queuedFinal = false;
|
|
let routedFinalCount = 0;
|
|
for (const reply of replies) {
|
|
// Suppress reasoning payloads from channel delivery — channels using this
|
|
// generic dispatch path do not have a dedicated reasoning lane.
|
|
if (shouldSuppressReasoningPayload(reply)) {
|
|
continue;
|
|
}
|
|
const ttsReply = await maybeApplyTtsToPayload({
|
|
payload: reply,
|
|
cfg,
|
|
channel: ttsChannel,
|
|
kind: "final",
|
|
inboundAudio,
|
|
ttsAuto: sessionTtsAuto,
|
|
});
|
|
if (shouldRouteToOriginating && originatingChannel && originatingTo) {
|
|
// Route final reply to originating channel.
|
|
const result = await routeReply({
|
|
payload: ttsReply,
|
|
channel: originatingChannel,
|
|
to: originatingTo,
|
|
sessionKey: ctx.SessionKey,
|
|
accountId: ctx.AccountId,
|
|
threadId: ctx.MessageThreadId,
|
|
cfg,
|
|
});
|
|
if (!result.ok) {
|
|
logVerbose(
|
|
`dispatch-from-config: route-reply (final) failed: ${result.error ?? "unknown error"}`,
|
|
);
|
|
}
|
|
queuedFinal = result.ok || queuedFinal;
|
|
if (result.ok) {
|
|
routedFinalCount += 1;
|
|
}
|
|
} else {
|
|
queuedFinal = dispatcher.sendFinalReply(ttsReply) || queuedFinal;
|
|
}
|
|
}
|
|
|
|
const ttsMode = resolveTtsConfig(cfg).mode ?? "final";
|
|
// Generate TTS-only reply after block streaming completes (when there's no final reply).
|
|
// This handles the case where block streaming succeeds and drops final payloads,
|
|
// but we still want TTS audio to be generated from the accumulated block content.
|
|
if (
|
|
ttsMode === "final" &&
|
|
replies.length === 0 &&
|
|
blockCount > 0 &&
|
|
accumulatedBlockText.trim()
|
|
) {
|
|
try {
|
|
const ttsSyntheticReply = await maybeApplyTtsToPayload({
|
|
payload: { text: accumulatedBlockText },
|
|
cfg,
|
|
channel: ttsChannel,
|
|
kind: "final",
|
|
inboundAudio,
|
|
ttsAuto: sessionTtsAuto,
|
|
});
|
|
// Only send if TTS was actually applied (mediaUrl exists)
|
|
if (ttsSyntheticReply.mediaUrl) {
|
|
// Send TTS-only payload (no text, just audio) so it doesn't duplicate the block content
|
|
const ttsOnlyPayload: ReplyPayload = {
|
|
mediaUrl: ttsSyntheticReply.mediaUrl,
|
|
audioAsVoice: ttsSyntheticReply.audioAsVoice,
|
|
};
|
|
if (shouldRouteToOriginating && originatingChannel && originatingTo) {
|
|
const result = await routeReply({
|
|
payload: ttsOnlyPayload,
|
|
channel: originatingChannel,
|
|
to: originatingTo,
|
|
sessionKey: ctx.SessionKey,
|
|
accountId: ctx.AccountId,
|
|
threadId: ctx.MessageThreadId,
|
|
cfg,
|
|
});
|
|
queuedFinal = result.ok || queuedFinal;
|
|
if (result.ok) {
|
|
routedFinalCount += 1;
|
|
}
|
|
if (!result.ok) {
|
|
logVerbose(
|
|
`dispatch-from-config: route-reply (tts-only) failed: ${result.error ?? "unknown error"}`,
|
|
);
|
|
}
|
|
} else {
|
|
const didQueue = dispatcher.sendFinalReply(ttsOnlyPayload);
|
|
queuedFinal = didQueue || queuedFinal;
|
|
}
|
|
}
|
|
} catch (err) {
|
|
logVerbose(
|
|
`dispatch-from-config: accumulated block TTS failed: ${err instanceof Error ? err.message : String(err)}`,
|
|
);
|
|
}
|
|
}
|
|
|
|
const counts = dispatcher.getQueuedCounts();
|
|
counts.final += routedFinalCount;
|
|
recordProcessed("completed");
|
|
markIdle("message_completed");
|
|
return { queuedFinal, counts };
|
|
} catch (err) {
|
|
recordProcessed("error", { error: String(err) });
|
|
markIdle("message_error");
|
|
throw err;
|
|
}
|
|
}
|