From caae34cbafab3823b505f6dc15f9778820f3aed4 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 2 Mar 2026 22:51:22 +0000 Subject: [PATCH] refactor: unify message hook mapping and async dispatch --- src/auto-reply/reply/dispatch-from-config.ts | 93 +--- src/auto-reply/reply/get-reply.ts | 78 +-- .../reply/message-preprocess-hooks.test.ts | 93 ++++ .../reply/message-preprocess-hooks.ts | 50 ++ src/hooks/fire-and-forget.test.ts | 18 + src/hooks/fire-and-forget.ts | 11 + src/hooks/message-hook-mappers.test.ts | 154 ++++++ src/hooks/message-hook-mappers.ts | 279 +++++++++++ src/hooks/message-hooks.test.ts | 467 +++++++----------- src/infra/outbound/deliver.ts | 72 +-- 10 files changed, 865 insertions(+), 450 deletions(-) create mode 100644 src/auto-reply/reply/message-preprocess-hooks.test.ts create mode 100644 src/auto-reply/reply/message-preprocess-hooks.ts create mode 100644 src/hooks/fire-and-forget.test.ts create mode 100644 src/hooks/fire-and-forget.ts create mode 100644 src/hooks/message-hook-mappers.test.ts create mode 100644 src/hooks/message-hook-mappers.ts diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index 2e3265b44e5..c727871ca4e 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -2,7 +2,14 @@ import { resolveSessionAgentId } from "../../agents/agent-scope.js"; import type { OpenClawConfig } from "../../config/config.js"; import { loadSessionStore, resolveStorePath, type SessionEntry } from "../../config/sessions.js"; import { logVerbose } from "../../globals.js"; +import { fireAndForgetHook } from "../../hooks/fire-and-forget.js"; import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js"; +import { + deriveInboundMessageHookContext, + toInternalMessageReceivedContext, + toPluginMessageContext, + toPluginMessageReceivedEvent, +} from "../../hooks/message-hook-mappers.js"; import { isDiagnosticsEnabled } from "../../infra/diagnostic-events.js"; import { logMessageProcessed, @@ -167,81 +174,31 @@ export async function dispatchReplyFromConfig(params: { typeof ctx.Timestamp === "number" && Number.isFinite(ctx.Timestamp) ? ctx.Timestamp : undefined; const messageIdForHook = ctx.MessageSidFull ?? ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast; - const content = - typeof ctx.BodyForCommands === "string" - ? ctx.BodyForCommands - : typeof ctx.RawBody === "string" - ? ctx.RawBody - : typeof ctx.Body === "string" - ? ctx.Body - : ""; - const channelId = (ctx.OriginatingChannel ?? ctx.Surface ?? ctx.Provider ?? "").toLowerCase(); - const conversationId = ctx.OriginatingTo ?? ctx.To ?? ctx.From ?? undefined; - const isGroup = Boolean(ctx.GroupSubject || ctx.GroupChannel); - const groupId = isGroup ? conversationId : undefined; + const hookContext = deriveInboundMessageHookContext(ctx, { messageId: messageIdForHook }); + const { isGroup, groupId } = hookContext; // Trigger plugin hooks (fire-and-forget) if (hookRunner?.hasHooks("message_received")) { - void hookRunner - .runMessageReceived( - { - from: ctx.From ?? "", - content, - timestamp, - metadata: { - to: ctx.To, - provider: ctx.Provider, - surface: ctx.Surface, - threadId: ctx.MessageThreadId, - originatingChannel: ctx.OriginatingChannel, - originatingTo: ctx.OriginatingTo, - messageId: messageIdForHook, - senderId: ctx.SenderId, - senderName: ctx.SenderName, - senderUsername: ctx.SenderUsername, - senderE164: ctx.SenderE164, - guildId: ctx.GroupSpace, - channelName: ctx.GroupChannel, - }, - }, - { - channelId, - accountId: ctx.AccountId, - conversationId, - }, - ) - .catch((err) => { - logVerbose(`dispatch-from-config: message_received plugin hook failed: ${String(err)}`); - }); + fireAndForgetHook( + hookRunner.runMessageReceived( + toPluginMessageReceivedEvent(hookContext), + toPluginMessageContext(hookContext), + ), + "dispatch-from-config: message_received plugin hook failed", + ); } // Bridge to internal hooks (HOOK.md discovery system) - refs #8807 if (sessionKey) { - void triggerInternalHook( - createInternalHookEvent("message", "received", sessionKey, { - from: ctx.From ?? "", - content, - timestamp, - channelId, - accountId: ctx.AccountId, - conversationId, - messageId: messageIdForHook, - metadata: { - to: ctx.To, - provider: ctx.Provider, - surface: ctx.Surface, - threadId: ctx.MessageThreadId, - senderId: ctx.SenderId, - senderName: ctx.SenderName, - senderUsername: ctx.SenderUsername, - senderE164: ctx.SenderE164, - guildId: ctx.GroupSpace, - channelName: ctx.GroupChannel, - }, - }), - ).catch((err) => { - logVerbose(`dispatch-from-config: message_received internal hook failed: ${String(err)}`); - }); + fireAndForgetHook( + triggerInternalHook( + createInternalHookEvent("message", "received", sessionKey, { + ...toInternalMessageReceivedContext(hookContext), + timestamp, + }), + ), + "dispatch-from-config: message_received internal hook failed", + ); } // Check if we should route replies to originating channel instead of dispatcher. diff --git a/src/auto-reply/reply/get-reply.ts b/src/auto-reply/reply/get-reply.ts index f9f2dc8a90e..911cddf46ef 100644 --- a/src/auto-reply/reply/get-reply.ts +++ b/src/auto-reply/reply/get-reply.ts @@ -9,8 +9,6 @@ import { resolveAgentTimeoutMs } from "../../agents/timeout.js"; import { DEFAULT_AGENT_WORKSPACE_DIR, ensureAgentWorkspace } from "../../agents/workspace.js"; import { resolveChannelModelOverride } from "../../channels/model-overrides.js"; import { type OpenClawConfig, loadConfig } from "../../config/config.js"; -import { logVerbose } from "../../globals.js"; -import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js"; import { applyLinkUnderstanding } from "../../link-understanding/apply.js"; import { applyMediaUnderstanding } from "../../media-understanding/apply.js"; import { defaultRuntime } from "../../runtime.js"; @@ -24,6 +22,7 @@ import { resolveReplyDirectives } from "./get-reply-directives.js"; import { handleInlineActions } from "./get-reply-inline-actions.js"; import { runPreparedReply } from "./get-reply-run.js"; import { finalizeInboundContext } from "./inbound-context.js"; +import { emitPreAgentMessageHooks } from "./message-preprocess-hooks.js"; import { applyResetModelOverride } from "./session-reset-model.js"; import { initSessionState } from "./session.js"; import { stageSandboxMedia } from "./stage-sandbox-media.js"; @@ -137,76 +136,11 @@ export async function getReplyFromConfig( cfg, }); } - - const channelId = ( - finalized.OriginatingChannel ?? - finalized.Surface ?? - finalized.Provider ?? - "" - ).toLowerCase(); - const hookSessionKey = finalized.SessionKey?.trim(); - const conversationId = finalized.OriginatingTo ?? finalized.To ?? finalized.From ?? undefined; - const isGroupConversation = Boolean(finalized.GroupSubject || finalized.GroupChannel); - const groupId = isGroupConversation ? conversationId : undefined; - - // Trigger message:transcribed hook after media understanding completes - // Only fire if transcription actually occurred (skip in fast test mode or non-audio) - if (!isFastTestEnv && hookSessionKey && finalized.Transcript) { - void triggerInternalHook( - createInternalHookEvent("message", "transcribed", hookSessionKey, { - from: finalized.From, - to: finalized.To, - body: finalized.Body, - bodyForAgent: finalized.BodyForAgent, - transcript: finalized.Transcript, - timestamp: finalized.Timestamp, - channelId, - conversationId, - messageId: finalized.MessageSid, - senderId: finalized.SenderId, - senderName: finalized.SenderName, - senderUsername: finalized.SenderUsername, - provider: finalized.Provider, - surface: finalized.Surface, - mediaPath: finalized.MediaPath, - mediaType: finalized.MediaType, - cfg, - }), - ).catch((err) => { - logVerbose(`get-reply: message:transcribed internal hook failed: ${String(err)}`); - }); - } - - // Trigger message:preprocessed hook after all media + link understanding. - // Fires for every message, giving hooks access to the fully enriched body - // (transcripts, image descriptions, link summaries) before the agent sees it. - if (!isFastTestEnv && hookSessionKey) { - void triggerInternalHook( - createInternalHookEvent("message", "preprocessed", hookSessionKey, { - from: finalized.From, - to: finalized.To, - body: finalized.Body, - bodyForAgent: finalized.BodyForAgent, - transcript: finalized.Transcript, - timestamp: finalized.Timestamp, - channelId, - conversationId, - messageId: finalized.MessageSid, - senderId: finalized.SenderId, - senderName: finalized.SenderName, - senderUsername: finalized.SenderUsername, - provider: finalized.Provider, - surface: finalized.Surface, - mediaPath: finalized.MediaPath, - mediaType: finalized.MediaType, - isGroup: isGroupConversation, - groupId, - cfg, - }), - ).catch((err) => { - logVerbose(`get-reply: message:preprocessed internal hook failed: ${String(err)}`); - }); - } + emitPreAgentMessageHooks({ + ctx: finalized, + cfg, + isFastTestEnv, + }); const commandAuthorized = finalized.CommandAuthorized; resolveCommandAuthorization({ diff --git a/src/auto-reply/reply/message-preprocess-hooks.test.ts b/src/auto-reply/reply/message-preprocess-hooks.test.ts new file mode 100644 index 00000000000..be220723fb4 --- /dev/null +++ b/src/auto-reply/reply/message-preprocess-hooks.test.ts @@ -0,0 +1,93 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { OpenClawConfig } from "../../config/config.js"; +import { clearInternalHooks, registerInternalHook } from "../../hooks/internal-hooks.js"; +import type { FinalizedMsgContext } from "../templating.js"; +import { emitPreAgentMessageHooks } from "./message-preprocess-hooks.js"; + +function makeCtx(overrides: Partial = {}): FinalizedMsgContext { + return { + SessionKey: "agent:main:telegram:chat-1", + From: "telegram:user:1", + To: "telegram:chat-1", + Body: "", + BodyForAgent: "[Audio] Transcript: hello", + BodyForCommands: "", + Transcript: "hello", + Provider: "telegram", + Surface: "telegram", + OriginatingChannel: "telegram", + OriginatingTo: "telegram:chat-1", + Timestamp: 1710000000, + MessageSid: "msg-1", + GroupChannel: "ops", + ...overrides, + } as FinalizedMsgContext; +} + +describe("emitPreAgentMessageHooks", () => { + beforeEach(() => { + clearInternalHooks(); + }); + + it("emits transcribed and preprocessed events when transcript exists", async () => { + const actions: string[] = []; + registerInternalHook("message", (event) => { + actions.push(event.action); + }); + + emitPreAgentMessageHooks({ + ctx: makeCtx(), + cfg: {} as OpenClawConfig, + isFastTestEnv: false, + }); + await Promise.resolve(); + await Promise.resolve(); + + expect(actions).toEqual(["transcribed", "preprocessed"]); + }); + + it("emits only preprocessed when transcript is missing", async () => { + const actions: string[] = []; + registerInternalHook("message", (event) => { + actions.push(event.action); + }); + + emitPreAgentMessageHooks({ + ctx: makeCtx({ Transcript: undefined }), + cfg: {} as OpenClawConfig, + isFastTestEnv: false, + }); + await Promise.resolve(); + await Promise.resolve(); + + expect(actions).toEqual(["preprocessed"]); + }); + + it("skips hook emission in fast-test mode", async () => { + const handler = vi.fn(); + registerInternalHook("message", handler); + + emitPreAgentMessageHooks({ + ctx: makeCtx(), + cfg: {} as OpenClawConfig, + isFastTestEnv: true, + }); + await Promise.resolve(); + + expect(handler).not.toHaveBeenCalled(); + }); + + it("skips hook emission without session key", async () => { + const handler = vi.fn(); + registerInternalHook("message", handler); + + emitPreAgentMessageHooks({ + ctx: makeCtx({ SessionKey: " " }), + cfg: {} as OpenClawConfig, + isFastTestEnv: false, + }); + await Promise.resolve(); + + expect(handler).not.toHaveBeenCalled(); + }); +}); diff --git a/src/auto-reply/reply/message-preprocess-hooks.ts b/src/auto-reply/reply/message-preprocess-hooks.ts new file mode 100644 index 00000000000..f4c19675941 --- /dev/null +++ b/src/auto-reply/reply/message-preprocess-hooks.ts @@ -0,0 +1,50 @@ +import type { OpenClawConfig } from "../../config/config.js"; +import { fireAndForgetHook } from "../../hooks/fire-and-forget.js"; +import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js"; +import { + deriveInboundMessageHookContext, + toInternalMessagePreprocessedContext, + toInternalMessageTranscribedContext, +} from "../../hooks/message-hook-mappers.js"; +import type { FinalizedMsgContext } from "../templating.js"; + +export function emitPreAgentMessageHooks(params: { + ctx: FinalizedMsgContext; + cfg: OpenClawConfig; + isFastTestEnv: boolean; +}): void { + if (params.isFastTestEnv) { + return; + } + const sessionKey = params.ctx.SessionKey?.trim(); + if (!sessionKey) { + return; + } + + const canonical = deriveInboundMessageHookContext(params.ctx); + if (canonical.transcript) { + fireAndForgetHook( + triggerInternalHook( + createInternalHookEvent( + "message", + "transcribed", + sessionKey, + toInternalMessageTranscribedContext(canonical, params.cfg), + ), + ), + "get-reply: message:transcribed internal hook failed", + ); + } + + fireAndForgetHook( + triggerInternalHook( + createInternalHookEvent( + "message", + "preprocessed", + sessionKey, + toInternalMessagePreprocessedContext(canonical, params.cfg), + ), + ), + "get-reply: message:preprocessed internal hook failed", + ); +} diff --git a/src/hooks/fire-and-forget.test.ts b/src/hooks/fire-and-forget.test.ts new file mode 100644 index 00000000000..74710495fc8 --- /dev/null +++ b/src/hooks/fire-and-forget.test.ts @@ -0,0 +1,18 @@ +import { describe, expect, it, vi } from "vitest"; +import { fireAndForgetHook } from "./fire-and-forget.js"; + +describe("fireAndForgetHook", () => { + it("logs rejection errors", async () => { + const logger = vi.fn(); + fireAndForgetHook(Promise.reject(new Error("boom")), "hook failed", logger); + await Promise.resolve(); + expect(logger).toHaveBeenCalledWith("hook failed: Error: boom"); + }); + + it("does not log for resolved tasks", async () => { + const logger = vi.fn(); + fireAndForgetHook(Promise.resolve("ok"), "hook failed", logger); + await Promise.resolve(); + expect(logger).not.toHaveBeenCalled(); + }); +}); diff --git a/src/hooks/fire-and-forget.ts b/src/hooks/fire-and-forget.ts new file mode 100644 index 00000000000..a1f0136097b --- /dev/null +++ b/src/hooks/fire-and-forget.ts @@ -0,0 +1,11 @@ +import { logVerbose } from "../globals.js"; + +export function fireAndForgetHook( + task: Promise, + label: string, + logger: (message: string) => void = logVerbose, +): void { + void task.catch((err) => { + logger(`${label}: ${String(err)}`); + }); +} diff --git a/src/hooks/message-hook-mappers.test.ts b/src/hooks/message-hook-mappers.test.ts new file mode 100644 index 00000000000..c365f463ade --- /dev/null +++ b/src/hooks/message-hook-mappers.test.ts @@ -0,0 +1,154 @@ +import { describe, expect, it } from "vitest"; +import type { FinalizedMsgContext } from "../auto-reply/templating.js"; +import type { OpenClawConfig } from "../config/config.js"; +import { + buildCanonicalSentMessageHookContext, + deriveInboundMessageHookContext, + toInternalMessagePreprocessedContext, + toInternalMessageReceivedContext, + toInternalMessageSentContext, + toInternalMessageTranscribedContext, + toPluginMessageContext, + toPluginMessageReceivedEvent, + toPluginMessageSentEvent, +} from "./message-hook-mappers.js"; + +function makeInboundCtx(overrides: Partial = {}): FinalizedMsgContext { + return { + From: "telegram:user:123", + To: "telegram:chat:456", + Body: "body", + BodyForAgent: "body-for-agent", + BodyForCommands: "commands-body", + RawBody: "raw-body", + Transcript: "hello transcript", + Timestamp: 1710000000, + Provider: "telegram", + Surface: "telegram", + OriginatingChannel: "telegram", + OriginatingTo: "telegram:chat:456", + AccountId: "acc-1", + MessageSid: "msg-1", + SenderId: "sender-1", + SenderName: "User One", + SenderUsername: "userone", + SenderE164: "+15551234567", + MessageThreadId: 42, + MediaPath: "/tmp/audio.ogg", + MediaType: "audio/ogg", + GroupSubject: "ops", + GroupChannel: "ops-room", + GroupSpace: "guild-1", + ...overrides, + } as FinalizedMsgContext; +} + +describe("message hook mappers", () => { + it("derives canonical inbound context with body precedence and group metadata", () => { + const canonical = deriveInboundMessageHookContext(makeInboundCtx()); + + expect(canonical.content).toBe("commands-body"); + expect(canonical.channelId).toBe("telegram"); + expect(canonical.conversationId).toBe("telegram:chat:456"); + expect(canonical.messageId).toBe("msg-1"); + expect(canonical.isGroup).toBe(true); + expect(canonical.groupId).toBe("telegram:chat:456"); + expect(canonical.guildId).toBe("guild-1"); + }); + + it("supports explicit content/messageId overrides", () => { + const canonical = deriveInboundMessageHookContext(makeInboundCtx(), { + content: "override-content", + messageId: "override-msg", + }); + + expect(canonical.content).toBe("override-content"); + expect(canonical.messageId).toBe("override-msg"); + }); + + it("maps canonical inbound context to plugin/internal received payloads", () => { + const canonical = deriveInboundMessageHookContext(makeInboundCtx()); + + expect(toPluginMessageContext(canonical)).toEqual({ + channelId: "telegram", + accountId: "acc-1", + conversationId: "telegram:chat:456", + }); + expect(toPluginMessageReceivedEvent(canonical)).toEqual({ + from: "telegram:user:123", + content: "commands-body", + timestamp: 1710000000, + metadata: expect.objectContaining({ + messageId: "msg-1", + senderName: "User One", + threadId: 42, + }), + }); + expect(toInternalMessageReceivedContext(canonical)).toEqual({ + from: "telegram:user:123", + content: "commands-body", + timestamp: 1710000000, + channelId: "telegram", + accountId: "acc-1", + conversationId: "telegram:chat:456", + messageId: "msg-1", + metadata: expect.objectContaining({ + senderUsername: "userone", + senderE164: "+15551234567", + }), + }); + }); + + it("maps transcribed and preprocessed internal payloads", () => { + const cfg = {} as OpenClawConfig; + const canonical = deriveInboundMessageHookContext(makeInboundCtx({ Transcript: undefined })); + + const transcribed = toInternalMessageTranscribedContext(canonical, cfg); + expect(transcribed.transcript).toBe(""); + expect(transcribed.cfg).toBe(cfg); + + const preprocessed = toInternalMessagePreprocessedContext(canonical, cfg); + expect(preprocessed.transcript).toBeUndefined(); + expect(preprocessed.isGroup).toBe(true); + expect(preprocessed.groupId).toBe("telegram:chat:456"); + expect(preprocessed.cfg).toBe(cfg); + }); + + it("maps sent context consistently for plugin/internal hooks", () => { + const canonical = buildCanonicalSentMessageHookContext({ + to: "telegram:chat:456", + content: "reply", + success: false, + error: "network error", + channelId: "telegram", + accountId: "acc-1", + messageId: "out-1", + isGroup: true, + groupId: "telegram:chat:456", + }); + + expect(toPluginMessageContext(canonical)).toEqual({ + channelId: "telegram", + accountId: "acc-1", + conversationId: "telegram:chat:456", + }); + expect(toPluginMessageSentEvent(canonical)).toEqual({ + to: "telegram:chat:456", + content: "reply", + success: false, + error: "network error", + }); + expect(toInternalMessageSentContext(canonical)).toEqual({ + to: "telegram:chat:456", + content: "reply", + success: false, + error: "network error", + channelId: "telegram", + accountId: "acc-1", + conversationId: "telegram:chat:456", + messageId: "out-1", + isGroup: true, + groupId: "telegram:chat:456", + }); + }); +}); diff --git a/src/hooks/message-hook-mappers.ts b/src/hooks/message-hook-mappers.ts new file mode 100644 index 00000000000..be51245a545 --- /dev/null +++ b/src/hooks/message-hook-mappers.ts @@ -0,0 +1,279 @@ +import type { FinalizedMsgContext } from "../auto-reply/templating.js"; +import type { OpenClawConfig } from "../config/config.js"; +import type { + PluginHookMessageContext, + PluginHookMessageReceivedEvent, + PluginHookMessageSentEvent, +} from "../plugins/types.js"; +import type { + MessagePreprocessedHookContext, + MessageReceivedHookContext, + MessageSentHookContext, + MessageTranscribedHookContext, +} from "./internal-hooks.js"; + +export type CanonicalInboundMessageHookContext = { + from: string; + to?: string; + content: string; + body?: string; + bodyForAgent?: string; + transcript?: string; + timestamp?: number; + channelId: string; + accountId?: string; + conversationId?: string; + messageId?: string; + senderId?: string; + senderName?: string; + senderUsername?: string; + senderE164?: string; + provider?: string; + surface?: string; + threadId?: string | number; + mediaPath?: string; + mediaType?: string; + originatingChannel?: string; + originatingTo?: string; + guildId?: string; + channelName?: string; + isGroup: boolean; + groupId?: string; +}; + +export type CanonicalSentMessageHookContext = { + to: string; + content: string; + success: boolean; + error?: string; + channelId: string; + accountId?: string; + conversationId?: string; + messageId?: string; + isGroup?: boolean; + groupId?: string; +}; + +export function deriveInboundMessageHookContext( + ctx: FinalizedMsgContext, + overrides?: { + content?: string; + messageId?: string; + }, +): CanonicalInboundMessageHookContext { + const content = + overrides?.content ?? + (typeof ctx.BodyForCommands === "string" + ? ctx.BodyForCommands + : typeof ctx.RawBody === "string" + ? ctx.RawBody + : typeof ctx.Body === "string" + ? ctx.Body + : ""); + const channelId = (ctx.OriginatingChannel ?? ctx.Surface ?? ctx.Provider ?? "").toLowerCase(); + const conversationId = ctx.OriginatingTo ?? ctx.To ?? ctx.From ?? undefined; + const isGroup = Boolean(ctx.GroupSubject || ctx.GroupChannel); + return { + from: ctx.From ?? "", + to: ctx.To, + content, + body: ctx.Body, + bodyForAgent: ctx.BodyForAgent, + transcript: ctx.Transcript, + timestamp: + typeof ctx.Timestamp === "number" && Number.isFinite(ctx.Timestamp) + ? ctx.Timestamp + : undefined, + channelId, + accountId: ctx.AccountId, + conversationId, + messageId: + overrides?.messageId ?? + ctx.MessageSidFull ?? + ctx.MessageSid ?? + ctx.MessageSidFirst ?? + ctx.MessageSidLast, + senderId: ctx.SenderId, + senderName: ctx.SenderName, + senderUsername: ctx.SenderUsername, + senderE164: ctx.SenderE164, + provider: ctx.Provider, + surface: ctx.Surface, + threadId: ctx.MessageThreadId, + mediaPath: ctx.MediaPath, + mediaType: ctx.MediaType, + originatingChannel: ctx.OriginatingChannel, + originatingTo: ctx.OriginatingTo, + guildId: ctx.GroupSpace, + channelName: ctx.GroupChannel, + isGroup, + groupId: isGroup ? conversationId : undefined, + }; +} + +export function buildCanonicalSentMessageHookContext(params: { + to: string; + content: string; + success: boolean; + error?: string; + channelId: string; + accountId?: string; + conversationId?: string; + messageId?: string; + isGroup?: boolean; + groupId?: string; +}): CanonicalSentMessageHookContext { + return { + to: params.to, + content: params.content, + success: params.success, + error: params.error, + channelId: params.channelId, + accountId: params.accountId, + conversationId: params.conversationId ?? params.to, + messageId: params.messageId, + isGroup: params.isGroup, + groupId: params.groupId, + }; +} + +export function toPluginMessageContext( + canonical: CanonicalInboundMessageHookContext | CanonicalSentMessageHookContext, +): PluginHookMessageContext { + return { + channelId: canonical.channelId, + accountId: canonical.accountId, + conversationId: canonical.conversationId, + }; +} + +export function toPluginMessageReceivedEvent( + canonical: CanonicalInboundMessageHookContext, +): PluginHookMessageReceivedEvent { + return { + from: canonical.from, + content: canonical.content, + timestamp: canonical.timestamp, + metadata: { + to: canonical.to, + provider: canonical.provider, + surface: canonical.surface, + threadId: canonical.threadId, + originatingChannel: canonical.originatingChannel, + originatingTo: canonical.originatingTo, + messageId: canonical.messageId, + senderId: canonical.senderId, + senderName: canonical.senderName, + senderUsername: canonical.senderUsername, + senderE164: canonical.senderE164, + guildId: canonical.guildId, + channelName: canonical.channelName, + }, + }; +} + +export function toPluginMessageSentEvent( + canonical: CanonicalSentMessageHookContext, +): PluginHookMessageSentEvent { + return { + to: canonical.to, + content: canonical.content, + success: canonical.success, + ...(canonical.error ? { error: canonical.error } : {}), + }; +} + +export function toInternalMessageReceivedContext( + canonical: CanonicalInboundMessageHookContext, +): MessageReceivedHookContext { + return { + from: canonical.from, + content: canonical.content, + timestamp: canonical.timestamp, + channelId: canonical.channelId, + accountId: canonical.accountId, + conversationId: canonical.conversationId, + messageId: canonical.messageId, + metadata: { + to: canonical.to, + provider: canonical.provider, + surface: canonical.surface, + threadId: canonical.threadId, + senderId: canonical.senderId, + senderName: canonical.senderName, + senderUsername: canonical.senderUsername, + senderE164: canonical.senderE164, + guildId: canonical.guildId, + channelName: canonical.channelName, + }, + }; +} + +export function toInternalMessageTranscribedContext( + canonical: CanonicalInboundMessageHookContext, + cfg: OpenClawConfig, +): MessageTranscribedHookContext & { cfg: OpenClawConfig } { + return { + from: canonical.from, + to: canonical.to, + body: canonical.body, + bodyForAgent: canonical.bodyForAgent, + transcript: canonical.transcript ?? "", + timestamp: canonical.timestamp, + channelId: canonical.channelId, + conversationId: canonical.conversationId, + messageId: canonical.messageId, + senderId: canonical.senderId, + senderName: canonical.senderName, + senderUsername: canonical.senderUsername, + provider: canonical.provider, + surface: canonical.surface, + mediaPath: canonical.mediaPath, + mediaType: canonical.mediaType, + cfg, + }; +} + +export function toInternalMessagePreprocessedContext( + canonical: CanonicalInboundMessageHookContext, + cfg: OpenClawConfig, +): MessagePreprocessedHookContext & { cfg: OpenClawConfig } { + return { + from: canonical.from, + to: canonical.to, + body: canonical.body, + bodyForAgent: canonical.bodyForAgent, + transcript: canonical.transcript, + timestamp: canonical.timestamp, + channelId: canonical.channelId, + conversationId: canonical.conversationId, + messageId: canonical.messageId, + senderId: canonical.senderId, + senderName: canonical.senderName, + senderUsername: canonical.senderUsername, + provider: canonical.provider, + surface: canonical.surface, + mediaPath: canonical.mediaPath, + mediaType: canonical.mediaType, + isGroup: canonical.isGroup, + groupId: canonical.groupId, + cfg, + }; +} + +export function toInternalMessageSentContext( + canonical: CanonicalSentMessageHookContext, +): MessageSentHookContext { + return { + to: canonical.to, + content: canonical.content, + success: canonical.success, + ...(canonical.error ? { error: canonical.error } : {}), + channelId: canonical.channelId, + accountId: canonical.accountId, + conversationId: canonical.conversationId, + messageId: canonical.messageId, + ...(canonical.isGroup != null ? { isGroup: canonical.isGroup } : {}), + ...(canonical.groupId ? { groupId: canonical.groupId } : {}), + }; +} diff --git a/src/hooks/message-hooks.test.ts b/src/hooks/message-hooks.test.ts index 9232e45c52e..29a7d7da6a4 100644 --- a/src/hooks/message-hooks.test.ts +++ b/src/hooks/message-hooks.test.ts @@ -7,6 +7,105 @@ import { type InternalHookEvent, } from "./internal-hooks.js"; +type ActionCase = { + label: string; + key: string; + action: "received" | "transcribed" | "preprocessed" | "sent"; + context: Record; + assertContext: (context: Record) => void; +}; + +const actionCases: ActionCase[] = [ + { + label: "message:received", + key: "message:received", + action: "received", + context: { + from: "signal:+15551234567", + to: "bot:+15559876543", + content: "Test message", + channelId: "signal", + conversationId: "conv-abc", + messageId: "msg-xyz", + senderId: "sender-1", + senderName: "Test User", + senderUsername: "testuser", + senderE164: "+15551234567", + provider: "signal", + surface: "signal", + threadId: "thread-1", + originatingChannel: "signal", + originatingTo: "bot:+15559876543", + timestamp: 1707600000, + }, + assertContext: (context) => { + expect(context.content).toBe("Test message"); + expect(context.channelId).toBe("signal"); + expect(context.senderE164).toBe("+15551234567"); + expect(context.threadId).toBe("thread-1"); + }, + }, + { + label: "message:transcribed", + key: "message:transcribed", + action: "transcribed", + context: { + body: "🎤 [Audio]", + bodyForAgent: "[Audio] Transcript: Hello from voice", + transcript: "Hello from voice", + channelId: "telegram", + mediaType: "audio/ogg", + }, + assertContext: (context) => { + expect(context.body).toBe("🎤 [Audio]"); + expect(context.bodyForAgent).toContain("Transcript:"); + expect(context.transcript).toBe("Hello from voice"); + expect(context.mediaType).toBe("audio/ogg"); + }, + }, + { + label: "message:preprocessed", + key: "message:preprocessed", + action: "preprocessed", + context: { + body: "🎤 [Audio]", + bodyForAgent: "[Audio] Transcript: Check https://example.com\n[Link summary: Example site]", + transcript: "Check https://example.com", + channelId: "telegram", + mediaType: "audio/ogg", + isGroup: false, + }, + assertContext: (context) => { + expect(context.transcript).toBe("Check https://example.com"); + expect(String(context.bodyForAgent)).toContain("Link summary"); + expect(String(context.bodyForAgent)).toContain("Transcript:"); + }, + }, + { + label: "message:sent", + key: "message:sent", + action: "sent", + context: { + from: "bot:456", + to: "user:123", + content: "Reply text", + channelId: "discord", + conversationId: "channel:C123", + provider: "discord", + surface: "discord", + threadId: "thread-abc", + originatingChannel: "discord", + originatingTo: "channel:C123", + }, + assertContext: (context) => { + expect(context.content).toBe("Reply text"); + expect(context.channelId).toBe("discord"); + expect(context.conversationId).toBe("channel:C123"); + expect(context.threadId).toBe("thread-abc"); + }, + }, +]; + describe("message hooks", () => { beforeEach(() => { clearInternalHooks(); @@ -16,284 +115,102 @@ describe("message hooks", () => { clearInternalHooks(); }); - describe("message:received", () => { - it("should trigger handler registered for message:received", async () => { - const handler = vi.fn(); - registerInternalHook("message:received", handler); + describe("action handlers", () => { + for (const testCase of actionCases) { + it(`triggers handler for ${testCase.label}`, async () => { + const handler = vi.fn(); + registerInternalHook(testCase.key, handler); - const event = createInternalHookEvent("message", "received", "session-1", { - from: "user:123", - to: "bot:456", - content: "Hello world", - channelId: "telegram", - senderId: "123", - senderName: "Eric", - senderUsername: "eric_lytle", + await triggerInternalHook( + createInternalHookEvent("message", testCase.action, "session-1", testCase.context), + ); + + expect(handler).toHaveBeenCalledOnce(); + const event = handler.mock.calls[0][0] as InternalHookEvent; + expect(event.type).toBe("message"); + expect(event.action).toBe(testCase.action); + testCase.assertContext(event.context); }); - await triggerInternalHook(event); + } - expect(handler).toHaveBeenCalledOnce(); - expect(handler.mock.calls[0][0].type).toBe("message"); - expect(handler.mock.calls[0][0].action).toBe("received"); - expect(handler.mock.calls[0][0].context.content).toBe("Hello world"); - expect(handler.mock.calls[0][0].context.channelId).toBe("telegram"); - expect(handler.mock.calls[0][0].context.senderName).toBe("Eric"); - }); - - it("should include sender and message metadata in context", async () => { - const handler = vi.fn(); - registerInternalHook("message:received", handler); - - const event = createInternalHookEvent("message", "received", "session-1", { - from: "signal:+15551234567", - to: "bot:+15559876543", - content: "Test message", - channelId: "signal", - conversationId: "conv-abc", - messageId: "msg-xyz", - senderId: "sender-1", - senderName: "Test User", - senderUsername: "testuser", - senderE164: "+15551234567", - provider: "signal", - surface: "signal", - threadId: "thread-1", - originatingChannel: "signal", - originatingTo: "bot:+15559876543", - timestamp: 1707600000, - }); - await triggerInternalHook(event); - - const ctx = handler.mock.calls[0][0].context; - expect(ctx.messageId).toBe("msg-xyz"); - expect(ctx.senderId).toBe("sender-1"); - expect(ctx.senderE164).toBe("+15551234567"); - expect(ctx.threadId).toBe("thread-1"); - expect(ctx.timestamp).toBe(1707600000); - }); - }); - - describe("message:transcribed", () => { - it("should trigger handler registered for message:transcribed", async () => { - const handler = vi.fn(); - registerInternalHook("message:transcribed", handler); - - const event = createInternalHookEvent("message", "transcribed", "session-1", { - from: "user:123", - to: "bot:456", - transcript: "This is what the user said", - body: "🎤 Audio message", - channelId: "telegram", - mediaPath: "/tmp/audio.ogg", - mediaType: "audio/ogg", - }); - await triggerInternalHook(event); - - expect(handler).toHaveBeenCalledOnce(); - expect(handler.mock.calls[0][0].action).toBe("transcribed"); - expect(handler.mock.calls[0][0].context.transcript).toBe("This is what the user said"); - expect(handler.mock.calls[0][0].context.mediaType).toBe("audio/ogg"); - }); - - it("should include both raw body and transcript in context", async () => { - const handler = vi.fn(); - registerInternalHook("message:transcribed", handler); - - const event = createInternalHookEvent("message", "transcribed", "session-1", { - body: "🎤 [Audio]", - bodyForAgent: "[Audio] Transcript: Hello from voice", - transcript: "Hello from voice", - channelId: "telegram", - }); - await triggerInternalHook(event); - - const ctx = handler.mock.calls[0][0].context; - expect(ctx.body).toBe("🎤 [Audio]"); - expect(ctx.bodyForAgent).toBe("[Audio] Transcript: Hello from voice"); - expect(ctx.transcript).toBe("Hello from voice"); - }); - }); - - describe("message:preprocessed", () => { - it("should trigger handler registered for message:preprocessed", async () => { - const handler = vi.fn(); - registerInternalHook("message:preprocessed", handler); - - const event = createInternalHookEvent("message", "preprocessed", "session-1", { - from: "user:123", - to: "bot:456", - body: "Check out this link", - bodyForAgent: "Check out this link\n[Link summary: Article about testing]", - channelId: "telegram", - senderId: "123", - senderName: "Eric", - isGroup: false, - }); - await triggerInternalHook(event); - - expect(handler).toHaveBeenCalledOnce(); - expect(handler.mock.calls[0][0].action).toBe("preprocessed"); - expect(handler.mock.calls[0][0].context.bodyForAgent).toContain("Link summary"); - }); - - it("should include both transcript and link summary for enriched audio messages", async () => { - const handler = vi.fn(); - registerInternalHook("message:preprocessed", handler); - - const event = createInternalHookEvent("message", "preprocessed", "session-1", { - body: "🎤 [Audio]", - bodyForAgent: "[Audio] Transcript: Check https://example.com\n[Link summary: Example site]", - transcript: "Check https://example.com", - channelId: "telegram", - mediaType: "audio/ogg", - isGroup: false, - }); - await triggerInternalHook(event); - - const ctx = handler.mock.calls[0][0].context; - expect(ctx.transcript).toBe("Check https://example.com"); - expect(ctx.bodyForAgent).toContain("Link summary"); - expect(ctx.bodyForAgent).toContain("Transcript:"); - }); - - it("should fire for plain text messages without media", async () => { - const handler = vi.fn(); - registerInternalHook("message:preprocessed", handler); - - const event = createInternalHookEvent("message", "preprocessed", "session-1", { - body: "Just a text message", - bodyForAgent: "Just a text message", - channelId: "signal", - isGroup: false, - }); - await triggerInternalHook(event); - - expect(handler).toHaveBeenCalledOnce(); - const ctx = handler.mock.calls[0][0].context; - expect(ctx.transcript).toBeUndefined(); - expect(ctx.mediaType).toBeUndefined(); - expect(ctx.body).toBe("Just a text message"); - }); - }); - - describe("message:sent", () => { - it("should trigger handler registered for message:sent", async () => { - const handler = vi.fn(); - registerInternalHook("message:sent", handler); - - const event = createInternalHookEvent("message", "sent", "session-1", { - from: "bot:456", - to: "user:123", - content: "Here is my reply", - channelId: "telegram", - provider: "telegram", - }); - await triggerInternalHook(event); - - expect(handler).toHaveBeenCalledOnce(); - expect(handler.mock.calls[0][0].action).toBe("sent"); - expect(handler.mock.calls[0][0].context.content).toBe("Here is my reply"); - }); - - it("should include channel and routing context", async () => { - const handler = vi.fn(); - registerInternalHook("message:sent", handler); - - const event = createInternalHookEvent("message", "sent", "session-1", { - from: "bot:456", - to: "user:123", - content: "Reply text", - channelId: "discord", - conversationId: "channel:C123", - provider: "discord", - surface: "discord", - threadId: "thread-abc", - originatingChannel: "discord", - originatingTo: "channel:C123", - }); - await triggerInternalHook(event); - - const ctx = handler.mock.calls[0][0].context; - expect(ctx.channelId).toBe("discord"); - expect(ctx.conversationId).toBe("channel:C123"); - expect(ctx.threadId).toBe("thread-abc"); - }); - }); - - describe("general message handler", () => { - it("should receive all message event types (received, transcribed, preprocessed, sent)", async () => { - const events: InternalHookEvent[] = []; - registerInternalHook("message", (event) => { - events.push(event); - }); - - await triggerInternalHook( - createInternalHookEvent("message", "received", "s1", { content: "hi" }), - ); - await triggerInternalHook( - createInternalHookEvent("message", "transcribed", "s1", { transcript: "hello" }), - ); - await triggerInternalHook( - createInternalHookEvent("message", "preprocessed", "s1", { - body: "hello", - bodyForAgent: "hello", - }), - ); - await triggerInternalHook( - createInternalHookEvent("message", "sent", "s1", { content: "reply" }), - ); - - expect(events).toHaveLength(4); - expect(events[0].action).toBe("received"); - expect(events[1].action).toBe("transcribed"); - expect(events[2].action).toBe("preprocessed"); - expect(events[3].action).toBe("sent"); - }); - - it("should trigger both general and specific handlers for same event", async () => { - const generalHandler = vi.fn(); - const specificHandler = vi.fn(); - - registerInternalHook("message", generalHandler); - registerInternalHook("message:received", specificHandler); - - const event = createInternalHookEvent("message", "received", "s1", { content: "test" }); - await triggerInternalHook(event); - - expect(generalHandler).toHaveBeenCalledOnce(); - expect(specificHandler).toHaveBeenCalledOnce(); - }); - - it("should not trigger message:sent handler for message:received events", async () => { + it("does not trigger action-specific handlers for other actions", async () => { const sentHandler = vi.fn(); registerInternalHook("message:sent", sentHandler); await triggerInternalHook( - createInternalHookEvent("message", "received", "s1", { content: "hi" }), + createInternalHookEvent("message", "received", "session-1", { content: "hello" }), ); expect(sentHandler).not.toHaveBeenCalled(); }); }); + describe("general handler", () => { + it("receives full message lifecycle in order", async () => { + const events: InternalHookEvent[] = []; + registerInternalHook("message", (event) => { + events.push(event); + }); + + const lifecycleFixtures: Array<{ + action: "received" | "transcribed" | "preprocessed" | "sent"; + context: Record; + }> = [ + { action: "received", context: { content: "hi" } }, + { action: "transcribed", context: { transcript: "hello" } }, + { action: "preprocessed", context: { body: "hello", bodyForAgent: "hello" } }, + { action: "sent", context: { content: "reply" } }, + ]; + + for (const fixture of lifecycleFixtures) { + await triggerInternalHook( + createInternalHookEvent("message", fixture.action, "s1", fixture.context), + ); + } + + expect(events.map((event) => event.action)).toEqual([ + "received", + "transcribed", + "preprocessed", + "sent", + ]); + }); + + it("triggers both general and specific handlers", async () => { + const generalHandler = vi.fn(); + const specificHandler = vi.fn(); + registerInternalHook("message", generalHandler); + registerInternalHook("message:received", specificHandler); + + await triggerInternalHook( + createInternalHookEvent("message", "received", "s1", { content: "test" }), + ); + + expect(generalHandler).toHaveBeenCalledOnce(); + expect(specificHandler).toHaveBeenCalledOnce(); + }); + }); + describe("error isolation", () => { - it("should not propagate handler errors to caller", async () => { + it("does not propagate handler errors", async () => { const badHandler = vi.fn(() => { throw new Error("Hook exploded"); }); registerInternalHook("message:received", badHandler); - const event = createInternalHookEvent("message", "received", "s1", { content: "test" }); - await expect(triggerInternalHook(event)).resolves.not.toThrow(); + await expect( + triggerInternalHook( + createInternalHookEvent("message", "received", "s1", { content: "test" }), + ), + ).resolves.not.toThrow(); expect(badHandler).toHaveBeenCalledOnce(); }); - it("should continue running subsequent handlers after one fails", async () => { + it("continues with later handlers when one fails", async () => { const failHandler = vi.fn(() => { throw new Error("First handler fails"); }); const successHandler = vi.fn(); - registerInternalHook("message:received", failHandler); registerInternalHook("message:received", successHandler); @@ -301,11 +218,11 @@ describe("message hooks", () => { createInternalHookEvent("message", "received", "s1", { content: "test" }), ); - expect(failHandler).toHaveBeenCalled(); - expect(successHandler).toHaveBeenCalled(); + expect(failHandler).toHaveBeenCalledOnce(); + expect(successHandler).toHaveBeenCalledOnce(); }); - it("should isolate async handler errors", async () => { + it("isolates async handler errors", async () => { const asyncFailHandler = vi.fn(async () => { throw new Error("Async hook failed"); }); @@ -319,7 +236,7 @@ describe("message hooks", () => { }); describe("event structure", () => { - it("should include timestamp on all message events", async () => { + it("includes timestamps on message events", async () => { const handler = vi.fn(); registerInternalHook("message", handler); @@ -335,37 +252,25 @@ describe("message hooks", () => { expect(event.timestamp.getTime()).toBeLessThanOrEqual(after.getTime()); }); - it("should include messages array for hook responses", async () => { - const handler = vi.fn((event: InternalHookEvent) => { - event.messages.push("Echo: received your message"); - }); - registerInternalHook("message:received", handler); - - const event = createInternalHookEvent("message", "received", "s1", { content: "hello" }); - await triggerInternalHook(event); - - expect(event.messages).toContain("Echo: received your message"); - }); - - it("should preserve sessionKey across event lifecycle", async () => { + it("preserves mutable messages and sessionKey", async () => { const events: InternalHookEvent[] = []; - registerInternalHook("message", (e) => { - events.push(e); + registerInternalHook("message", (event) => { + event.messages.push("Echo"); + events.push(event); }); + const sessionKey = "agent:main:telegram:abc"; + const received = createInternalHookEvent("message", "received", sessionKey, { + content: "hi", + }); + await triggerInternalHook(received); await triggerInternalHook( - createInternalHookEvent("message", "received", "agent:main:telegram:abc", { - content: "hi", - }), - ); - await triggerInternalHook( - createInternalHookEvent("message", "sent", "agent:main:telegram:abc", { - content: "reply", - }), + createInternalHookEvent("message", "sent", sessionKey, { content: "reply" }), ); - expect(events[0].sessionKey).toBe("agent:main:telegram:abc"); - expect(events[1].sessionKey).toBe("agent:main:telegram:abc"); + expect(received.messages).toContain("Echo"); + expect(events[0]?.sessionKey).toBe(sessionKey); + expect(events[1]?.sessionKey).toBe(sessionKey); }); }); }); diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index c578656d3e4..585a83dd54f 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -18,7 +18,14 @@ import { resolveMirroredTranscriptText, } from "../../config/sessions.js"; import type { sendMessageDiscord } from "../../discord/send.js"; +import { fireAndForgetHook } from "../../hooks/fire-and-forget.js"; import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js"; +import { + buildCanonicalSentMessageHookContext, + toInternalMessageSentContext, + toPluginMessageContext, + toPluginMessageSentEvent, +} from "../../hooks/message-hook-mappers.js"; import type { sendMessageIMessage } from "../../imessage/send.js"; import { createSubsystemLogger } from "../../logging/subsystem.js"; import { getAgentScopedMediaLocalRoots } from "../../media/local-roots.js"; @@ -510,40 +517,47 @@ async function deliverOutboundPayloadsCore( error?: string; messageId?: string; }) => { + const canonical = buildCanonicalSentMessageHookContext({ + to, + content: params.content, + success: params.success, + error: params.error, + channelId: channel, + accountId: accountId ?? undefined, + conversationId: to, + messageId: params.messageId, + isGroup: mirrorIsGroup, + groupId: mirrorGroupId, + }); if (hookRunner?.hasHooks("message_sent")) { - void hookRunner - .runMessageSent( - { - to, - content: params.content, - success: params.success, - ...(params.error ? { error: params.error } : {}), - }, - { - channelId: channel, - accountId: accountId ?? undefined, - conversationId: to, - }, - ) - .catch(() => {}); + fireAndForgetHook( + hookRunner.runMessageSent( + toPluginMessageSentEvent(canonical), + toPluginMessageContext(canonical), + ), + "deliverOutboundPayloads: message_sent plugin hook failed", + (message) => { + log.warn(message); + }, + ); } if (!sessionKeyForInternalHooks) { return; } - void triggerInternalHook( - createInternalHookEvent("message", "sent", sessionKeyForInternalHooks, { - to, - content: params.content, - success: params.success, - ...(params.error ? { error: params.error } : {}), - channelId: channel, - accountId: accountId ?? undefined, - conversationId: to, - messageId: params.messageId, - ...(mirrorIsGroup != null ? { isGroup: mirrorIsGroup } : {}), - ...(mirrorGroupId ? { groupId: mirrorGroupId } : {}), - }), - ).catch(() => {}); + fireAndForgetHook( + triggerInternalHook( + createInternalHookEvent( + "message", + "sent", + sessionKeyForInternalHooks, + toInternalMessageSentContext(canonical), + ), + ), + "deliverOutboundPayloads: message:sent internal hook failed", + (message) => { + log.warn(message); + }, + ); }; try { throwIfAborted(abortSignal);