Files
openclaw/src/web/monitor-inbox.streams-inbound-messages.test.ts
2026-02-14 19:35:00 +00:00

357 lines
9.7 KiB
TypeScript

import fsSync from "node:fs";
import path from "node:path";
import "./monitor-inbox.test-harness.js";
import { describe, expect, it, vi } from "vitest";
import { monitorWebInbox } from "./inbound.js";
import {
DEFAULT_ACCOUNT_ID,
getAuthDir,
getSock,
installWebMonitorInboxUnitTestHooks,
} from "./monitor-inbox.test-harness.js";
describe("web monitor inbox", () => {
installWebMonitorInboxUnitTestHooks();
it("streams inbound messages", async () => {
const onMessage = vi.fn(async (msg) => {
await msg.sendComposing();
await msg.reply("pong");
});
const listener = await monitorWebInbox({
verbose: false,
onMessage,
accountId: DEFAULT_ACCOUNT_ID,
authDir: getAuthDir(),
});
const sock = getSock();
expect(sock.sendPresenceUpdate).toHaveBeenCalledWith("available");
const upsert = {
type: "notify",
messages: [
{
key: { id: "abc", fromMe: false, remoteJid: "999@s.whatsapp.net" },
message: { conversation: "ping" },
messageTimestamp: 1_700_000_000,
pushName: "Tester",
},
],
};
sock.ev.emit("messages.upsert", upsert);
await new Promise((resolve) => setImmediate(resolve));
expect(onMessage).toHaveBeenCalledWith(
expect.objectContaining({ body: "ping", from: "+999", to: "+123" }),
);
expect(sock.readMessages).toHaveBeenCalledWith([
{
remoteJid: "999@s.whatsapp.net",
id: "abc",
participant: undefined,
fromMe: false,
},
]);
expect(sock.sendPresenceUpdate).toHaveBeenCalledWith("available");
expect(sock.sendPresenceUpdate).toHaveBeenCalledWith("composing", "999@s.whatsapp.net");
expect(sock.sendMessage).toHaveBeenCalledWith("999@s.whatsapp.net", {
text: "pong",
});
await listener.close();
});
it("deduplicates redelivered messages by id", async () => {
const onMessage = vi.fn(async () => {
return;
});
const listener = await monitorWebInbox({
verbose: false,
onMessage,
accountId: DEFAULT_ACCOUNT_ID,
authDir: getAuthDir(),
});
const sock = getSock();
const upsert = {
type: "notify",
messages: [
{
key: { id: "abc", fromMe: false, remoteJid: "999@s.whatsapp.net" },
message: { conversation: "ping" },
messageTimestamp: 1_700_000_000,
pushName: "Tester",
},
],
};
sock.ev.emit("messages.upsert", upsert);
sock.ev.emit("messages.upsert", upsert);
await new Promise((resolve) => setImmediate(resolve));
expect(onMessage).toHaveBeenCalledTimes(1);
await listener.close();
});
it("resolves LID JIDs using Baileys LID mapping store", async () => {
const onMessage = vi.fn(async () => {
return;
});
const listener = await monitorWebInbox({
verbose: false,
onMessage,
accountId: DEFAULT_ACCOUNT_ID,
authDir: getAuthDir(),
});
const sock = getSock();
const getPNForLID = vi.spyOn(sock.signalRepository.lidMapping, "getPNForLID");
sock.signalRepository.lidMapping.getPNForLID.mockResolvedValueOnce("999:0@s.whatsapp.net");
const upsert = {
type: "notify",
messages: [
{
key: { id: "abc", fromMe: false, remoteJid: "999@lid" },
message: { conversation: "ping" },
messageTimestamp: 1_700_000_000,
pushName: "Tester",
},
],
};
sock.ev.emit("messages.upsert", upsert);
await new Promise((resolve) => setImmediate(resolve));
expect(getPNForLID).toHaveBeenCalledWith("999@lid");
expect(onMessage).toHaveBeenCalledWith(
expect.objectContaining({ body: "ping", from: "+999", to: "+123" }),
);
await listener.close();
});
it("resolves LID JIDs via authDir mapping files", async () => {
const onMessage = vi.fn(async () => {
return;
});
fsSync.writeFileSync(
path.join(getAuthDir(), "lid-mapping-555_reverse.json"),
JSON.stringify("1555"),
);
const listener = await monitorWebInbox({
verbose: false,
onMessage,
accountId: DEFAULT_ACCOUNT_ID,
authDir: getAuthDir(),
});
const sock = getSock();
const getPNForLID = vi.spyOn(sock.signalRepository.lidMapping, "getPNForLID");
const upsert = {
type: "notify",
messages: [
{
key: { id: "abc", fromMe: false, remoteJid: "555@lid" },
message: { conversation: "ping" },
messageTimestamp: 1_700_000_000,
pushName: "Tester",
},
],
};
sock.ev.emit("messages.upsert", upsert);
await new Promise((resolve) => setImmediate(resolve));
expect(onMessage).toHaveBeenCalledWith(
expect.objectContaining({ body: "ping", from: "+1555", to: "+123" }),
);
expect(getPNForLID).not.toHaveBeenCalled();
await listener.close();
});
it("resolves group participant LID JIDs via Baileys mapping", async () => {
const onMessage = vi.fn(async () => {
return;
});
const listener = await monitorWebInbox({
verbose: false,
onMessage,
accountId: DEFAULT_ACCOUNT_ID,
authDir: getAuthDir(),
});
const sock = getSock();
const getPNForLID = vi.spyOn(sock.signalRepository.lidMapping, "getPNForLID");
sock.signalRepository.lidMapping.getPNForLID.mockResolvedValueOnce("444:0@s.whatsapp.net");
const upsert = {
type: "notify",
messages: [
{
key: {
id: "abc",
fromMe: false,
remoteJid: "123@g.us",
participant: "444@lid",
},
message: { conversation: "ping" },
messageTimestamp: 1_700_000_000,
},
],
};
sock.ev.emit("messages.upsert", upsert);
await new Promise((resolve) => setImmediate(resolve));
expect(getPNForLID).toHaveBeenCalledWith("444@lid");
expect(onMessage).toHaveBeenCalledWith(
expect.objectContaining({
body: "ping",
from: "123@g.us",
senderE164: "+444",
chatType: "group",
}),
);
await listener.close();
});
it("does not block follow-up messages when handler is pending", async () => {
let resolveFirst: (() => void) | null = null;
const onMessage = vi.fn(async () => {
if (!resolveFirst) {
await new Promise<void>((resolve) => {
resolveFirst = resolve;
});
}
});
const listener = await monitorWebInbox({
verbose: false,
onMessage,
accountId: DEFAULT_ACCOUNT_ID,
authDir: getAuthDir(),
});
const sock = getSock();
const upsert = {
type: "notify",
messages: [
{
key: { id: "abc1", fromMe: false, remoteJid: "999@s.whatsapp.net" },
message: { conversation: "ping" },
messageTimestamp: 1_700_000_000,
},
{
key: { id: "abc2", fromMe: false, remoteJid: "999@s.whatsapp.net" },
message: { conversation: "pong" },
messageTimestamp: 1_700_000_001,
},
],
};
sock.ev.emit("messages.upsert", upsert);
await new Promise((resolve) => setImmediate(resolve));
expect(onMessage).toHaveBeenCalledTimes(2);
resolveFirst?.();
await listener.close();
});
it("captures reply context from quoted messages", async () => {
const onMessage = vi.fn(async (msg) => {
await msg.reply("pong");
});
const listener = await monitorWebInbox({ verbose: false, onMessage });
const sock = getSock();
const upsert = {
type: "notify",
messages: [
{
key: { id: "abc", fromMe: false, remoteJid: "999@s.whatsapp.net" },
message: {
extendedTextMessage: {
text: "reply",
contextInfo: {
stanzaId: "q1",
participant: "111@s.whatsapp.net",
quotedMessage: { conversation: "original" },
},
},
},
messageTimestamp: 1_700_000_000,
pushName: "Tester",
},
],
};
sock.ev.emit("messages.upsert", upsert);
await new Promise((resolve) => setImmediate(resolve));
expect(onMessage).toHaveBeenCalledWith(
expect.objectContaining({
replyToId: "q1",
replyToBody: "original",
replyToSender: "+111",
}),
);
expect(sock.sendMessage).toHaveBeenCalledWith("999@s.whatsapp.net", {
text: "pong",
});
await listener.close();
});
it("captures reply context from wrapped quoted messages", async () => {
const onMessage = vi.fn(async (msg) => {
await msg.reply("pong");
});
const listener = await monitorWebInbox({ verbose: false, onMessage });
const sock = getSock();
const upsert = {
type: "notify",
messages: [
{
key: { id: "abc", fromMe: false, remoteJid: "999@s.whatsapp.net" },
message: {
extendedTextMessage: {
text: "reply",
contextInfo: {
stanzaId: "q1",
participant: "111@s.whatsapp.net",
quotedMessage: {
viewOnceMessageV2Extension: {
message: { conversation: "original" },
},
},
},
},
},
messageTimestamp: 1_700_000_000,
pushName: "Tester",
},
],
};
sock.ev.emit("messages.upsert", upsert);
await new Promise((resolve) => setImmediate(resolve));
expect(onMessage).toHaveBeenCalledWith(
expect.objectContaining({
replyToId: "q1",
replyToBody: "original",
replyToSender: "+111",
}),
);
expect(sock.sendMessage).toHaveBeenCalledWith("999@s.whatsapp.net", {
text: "pong",
});
await listener.close();
});
});