mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-01 09:26:13 +00:00
Fix BlueBubbles DM history backfill bug (#20302)
* feat: implement DM history backfill for BlueBubbles - Add fetchBlueBubblesHistory function to fetch message history from API - Modify processMessage to fetch history for both groups and DMs - Use dmHistoryLimit for DMs and historyLimit for groups - Add InboundHistory field to finalizeInboundContext call Fixes #20296 * style: format with oxfmt * address review: in-memory history cache, resolveAccount try/catch, include is_from_me - Wrap resolveAccount in try/catch instead of unreachable guard (it throws) - Include is_from_me messages with 'me' sender label for full conversation context - Add in-memory rolling history map (chatHistories) matching other channel patterns - API backfill only on first message per chat, not every incoming message - Remove unused buildInboundHistoryFromEntries import * chore: remove unused buildInboundHistoryFromEntries helper Dead code flagged by Greptile — mapping is done inline in monitor-processing.ts. * BlueBubbles: harden DM history backfill state handling * BlueBubbles: add bounded exponential backoff and history payload guards * BlueBubbles: evict merged history keys * Update extensions/bluebubbles/src/monitor-processing.ts Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> --------- Co-authored-by: Ryan Mac Mini <ryanmacmini@ryans-mac-mini.tailf78f8b.ts.net> Co-authored-by: Vincent Koc <vincentkoc@ieee.org> Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
This commit is contained in:
@@ -1,17 +1,21 @@
|
||||
import type { OpenClawConfig } from "openclaw/plugin-sdk";
|
||||
import {
|
||||
createReplyPrefixOptions,
|
||||
evictOldHistoryKeys,
|
||||
logAckFailure,
|
||||
logInboundDrop,
|
||||
logTypingFailure,
|
||||
recordPendingHistoryEntryIfEnabled,
|
||||
resolveAckReaction,
|
||||
resolveDmGroupAccessDecision,
|
||||
resolveEffectiveAllowFromLists,
|
||||
resolveControlCommandGate,
|
||||
stripMarkdown,
|
||||
type HistoryEntry,
|
||||
} from "openclaw/plugin-sdk";
|
||||
import { downloadBlueBubblesAttachment } from "./attachments.js";
|
||||
import { markBlueBubblesChatRead, sendBlueBubblesTyping } from "./chat.js";
|
||||
import { fetchBlueBubblesHistory } from "./history.js";
|
||||
import { sendBlueBubblesMedia } from "./media-send.js";
|
||||
import {
|
||||
buildMessagePlaceholder,
|
||||
@@ -239,6 +243,178 @@ function resolveBlueBubblesAckReaction(params: {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* In-memory rolling history map keyed by account + chat identifier.
|
||||
* Populated from incoming messages during the session.
|
||||
* API backfill is attempted until one fetch resolves (or retries are exhausted).
|
||||
*/
|
||||
const chatHistories = new Map<string, HistoryEntry[]>();
|
||||
type HistoryBackfillState = {
|
||||
attempts: number;
|
||||
firstAttemptAt: number;
|
||||
nextAttemptAt: number;
|
||||
resolved: boolean;
|
||||
};
|
||||
|
||||
const historyBackfills = new Map<string, HistoryBackfillState>();
|
||||
const HISTORY_BACKFILL_BASE_DELAY_MS = 5_000;
|
||||
const HISTORY_BACKFILL_MAX_DELAY_MS = 2 * 60 * 1000;
|
||||
const HISTORY_BACKFILL_MAX_ATTEMPTS = 6;
|
||||
const HISTORY_BACKFILL_RETRY_WINDOW_MS = 30 * 60 * 1000;
|
||||
const MAX_STORED_HISTORY_ENTRY_CHARS = 2_000;
|
||||
const MAX_INBOUND_HISTORY_ENTRY_CHARS = 1_200;
|
||||
const MAX_INBOUND_HISTORY_TOTAL_CHARS = 12_000;
|
||||
|
||||
function buildAccountScopedHistoryKey(accountId: string, historyIdentifier: string): string {
|
||||
return `${accountId}\u0000${historyIdentifier}`;
|
||||
}
|
||||
|
||||
function historyDedupKey(entry: HistoryEntry): string {
|
||||
const messageId = entry.messageId?.trim();
|
||||
if (messageId) {
|
||||
return `id:${messageId}`;
|
||||
}
|
||||
return `fallback:${entry.sender}\u0000${entry.body}\u0000${entry.timestamp ?? ""}`;
|
||||
}
|
||||
|
||||
function truncateHistoryBody(body: string, maxChars: number): string {
|
||||
const trimmed = body.trim();
|
||||
if (!trimmed) {
|
||||
return "";
|
||||
}
|
||||
if (trimmed.length <= maxChars) {
|
||||
return trimmed;
|
||||
}
|
||||
return `${trimmed.slice(0, maxChars).trimEnd()}...`;
|
||||
}
|
||||
|
||||
function mergeHistoryEntries(params: {
|
||||
apiEntries: HistoryEntry[];
|
||||
currentEntries: HistoryEntry[];
|
||||
limit: number;
|
||||
}): HistoryEntry[] {
|
||||
if (params.limit <= 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const merged: HistoryEntry[] = [];
|
||||
const seen = new Set<string>();
|
||||
const appendUnique = (entry: HistoryEntry) => {
|
||||
const key = historyDedupKey(entry);
|
||||
if (seen.has(key)) {
|
||||
return;
|
||||
}
|
||||
seen.add(key);
|
||||
merged.push(entry);
|
||||
};
|
||||
|
||||
for (const entry of params.apiEntries) {
|
||||
appendUnique(entry);
|
||||
}
|
||||
for (const entry of params.currentEntries) {
|
||||
appendUnique(entry);
|
||||
}
|
||||
|
||||
if (merged.length <= params.limit) {
|
||||
return merged;
|
||||
}
|
||||
return merged.slice(merged.length - params.limit);
|
||||
}
|
||||
|
||||
function pruneHistoryBackfillState(): void {
|
||||
for (const key of historyBackfills.keys()) {
|
||||
if (!chatHistories.has(key)) {
|
||||
historyBackfills.delete(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function markHistoryBackfillResolved(historyKey: string): void {
|
||||
const state = historyBackfills.get(historyKey);
|
||||
if (state) {
|
||||
state.resolved = true;
|
||||
historyBackfills.set(historyKey, state);
|
||||
return;
|
||||
}
|
||||
historyBackfills.set(historyKey, {
|
||||
attempts: 0,
|
||||
firstAttemptAt: Date.now(),
|
||||
nextAttemptAt: Number.POSITIVE_INFINITY,
|
||||
resolved: true,
|
||||
});
|
||||
}
|
||||
|
||||
function planHistoryBackfillAttempt(historyKey: string, now: number): HistoryBackfillState | null {
|
||||
const existing = historyBackfills.get(historyKey);
|
||||
if (existing?.resolved) {
|
||||
return null;
|
||||
}
|
||||
if (existing && now - existing.firstAttemptAt > HISTORY_BACKFILL_RETRY_WINDOW_MS) {
|
||||
markHistoryBackfillResolved(historyKey);
|
||||
return null;
|
||||
}
|
||||
if (existing && existing.attempts >= HISTORY_BACKFILL_MAX_ATTEMPTS) {
|
||||
markHistoryBackfillResolved(historyKey);
|
||||
return null;
|
||||
}
|
||||
if (existing && now < existing.nextAttemptAt) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const attempts = (existing?.attempts ?? 0) + 1;
|
||||
const firstAttemptAt = existing?.firstAttemptAt ?? now;
|
||||
const backoffDelay = Math.min(
|
||||
HISTORY_BACKFILL_BASE_DELAY_MS * 2 ** (attempts - 1),
|
||||
HISTORY_BACKFILL_MAX_DELAY_MS,
|
||||
);
|
||||
const state: HistoryBackfillState = {
|
||||
attempts,
|
||||
firstAttemptAt,
|
||||
nextAttemptAt: now + backoffDelay,
|
||||
resolved: false,
|
||||
};
|
||||
historyBackfills.set(historyKey, state);
|
||||
return state;
|
||||
}
|
||||
|
||||
function buildInboundHistorySnapshot(params: {
|
||||
entries: HistoryEntry[];
|
||||
limit: number;
|
||||
}): Array<{ sender: string; body: string; timestamp?: number }> | undefined {
|
||||
if (params.limit <= 0 || params.entries.length === 0) {
|
||||
return undefined;
|
||||
}
|
||||
const recent = params.entries.slice(-params.limit);
|
||||
const selected: Array<{ sender: string; body: string; timestamp?: number }> = [];
|
||||
let remainingChars = MAX_INBOUND_HISTORY_TOTAL_CHARS;
|
||||
|
||||
for (let i = recent.length - 1; i >= 0; i--) {
|
||||
const entry = recent[i];
|
||||
const body = truncateHistoryBody(entry.body, MAX_INBOUND_HISTORY_ENTRY_CHARS);
|
||||
if (!body) {
|
||||
continue;
|
||||
}
|
||||
if (selected.length > 0 && body.length > remainingChars) {
|
||||
break;
|
||||
}
|
||||
selected.push({
|
||||
sender: entry.sender,
|
||||
body,
|
||||
timestamp: entry.timestamp,
|
||||
});
|
||||
remainingChars -= body.length;
|
||||
if (remainingChars <= 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (selected.length === 0) {
|
||||
return undefined;
|
||||
}
|
||||
selected.reverse();
|
||||
return selected;
|
||||
}
|
||||
|
||||
export async function processMessage(
|
||||
message: NormalizedWebhookMessage,
|
||||
target: WebhookTarget,
|
||||
@@ -808,9 +984,118 @@ export async function processMessage(
|
||||
.trim();
|
||||
};
|
||||
|
||||
// History: in-memory rolling map with bounded API backfill retries
|
||||
const historyLimit = isGroup
|
||||
? (account.config.historyLimit ?? 0)
|
||||
: (account.config.dmHistoryLimit ?? 0);
|
||||
|
||||
const historyIdentifier =
|
||||
chatGuid ||
|
||||
chatIdentifier ||
|
||||
(chatId ? String(chatId) : null) ||
|
||||
(isGroup ? null : message.senderId) ||
|
||||
"";
|
||||
const historyKey = historyIdentifier
|
||||
? buildAccountScopedHistoryKey(account.accountId, historyIdentifier)
|
||||
: "";
|
||||
|
||||
// Record the current message into rolling history
|
||||
if (historyKey && historyLimit > 0) {
|
||||
const nowMs = Date.now();
|
||||
const senderLabel = message.fromMe ? "me" : message.senderName || message.senderId;
|
||||
const normalizedHistoryBody = truncateHistoryBody(text, MAX_STORED_HISTORY_ENTRY_CHARS);
|
||||
const currentEntries = recordPendingHistoryEntryIfEnabled({
|
||||
historyMap: chatHistories,
|
||||
limit: historyLimit,
|
||||
historyKey,
|
||||
entry: normalizedHistoryBody
|
||||
? {
|
||||
sender: senderLabel,
|
||||
body: normalizedHistoryBody,
|
||||
timestamp: message.timestamp ?? nowMs,
|
||||
messageId: message.messageId ?? undefined,
|
||||
}
|
||||
: null,
|
||||
});
|
||||
pruneHistoryBackfillState();
|
||||
|
||||
const backfillAttempt = planHistoryBackfillAttempt(historyKey, nowMs);
|
||||
if (backfillAttempt) {
|
||||
try {
|
||||
const backfillResult = await fetchBlueBubblesHistory(historyIdentifier, historyLimit, {
|
||||
cfg: config,
|
||||
accountId: account.accountId,
|
||||
});
|
||||
if (backfillResult.resolved) {
|
||||
markHistoryBackfillResolved(historyKey);
|
||||
}
|
||||
if (backfillResult.entries.length > 0) {
|
||||
const apiEntries: HistoryEntry[] = [];
|
||||
for (const entry of backfillResult.entries) {
|
||||
const body = truncateHistoryBody(entry.body, MAX_STORED_HISTORY_ENTRY_CHARS);
|
||||
if (!body) {
|
||||
continue;
|
||||
}
|
||||
apiEntries.push({
|
||||
sender: entry.sender,
|
||||
body,
|
||||
timestamp: entry.timestamp,
|
||||
messageId: entry.messageId,
|
||||
});
|
||||
}
|
||||
const merged = mergeHistoryEntries({
|
||||
apiEntries,
|
||||
currentEntries:
|
||||
currentEntries.length > 0 ? currentEntries : (chatHistories.get(historyKey) ?? []),
|
||||
limit: historyLimit,
|
||||
});
|
||||
if (chatHistories.has(historyKey)) {
|
||||
chatHistories.delete(historyKey);
|
||||
}
|
||||
chatHistories.set(historyKey, merged);
|
||||
evictOldHistoryKeys(chatHistories);
|
||||
logVerbose(
|
||||
core,
|
||||
runtime,
|
||||
`backfilled ${backfillResult.entries.length} history messages for ${isGroup ? "group" : "DM"}: ${historyIdentifier}`,
|
||||
);
|
||||
} else if (!backfillResult.resolved) {
|
||||
const remainingAttempts = HISTORY_BACKFILL_MAX_ATTEMPTS - backfillAttempt.attempts;
|
||||
const nextBackoffMs = Math.max(backfillAttempt.nextAttemptAt - nowMs, 0);
|
||||
logVerbose(
|
||||
core,
|
||||
runtime,
|
||||
`history backfill unresolved for ${historyIdentifier}; retries left=${Math.max(remainingAttempts, 0)} next_in_ms=${nextBackoffMs}`,
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
const remainingAttempts = HISTORY_BACKFILL_MAX_ATTEMPTS - backfillAttempt.attempts;
|
||||
const nextBackoffMs = Math.max(backfillAttempt.nextAttemptAt - nowMs, 0);
|
||||
logVerbose(
|
||||
core,
|
||||
runtime,
|
||||
`history backfill failed for ${historyIdentifier}: ${String(err)} (retries left=${Math.max(remainingAttempts, 0)} next_in_ms=${nextBackoffMs})`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Build inbound history from the in-memory map
|
||||
let inboundHistory: Array<{ sender: string; body: string; timestamp?: number }> | undefined;
|
||||
if (historyKey && historyLimit > 0) {
|
||||
const entries = chatHistories.get(historyKey);
|
||||
if (entries && entries.length > 0) {
|
||||
inboundHistory = buildInboundHistorySnapshot({
|
||||
entries,
|
||||
limit: historyLimit,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
const ctxPayload = core.channel.reply.finalizeInboundContext({
|
||||
Body: body,
|
||||
BodyForAgent: rawBody,
|
||||
InboundHistory: inboundHistory,
|
||||
RawBody: rawBody,
|
||||
CommandBody: rawBody,
|
||||
BodyForCommands: rawBody,
|
||||
|
||||
Reference in New Issue
Block a user