mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-21 12:04:59 +00:00
BlueBubbles: add bounded exponential backoff and history payload guards
This commit is contained in:
@@ -44,6 +44,29 @@ function resolveAccount(params: BlueBubblesChatOpts) {
|
||||
return resolveBlueBubblesServerAccount(params);
|
||||
}
|
||||
|
||||
const MAX_HISTORY_FETCH_LIMIT = 100;
|
||||
const HISTORY_SCAN_MULTIPLIER = 8;
|
||||
const MAX_HISTORY_SCAN_MESSAGES = 500;
|
||||
const MAX_HISTORY_BODY_CHARS = 2_000;
|
||||
|
||||
function clampHistoryLimit(limit: number): number {
|
||||
if (!Number.isFinite(limit)) {
|
||||
return 0;
|
||||
}
|
||||
const normalized = Math.floor(limit);
|
||||
if (normalized <= 0) {
|
||||
return 0;
|
||||
}
|
||||
return Math.min(normalized, MAX_HISTORY_FETCH_LIMIT);
|
||||
}
|
||||
|
||||
function truncateHistoryBody(text: string): string {
|
||||
if (text.length <= MAX_HISTORY_BODY_CHARS) {
|
||||
return text;
|
||||
}
|
||||
return `${text.slice(0, MAX_HISTORY_BODY_CHARS).trimEnd()}...`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch message history from BlueBubbles API for a specific chat.
|
||||
* This provides the initial backfill for both group chats and DMs.
|
||||
@@ -53,7 +76,8 @@ export async function fetchBlueBubblesHistory(
|
||||
limit: number,
|
||||
opts: BlueBubblesChatOpts = {},
|
||||
): Promise<BlueBubblesHistoryFetchResult> {
|
||||
if (!chatIdentifier.trim() || limit <= 0) {
|
||||
const effectiveLimit = clampHistoryLimit(limit);
|
||||
if (!chatIdentifier.trim() || effectiveLimit <= 0) {
|
||||
return { entries: [], resolved: true };
|
||||
}
|
||||
|
||||
@@ -67,9 +91,9 @@ export async function fetchBlueBubblesHistory(
|
||||
|
||||
// Try different common API patterns for fetching messages
|
||||
const possiblePaths = [
|
||||
`/api/v1/chat/${encodeURIComponent(chatIdentifier)}/messages?limit=${limit}&sort=DESC`,
|
||||
`/api/v1/messages?chatGuid=${encodeURIComponent(chatIdentifier)}&limit=${limit}`,
|
||||
`/api/v1/chat/${encodeURIComponent(chatIdentifier)}/message?limit=${limit}`,
|
||||
`/api/v1/chat/${encodeURIComponent(chatIdentifier)}/messages?limit=${effectiveLimit}&sort=DESC`,
|
||||
`/api/v1/messages?chatGuid=${encodeURIComponent(chatIdentifier)}&limit=${effectiveLimit}`,
|
||||
`/api/v1/chat/${encodeURIComponent(chatIdentifier)}/message?limit=${effectiveLimit}`,
|
||||
];
|
||||
|
||||
for (const path of possiblePaths) {
|
||||
@@ -104,7 +128,12 @@ export async function fetchBlueBubblesHistory(
|
||||
|
||||
const historyEntries: BlueBubblesHistoryEntry[] = [];
|
||||
|
||||
for (const item of messages) {
|
||||
const maxScannedMessages = Math.min(
|
||||
Math.max(effectiveLimit * HISTORY_SCAN_MULTIPLIER, effectiveLimit),
|
||||
MAX_HISTORY_SCAN_MESSAGES,
|
||||
);
|
||||
for (let i = 0; i < messages.length && i < maxScannedMessages; i++) {
|
||||
const item = messages[i];
|
||||
const msg = item as BlueBubblesMessageData;
|
||||
|
||||
// Skip messages without text content
|
||||
@@ -120,7 +149,7 @@ export async function fetchBlueBubblesHistory(
|
||||
|
||||
historyEntries.push({
|
||||
sender,
|
||||
body: text,
|
||||
body: truncateHistoryBody(text),
|
||||
timestamp,
|
||||
messageId: msg.guid,
|
||||
});
|
||||
@@ -134,7 +163,7 @@ export async function fetchBlueBubblesHistory(
|
||||
});
|
||||
|
||||
return {
|
||||
entries: historyEntries.slice(0, limit), // Ensure we don't exceed the requested limit
|
||||
entries: historyEntries.slice(0, effectiveLimit), // Ensure we don't exceed the requested limit
|
||||
resolved: true,
|
||||
};
|
||||
} catch (error) {
|
||||
|
||||
@@ -246,9 +246,21 @@ function resolveBlueBubblesAckReaction(params: {
|
||||
* API backfill is attempted until one fetch resolves (or retries are exhausted).
|
||||
*/
|
||||
const chatHistories = new Map<string, HistoryEntry[]>();
|
||||
const completedHistoryBackfills = new Set<string>();
|
||||
const historyBackfillAttempts = new Map<string, number>();
|
||||
const MAX_HISTORY_BACKFILL_ATTEMPTS = 3;
|
||||
type HistoryBackfillState = {
|
||||
attempts: number;
|
||||
firstAttemptAt: number;
|
||||
nextAttemptAt: number;
|
||||
resolved: boolean;
|
||||
};
|
||||
|
||||
const historyBackfills = new Map<string, HistoryBackfillState>();
|
||||
const HISTORY_BACKFILL_BASE_DELAY_MS = 5_000;
|
||||
const HISTORY_BACKFILL_MAX_DELAY_MS = 2 * 60 * 1000;
|
||||
const HISTORY_BACKFILL_MAX_ATTEMPTS = 6;
|
||||
const HISTORY_BACKFILL_RETRY_WINDOW_MS = 30 * 60 * 1000;
|
||||
const MAX_STORED_HISTORY_ENTRY_CHARS = 2_000;
|
||||
const MAX_INBOUND_HISTORY_ENTRY_CHARS = 1_200;
|
||||
const MAX_INBOUND_HISTORY_TOTAL_CHARS = 12_000;
|
||||
|
||||
function buildAccountScopedHistoryKey(accountId: string, historyIdentifier: string): string {
|
||||
return `${accountId}\u0000${historyIdentifier}`;
|
||||
@@ -262,6 +274,17 @@ function historyDedupKey(entry: HistoryEntry): string {
|
||||
return `fallback:${entry.sender}\u0000${entry.body}\u0000${entry.timestamp ?? ""}`;
|
||||
}
|
||||
|
||||
function truncateHistoryBody(body: string, maxChars: number): string {
|
||||
const trimmed = body.trim();
|
||||
if (!trimmed) {
|
||||
return "";
|
||||
}
|
||||
if (trimmed.length <= maxChars) {
|
||||
return trimmed;
|
||||
}
|
||||
return `${trimmed.slice(0, maxChars).trimEnd()}...`;
|
||||
}
|
||||
|
||||
function mergeHistoryEntries(params: {
|
||||
apiEntries: HistoryEntry[];
|
||||
currentEntries: HistoryEntry[];
|
||||
@@ -296,16 +319,97 @@ function mergeHistoryEntries(params: {
|
||||
}
|
||||
|
||||
function pruneHistoryBackfillState(): void {
|
||||
for (const key of completedHistoryBackfills) {
|
||||
for (const key of historyBackfills.keys()) {
|
||||
if (!chatHistories.has(key)) {
|
||||
completedHistoryBackfills.delete(key);
|
||||
historyBackfills.delete(key);
|
||||
}
|
||||
}
|
||||
for (const key of historyBackfillAttempts.keys()) {
|
||||
if (!chatHistories.has(key)) {
|
||||
historyBackfillAttempts.delete(key);
|
||||
}
|
||||
|
||||
function markHistoryBackfillResolved(historyKey: string): void {
|
||||
const state = historyBackfills.get(historyKey);
|
||||
if (state) {
|
||||
state.resolved = true;
|
||||
historyBackfills.set(historyKey, state);
|
||||
return;
|
||||
}
|
||||
historyBackfills.set(historyKey, {
|
||||
attempts: 0,
|
||||
firstAttemptAt: Date.now(),
|
||||
nextAttemptAt: Number.POSITIVE_INFINITY,
|
||||
resolved: true,
|
||||
});
|
||||
}
|
||||
|
||||
function planHistoryBackfillAttempt(historyKey: string, now: number): HistoryBackfillState | null {
|
||||
const existing = historyBackfills.get(historyKey);
|
||||
if (existing?.resolved) {
|
||||
return null;
|
||||
}
|
||||
if (existing && now - existing.firstAttemptAt > HISTORY_BACKFILL_RETRY_WINDOW_MS) {
|
||||
markHistoryBackfillResolved(historyKey);
|
||||
return null;
|
||||
}
|
||||
if (existing && existing.attempts >= HISTORY_BACKFILL_MAX_ATTEMPTS) {
|
||||
markHistoryBackfillResolved(historyKey);
|
||||
return null;
|
||||
}
|
||||
if (existing && now < existing.nextAttemptAt) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const attempts = (existing?.attempts ?? 0) + 1;
|
||||
const firstAttemptAt = existing?.firstAttemptAt ?? now;
|
||||
const backoffDelay = Math.min(
|
||||
HISTORY_BACKFILL_BASE_DELAY_MS * 2 ** (attempts - 1),
|
||||
HISTORY_BACKFILL_MAX_DELAY_MS,
|
||||
);
|
||||
const state: HistoryBackfillState = {
|
||||
attempts,
|
||||
firstAttemptAt,
|
||||
nextAttemptAt: now + backoffDelay,
|
||||
resolved: false,
|
||||
};
|
||||
historyBackfills.set(historyKey, state);
|
||||
return state;
|
||||
}
|
||||
|
||||
function buildInboundHistorySnapshot(params: {
|
||||
entries: HistoryEntry[];
|
||||
limit: number;
|
||||
}): Array<{ sender: string; body: string; timestamp?: number }> | undefined {
|
||||
if (params.limit <= 0 || params.entries.length === 0) {
|
||||
return undefined;
|
||||
}
|
||||
const recent = params.entries.slice(-params.limit);
|
||||
const selected: Array<{ sender: string; body: string; timestamp?: number }> = [];
|
||||
let remainingChars = MAX_INBOUND_HISTORY_TOTAL_CHARS;
|
||||
|
||||
for (let i = recent.length - 1; i >= 0; i--) {
|
||||
const entry = recent[i];
|
||||
const body = truncateHistoryBody(entry.body, MAX_INBOUND_HISTORY_ENTRY_CHARS);
|
||||
if (!body) {
|
||||
continue;
|
||||
}
|
||||
if (selected.length > 0 && body.length > remainingChars) {
|
||||
break;
|
||||
}
|
||||
selected.push({
|
||||
sender: entry.sender,
|
||||
body,
|
||||
timestamp: entry.timestamp,
|
||||
});
|
||||
remainingChars -= body.length;
|
||||
if (remainingChars <= 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (selected.length === 0) {
|
||||
return undefined;
|
||||
}
|
||||
selected.reverse();
|
||||
return selected;
|
||||
}
|
||||
|
||||
export async function processMessage(
|
||||
@@ -901,43 +1005,48 @@ export async function processMessage(
|
||||
|
||||
// Record the current message into rolling history
|
||||
if (historyKey && historyLimit > 0) {
|
||||
const nowMs = Date.now();
|
||||
const senderLabel = message.fromMe ? "me" : message.senderName || message.senderId;
|
||||
const normalizedHistoryBody = truncateHistoryBody(text, MAX_STORED_HISTORY_ENTRY_CHARS);
|
||||
const currentEntries = recordPendingHistoryEntryIfEnabled({
|
||||
historyMap: chatHistories,
|
||||
limit: historyLimit,
|
||||
historyKey,
|
||||
entry: text
|
||||
entry: normalizedHistoryBody
|
||||
? {
|
||||
sender: senderLabel,
|
||||
body: text,
|
||||
timestamp: message.timestamp ?? Date.now(),
|
||||
body: normalizedHistoryBody,
|
||||
timestamp: message.timestamp ?? nowMs,
|
||||
messageId: message.messageId ?? undefined,
|
||||
}
|
||||
: null,
|
||||
});
|
||||
pruneHistoryBackfillState();
|
||||
|
||||
const attempts = historyBackfillAttempts.get(historyKey) ?? 0;
|
||||
const shouldTryBackfill =
|
||||
!completedHistoryBackfills.has(historyKey) && attempts < MAX_HISTORY_BACKFILL_ATTEMPTS;
|
||||
if (shouldTryBackfill) {
|
||||
historyBackfillAttempts.set(historyKey, attempts + 1);
|
||||
const backfillAttempt = planHistoryBackfillAttempt(historyKey, nowMs);
|
||||
if (backfillAttempt) {
|
||||
try {
|
||||
const backfillResult = await fetchBlueBubblesHistory(historyIdentifier, historyLimit, {
|
||||
cfg: config,
|
||||
accountId: account.accountId,
|
||||
});
|
||||
if (backfillResult.resolved) {
|
||||
completedHistoryBackfills.add(historyKey);
|
||||
historyBackfillAttempts.delete(historyKey);
|
||||
markHistoryBackfillResolved(historyKey);
|
||||
}
|
||||
if (backfillResult.entries.length > 0) {
|
||||
const apiEntries: HistoryEntry[] = backfillResult.entries.map((e) => ({
|
||||
sender: e.sender,
|
||||
body: e.body,
|
||||
timestamp: e.timestamp,
|
||||
messageId: e.messageId,
|
||||
}));
|
||||
const apiEntries: HistoryEntry[] = [];
|
||||
for (const entry of backfillResult.entries) {
|
||||
const body = truncateHistoryBody(entry.body, MAX_STORED_HISTORY_ENTRY_CHARS);
|
||||
if (!body) {
|
||||
continue;
|
||||
}
|
||||
apiEntries.push({
|
||||
sender: entry.sender,
|
||||
body,
|
||||
timestamp: entry.timestamp,
|
||||
messageId: entry.messageId,
|
||||
});
|
||||
}
|
||||
const merged = mergeHistoryEntries({
|
||||
apiEntries,
|
||||
currentEntries:
|
||||
@@ -951,19 +1060,21 @@ export async function processMessage(
|
||||
`backfilled ${backfillResult.entries.length} history messages for ${isGroup ? "group" : "DM"}: ${historyIdentifier}`,
|
||||
);
|
||||
} else if (!backfillResult.resolved) {
|
||||
const remainingAttempts = MAX_HISTORY_BACKFILL_ATTEMPTS - (attempts + 1);
|
||||
const remainingAttempts = HISTORY_BACKFILL_MAX_ATTEMPTS - backfillAttempt.attempts;
|
||||
const nextBackoffMs = Math.max(backfillAttempt.nextAttemptAt - nowMs, 0);
|
||||
logVerbose(
|
||||
core,
|
||||
runtime,
|
||||
`history backfill unresolved for ${historyIdentifier}; retries left=${Math.max(remainingAttempts, 0)}`,
|
||||
`history backfill unresolved for ${historyIdentifier}; retries left=${Math.max(remainingAttempts, 0)} next_in_ms=${nextBackoffMs}`,
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
const remainingAttempts = MAX_HISTORY_BACKFILL_ATTEMPTS - (attempts + 1);
|
||||
const remainingAttempts = HISTORY_BACKFILL_MAX_ATTEMPTS - backfillAttempt.attempts;
|
||||
const nextBackoffMs = Math.max(backfillAttempt.nextAttemptAt - nowMs, 0);
|
||||
logVerbose(
|
||||
core,
|
||||
runtime,
|
||||
`history backfill failed for ${historyIdentifier}: ${String(err)} (retries left=${Math.max(remainingAttempts, 0)})`,
|
||||
`history backfill failed for ${historyIdentifier}: ${String(err)} (retries left=${Math.max(remainingAttempts, 0)} next_in_ms=${nextBackoffMs})`,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -974,11 +1085,10 @@ export async function processMessage(
|
||||
if (historyKey && historyLimit > 0) {
|
||||
const entries = chatHistories.get(historyKey);
|
||||
if (entries && entries.length > 0) {
|
||||
inboundHistory = entries.map((e) => ({
|
||||
sender: e.sender,
|
||||
body: e.body,
|
||||
timestamp: e.timestamp,
|
||||
}));
|
||||
inboundHistory = buildInboundHistorySnapshot({
|
||||
entries,
|
||||
limit: historyLimit,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3029,7 +3029,7 @@ describe("BlueBubbles webhook monitor", () => {
|
||||
expect(inboundHistory.filter((entry) => entry.body === "current text")).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("retries unresolved backfill and stops retrying after a resolved fetch", async () => {
|
||||
it("uses exponential backoff for unresolved backfill and stops after resolve", async () => {
|
||||
mockFetchBlueBubblesHistory
|
||||
.mockResolvedValueOnce({ resolved: false, entries: [] })
|
||||
.mockResolvedValueOnce({
|
||||
@@ -3039,7 +3039,7 @@ describe("BlueBubbles webhook monitor", () => {
|
||||
],
|
||||
});
|
||||
|
||||
const account = createMockAccount({ dmHistoryLimit: 3 });
|
||||
const account = createMockAccount({ dmHistoryLimit: 4 });
|
||||
const config: OpenClawConfig = {};
|
||||
const core = createMockRuntime();
|
||||
setBlueBubblesRuntime(core);
|
||||
@@ -3052,7 +3052,7 @@ describe("BlueBubbles webhook monitor", () => {
|
||||
path: "/bluebubbles-webhook",
|
||||
});
|
||||
|
||||
const mkPayload = (guid: string, text: string) => ({
|
||||
const mkPayload = (guid: string, text: string, now: number) => ({
|
||||
type: "new-message",
|
||||
data: {
|
||||
text,
|
||||
@@ -3061,35 +3061,101 @@ describe("BlueBubbles webhook monitor", () => {
|
||||
isFromMe: false,
|
||||
guid,
|
||||
chatGuid: "iMessage;-;+15550003003",
|
||||
date: Date.now(),
|
||||
date: now,
|
||||
},
|
||||
});
|
||||
|
||||
let now = 1_700_000_000_000;
|
||||
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);
|
||||
try {
|
||||
await handleBlueBubblesWebhookRequest(
|
||||
createMockRequest("POST", "/bluebubbles-webhook", mkPayload("msg-1", "first text", now)),
|
||||
createMockResponse(),
|
||||
);
|
||||
await flushAsync();
|
||||
expect(mockFetchBlueBubblesHistory).toHaveBeenCalledTimes(1);
|
||||
|
||||
now += 1_000;
|
||||
await handleBlueBubblesWebhookRequest(
|
||||
createMockRequest("POST", "/bluebubbles-webhook", mkPayload("msg-2", "second text", now)),
|
||||
createMockResponse(),
|
||||
);
|
||||
await flushAsync();
|
||||
expect(mockFetchBlueBubblesHistory).toHaveBeenCalledTimes(1);
|
||||
|
||||
now += 6_000;
|
||||
await handleBlueBubblesWebhookRequest(
|
||||
createMockRequest("POST", "/bluebubbles-webhook", mkPayload("msg-3", "third text", now)),
|
||||
createMockResponse(),
|
||||
);
|
||||
await flushAsync();
|
||||
expect(mockFetchBlueBubblesHistory).toHaveBeenCalledTimes(2);
|
||||
|
||||
const thirdCall = mockDispatchReplyWithBufferedBlockDispatcher.mock.calls[2]?.[0];
|
||||
const thirdHistory = (thirdCall?.ctx.InboundHistory ?? []) as Array<{ body: string }>;
|
||||
expect(thirdHistory.map((entry) => entry.body)).toContain("older context");
|
||||
expect(thirdHistory.map((entry) => entry.body)).toContain("third text");
|
||||
|
||||
now += 10_000;
|
||||
await handleBlueBubblesWebhookRequest(
|
||||
createMockRequest("POST", "/bluebubbles-webhook", mkPayload("msg-4", "fourth text", now)),
|
||||
createMockResponse(),
|
||||
);
|
||||
await flushAsync();
|
||||
expect(mockFetchBlueBubblesHistory).toHaveBeenCalledTimes(2);
|
||||
} finally {
|
||||
nowSpy.mockRestore();
|
||||
}
|
||||
});
|
||||
|
||||
it("caps inbound history payload size to reduce prompt-bomb risk", async () => {
|
||||
const huge = "x".repeat(8_000);
|
||||
mockFetchBlueBubblesHistory.mockResolvedValueOnce({
|
||||
resolved: true,
|
||||
entries: Array.from({ length: 20 }, (_, idx) => ({
|
||||
sender: `Friend ${idx}`,
|
||||
body: `${huge} ${idx}`,
|
||||
messageId: `hist-${idx}`,
|
||||
timestamp: idx + 1,
|
||||
})),
|
||||
});
|
||||
|
||||
const account = createMockAccount({ dmHistoryLimit: 20 });
|
||||
const config: OpenClawConfig = {};
|
||||
const core = createMockRuntime();
|
||||
setBlueBubblesRuntime(core);
|
||||
|
||||
unregister = registerBlueBubblesWebhookTarget({
|
||||
account,
|
||||
config,
|
||||
runtime: { log: vi.fn(), error: vi.fn() },
|
||||
core,
|
||||
path: "/bluebubbles-webhook",
|
||||
});
|
||||
|
||||
await handleBlueBubblesWebhookRequest(
|
||||
createMockRequest("POST", "/bluebubbles-webhook", mkPayload("msg-1", "first text")),
|
||||
createMockRequest("POST", "/bluebubbles-webhook", {
|
||||
type: "new-message",
|
||||
data: {
|
||||
text: "latest text",
|
||||
handle: { address: "+15551234567" },
|
||||
isGroup: false,
|
||||
isFromMe: false,
|
||||
guid: "msg-bomb-1",
|
||||
chatGuid: "iMessage;-;+15550004004",
|
||||
date: Date.now(),
|
||||
},
|
||||
}),
|
||||
createMockResponse(),
|
||||
);
|
||||
await flushAsync();
|
||||
expect(mockFetchBlueBubblesHistory).toHaveBeenCalledTimes(1);
|
||||
|
||||
await handleBlueBubblesWebhookRequest(
|
||||
createMockRequest("POST", "/bluebubbles-webhook", mkPayload("msg-2", "second text")),
|
||||
createMockResponse(),
|
||||
);
|
||||
await flushAsync();
|
||||
expect(mockFetchBlueBubblesHistory).toHaveBeenCalledTimes(2);
|
||||
|
||||
const secondCall = mockDispatchReplyWithBufferedBlockDispatcher.mock.calls[1]?.[0];
|
||||
const secondHistory = (secondCall?.ctx.InboundHistory ?? []) as Array<{ body: string }>;
|
||||
expect(secondHistory.map((entry) => entry.body)).toContain("older context");
|
||||
expect(secondHistory.map((entry) => entry.body)).toContain("second text");
|
||||
|
||||
await handleBlueBubblesWebhookRequest(
|
||||
createMockRequest("POST", "/bluebubbles-webhook", mkPayload("msg-3", "third text")),
|
||||
createMockResponse(),
|
||||
);
|
||||
await flushAsync();
|
||||
expect(mockFetchBlueBubblesHistory).toHaveBeenCalledTimes(2);
|
||||
const callArgs = getFirstDispatchCall();
|
||||
const inboundHistory = (callArgs.ctx.InboundHistory ?? []) as Array<{ body: string }>;
|
||||
const totalChars = inboundHistory.reduce((sum, entry) => sum + entry.body.length, 0);
|
||||
expect(inboundHistory.length).toBeLessThan(20);
|
||||
expect(totalChars).toBeLessThanOrEqual(12_000);
|
||||
expect(inboundHistory.every((entry) => entry.body.length <= 1_203)).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user