Add web provider inbound monitor with auto-replies

This commit is contained in:
Peter Steinberger
2025-11-24 18:33:50 +01:00
parent 5ee4f3219d
commit 9b4dceecfe
4 changed files with 474 additions and 48 deletions

View File

@@ -1,5 +1,6 @@
#!/usr/bin/env node
import { execFile, spawn } from "node:child_process";
import crypto from "node:crypto";
import fs from "node:fs";
import net from "node:net";
import os from "node:os";
@@ -8,6 +9,7 @@ import process, { stdin as input, stdout as output } from "node:process";
import readline from "node:readline/promises";
import { fileURLToPath } from "node:url";
import { promisify } from "node:util";
import bodyParser from "body-parser";
import chalk from "chalk";
import { Command } from "commander";
@@ -16,21 +18,22 @@ import express, { type Request, type Response } from "express";
import JSON5 from "json5";
import Twilio from "twilio";
import type { MessageInstance } from "twilio/lib/rest/api/v2010/account/message.js";
import {
danger,
info,
isYes,
isVerbose,
isYes,
logVerbose,
setVerbose,
setYes,
success,
warn,
} from "./globals.js";
import { loginWeb, sendMessageWeb } from "./provider-web.js";
import { loginWeb, monitorWebInbox, sendMessageWeb } from "./provider-web.js";
import {
Provider,
assertProvider,
CONFIG_DIR,
normalizeE164,
normalizePath,
sleep,
@@ -379,10 +382,23 @@ type WarelayConfig = {
template?: string; // prepend template string when building command/prompt
timeoutSeconds?: number; // optional command timeout; defaults to 600s
bodyPrefix?: string; // optional string prepended to Body before templating
session?: SessionConfig;
};
};
};
type SessionScope = "per-sender" | "global";
type SessionConfig = {
scope?: SessionScope;
resetTriggers?: string[];
idleMinutes?: number;
store?: string;
sessionArgNew?: string[];
sessionArgResume?: string[];
sessionArgBeforeBody?: boolean;
};
function loadConfig(): WarelayConfig {
// Read ~/.warelay/warelay.json (JSON5) if present.
try {
@@ -408,7 +424,7 @@ type GetReplyOptions = {
onReplyStart?: () => Promise<void> | void;
};
function applyTemplate(str: string, ctx: MsgContext) {
function applyTemplate(str: string, ctx: TemplateContext) {
// Simple {{Placeholder}} interpolation using inbound message context.
return str.replace(/{{\s*(\w+)\s*}}/g, (_, key) => {
const value = (ctx as Record<string, unknown>)[key];
@@ -416,12 +432,56 @@ function applyTemplate(str: string, ctx: MsgContext) {
});
}
type TemplateContext = MsgContext & {
BodyStripped?: string;
SessionId?: string;
IsNewSession?: string;
};
type SessionEntry = { sessionId: string; updatedAt: number };
const SESSION_STORE_DEFAULT = path.join(CONFIG_DIR, "sessions.json");
const DEFAULT_RESET_TRIGGER = "/new";
const DEFAULT_IDLE_MINUTES = 60;
function resolveStorePath(store?: string) {
if (!store) return SESSION_STORE_DEFAULT;
if (store.startsWith("~")) return path.resolve(store.replace("~", os.homedir()));
return path.resolve(store);
}
function loadSessionStore(storePath: string): Record<string, SessionEntry> {
try {
const raw = fs.readFileSync(storePath, "utf-8");
const parsed = JSON5.parse(raw);
if (parsed && typeof parsed === "object") {
return parsed as Record<string, SessionEntry>;
}
} catch {
// ignore missing/invalid store; we'll recreate it
}
return {};
}
async function saveSessionStore(storePath: string, store: Record<string, SessionEntry>) {
await fs.promises.mkdir(path.dirname(storePath), { recursive: true });
await fs.promises.writeFile(storePath, JSON.stringify(store, null, 2), "utf-8");
}
function deriveSessionKey(scope: SessionScope, ctx: MsgContext) {
if (scope === "global") return "global";
const from = ctx.From ? normalizeE164(ctx.From) : "";
return from || "unknown";
}
async function getReplyFromConfig(
ctx: MsgContext,
opts?: GetReplyOptions,
configOverride?: WarelayConfig,
commandRunner: typeof runCommandWithTimeout = runCommandWithTimeout,
): Promise<string | undefined> {
// Choose reply from config: static text or external command stdout.
const cfg = loadConfig();
const cfg = configOverride ?? loadConfig();
const reply = cfg.inbound?.reply;
const timeoutSeconds = Math.max(reply?.timeoutSeconds ?? 600, 1);
const timeoutMs = timeoutSeconds * 1000;
@@ -432,14 +492,73 @@ async function getReplyFromConfig(
await opts?.onReplyStart?.();
};
// Optional session handling (conversation reuse + /new resets)
const sessionCfg = reply?.session;
const resetTriggers =
sessionCfg?.resetTriggers?.length
? sessionCfg.resetTriggers
: [DEFAULT_RESET_TRIGGER];
const idleMinutes = Math.max(sessionCfg?.idleMinutes ?? DEFAULT_IDLE_MINUTES, 1);
const sessionScope = sessionCfg?.scope ?? "per-sender";
const storePath = resolveStorePath(sessionCfg?.store);
let sessionId: string | undefined;
let isNewSession = false;
let bodyStripped: string | undefined;
if (sessionCfg) {
const trimmedBody = (ctx.Body ?? "").trim();
for (const trigger of resetTriggers) {
if (!trigger) continue;
if (trimmedBody === trigger) {
isNewSession = true;
bodyStripped = "";
break;
}
const triggerPrefix = `${trigger} `;
if (trimmedBody.startsWith(triggerPrefix)) {
isNewSession = true;
bodyStripped = trimmedBody.slice(trigger.length).trimStart();
break;
}
}
const sessionKey = deriveSessionKey(sessionScope, ctx);
const store = loadSessionStore(storePath);
const entry = store[sessionKey];
const idleMs = idleMinutes * 60_000;
const freshEntry = entry && Date.now() - entry.updatedAt <= idleMs;
if (!isNewSession && freshEntry) {
sessionId = entry.sessionId;
} else {
sessionId = crypto.randomUUID();
isNewSession = true;
}
store[sessionKey] = { sessionId, updatedAt: Date.now() };
await saveSessionStore(storePath, store);
}
const sessionCtx: TemplateContext = {
...ctx,
BodyStripped: bodyStripped ?? ctx.Body,
SessionId: sessionId,
IsNewSession: isNewSession ? "true" : "false",
};
// Optional prefix injected before Body for templating/command prompts.
const bodyPrefix = reply?.bodyPrefix
? applyTemplate(reply.bodyPrefix, ctx)
? applyTemplate(reply.bodyPrefix, sessionCtx)
: "";
const templatingCtx: MsgContext =
bodyPrefix && (ctx.Body ?? "").length >= 0
? { ...ctx, Body: `${bodyPrefix}${ctx.Body ?? ""}` }
: ctx;
const prefixedBody = bodyPrefix
? `${bodyPrefix}${sessionCtx.BodyStripped ?? sessionCtx.Body ?? ""}`
: sessionCtx.BodyStripped ?? sessionCtx.Body;
const templatingCtx: TemplateContext = {
...sessionCtx,
Body: prefixedBody,
BodyStripped: prefixedBody,
};
// Optional allowlist by origin number (E.164 without whatsapp: prefix)
const allowFrom = cfg.inbound?.allowFrom;
@@ -465,20 +584,36 @@ async function getReplyFromConfig(
if (reply.mode === "command" && reply.command?.length) {
await onReplyStart();
const argv = reply.command.map((part) =>
applyTemplate(part, templatingCtx),
);
let argv = reply.command.map((part) => applyTemplate(part, templatingCtx));
const templatePrefix = reply.template
? applyTemplate(reply.template, templatingCtx)
: "";
const finalArgv = templatePrefix
? [argv[0], templatePrefix, ...argv.slice(1)]
: argv;
if (templatePrefix && argv.length > 0) {
argv = [argv[0], templatePrefix, ...argv.slice(1)];
}
// Inject session args if configured (use resume for existing, session-id for new)
if (reply.session) {
const sessionArgList = (isNewSession
? reply.session.sessionArgNew ?? ["--session-id", "{{SessionId}}"]
: reply.session.sessionArgResume ?? ["--resume", "{{SessionId}}"]
).map((part) => applyTemplate(part, templatingCtx));
if (sessionArgList.length) {
const insertBeforeBody = reply.session.sessionArgBeforeBody ?? true;
const insertAt = insertBeforeBody && argv.length > 1 ? argv.length - 1 : argv.length;
argv = [
...argv.slice(0, insertAt),
...sessionArgList,
...argv.slice(insertAt),
];
}
}
const finalArgv = argv;
logVerbose(`Running command auto-reply: ${finalArgv.join(" ")}`);
const started = Date.now();
try {
const { stdout, stderr, code, signal, killed } =
await runCommandWithTimeout(finalArgv, timeoutMs);
await commandRunner(finalArgv, timeoutMs);
const trimmed = stdout.trim();
if (stderr?.trim()) {
logVerbose(`Command auto-reply stderr: ${stderr.trim()}`);
@@ -528,6 +663,7 @@ async function getReplyFromConfig(
async function autoReplyIfConfigured(
client: ReturnType<typeof createClient>,
message: MessageInstance,
configOverride?: WarelayConfig,
): Promise<void> {
// Fire a config-driven reply (text or command) for the inbound message, if configured.
const ctx: MsgContext = {
@@ -537,9 +673,13 @@ async function autoReplyIfConfigured(
MessageSid: message.sid,
};
const replyText = await getReplyFromConfig(ctx, {
onReplyStart: () => sendTypingIndicator(client, message.sid),
});
const replyText = await getReplyFromConfig(
ctx,
{
onReplyStart: () => sendTypingIndicator(client, message.sid),
},
configOverride,
);
if (!replyText) return;
const replyFrom = message.to;
@@ -1290,6 +1430,56 @@ async function monitor(intervalSeconds: number, lookbackMinutes: number) {
}
}
async function monitorWebProvider(verbose: boolean) {
// Listen for inbound personal WhatsApp Web messages and auto-reply if configured.
const listener = await monitorWebInbox({
verbose,
onMessage: async (msg) => {
const ts = msg.timestamp
? new Date(msg.timestamp).toISOString()
: new Date().toISOString();
console.log(`\n[${ts}] ${msg.from} -> ${msg.to}: ${msg.body}`);
const replyText = await getReplyFromConfig(
{
Body: msg.body,
From: msg.from,
To: msg.to,
MessageSid: msg.id,
},
{
onReplyStart: msg.sendComposing,
},
);
if (!replyText) return;
try {
await msg.reply(replyText);
if (isVerbose()) {
console.log(success(`↩️ Auto-replied to ${msg.from} (web)`));
}
} catch (err) {
console.error(
danger(`Failed sending web auto-reply to ${msg.from}: ${String(err)}`),
);
}
},
});
console.log(
info(
"📡 Listening for personal WhatsApp Web inbound messages. Leave this running; Ctrl+C to stop.",
),
);
process.on("SIGINT", () => {
void listener.close().finally(() => {
console.log("\n👋 Web monitor stopped");
process.exit(0);
});
});
await waitForever();
}
type ListedMessage = {
sid: string;
status: string | null;
@@ -1343,9 +1533,10 @@ function formatMessageLine(m: ListedMessage): string {
async function listRecentMessages(
lookbackMinutes: number,
limit: number,
clientOverride?: ReturnType<typeof createClient>,
): Promise<ListedMessage[]> {
const env = readEnv();
const client = createClient(env);
const client = clientOverride ?? createClient(env);
const from = withWhatsAppPrefix(env.whatsappFrom);
const since = new Date(Date.now() - lookbackMinutes * 60_000);
@@ -1390,7 +1581,12 @@ program
.option("--verbose", "Verbose connection logs", false)
.action(async (opts) => {
setVerbose(Boolean(opts.verbose));
await loginWeb(Boolean(opts.verbose));
try {
await loginWeb(Boolean(opts.verbose));
} catch (err) {
console.error(danger(`Web login failed: ${String(err)}`));
process.exit(1);
}
});
program
@@ -1475,6 +1671,23 @@ Examples:
await monitor(intervalSeconds, lookbackMinutes);
});
program
.command("web:monitor")
.description("Listen for inbound messages via personal WhatsApp Web and auto-reply")
.option("--verbose", "Verbose logging", false)
.addHelpText(
"after",
`
Examples:
warelay web:monitor # start auto-replies on your linked web session
warelay web:monitor --verbose # show low-level Baileys logs
`,
)
.action(async (opts) => {
setVerbose(Boolean(opts.verbose));
await monitorWebProvider(Boolean(opts.verbose));
});
program
.command("status")
.description("Show recent WhatsApp messages (sent and received)")
@@ -1669,7 +1882,49 @@ program
await waitForever();
});
export { normalizeE164, toWhatsappJid, assertProvider };
export {
assertProvider,
autoReplyIfConfigured,
applyTemplate,
createClient,
deriveSessionKey,
describePortOwner,
ensureBinary,
ensureFunnel,
ensureGoInstalled,
ensurePortAvailable,
ensureTailscaledInstalled,
findIncomingNumberSid,
findMessagingServiceSid,
findWhatsappSenderSid,
formatMessageLine,
getReplyFromConfig,
getTailnetHostname,
handlePortError,
listRecentMessages,
loadConfig,
loadSessionStore,
monitor,
monitorWebProvider,
normalizeE164,
PortInUseError,
promptYesNo,
readEnv,
resolveStorePath,
runCommandWithTimeout,
runExec,
saveSessionStore,
sendMessage,
sendTypingIndicator,
setMessagingServiceWebhook,
sortByDateDesc,
startWebhook,
updateWebhook,
uniqueBySid,
waitForFinalStatus,
waitForever,
toWhatsappJid,
};
const isMain =
process.argv[1] && fileURLToPath(import.meta.url) === process.argv[1];