Files
openclaw/src/infra/outbound/deliver.test.ts
2026-02-17 15:50:07 +09:00

625 lines
19 KiB
TypeScript

import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { signalOutbound } from "../../channels/plugins/outbound/signal.js";
import { telegramOutbound } from "../../channels/plugins/outbound/telegram.js";
import { whatsappOutbound } from "../../channels/plugins/outbound/whatsapp.js";
import type { OpenClawConfig } from "../../config/config.js";
import { STATE_DIR } from "../../config/paths.js";
import { setActivePluginRegistry } from "../../plugins/runtime.js";
import { markdownToSignalTextChunks } from "../../signal/format.js";
import { createOutboundTestPlugin, createTestRegistry } from "../../test-utils/channel-plugins.js";
import { createIMessageTestPlugin } from "../../test-utils/imessage-test-plugin.js";
const mocks = vi.hoisted(() => ({
appendAssistantMessageToSessionTranscript: vi.fn(async () => ({ ok: true, sessionFile: "x" })),
}));
const hookMocks = vi.hoisted(() => ({
runner: {
hasHooks: vi.fn(() => false),
runMessageSent: vi.fn(async () => {}),
},
}));
const queueMocks = vi.hoisted(() => ({
enqueueDelivery: vi.fn(async () => "mock-queue-id"),
ackDelivery: vi.fn(async () => {}),
failDelivery: vi.fn(async () => {}),
}));
vi.mock("../../config/sessions.js", async () => {
const actual = await vi.importActual<typeof import("../../config/sessions.js")>(
"../../config/sessions.js",
);
return {
...actual,
appendAssistantMessageToSessionTranscript: mocks.appendAssistantMessageToSessionTranscript,
};
});
vi.mock("../../plugins/hook-runner-global.js", () => ({
getGlobalHookRunner: () => hookMocks.runner,
}));
vi.mock("./delivery-queue.js", () => ({
enqueueDelivery: queueMocks.enqueueDelivery,
ackDelivery: queueMocks.ackDelivery,
failDelivery: queueMocks.failDelivery,
}));
const { deliverOutboundPayloads, normalizeOutboundPayloads } = await import("./deliver.js");
const telegramChunkConfig: OpenClawConfig = {
channels: { telegram: { botToken: "tok-1", textChunkLimit: 2 } },
};
const whatsappChunkConfig: OpenClawConfig = {
channels: { whatsapp: { textChunkLimit: 4000 } },
};
async function deliverWhatsAppPayload(params: {
sendWhatsApp: NonNullable<
NonNullable<Parameters<typeof deliverOutboundPayloads>[0]["deps"]>["sendWhatsApp"]
>;
payload: { text: string; mediaUrl?: string };
cfg?: OpenClawConfig;
}) {
return deliverOutboundPayloads({
cfg: params.cfg ?? whatsappChunkConfig,
channel: "whatsapp",
to: "+1555",
payloads: [params.payload],
deps: { sendWhatsApp: params.sendWhatsApp },
});
}
describe("deliverOutboundPayloads", () => {
beforeEach(() => {
setActivePluginRegistry(defaultRegistry);
hookMocks.runner.hasHooks.mockReset();
hookMocks.runner.hasHooks.mockReturnValue(false);
hookMocks.runner.runMessageSent.mockReset();
hookMocks.runner.runMessageSent.mockResolvedValue(undefined);
queueMocks.enqueueDelivery.mockReset();
queueMocks.enqueueDelivery.mockResolvedValue("mock-queue-id");
queueMocks.ackDelivery.mockReset();
queueMocks.ackDelivery.mockResolvedValue(undefined);
queueMocks.failDelivery.mockReset();
queueMocks.failDelivery.mockResolvedValue(undefined);
});
afterEach(() => {
setActivePluginRegistry(emptyRegistry);
});
it("chunks telegram markdown and passes through accountId", async () => {
const sendTelegram = vi.fn().mockResolvedValue({ messageId: "m1", chatId: "c1" });
const prevTelegramToken = process.env.TELEGRAM_BOT_TOKEN;
process.env.TELEGRAM_BOT_TOKEN = "";
try {
const results = await deliverOutboundPayloads({
cfg: telegramChunkConfig,
channel: "telegram",
to: "123",
payloads: [{ text: "abcd" }],
deps: { sendTelegram },
});
expect(sendTelegram).toHaveBeenCalledTimes(2);
for (const call of sendTelegram.mock.calls) {
expect(call[2]).toEqual(
expect.objectContaining({ accountId: undefined, verbose: false, textMode: "html" }),
);
}
expect(results).toHaveLength(2);
expect(results[0]).toMatchObject({ channel: "telegram", chatId: "c1" });
} finally {
if (prevTelegramToken === undefined) {
delete process.env.TELEGRAM_BOT_TOKEN;
} else {
process.env.TELEGRAM_BOT_TOKEN = prevTelegramToken;
}
}
});
it("passes explicit accountId to sendTelegram", async () => {
const sendTelegram = vi.fn().mockResolvedValue({ messageId: "m1", chatId: "c1" });
await deliverOutboundPayloads({
cfg: telegramChunkConfig,
channel: "telegram",
to: "123",
accountId: "default",
payloads: [{ text: "hi" }],
deps: { sendTelegram },
});
expect(sendTelegram).toHaveBeenCalledWith(
"123",
"hi",
expect.objectContaining({ accountId: "default", verbose: false, textMode: "html" }),
);
});
it("scopes media local roots to the active agent workspace when agentId is provided", async () => {
const sendTelegram = vi.fn().mockResolvedValue({ messageId: "m1", chatId: "c1" });
await deliverOutboundPayloads({
cfg: telegramChunkConfig,
channel: "telegram",
to: "123",
agentId: "work",
payloads: [{ text: "hi", mediaUrl: "file:///tmp/f.png" }],
deps: { sendTelegram },
});
expect(sendTelegram).toHaveBeenCalledWith(
"123",
"hi",
expect.objectContaining({
mediaUrl: "file:///tmp/f.png",
mediaLocalRoots: expect.arrayContaining([path.join(STATE_DIR, "workspace-work")]),
}),
);
});
it("uses signal media maxBytes from config", async () => {
const sendSignal = vi.fn().mockResolvedValue({ messageId: "s1", timestamp: 123 });
const cfg: OpenClawConfig = { channels: { signal: { mediaMaxMb: 2 } } };
const results = await deliverOutboundPayloads({
cfg,
channel: "signal",
to: "+1555",
payloads: [{ text: "hi", mediaUrl: "https://x.test/a.jpg" }],
deps: { sendSignal },
});
expect(sendSignal).toHaveBeenCalledWith(
"+1555",
"hi",
expect.objectContaining({
mediaUrl: "https://x.test/a.jpg",
maxBytes: 2 * 1024 * 1024,
textMode: "plain",
textStyles: [],
}),
);
expect(results[0]).toMatchObject({ channel: "signal", messageId: "s1" });
});
it("chunks Signal markdown using the format-first chunker", async () => {
const sendSignal = vi.fn().mockResolvedValue({ messageId: "s1", timestamp: 123 });
const cfg: OpenClawConfig = {
channels: { signal: { textChunkLimit: 20 } },
};
const text = `Intro\\n\\n\`\`\`\`md\\n${"y".repeat(60)}\\n\`\`\`\\n\\nOutro`;
const expectedChunks = markdownToSignalTextChunks(text, 20);
await deliverOutboundPayloads({
cfg,
channel: "signal",
to: "+1555",
payloads: [{ text }],
deps: { sendSignal },
});
expect(sendSignal).toHaveBeenCalledTimes(expectedChunks.length);
expectedChunks.forEach((chunk, index) => {
expect(sendSignal).toHaveBeenNthCalledWith(
index + 1,
"+1555",
chunk.text,
expect.objectContaining({
accountId: undefined,
textMode: "plain",
textStyles: chunk.styles,
}),
);
});
});
it("chunks WhatsApp text and returns all results", async () => {
const sendWhatsApp = vi
.fn()
.mockResolvedValueOnce({ messageId: "w1", toJid: "jid" })
.mockResolvedValueOnce({ messageId: "w2", toJid: "jid" });
const cfg: OpenClawConfig = {
channels: { whatsapp: { textChunkLimit: 2 } },
};
const results = await deliverOutboundPayloads({
cfg,
channel: "whatsapp",
to: "+1555",
payloads: [{ text: "abcd" }],
deps: { sendWhatsApp },
});
expect(sendWhatsApp).toHaveBeenCalledTimes(2);
expect(results.map((r) => r.messageId)).toEqual(["w1", "w2"]);
});
it("respects newline chunk mode for WhatsApp", async () => {
const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" });
const cfg: OpenClawConfig = {
channels: { whatsapp: { textChunkLimit: 4000, chunkMode: "newline" } },
};
await deliverOutboundPayloads({
cfg,
channel: "whatsapp",
to: "+1555",
payloads: [{ text: "Line one\n\nLine two" }],
deps: { sendWhatsApp },
});
expect(sendWhatsApp).toHaveBeenCalledTimes(2);
expect(sendWhatsApp).toHaveBeenNthCalledWith(
1,
"+1555",
"Line one",
expect.objectContaining({ verbose: false }),
);
expect(sendWhatsApp).toHaveBeenNthCalledWith(
2,
"+1555",
"Line two",
expect.objectContaining({ verbose: false }),
);
});
it("strips leading blank lines for WhatsApp text payloads", async () => {
const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" });
await deliverWhatsAppPayload({
sendWhatsApp,
payload: { text: "\n\nHello from WhatsApp" },
});
expect(sendWhatsApp).toHaveBeenCalledTimes(1);
expect(sendWhatsApp).toHaveBeenNthCalledWith(
1,
"+1555",
"Hello from WhatsApp",
expect.objectContaining({ verbose: false }),
);
});
it("drops whitespace-only WhatsApp text payloads when no media is attached", async () => {
const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" });
const results = await deliverWhatsAppPayload({
sendWhatsApp,
payload: { text: " \n\t " },
});
expect(sendWhatsApp).not.toHaveBeenCalled();
expect(results).toEqual([]);
});
it("keeps WhatsApp media payloads but clears whitespace-only captions", async () => {
const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" });
await deliverWhatsAppPayload({
sendWhatsApp,
payload: { text: " \n\t ", mediaUrl: "https://example.com/photo.png" },
});
expect(sendWhatsApp).toHaveBeenCalledTimes(1);
expect(sendWhatsApp).toHaveBeenNthCalledWith(
1,
"+1555",
"",
expect.objectContaining({
mediaUrl: "https://example.com/photo.png",
verbose: false,
}),
);
});
it("preserves fenced blocks for markdown chunkers in newline mode", async () => {
const chunker = vi.fn((text: string) => (text ? [text] : []));
const sendText = vi.fn().mockImplementation(async ({ text }: { text: string }) => ({
channel: "matrix" as const,
messageId: text,
roomId: "r1",
}));
const sendMedia = vi.fn().mockImplementation(async ({ text }: { text: string }) => ({
channel: "matrix" as const,
messageId: text,
roomId: "r1",
}));
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: createOutboundTestPlugin({
id: "matrix",
outbound: {
deliveryMode: "direct",
chunker,
chunkerMode: "markdown",
textChunkLimit: 4000,
sendText,
sendMedia,
},
}),
},
]),
);
const cfg: OpenClawConfig = {
channels: { matrix: { textChunkLimit: 4000, chunkMode: "newline" } },
};
const text = "```js\nconst a = 1;\nconst b = 2;\n```\nAfter";
await deliverOutboundPayloads({
cfg,
channel: "matrix",
to: "!room",
payloads: [{ text }],
});
expect(chunker).toHaveBeenCalledTimes(1);
expect(chunker).toHaveBeenNthCalledWith(1, text, 4000);
});
it("uses iMessage media maxBytes from agent fallback", async () => {
const sendIMessage = vi.fn().mockResolvedValue({ messageId: "i1" });
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "imessage",
source: "test",
plugin: createIMessageTestPlugin(),
},
]),
);
const cfg: OpenClawConfig = {
agents: { defaults: { mediaMaxMb: 3 } },
};
await deliverOutboundPayloads({
cfg,
channel: "imessage",
to: "chat_id:42",
payloads: [{ text: "hello" }],
deps: { sendIMessage },
});
expect(sendIMessage).toHaveBeenCalledWith(
"chat_id:42",
"hello",
expect.objectContaining({ maxBytes: 3 * 1024 * 1024 }),
);
});
it("normalizes payloads and drops empty entries", () => {
const normalized = normalizeOutboundPayloads([
{ text: "hi" },
{ text: "MEDIA:https://x.test/a.jpg" },
{ text: " ", mediaUrls: [] },
]);
expect(normalized).toEqual([
{ text: "hi", mediaUrls: [] },
{ text: "", mediaUrls: ["https://x.test/a.jpg"] },
]);
});
it("continues on errors when bestEffort is enabled", async () => {
const sendWhatsApp = vi
.fn()
.mockRejectedValueOnce(new Error("fail"))
.mockResolvedValueOnce({ messageId: "w2", toJid: "jid" });
const onError = vi.fn();
const cfg: OpenClawConfig = {};
const results = await deliverOutboundPayloads({
cfg,
channel: "whatsapp",
to: "+1555",
payloads: [{ text: "a" }, { text: "b" }],
deps: { sendWhatsApp },
bestEffort: true,
onError,
});
expect(sendWhatsApp).toHaveBeenCalledTimes(2);
expect(onError).toHaveBeenCalledTimes(1);
expect(results).toEqual([{ channel: "whatsapp", messageId: "w2", toJid: "jid" }]);
});
it("calls failDelivery instead of ackDelivery on bestEffort partial failure", async () => {
const sendWhatsApp = vi
.fn()
.mockRejectedValueOnce(new Error("fail"))
.mockResolvedValueOnce({ messageId: "w2", toJid: "jid" });
const onError = vi.fn();
const cfg: OpenClawConfig = {};
await deliverOutboundPayloads({
cfg,
channel: "whatsapp",
to: "+1555",
payloads: [{ text: "a" }, { text: "b" }],
deps: { sendWhatsApp },
bestEffort: true,
onError,
});
// onError was called for the first payload's failure.
expect(onError).toHaveBeenCalledTimes(1);
// Queue entry should NOT be acked — failDelivery should be called instead.
expect(queueMocks.ackDelivery).not.toHaveBeenCalled();
expect(queueMocks.failDelivery).toHaveBeenCalledWith(
"mock-queue-id",
"partial delivery failure (bestEffort)",
);
});
it("acks the queue entry when delivery is aborted", async () => {
const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" });
const abortController = new AbortController();
abortController.abort();
const cfg: OpenClawConfig = {};
await expect(
deliverOutboundPayloads({
cfg,
channel: "whatsapp",
to: "+1555",
payloads: [{ text: "a" }],
deps: { sendWhatsApp },
abortSignal: abortController.signal,
}),
).rejects.toThrow("Operation aborted");
expect(queueMocks.ackDelivery).toHaveBeenCalledWith("mock-queue-id");
expect(queueMocks.failDelivery).not.toHaveBeenCalled();
expect(sendWhatsApp).not.toHaveBeenCalled();
});
it("passes normalized payload to onError", async () => {
const sendWhatsApp = vi.fn().mockRejectedValue(new Error("boom"));
const onError = vi.fn();
const cfg: OpenClawConfig = {};
await deliverOutboundPayloads({
cfg,
channel: "whatsapp",
to: "+1555",
payloads: [{ text: "hi", mediaUrl: "https://x.test/a.jpg" }],
deps: { sendWhatsApp },
bestEffort: true,
onError,
});
expect(onError).toHaveBeenCalledTimes(1);
expect(onError).toHaveBeenCalledWith(
expect.any(Error),
expect.objectContaining({ text: "hi", mediaUrls: ["https://x.test/a.jpg"] }),
);
});
it("mirrors delivered output when mirror options are provided", async () => {
const sendTelegram = vi.fn().mockResolvedValue({ messageId: "m1", chatId: "c1" });
mocks.appendAssistantMessageToSessionTranscript.mockClear();
await deliverOutboundPayloads({
cfg: telegramChunkConfig,
channel: "telegram",
to: "123",
payloads: [{ text: "caption", mediaUrl: "https://example.com/files/report.pdf?sig=1" }],
deps: { sendTelegram },
mirror: {
sessionKey: "agent:main:main",
text: "caption",
mediaUrls: ["https://example.com/files/report.pdf?sig=1"],
},
});
expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledWith(
expect.objectContaining({ text: "report.pdf" }),
);
});
it("emits message_sent success for text-only deliveries", async () => {
hookMocks.runner.hasHooks.mockReturnValue(true);
const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" });
await deliverOutboundPayloads({
cfg: {},
channel: "whatsapp",
to: "+1555",
payloads: [{ text: "hello" }],
deps: { sendWhatsApp },
});
await vi.waitFor(() => {
expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith(
expect.objectContaining({ to: "+1555", content: "hello", success: true }),
expect.objectContaining({ channelId: "whatsapp" }),
);
});
});
it("emits message_sent success for sendPayload deliveries", async () => {
hookMocks.runner.hasHooks.mockReturnValue(true);
const sendPayload = vi.fn().mockResolvedValue({ channel: "matrix", messageId: "mx-1" });
const sendText = vi.fn();
const sendMedia = vi.fn();
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: createOutboundTestPlugin({
id: "matrix",
outbound: { deliveryMode: "direct", sendPayload, sendText, sendMedia },
}),
},
]),
);
await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room:1",
payloads: [{ text: "payload text", channelData: { mode: "custom" } }],
});
await vi.waitFor(() => {
expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith(
expect.objectContaining({ to: "!room:1", content: "payload text", success: true }),
expect.objectContaining({ channelId: "matrix" }),
);
});
});
it("emits message_sent failure when delivery errors", async () => {
hookMocks.runner.hasHooks.mockReturnValue(true);
const sendWhatsApp = vi.fn().mockRejectedValue(new Error("downstream failed"));
await expect(
deliverOutboundPayloads({
cfg: {},
channel: "whatsapp",
to: "+1555",
payloads: [{ text: "hi" }],
deps: { sendWhatsApp },
}),
).rejects.toThrow("downstream failed");
await vi.waitFor(() => {
expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith(
expect.objectContaining({
to: "+1555",
content: "hi",
success: false,
error: "downstream failed",
}),
expect.objectContaining({ channelId: "whatsapp" }),
);
});
});
});
const emptyRegistry = createTestRegistry([]);
const defaultRegistry = createTestRegistry([
{
pluginId: "telegram",
plugin: createOutboundTestPlugin({ id: "telegram", outbound: telegramOutbound }),
source: "test",
},
{
pluginId: "signal",
plugin: createOutboundTestPlugin({ id: "signal", outbound: signalOutbound }),
source: "test",
},
{
pluginId: "whatsapp",
plugin: createOutboundTestPlugin({ id: "whatsapp", outbound: whatsappOutbound }),
source: "test",
},
{
pluginId: "imessage",
plugin: createIMessageTestPlugin(),
source: "test",
},
]);