fix(hooks): backport internal message hook bridge with safe delivery semantics

This commit is contained in:
Peter Steinberger
2026-02-18 00:32:51 +01:00
parent 087dca8fa9
commit f07bb8e8fc
8 changed files with 621 additions and 66 deletions

View File

@@ -1,9 +1,9 @@
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../../config/config.js";
import { signalOutbound } from "../../channels/plugins/outbound/signal.js";
import { telegramOutbound } from "../../channels/plugins/outbound/telegram.js";
import { whatsappOutbound } from "../../channels/plugins/outbound/whatsapp.js";
import type { OpenClawConfig } from "../../config/config.js";
import { STATE_DIR } from "../../config/paths.js";
import { setActivePluginRegistry } from "../../plugins/runtime.js";
import { markdownToSignalTextChunks } from "../../signal/format.js";
@@ -19,6 +19,19 @@ const hookMocks = vi.hoisted(() => ({
runMessageSent: vi.fn(async () => {}),
},
}));
const internalHookMocks = vi.hoisted(() => ({
createInternalHookEvent: vi.fn(
(type: string, action: string, sessionKey: string, context: Record<string, unknown>) => ({
type,
action,
sessionKey,
context,
timestamp: new Date(),
messages: [],
}),
),
triggerInternalHook: vi.fn(async () => {}),
}));
const queueMocks = vi.hoisted(() => ({
enqueueDelivery: vi.fn(async () => "mock-queue-id"),
ackDelivery: vi.fn(async () => {}),
@@ -37,6 +50,10 @@ vi.mock("../../config/sessions.js", async () => {
vi.mock("../../plugins/hook-runner-global.js", () => ({
getGlobalHookRunner: () => hookMocks.runner,
}));
vi.mock("../../hooks/internal-hooks.js", () => ({
createInternalHookEvent: internalHookMocks.createInternalHookEvent,
triggerInternalHook: internalHookMocks.triggerInternalHook,
}));
vi.mock("./delivery-queue.js", () => ({
enqueueDelivery: queueMocks.enqueueDelivery,
ackDelivery: queueMocks.ackDelivery,
@@ -76,6 +93,8 @@ describe("deliverOutboundPayloads", () => {
hookMocks.runner.hasHooks.mockReturnValue(false);
hookMocks.runner.runMessageSent.mockReset();
hookMocks.runner.runMessageSent.mockResolvedValue(undefined);
internalHookMocks.createInternalHookEvent.mockClear();
internalHookMocks.triggerInternalHook.mockClear();
queueMocks.enqueueDelivery.mockReset();
queueMocks.enqueueDelivery.mockResolvedValue("mock-queue-id");
queueMocks.ackDelivery.mockReset();
@@ -449,6 +468,58 @@ describe("deliverOutboundPayloads", () => {
expect(results).toEqual([{ channel: "whatsapp", messageId: "w2", toJid: "jid" }]);
});
it("emits internal message:sent hook with success=true for chunked payload delivery", async () => {
const sendWhatsApp = vi
.fn()
.mockResolvedValueOnce({ messageId: "w1", toJid: "jid" })
.mockResolvedValueOnce({ messageId: "w2", toJid: "jid" });
const cfg: OpenClawConfig = {
channels: { whatsapp: { textChunkLimit: 2 } },
};
await deliverOutboundPayloads({
cfg,
channel: "whatsapp",
to: "+1555",
payloads: [{ text: "abcd" }],
deps: { sendWhatsApp },
mirror: {
sessionKey: "agent:main:main",
},
});
expect(internalHookMocks.createInternalHookEvent).toHaveBeenCalledTimes(1);
expect(internalHookMocks.createInternalHookEvent).toHaveBeenCalledWith(
"message",
"sent",
"agent:main:main",
expect.objectContaining({
to: "+1555",
content: "abcd",
success: true,
channelId: "whatsapp",
conversationId: "+1555",
messageId: "w2",
}),
);
expect(internalHookMocks.triggerInternalHook).toHaveBeenCalledTimes(1);
});
it("does not emit internal message:sent hook when mirror sessionKey is missing", async () => {
const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" });
await deliverOutboundPayloads({
cfg: whatsappChunkConfig,
channel: "whatsapp",
to: "+1555",
payloads: [{ text: "hello" }],
deps: { sendWhatsApp },
});
expect(internalHookMocks.createInternalHookEvent).not.toHaveBeenCalled();
expect(internalHookMocks.triggerInternalHook).not.toHaveBeenCalled();
});
it("calls failDelivery instead of ackDelivery on bestEffort partial failure", async () => {
const sendWhatsApp = vi
.fn()

View File

@@ -1,37 +1,38 @@
import type { ReplyPayload } from "../../auto-reply/types.js";
import type {
ChannelOutboundAdapter,
ChannelOutboundContext,
} from "../../channels/plugins/types.js";
import type { OpenClawConfig } from "../../config/config.js";
import type { sendMessageDiscord } from "../../discord/send.js";
import type { sendMessageIMessage } from "../../imessage/send.js";
import type { sendMessageSlack } from "../../slack/send.js";
import type { sendMessageTelegram } from "../../telegram/send.js";
import type { sendMessageWhatsApp } from "../../web/outbound.js";
import type { OutboundIdentity } from "./identity.js";
import type { NormalizedOutboundPayload } from "./payloads.js";
import type { OutboundChannel } from "./targets.js";
import {
chunkByParagraph,
chunkMarkdownTextWithMode,
resolveChunkMode,
resolveTextChunkLimit,
} from "../../auto-reply/chunk.js";
import type { ReplyPayload } from "../../auto-reply/types.js";
import { resolveChannelMediaMaxBytes } from "../../channels/plugins/media-limits.js";
import { loadChannelOutboundAdapter } from "../../channels/plugins/outbound/load.js";
import type {
ChannelOutboundAdapter,
ChannelOutboundContext,
} from "../../channels/plugins/types.js";
import type { OpenClawConfig } from "../../config/config.js";
import { resolveMarkdownTableMode } from "../../config/markdown-tables.js";
import {
appendAssistantMessageToSessionTranscript,
resolveMirroredTranscriptText,
} from "../../config/sessions.js";
import type { sendMessageDiscord } from "../../discord/send.js";
import type { sendMessageIMessage } from "../../imessage/send.js";
import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js";
import { getAgentScopedMediaLocalRoots } from "../../media/local-roots.js";
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
import { markdownToSignalTextChunks, type SignalTextStyleRange } from "../../signal/format.js";
import { sendMessageSignal } from "../../signal/send.js";
import type { sendMessageSlack } from "../../slack/send.js";
import type { sendMessageTelegram } from "../../telegram/send.js";
import type { sendMessageWhatsApp } from "../../web/outbound.js";
import { throwIfAborted } from "./abort.js";
import { ackDelivery, enqueueDelivery, failDelivery } from "./delivery-queue.js";
import type { OutboundIdentity } from "./identity.js";
import type { NormalizedOutboundPayload } from "./payloads.js";
import { normalizeReplyPayloadsForDelivery } from "./payloads.js";
import type { OutboundChannel } from "./targets.js";
export type { NormalizedOutboundPayload } from "./payloads.js";
export { normalizeOutboundPayloads } from "./payloads.js";
@@ -443,30 +444,51 @@ async function deliverOutboundPayloadsCore(
return normalized ? [normalized] : [];
});
const hookRunner = getGlobalHookRunner();
const sessionKeyForInternalHooks = params.mirror?.sessionKey;
for (const payload of normalizedPayloads) {
const payloadSummary: NormalizedOutboundPayload = {
text: payload.text ?? "",
mediaUrls: payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []),
channelData: payload.channelData,
};
const emitMessageSent = (success: boolean, error?: string) => {
if (!hookRunner?.hasHooks("message_sent")) {
const emitMessageSent = (params: {
success: boolean;
content: string;
error?: string;
messageId?: string;
}) => {
if (hookRunner?.hasHooks("message_sent")) {
void hookRunner
.runMessageSent(
{
to,
content: params.content,
success: params.success,
...(params.error ? { error: params.error } : {}),
},
{
channelId: channel,
accountId: accountId ?? undefined,
conversationId: to,
},
)
.catch(() => {});
}
if (!sessionKeyForInternalHooks) {
return;
}
void hookRunner
.runMessageSent(
{
to,
content: payloadSummary.text,
success,
...(error ? { error } : {}),
},
{
channelId: channel,
accountId: accountId ?? undefined,
},
)
.catch(() => {});
void triggerInternalHook(
createInternalHookEvent("message", "sent", sessionKeyForInternalHooks, {
to,
content: params.content,
success: params.success,
...(params.error ? { error: params.error } : {}),
channelId: channel,
accountId: accountId ?? undefined,
conversationId: to,
messageId: params.messageId,
}),
).catch(() => {});
};
try {
throwIfAborted(abortSignal);
@@ -504,34 +526,58 @@ async function deliverOutboundPayloadsCore(
threadId: params.threadId ?? undefined,
};
if (handler.sendPayload && effectivePayload.channelData) {
results.push(await handler.sendPayload(effectivePayload, sendOverrides));
emitMessageSent(true);
const delivery = await handler.sendPayload(effectivePayload, sendOverrides);
results.push(delivery);
emitMessageSent({
success: true,
content: payloadSummary.text,
messageId: delivery.messageId,
});
continue;
}
if (payloadSummary.mediaUrls.length === 0) {
const beforeCount = results.length;
if (isSignalChannel) {
await sendSignalTextChunks(payloadSummary.text);
} else {
await sendTextChunks(payloadSummary.text, sendOverrides);
}
emitMessageSent(true);
const messageId = results.at(-1)?.messageId;
emitMessageSent({
success: results.length > beforeCount,
content: payloadSummary.text,
messageId,
});
continue;
}
let first = true;
let lastMessageId: string | undefined;
for (const url of payloadSummary.mediaUrls) {
throwIfAborted(abortSignal);
const caption = first ? payloadSummary.text : "";
first = false;
if (isSignalChannel) {
results.push(await sendSignalMedia(caption, url));
const delivery = await sendSignalMedia(caption, url);
results.push(delivery);
lastMessageId = delivery.messageId;
} else {
results.push(await handler.sendMedia(caption, url, sendOverrides));
const delivery = await handler.sendMedia(caption, url, sendOverrides);
results.push(delivery);
lastMessageId = delivery.messageId;
}
}
emitMessageSent(true);
emitMessageSent({
success: true,
content: payloadSummary.text,
messageId: lastMessageId,
});
} catch (err) {
emitMessageSent(false, err instanceof Error ? err.message : String(err));
emitMessageSent({
success: false,
content: payloadSummary.text,
error: err instanceof Error ? err.message : String(err),
});
if (!params.bestEffort) {
throw err;
}
@@ -551,5 +597,6 @@ async function deliverOutboundPayloadsCore(
});
}
}
return results;
}