mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-28 14:50:42 +00:00
refactor: split inbound and reload pipelines into staged modules
This commit is contained in:
@@ -151,6 +151,249 @@ export async function monitorWebInbox(options: {
|
||||
}
|
||||
};
|
||||
|
||||
type NormalizedInboundMessage = {
|
||||
id?: string;
|
||||
remoteJid: string;
|
||||
group: boolean;
|
||||
participantJid?: string;
|
||||
from: string;
|
||||
senderE164: string | null;
|
||||
groupSubject?: string;
|
||||
groupParticipants?: string[];
|
||||
messageTimestampMs?: number;
|
||||
access: Awaited<ReturnType<typeof checkInboundAccessControl>>;
|
||||
};
|
||||
|
||||
const normalizeInboundMessage = async (
|
||||
msg: WAMessage,
|
||||
): Promise<NormalizedInboundMessage | null> => {
|
||||
const id = msg.key?.id ?? undefined;
|
||||
const remoteJid = msg.key?.remoteJid;
|
||||
if (!remoteJid) {
|
||||
return null;
|
||||
}
|
||||
if (remoteJid.endsWith("@status") || remoteJid.endsWith("@broadcast")) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const group = isJidGroup(remoteJid) === true;
|
||||
if (id) {
|
||||
const dedupeKey = `${options.accountId}:${remoteJid}:${id}`;
|
||||
if (isRecentInboundMessage(dedupeKey)) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
const participantJid = msg.key?.participant ?? undefined;
|
||||
const from = group ? remoteJid : await resolveInboundJid(remoteJid);
|
||||
if (!from) {
|
||||
return null;
|
||||
}
|
||||
const senderE164 = group
|
||||
? participantJid
|
||||
? await resolveInboundJid(participantJid)
|
||||
: null
|
||||
: from;
|
||||
|
||||
let groupSubject: string | undefined;
|
||||
let groupParticipants: string[] | undefined;
|
||||
if (group) {
|
||||
const meta = await getGroupMeta(remoteJid);
|
||||
groupSubject = meta.subject;
|
||||
groupParticipants = meta.participants;
|
||||
}
|
||||
const messageTimestampMs = msg.messageTimestamp
|
||||
? Number(msg.messageTimestamp) * 1000
|
||||
: undefined;
|
||||
|
||||
const access = await checkInboundAccessControl({
|
||||
accountId: options.accountId,
|
||||
from,
|
||||
selfE164,
|
||||
senderE164,
|
||||
group,
|
||||
pushName: msg.pushName ?? undefined,
|
||||
isFromMe: Boolean(msg.key?.fromMe),
|
||||
messageTimestampMs,
|
||||
connectedAtMs,
|
||||
sock: { sendMessage: (jid, content) => sock.sendMessage(jid, content) },
|
||||
remoteJid,
|
||||
});
|
||||
if (!access.allowed) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return {
|
||||
id,
|
||||
remoteJid,
|
||||
group,
|
||||
participantJid,
|
||||
from,
|
||||
senderE164,
|
||||
groupSubject,
|
||||
groupParticipants,
|
||||
messageTimestampMs,
|
||||
access,
|
||||
};
|
||||
};
|
||||
|
||||
const maybeMarkInboundAsRead = async (inbound: NormalizedInboundMessage) => {
|
||||
const { id, remoteJid, participantJid, access } = inbound;
|
||||
if (id && !access.isSelfChat && options.sendReadReceipts !== false) {
|
||||
try {
|
||||
await sock.readMessages([{ remoteJid, id, participant: participantJid, fromMe: false }]);
|
||||
if (shouldLogVerbose()) {
|
||||
const suffix = participantJid ? ` (participant ${participantJid})` : "";
|
||||
logVerbose(`Marked message ${id} as read for ${remoteJid}${suffix}`);
|
||||
}
|
||||
} catch (err) {
|
||||
logVerbose(`Failed to mark message ${id} read: ${String(err)}`);
|
||||
}
|
||||
} else if (id && access.isSelfChat && shouldLogVerbose()) {
|
||||
// Self-chat mode: never auto-send read receipts (blue ticks) on behalf of the owner.
|
||||
logVerbose(`Self-chat mode: skipping read receipt for ${id}`);
|
||||
}
|
||||
};
|
||||
|
||||
type EnrichedInboundMessage = {
|
||||
body: string;
|
||||
location?: ReturnType<typeof extractLocationData>;
|
||||
replyContext?: ReturnType<typeof describeReplyContext>;
|
||||
mediaPath?: string;
|
||||
mediaType?: string;
|
||||
mediaFileName?: string;
|
||||
};
|
||||
|
||||
const enrichInboundMessage = async (msg: WAMessage): Promise<EnrichedInboundMessage | null> => {
|
||||
const location = extractLocationData(msg.message ?? undefined);
|
||||
const locationText = location ? formatLocationText(location) : undefined;
|
||||
let body = extractText(msg.message ?? undefined);
|
||||
if (locationText) {
|
||||
body = [body, locationText].filter(Boolean).join("\n").trim();
|
||||
}
|
||||
if (!body) {
|
||||
body = extractMediaPlaceholder(msg.message ?? undefined);
|
||||
if (!body) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
const replyContext = describeReplyContext(msg.message as proto.IMessage | undefined);
|
||||
|
||||
let mediaPath: string | undefined;
|
||||
let mediaType: string | undefined;
|
||||
let mediaFileName: string | undefined;
|
||||
try {
|
||||
const inboundMedia = await downloadInboundMedia(msg as proto.IWebMessageInfo, sock);
|
||||
if (inboundMedia) {
|
||||
const maxMb =
|
||||
typeof options.mediaMaxMb === "number" && options.mediaMaxMb > 0
|
||||
? options.mediaMaxMb
|
||||
: 50;
|
||||
const maxBytes = maxMb * 1024 * 1024;
|
||||
const saved = await saveMediaBuffer(
|
||||
inboundMedia.buffer,
|
||||
inboundMedia.mimetype,
|
||||
"inbound",
|
||||
maxBytes,
|
||||
inboundMedia.fileName,
|
||||
);
|
||||
mediaPath = saved.path;
|
||||
mediaType = inboundMedia.mimetype;
|
||||
mediaFileName = inboundMedia.fileName;
|
||||
}
|
||||
} catch (err) {
|
||||
logVerbose(`Inbound media download failed: ${String(err)}`);
|
||||
}
|
||||
|
||||
return {
|
||||
body,
|
||||
location: location ?? undefined,
|
||||
replyContext,
|
||||
mediaPath,
|
||||
mediaType,
|
||||
mediaFileName,
|
||||
};
|
||||
};
|
||||
|
||||
const enqueueInboundMessage = async (
|
||||
msg: WAMessage,
|
||||
inbound: NormalizedInboundMessage,
|
||||
enriched: EnrichedInboundMessage,
|
||||
) => {
|
||||
const chatJid = inbound.remoteJid;
|
||||
const sendComposing = async () => {
|
||||
try {
|
||||
await sock.sendPresenceUpdate("composing", chatJid);
|
||||
} catch (err) {
|
||||
logVerbose(`Presence update failed: ${String(err)}`);
|
||||
}
|
||||
};
|
||||
const reply = async (text: string) => {
|
||||
await sock.sendMessage(chatJid, { text });
|
||||
};
|
||||
const sendMedia = async (payload: AnyMessageContent) => {
|
||||
await sock.sendMessage(chatJid, payload);
|
||||
};
|
||||
const timestamp = inbound.messageTimestampMs;
|
||||
const mentionedJids = extractMentionedJids(msg.message as proto.IMessage | undefined);
|
||||
const senderName = msg.pushName ?? undefined;
|
||||
|
||||
inboundLogger.info(
|
||||
{
|
||||
from: inbound.from,
|
||||
to: selfE164 ?? "me",
|
||||
body: enriched.body,
|
||||
mediaPath: enriched.mediaPath,
|
||||
mediaType: enriched.mediaType,
|
||||
mediaFileName: enriched.mediaFileName,
|
||||
timestamp,
|
||||
},
|
||||
"inbound message",
|
||||
);
|
||||
const inboundMessage: WebInboundMessage = {
|
||||
id: inbound.id,
|
||||
from: inbound.from,
|
||||
conversationId: inbound.from,
|
||||
to: selfE164 ?? "me",
|
||||
accountId: inbound.access.resolvedAccountId,
|
||||
body: enriched.body,
|
||||
pushName: senderName,
|
||||
timestamp,
|
||||
chatType: inbound.group ? "group" : "direct",
|
||||
chatId: inbound.remoteJid,
|
||||
senderJid: inbound.participantJid,
|
||||
senderE164: inbound.senderE164 ?? undefined,
|
||||
senderName,
|
||||
replyToId: enriched.replyContext?.id,
|
||||
replyToBody: enriched.replyContext?.body,
|
||||
replyToSender: enriched.replyContext?.sender,
|
||||
replyToSenderJid: enriched.replyContext?.senderJid,
|
||||
replyToSenderE164: enriched.replyContext?.senderE164,
|
||||
groupSubject: inbound.groupSubject,
|
||||
groupParticipants: inbound.groupParticipants,
|
||||
mentionedJids: mentionedJids ?? undefined,
|
||||
selfJid,
|
||||
selfE164,
|
||||
fromMe: Boolean(msg.key?.fromMe),
|
||||
location: enriched.location ?? undefined,
|
||||
sendComposing,
|
||||
reply,
|
||||
sendMedia,
|
||||
mediaPath: enriched.mediaPath,
|
||||
mediaType: enriched.mediaType,
|
||||
mediaFileName: enriched.mediaFileName,
|
||||
};
|
||||
try {
|
||||
const task = Promise.resolve(debouncer.enqueue(inboundMessage));
|
||||
void task.catch((err) => {
|
||||
inboundLogger.error({ error: String(err) }, "failed handling inbound web message");
|
||||
inboundConsoleLog.error(`Failed handling inbound web message: ${String(err)}`);
|
||||
});
|
||||
} catch (err) {
|
||||
inboundLogger.error({ error: String(err) }, "failed handling inbound web message");
|
||||
inboundConsoleLog.error(`Failed handling inbound web message: ${String(err)}`);
|
||||
}
|
||||
};
|
||||
|
||||
const handleMessagesUpsert = async (upsert: { type?: string; messages?: Array<WAMessage> }) => {
|
||||
if (upsert.type !== "notify" && upsert.type !== "append") {
|
||||
return;
|
||||
@@ -161,187 +404,24 @@ export async function monitorWebInbox(options: {
|
||||
accountId: options.accountId,
|
||||
direction: "inbound",
|
||||
});
|
||||
const id = msg.key?.id ?? undefined;
|
||||
const remoteJid = msg.key?.remoteJid;
|
||||
if (!remoteJid) {
|
||||
continue;
|
||||
}
|
||||
if (remoteJid.endsWith("@status") || remoteJid.endsWith("@broadcast")) {
|
||||
const inbound = await normalizeInboundMessage(msg);
|
||||
if (!inbound) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const group = isJidGroup(remoteJid) === true;
|
||||
if (id) {
|
||||
const dedupeKey = `${options.accountId}:${remoteJid}:${id}`;
|
||||
if (isRecentInboundMessage(dedupeKey)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
const participantJid = msg.key?.participant ?? undefined;
|
||||
const from = group ? remoteJid : await resolveInboundJid(remoteJid);
|
||||
if (!from) {
|
||||
continue;
|
||||
}
|
||||
const senderE164 = group
|
||||
? participantJid
|
||||
? await resolveInboundJid(participantJid)
|
||||
: null
|
||||
: from;
|
||||
|
||||
let groupSubject: string | undefined;
|
||||
let groupParticipants: string[] | undefined;
|
||||
if (group) {
|
||||
const meta = await getGroupMeta(remoteJid);
|
||||
groupSubject = meta.subject;
|
||||
groupParticipants = meta.participants;
|
||||
}
|
||||
const messageTimestampMs = msg.messageTimestamp
|
||||
? Number(msg.messageTimestamp) * 1000
|
||||
: undefined;
|
||||
|
||||
const access = await checkInboundAccessControl({
|
||||
accountId: options.accountId,
|
||||
from,
|
||||
selfE164,
|
||||
senderE164,
|
||||
group,
|
||||
pushName: msg.pushName ?? undefined,
|
||||
isFromMe: Boolean(msg.key?.fromMe),
|
||||
messageTimestampMs,
|
||||
connectedAtMs,
|
||||
sock: { sendMessage: (jid, content) => sock.sendMessage(jid, content) },
|
||||
remoteJid,
|
||||
});
|
||||
if (!access.allowed) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (id && !access.isSelfChat && options.sendReadReceipts !== false) {
|
||||
const participant = msg.key?.participant;
|
||||
try {
|
||||
await sock.readMessages([{ remoteJid, id, participant, fromMe: false }]);
|
||||
if (shouldLogVerbose()) {
|
||||
const suffix = participant ? ` (participant ${participant})` : "";
|
||||
logVerbose(`Marked message ${id} as read for ${remoteJid}${suffix}`);
|
||||
}
|
||||
} catch (err) {
|
||||
logVerbose(`Failed to mark message ${id} read: ${String(err)}`);
|
||||
}
|
||||
} else if (id && access.isSelfChat && shouldLogVerbose()) {
|
||||
// Self-chat mode: never auto-send read receipts (blue ticks) on behalf of the owner.
|
||||
logVerbose(`Self-chat mode: skipping read receipt for ${id}`);
|
||||
}
|
||||
await maybeMarkInboundAsRead(inbound);
|
||||
|
||||
// If this is history/offline catch-up, mark read above but skip auto-reply.
|
||||
if (upsert.type === "append") {
|
||||
continue;
|
||||
}
|
||||
|
||||
const location = extractLocationData(msg.message ?? undefined);
|
||||
const locationText = location ? formatLocationText(location) : undefined;
|
||||
let body = extractText(msg.message ?? undefined);
|
||||
if (locationText) {
|
||||
body = [body, locationText].filter(Boolean).join("\n").trim();
|
||||
}
|
||||
if (!body) {
|
||||
body = extractMediaPlaceholder(msg.message ?? undefined);
|
||||
if (!body) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
const replyContext = describeReplyContext(msg.message as proto.IMessage | undefined);
|
||||
|
||||
let mediaPath: string | undefined;
|
||||
let mediaType: string | undefined;
|
||||
let mediaFileName: string | undefined;
|
||||
try {
|
||||
const inboundMedia = await downloadInboundMedia(msg as proto.IWebMessageInfo, sock);
|
||||
if (inboundMedia) {
|
||||
const maxMb =
|
||||
typeof options.mediaMaxMb === "number" && options.mediaMaxMb > 0
|
||||
? options.mediaMaxMb
|
||||
: 50;
|
||||
const maxBytes = maxMb * 1024 * 1024;
|
||||
const saved = await saveMediaBuffer(
|
||||
inboundMedia.buffer,
|
||||
inboundMedia.mimetype,
|
||||
"inbound",
|
||||
maxBytes,
|
||||
inboundMedia.fileName,
|
||||
);
|
||||
mediaPath = saved.path;
|
||||
mediaType = inboundMedia.mimetype;
|
||||
mediaFileName = inboundMedia.fileName;
|
||||
}
|
||||
} catch (err) {
|
||||
logVerbose(`Inbound media download failed: ${String(err)}`);
|
||||
const enriched = await enrichInboundMessage(msg);
|
||||
if (!enriched) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const chatJid = remoteJid;
|
||||
const sendComposing = async () => {
|
||||
try {
|
||||
await sock.sendPresenceUpdate("composing", chatJid);
|
||||
} catch (err) {
|
||||
logVerbose(`Presence update failed: ${String(err)}`);
|
||||
}
|
||||
};
|
||||
const reply = async (text: string) => {
|
||||
await sock.sendMessage(chatJid, { text });
|
||||
};
|
||||
const sendMedia = async (payload: AnyMessageContent) => {
|
||||
await sock.sendMessage(chatJid, payload);
|
||||
};
|
||||
const timestamp = messageTimestampMs;
|
||||
const mentionedJids = extractMentionedJids(msg.message as proto.IMessage | undefined);
|
||||
const senderName = msg.pushName ?? undefined;
|
||||
|
||||
inboundLogger.info(
|
||||
{ from, to: selfE164 ?? "me", body, mediaPath, mediaType, mediaFileName, timestamp },
|
||||
"inbound message",
|
||||
);
|
||||
const inboundMessage: WebInboundMessage = {
|
||||
id,
|
||||
from,
|
||||
conversationId: from,
|
||||
to: selfE164 ?? "me",
|
||||
accountId: access.resolvedAccountId,
|
||||
body,
|
||||
pushName: senderName,
|
||||
timestamp,
|
||||
chatType: group ? "group" : "direct",
|
||||
chatId: remoteJid,
|
||||
senderJid: participantJid,
|
||||
senderE164: senderE164 ?? undefined,
|
||||
senderName,
|
||||
replyToId: replyContext?.id,
|
||||
replyToBody: replyContext?.body,
|
||||
replyToSender: replyContext?.sender,
|
||||
replyToSenderJid: replyContext?.senderJid,
|
||||
replyToSenderE164: replyContext?.senderE164,
|
||||
groupSubject,
|
||||
groupParticipants,
|
||||
mentionedJids: mentionedJids ?? undefined,
|
||||
selfJid,
|
||||
selfE164,
|
||||
fromMe: Boolean(msg.key?.fromMe),
|
||||
location: location ?? undefined,
|
||||
sendComposing,
|
||||
reply,
|
||||
sendMedia,
|
||||
mediaPath,
|
||||
mediaType,
|
||||
mediaFileName,
|
||||
};
|
||||
try {
|
||||
const task = Promise.resolve(debouncer.enqueue(inboundMessage));
|
||||
void task.catch((err) => {
|
||||
inboundLogger.error({ error: String(err) }, "failed handling inbound web message");
|
||||
inboundConsoleLog.error(`Failed handling inbound web message: ${String(err)}`);
|
||||
});
|
||||
} catch (err) {
|
||||
inboundLogger.error({ error: String(err) }, "failed handling inbound web message");
|
||||
inboundConsoleLog.error(`Failed handling inbound web message: ${String(err)}`);
|
||||
}
|
||||
await enqueueInboundMessage(msg, inbound, enriched);
|
||||
}
|
||||
};
|
||||
sock.ev.on("messages.upsert", handleMessagesUpsert);
|
||||
|
||||
Reference in New Issue
Block a user