diff --git a/extensions/bluebubbles/src/history.ts b/extensions/bluebubbles/src/history.ts index 8bca83e61e1..672e2c48c80 100644 --- a/extensions/bluebubbles/src/history.ts +++ b/extensions/bluebubbles/src/history.ts @@ -44,6 +44,29 @@ function resolveAccount(params: BlueBubblesChatOpts) { return resolveBlueBubblesServerAccount(params); } +const MAX_HISTORY_FETCH_LIMIT = 100; +const HISTORY_SCAN_MULTIPLIER = 8; +const MAX_HISTORY_SCAN_MESSAGES = 500; +const MAX_HISTORY_BODY_CHARS = 2_000; + +function clampHistoryLimit(limit: number): number { + if (!Number.isFinite(limit)) { + return 0; + } + const normalized = Math.floor(limit); + if (normalized <= 0) { + return 0; + } + return Math.min(normalized, MAX_HISTORY_FETCH_LIMIT); +} + +function truncateHistoryBody(text: string): string { + if (text.length <= MAX_HISTORY_BODY_CHARS) { + return text; + } + return `${text.slice(0, MAX_HISTORY_BODY_CHARS).trimEnd()}...`; +} + /** * Fetch message history from BlueBubbles API for a specific chat. * This provides the initial backfill for both group chats and DMs. @@ -53,7 +76,8 @@ export async function fetchBlueBubblesHistory( limit: number, opts: BlueBubblesChatOpts = {}, ): Promise { - if (!chatIdentifier.trim() || limit <= 0) { + const effectiveLimit = clampHistoryLimit(limit); + if (!chatIdentifier.trim() || effectiveLimit <= 0) { return { entries: [], resolved: true }; } @@ -67,9 +91,9 @@ export async function fetchBlueBubblesHistory( // Try different common API patterns for fetching messages const possiblePaths = [ - `/api/v1/chat/${encodeURIComponent(chatIdentifier)}/messages?limit=${limit}&sort=DESC`, - `/api/v1/messages?chatGuid=${encodeURIComponent(chatIdentifier)}&limit=${limit}`, - `/api/v1/chat/${encodeURIComponent(chatIdentifier)}/message?limit=${limit}`, + `/api/v1/chat/${encodeURIComponent(chatIdentifier)}/messages?limit=${effectiveLimit}&sort=DESC`, + `/api/v1/messages?chatGuid=${encodeURIComponent(chatIdentifier)}&limit=${effectiveLimit}`, + `/api/v1/chat/${encodeURIComponent(chatIdentifier)}/message?limit=${effectiveLimit}`, ]; for (const path of possiblePaths) { @@ -104,7 +128,12 @@ export async function fetchBlueBubblesHistory( const historyEntries: BlueBubblesHistoryEntry[] = []; - for (const item of messages) { + const maxScannedMessages = Math.min( + Math.max(effectiveLimit * HISTORY_SCAN_MULTIPLIER, effectiveLimit), + MAX_HISTORY_SCAN_MESSAGES, + ); + for (let i = 0; i < messages.length && i < maxScannedMessages; i++) { + const item = messages[i]; const msg = item as BlueBubblesMessageData; // Skip messages without text content @@ -120,7 +149,7 @@ export async function fetchBlueBubblesHistory( historyEntries.push({ sender, - body: text, + body: truncateHistoryBody(text), timestamp, messageId: msg.guid, }); @@ -134,7 +163,7 @@ export async function fetchBlueBubblesHistory( }); return { - entries: historyEntries.slice(0, limit), // Ensure we don't exceed the requested limit + entries: historyEntries.slice(0, effectiveLimit), // Ensure we don't exceed the requested limit resolved: true, }; } catch (error) { diff --git a/extensions/bluebubbles/src/monitor-processing.ts b/extensions/bluebubbles/src/monitor-processing.ts index 69964a94996..ea2a14e1a2e 100644 --- a/extensions/bluebubbles/src/monitor-processing.ts +++ b/extensions/bluebubbles/src/monitor-processing.ts @@ -246,9 +246,21 @@ function resolveBlueBubblesAckReaction(params: { * API backfill is attempted until one fetch resolves (or retries are exhausted). */ const chatHistories = new Map(); -const completedHistoryBackfills = new Set(); -const historyBackfillAttempts = new Map(); -const MAX_HISTORY_BACKFILL_ATTEMPTS = 3; +type HistoryBackfillState = { + attempts: number; + firstAttemptAt: number; + nextAttemptAt: number; + resolved: boolean; +}; + +const historyBackfills = new Map(); +const HISTORY_BACKFILL_BASE_DELAY_MS = 5_000; +const HISTORY_BACKFILL_MAX_DELAY_MS = 2 * 60 * 1000; +const HISTORY_BACKFILL_MAX_ATTEMPTS = 6; +const HISTORY_BACKFILL_RETRY_WINDOW_MS = 30 * 60 * 1000; +const MAX_STORED_HISTORY_ENTRY_CHARS = 2_000; +const MAX_INBOUND_HISTORY_ENTRY_CHARS = 1_200; +const MAX_INBOUND_HISTORY_TOTAL_CHARS = 12_000; function buildAccountScopedHistoryKey(accountId: string, historyIdentifier: string): string { return `${accountId}\u0000${historyIdentifier}`; @@ -262,6 +274,17 @@ function historyDedupKey(entry: HistoryEntry): string { return `fallback:${entry.sender}\u0000${entry.body}\u0000${entry.timestamp ?? ""}`; } +function truncateHistoryBody(body: string, maxChars: number): string { + const trimmed = body.trim(); + if (!trimmed) { + return ""; + } + if (trimmed.length <= maxChars) { + return trimmed; + } + return `${trimmed.slice(0, maxChars).trimEnd()}...`; +} + function mergeHistoryEntries(params: { apiEntries: HistoryEntry[]; currentEntries: HistoryEntry[]; @@ -296,16 +319,97 @@ function mergeHistoryEntries(params: { } function pruneHistoryBackfillState(): void { - for (const key of completedHistoryBackfills) { + for (const key of historyBackfills.keys()) { if (!chatHistories.has(key)) { - completedHistoryBackfills.delete(key); + historyBackfills.delete(key); } } - for (const key of historyBackfillAttempts.keys()) { - if (!chatHistories.has(key)) { - historyBackfillAttempts.delete(key); +} + +function markHistoryBackfillResolved(historyKey: string): void { + const state = historyBackfills.get(historyKey); + if (state) { + state.resolved = true; + historyBackfills.set(historyKey, state); + return; + } + historyBackfills.set(historyKey, { + attempts: 0, + firstAttemptAt: Date.now(), + nextAttemptAt: Number.POSITIVE_INFINITY, + resolved: true, + }); +} + +function planHistoryBackfillAttempt(historyKey: string, now: number): HistoryBackfillState | null { + const existing = historyBackfills.get(historyKey); + if (existing?.resolved) { + return null; + } + if (existing && now - existing.firstAttemptAt > HISTORY_BACKFILL_RETRY_WINDOW_MS) { + markHistoryBackfillResolved(historyKey); + return null; + } + if (existing && existing.attempts >= HISTORY_BACKFILL_MAX_ATTEMPTS) { + markHistoryBackfillResolved(historyKey); + return null; + } + if (existing && now < existing.nextAttemptAt) { + return null; + } + + const attempts = (existing?.attempts ?? 0) + 1; + const firstAttemptAt = existing?.firstAttemptAt ?? now; + const backoffDelay = Math.min( + HISTORY_BACKFILL_BASE_DELAY_MS * 2 ** (attempts - 1), + HISTORY_BACKFILL_MAX_DELAY_MS, + ); + const state: HistoryBackfillState = { + attempts, + firstAttemptAt, + nextAttemptAt: now + backoffDelay, + resolved: false, + }; + historyBackfills.set(historyKey, state); + return state; +} + +function buildInboundHistorySnapshot(params: { + entries: HistoryEntry[]; + limit: number; +}): Array<{ sender: string; body: string; timestamp?: number }> | undefined { + if (params.limit <= 0 || params.entries.length === 0) { + return undefined; + } + const recent = params.entries.slice(-params.limit); + const selected: Array<{ sender: string; body: string; timestamp?: number }> = []; + let remainingChars = MAX_INBOUND_HISTORY_TOTAL_CHARS; + + for (let i = recent.length - 1; i >= 0; i--) { + const entry = recent[i]; + const body = truncateHistoryBody(entry.body, MAX_INBOUND_HISTORY_ENTRY_CHARS); + if (!body) { + continue; + } + if (selected.length > 0 && body.length > remainingChars) { + break; + } + selected.push({ + sender: entry.sender, + body, + timestamp: entry.timestamp, + }); + remainingChars -= body.length; + if (remainingChars <= 0) { + break; } } + + if (selected.length === 0) { + return undefined; + } + selected.reverse(); + return selected; } export async function processMessage( @@ -901,43 +1005,48 @@ export async function processMessage( // Record the current message into rolling history if (historyKey && historyLimit > 0) { + const nowMs = Date.now(); const senderLabel = message.fromMe ? "me" : message.senderName || message.senderId; + const normalizedHistoryBody = truncateHistoryBody(text, MAX_STORED_HISTORY_ENTRY_CHARS); const currentEntries = recordPendingHistoryEntryIfEnabled({ historyMap: chatHistories, limit: historyLimit, historyKey, - entry: text + entry: normalizedHistoryBody ? { sender: senderLabel, - body: text, - timestamp: message.timestamp ?? Date.now(), + body: normalizedHistoryBody, + timestamp: message.timestamp ?? nowMs, messageId: message.messageId ?? undefined, } : null, }); pruneHistoryBackfillState(); - const attempts = historyBackfillAttempts.get(historyKey) ?? 0; - const shouldTryBackfill = - !completedHistoryBackfills.has(historyKey) && attempts < MAX_HISTORY_BACKFILL_ATTEMPTS; - if (shouldTryBackfill) { - historyBackfillAttempts.set(historyKey, attempts + 1); + const backfillAttempt = planHistoryBackfillAttempt(historyKey, nowMs); + if (backfillAttempt) { try { const backfillResult = await fetchBlueBubblesHistory(historyIdentifier, historyLimit, { cfg: config, accountId: account.accountId, }); if (backfillResult.resolved) { - completedHistoryBackfills.add(historyKey); - historyBackfillAttempts.delete(historyKey); + markHistoryBackfillResolved(historyKey); } if (backfillResult.entries.length > 0) { - const apiEntries: HistoryEntry[] = backfillResult.entries.map((e) => ({ - sender: e.sender, - body: e.body, - timestamp: e.timestamp, - messageId: e.messageId, - })); + const apiEntries: HistoryEntry[] = []; + for (const entry of backfillResult.entries) { + const body = truncateHistoryBody(entry.body, MAX_STORED_HISTORY_ENTRY_CHARS); + if (!body) { + continue; + } + apiEntries.push({ + sender: entry.sender, + body, + timestamp: entry.timestamp, + messageId: entry.messageId, + }); + } const merged = mergeHistoryEntries({ apiEntries, currentEntries: @@ -951,19 +1060,21 @@ export async function processMessage( `backfilled ${backfillResult.entries.length} history messages for ${isGroup ? "group" : "DM"}: ${historyIdentifier}`, ); } else if (!backfillResult.resolved) { - const remainingAttempts = MAX_HISTORY_BACKFILL_ATTEMPTS - (attempts + 1); + const remainingAttempts = HISTORY_BACKFILL_MAX_ATTEMPTS - backfillAttempt.attempts; + const nextBackoffMs = Math.max(backfillAttempt.nextAttemptAt - nowMs, 0); logVerbose( core, runtime, - `history backfill unresolved for ${historyIdentifier}; retries left=${Math.max(remainingAttempts, 0)}`, + `history backfill unresolved for ${historyIdentifier}; retries left=${Math.max(remainingAttempts, 0)} next_in_ms=${nextBackoffMs}`, ); } } catch (err) { - const remainingAttempts = MAX_HISTORY_BACKFILL_ATTEMPTS - (attempts + 1); + const remainingAttempts = HISTORY_BACKFILL_MAX_ATTEMPTS - backfillAttempt.attempts; + const nextBackoffMs = Math.max(backfillAttempt.nextAttemptAt - nowMs, 0); logVerbose( core, runtime, - `history backfill failed for ${historyIdentifier}: ${String(err)} (retries left=${Math.max(remainingAttempts, 0)})`, + `history backfill failed for ${historyIdentifier}: ${String(err)} (retries left=${Math.max(remainingAttempts, 0)} next_in_ms=${nextBackoffMs})`, ); } } @@ -974,11 +1085,10 @@ export async function processMessage( if (historyKey && historyLimit > 0) { const entries = chatHistories.get(historyKey); if (entries && entries.length > 0) { - inboundHistory = entries.map((e) => ({ - sender: e.sender, - body: e.body, - timestamp: e.timestamp, - })); + inboundHistory = buildInboundHistorySnapshot({ + entries, + limit: historyLimit, + }); } } diff --git a/extensions/bluebubbles/src/monitor.test.ts b/extensions/bluebubbles/src/monitor.test.ts index fd58130cba6..d9468c6d2ef 100644 --- a/extensions/bluebubbles/src/monitor.test.ts +++ b/extensions/bluebubbles/src/monitor.test.ts @@ -3029,7 +3029,7 @@ describe("BlueBubbles webhook monitor", () => { expect(inboundHistory.filter((entry) => entry.body === "current text")).toHaveLength(1); }); - it("retries unresolved backfill and stops retrying after a resolved fetch", async () => { + it("uses exponential backoff for unresolved backfill and stops after resolve", async () => { mockFetchBlueBubblesHistory .mockResolvedValueOnce({ resolved: false, entries: [] }) .mockResolvedValueOnce({ @@ -3039,7 +3039,7 @@ describe("BlueBubbles webhook monitor", () => { ], }); - const account = createMockAccount({ dmHistoryLimit: 3 }); + const account = createMockAccount({ dmHistoryLimit: 4 }); const config: OpenClawConfig = {}; const core = createMockRuntime(); setBlueBubblesRuntime(core); @@ -3052,7 +3052,7 @@ describe("BlueBubbles webhook monitor", () => { path: "/bluebubbles-webhook", }); - const mkPayload = (guid: string, text: string) => ({ + const mkPayload = (guid: string, text: string, now: number) => ({ type: "new-message", data: { text, @@ -3061,35 +3061,101 @@ describe("BlueBubbles webhook monitor", () => { isFromMe: false, guid, chatGuid: "iMessage;-;+15550003003", - date: Date.now(), + date: now, }, }); + let now = 1_700_000_000_000; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + try { + await handleBlueBubblesWebhookRequest( + createMockRequest("POST", "/bluebubbles-webhook", mkPayload("msg-1", "first text", now)), + createMockResponse(), + ); + await flushAsync(); + expect(mockFetchBlueBubblesHistory).toHaveBeenCalledTimes(1); + + now += 1_000; + await handleBlueBubblesWebhookRequest( + createMockRequest("POST", "/bluebubbles-webhook", mkPayload("msg-2", "second text", now)), + createMockResponse(), + ); + await flushAsync(); + expect(mockFetchBlueBubblesHistory).toHaveBeenCalledTimes(1); + + now += 6_000; + await handleBlueBubblesWebhookRequest( + createMockRequest("POST", "/bluebubbles-webhook", mkPayload("msg-3", "third text", now)), + createMockResponse(), + ); + await flushAsync(); + expect(mockFetchBlueBubblesHistory).toHaveBeenCalledTimes(2); + + const thirdCall = mockDispatchReplyWithBufferedBlockDispatcher.mock.calls[2]?.[0]; + const thirdHistory = (thirdCall?.ctx.InboundHistory ?? []) as Array<{ body: string }>; + expect(thirdHistory.map((entry) => entry.body)).toContain("older context"); + expect(thirdHistory.map((entry) => entry.body)).toContain("third text"); + + now += 10_000; + await handleBlueBubblesWebhookRequest( + createMockRequest("POST", "/bluebubbles-webhook", mkPayload("msg-4", "fourth text", now)), + createMockResponse(), + ); + await flushAsync(); + expect(mockFetchBlueBubblesHistory).toHaveBeenCalledTimes(2); + } finally { + nowSpy.mockRestore(); + } + }); + + it("caps inbound history payload size to reduce prompt-bomb risk", async () => { + const huge = "x".repeat(8_000); + mockFetchBlueBubblesHistory.mockResolvedValueOnce({ + resolved: true, + entries: Array.from({ length: 20 }, (_, idx) => ({ + sender: `Friend ${idx}`, + body: `${huge} ${idx}`, + messageId: `hist-${idx}`, + timestamp: idx + 1, + })), + }); + + const account = createMockAccount({ dmHistoryLimit: 20 }); + const config: OpenClawConfig = {}; + const core = createMockRuntime(); + setBlueBubblesRuntime(core); + + unregister = registerBlueBubblesWebhookTarget({ + account, + config, + runtime: { log: vi.fn(), error: vi.fn() }, + core, + path: "/bluebubbles-webhook", + }); + await handleBlueBubblesWebhookRequest( - createMockRequest("POST", "/bluebubbles-webhook", mkPayload("msg-1", "first text")), + createMockRequest("POST", "/bluebubbles-webhook", { + type: "new-message", + data: { + text: "latest text", + handle: { address: "+15551234567" }, + isGroup: false, + isFromMe: false, + guid: "msg-bomb-1", + chatGuid: "iMessage;-;+15550004004", + date: Date.now(), + }, + }), createMockResponse(), ); await flushAsync(); - expect(mockFetchBlueBubblesHistory).toHaveBeenCalledTimes(1); - await handleBlueBubblesWebhookRequest( - createMockRequest("POST", "/bluebubbles-webhook", mkPayload("msg-2", "second text")), - createMockResponse(), - ); - await flushAsync(); - expect(mockFetchBlueBubblesHistory).toHaveBeenCalledTimes(2); - - const secondCall = mockDispatchReplyWithBufferedBlockDispatcher.mock.calls[1]?.[0]; - const secondHistory = (secondCall?.ctx.InboundHistory ?? []) as Array<{ body: string }>; - expect(secondHistory.map((entry) => entry.body)).toContain("older context"); - expect(secondHistory.map((entry) => entry.body)).toContain("second text"); - - await handleBlueBubblesWebhookRequest( - createMockRequest("POST", "/bluebubbles-webhook", mkPayload("msg-3", "third text")), - createMockResponse(), - ); - await flushAsync(); - expect(mockFetchBlueBubblesHistory).toHaveBeenCalledTimes(2); + const callArgs = getFirstDispatchCall(); + const inboundHistory = (callArgs.ctx.InboundHistory ?? []) as Array<{ body: string }>; + const totalChars = inboundHistory.reduce((sum, entry) => sum + entry.body.length, 0); + expect(inboundHistory.length).toBeLessThan(20); + expect(totalChars).toBeLessThanOrEqual(12_000); + expect(inboundHistory.every((entry) => entry.body.length <= 1_203)).toBe(true); }); });