diff --git a/extensions/bluebubbles/src/history.ts b/extensions/bluebubbles/src/history.ts index c72d8387ac8..23051e66a31 100644 --- a/extensions/bluebubbles/src/history.ts +++ b/extensions/bluebubbles/src/history.ts @@ -48,8 +48,11 @@ export async function fetchBlueBubblesHistory( return []; } - const { baseUrl, password, accountId } = resolveAccount(opts); - if (!baseUrl || !password) { + let baseUrl: string; + let password: string; + try { + ({ baseUrl, password } = resolveAccount(opts)); + } catch { return []; } @@ -101,13 +104,9 @@ export async function fetchBlueBubblesHistory( continue; } - // Skip from-me messages to avoid duplication - if (msg.is_from_me) { - continue; - } - - const sender = - msg.sender?.display_name || msg.sender?.address || msg.handle_id || "Unknown"; + const sender = msg.is_from_me + ? "me" + : msg.sender?.display_name || msg.sender?.address || msg.handle_id || "Unknown"; const timestamp = msg.date_created || msg.date_delivered; historyEntries.push({ diff --git a/extensions/bluebubbles/src/monitor-processing.ts b/extensions/bluebubbles/src/monitor-processing.ts index 189223ed268..914b8bf07e5 100644 --- a/extensions/bluebubbles/src/monitor-processing.ts +++ b/extensions/bluebubbles/src/monitor-processing.ts @@ -1,20 +1,19 @@ import type { OpenClawConfig } from "openclaw/plugin-sdk"; import { + buildPendingHistoryContextFromMap, createReplyPrefixOptions, logAckFailure, logInboundDrop, logTypingFailure, + recordPendingHistoryEntryIfEnabled, resolveAckReaction, resolveControlCommandGate, stripMarkdown, + type HistoryEntry, } from "openclaw/plugin-sdk"; import { downloadBlueBubblesAttachment } from "./attachments.js"; import { markBlueBubblesChatRead, sendBlueBubblesTyping } from "./chat.js"; -import { - fetchBlueBubblesHistory, - buildInboundHistoryFromEntries, - type BlueBubblesHistoryEntry, -} from "./history.js"; +import { fetchBlueBubblesHistory } from "./history.js"; import { sendBlueBubblesMedia } from "./media-send.js"; import { buildMessagePlaceholder, @@ -242,6 +241,13 @@ function resolveBlueBubblesAckReaction(params: { } } +/** + * In-memory rolling history map keyed by chat identifier. + * Populated from incoming messages during the session; API backfill only + * happens on the first message for a given chat (when the map is empty). + */ +const chatHistories = new Map(); + export async function processMessage( message: NormalizedWebhookMessage, target: WebhookTarget, @@ -818,42 +824,77 @@ export async function processMessage( .trim(); }; - // Fetch history for backfill (both groups and DMs) + // History: in-memory rolling map with API backfill on first message per chat const historyLimit = isGroup ? (account.config.historyLimit ?? 0) : (account.config.dmHistoryLimit ?? 0); - let inboundHistory: Array<{ sender: string; body: string; timestamp?: number }> | undefined; + const historyKey = + chatGuid || + chatIdentifier || + (chatId ? String(chatId) : null) || + (isGroup ? null : message.senderId) || + ""; - if (historyLimit > 0) { - // Determine chat identifier for history fetching - const historyIdentifier = - chatGuid || - chatIdentifier || - (chatId ? String(chatId) : null) || - (isGroup ? null : message.senderId); + // Record the current message into rolling history + if (historyKey && historyLimit > 0) { + const senderLabel = message.fromMe ? "me" : message.senderName || message.senderId; + recordPendingHistoryEntryIfEnabled({ + historyMap: chatHistories, + limit: historyLimit, + historyKey, + entry: text + ? { + sender: senderLabel, + body: text, + timestamp: Date.now(), + messageId: message.messageId ?? undefined, + } + : null, + }); - if (historyIdentifier) { + // API backfill only on first message (when history map was empty before recording) + const existing = chatHistories.get(historyKey); + if (existing && existing.length <= 1) { try { - const historyEntries = await fetchBlueBubblesHistory(historyIdentifier, historyLimit, { + const historyEntries = await fetchBlueBubblesHistory(historyKey, historyLimit, { cfg: config, accountId: account.accountId, }); - if (historyEntries.length > 0) { - inboundHistory = buildInboundHistoryFromEntries(historyEntries); + // Prepend API history before the current message + const apiEntries: HistoryEntry[] = historyEntries.map((e) => ({ + sender: e.sender, + body: e.body, + timestamp: e.timestamp, + messageId: e.messageId, + })); + chatHistories.set(historyKey, [...apiEntries, ...existing]); logVerbose( core, runtime, - `fetched ${historyEntries.length} history messages for ${isGroup ? "group" : "DM"}: ${historyIdentifier}`, + `backfilled ${historyEntries.length} history messages for ${isGroup ? "group" : "DM"}: ${historyKey}`, ); } } catch (err) { - logVerbose(core, runtime, `history fetch failed for ${historyIdentifier}: ${String(err)}`); + logVerbose(core, runtime, `history backfill failed for ${historyKey}: ${String(err)}`); } } } + // Build inbound history from the in-memory map + let inboundHistory: Array<{ sender: string; body: string; timestamp?: number }> | undefined; + if (historyKey && historyLimit > 0) { + const entries = chatHistories.get(historyKey); + if (entries && entries.length > 0) { + inboundHistory = entries.map((e) => ({ + sender: e.sender, + body: e.body, + timestamp: e.timestamp, + })); + } + } + const ctxPayload = core.channel.reply.finalizeInboundContext({ Body: body, BodyForAgent: rawBody,