fix(imessage): prevent echo loop from leaking internal metadata and amplifying NO_REPLY into queue overflow (#33295)

* fix(imessage): prevent echo loop from leaking internal metadata and amplifying NO_REPLY into queue overflow

- Add outbound sanitization at channel boundary (sanitize-outbound.ts):
  strips thinking/reasoning tags, relevant-memories tags, model-specific
  separators (+#+#), and assistant role markers before iMessage delivery

- Add inbound reflection guard (reflection-guard.ts): detects and drops
  messages containing assistant-internal markers that indicate a reflected
  outbound message, preventing recursive echo amplification

- Harden echo cache: increase text TTL from 5s to 30s to catch delayed
  reflections that previously expired before the echo could be detected

- Add loop rate limiter (loop-rate-limiter.ts): per-conversation rapid-fire
  detection that suppresses conversations exceeding threshold within a
  time window, acting as a safety net against amplification

Closes #33281

* fix(imessage): address review — stricter reflection regex, loop-aware rate limiter

- Reflection guard: require closing > bracket on thinking/final/memory
  tag patterns to prevent false-positives on user phrases like
  '<final answer>' or '<thought experiment>' (#33295 review)

- Rate limiter: only record echo/reflection/from-me drops instead of
  all dispatches, so the limiter acts as a loop-specific escalation
  mechanism rather than a general throttle on normal conversation
  velocity (#33295 review)

* Changelog: add iMessage echo-loop hardening entry

* iMessage: restore short echo-text TTL

* iMessage: ignore reflection markers in code

---------

Co-authored-by: Vincent Koc <vincentkoc@ieee.org>
This commit is contained in:
OfflynAI
2026-03-06 16:19:57 -08:00
committed by GitHub
parent 5320ee7731
commit adb9234d03
12 changed files with 432 additions and 5 deletions

View File

@@ -103,6 +103,7 @@ Docs: https://docs.openclaw.ai
- Gateway/HTTP tools invoke media compatibility: preserve raw media payload access for direct `/tools/invoke` clients by allowing media `nodes` invoke commands only in HTTP tool context, while keeping agent-context media invoke blocking to prevent base64 prompt bloat. (#34365) Thanks @obviyus.
- Agents/Nodes media outputs: add dedicated `photos_latest` action handling, block media-returning `nodes invoke` commands, keep metadata-only `camera.list` invoke allowed, and normalize empty `photos_latest` results to a consistent response shape to prevent base64 context bloat. (#34332) Thanks @obviyus.
- TUI/session-key canonicalization: normalize `openclaw tui --session` values to lowercase so uppercase session names no longer drop real-time streaming updates due to gateway/TUI key mismatches. (#33866, #34013) thanks @lynnzc.
- iMessage/echo loop hardening: strip leaked assistant-internal scaffolding from outbound iMessage replies, drop reflected assistant-content messages before they re-enter inbound processing, extend echo-cache text retention for delayed reflections, and suppress repeated loop traffic before it amplifies into queue overflow. (#33295) Thanks @joelnishanth.
- Outbound/send config threading: pass resolved SecretRef config through outbound adapters and helper send paths so send flows do not reload unresolved runtime config. (#33987) Thanks @joshavant.
- Sessions/subagent attachments: remove `attachments[].content.maxLength` from `sessions_spawn` schema to avoid llama.cpp GBNF repetition overflow, and preflight UTF-8 byte size before buffer allocation while keeping runtime file-size enforcement unchanged. (#33648) Thanks @anisoptera.
- Runtime/tool-state stability: recover from dangling Anthropic `tool_use` after compaction, serialize long-running Discord handler runs without blocking new inbound events, and prevent stale busy snapshots from suppressing stuck-channel recovery. (from #33630, #33583) Thanks @kevinWangSheng and @theotarr.

View File

@@ -7,6 +7,7 @@ import type { RuntimeEnv } from "../../runtime.js";
import type { createIMessageRpcClient } from "../client.js";
import { sendMessageIMessage } from "../send.js";
import type { SentMessageCache } from "./echo-cache.js";
import { sanitizeOutboundText } from "./sanitize-outbound.js";
export async function deliverReplies(params: {
replies: ReplyPayload[];
@@ -30,7 +31,7 @@ export async function deliverReplies(params: {
const chunkMode = resolveChunkMode(cfg, "imessage", accountId);
for (const payload of replies) {
const mediaList = payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []);
const rawText = payload.text ?? "";
const rawText = sanitizeOutboundText(payload.text ?? "");
const text = convertMarkdownTables(rawText, tableMode);
if (!text && mediaList.length === 0) {
continue;

View File

@@ -8,7 +8,9 @@ export type SentMessageCache = {
has: (scope: string, lookup: SentMessageLookup) => boolean;
};
const SENT_MESSAGE_TEXT_TTL_MS = 5000;
// Keep the text fallback short so repeated user replies like "ok" are not
// suppressed for long; delayed reflections should match the stronger message-id key.
const SENT_MESSAGE_TEXT_TTL_MS = 5_000;
const SENT_MESSAGE_ID_TTL_MS = 60_000;
function normalizeEchoTextKey(text: string | undefined): string | null {

View File

@@ -30,6 +30,7 @@ import {
isAllowedIMessageSender,
normalizeIMessageHandle,
} from "../targets.js";
import { detectReflectedContent } from "./reflection-guard.js";
import type { MonitorIMessageOpts, IMessagePayload } from "./types.js";
type IMessageReplyContext = {
@@ -214,7 +215,7 @@ export function resolveIMessageInboundDecision(params: {
return { kind: "drop", reason: "empty body" };
}
// Echo detection: check if the received message matches a recently sent message (within 5 seconds).
// Echo detection: check if the received message matches a recently sent message.
// Scope by conversation so same text in different chats is not conflated.
const inboundMessageId = params.message.id != null ? String(params.message.id) : undefined;
if (params.echoCache && (messageText || inboundMessageId)) {
@@ -237,6 +238,17 @@ export function resolveIMessageInboundDecision(params: {
}
}
// Reflection guard: drop inbound messages that contain assistant-internal
// metadata markers. These indicate outbound content was reflected back as
// inbound, which causes recursive echo amplification.
const reflection = detectReflectedContent(messageText);
if (reflection.isReflection) {
params.logVerbose?.(
`imessage: dropping reflected assistant content (markers: ${reflection.matchedLabels.join(", ")})`,
);
return { kind: "drop", reason: "reflected assistant content" };
}
const replyContext = describeReplyContext(params.message);
const createdAt = params.message.created_at ? Date.parse(params.message.created_at) : undefined;
const historyKey = isGroup

View File

@@ -0,0 +1,50 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { createLoopRateLimiter } from "./loop-rate-limiter.js";
describe("createLoopRateLimiter", () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});
it("allows messages below the threshold", () => {
const limiter = createLoopRateLimiter({ windowMs: 10_000, maxHits: 3 });
limiter.record("conv:1");
limiter.record("conv:1");
expect(limiter.isRateLimited("conv:1")).toBe(false);
});
it("rate limits at the threshold", () => {
const limiter = createLoopRateLimiter({ windowMs: 10_000, maxHits: 3 });
limiter.record("conv:1");
limiter.record("conv:1");
limiter.record("conv:1");
expect(limiter.isRateLimited("conv:1")).toBe(true);
});
it("does not cross-contaminate conversations", () => {
const limiter = createLoopRateLimiter({ windowMs: 10_000, maxHits: 2 });
limiter.record("conv:1");
limiter.record("conv:1");
expect(limiter.isRateLimited("conv:1")).toBe(true);
expect(limiter.isRateLimited("conv:2")).toBe(false);
});
it("resets after the time window expires", () => {
const limiter = createLoopRateLimiter({ windowMs: 5_000, maxHits: 2 });
limiter.record("conv:1");
limiter.record("conv:1");
expect(limiter.isRateLimited("conv:1")).toBe(true);
vi.advanceTimersByTime(6_000);
expect(limiter.isRateLimited("conv:1")).toBe(false);
});
it("returns false for unknown conversations", () => {
const limiter = createLoopRateLimiter();
expect(limiter.isRateLimited("unknown")).toBe(false);
});
});

View File

@@ -0,0 +1,69 @@
/**
* Per-conversation rate limiter that detects rapid-fire identical echo
* patterns and suppresses them before they amplify into queue overflow.
*/
const DEFAULT_WINDOW_MS = 60_000;
const DEFAULT_MAX_HITS = 5;
const CLEANUP_INTERVAL_MS = 120_000;
type ConversationWindow = {
timestamps: number[];
};
export type LoopRateLimiter = {
/** Returns true if this conversation has exceeded the rate limit. */
isRateLimited: (conversationKey: string) => boolean;
/** Record an inbound message for a conversation. */
record: (conversationKey: string) => void;
};
export function createLoopRateLimiter(opts?: {
windowMs?: number;
maxHits?: number;
}): LoopRateLimiter {
const windowMs = opts?.windowMs ?? DEFAULT_WINDOW_MS;
const maxHits = opts?.maxHits ?? DEFAULT_MAX_HITS;
const conversations = new Map<string, ConversationWindow>();
let lastCleanup = Date.now();
function cleanup() {
const now = Date.now();
if (now - lastCleanup < CLEANUP_INTERVAL_MS) {
return;
}
lastCleanup = now;
for (const [key, win] of conversations.entries()) {
const recent = win.timestamps.filter((ts) => now - ts <= windowMs);
if (recent.length === 0) {
conversations.delete(key);
} else {
win.timestamps = recent;
}
}
}
return {
record(conversationKey: string) {
cleanup();
let win = conversations.get(conversationKey);
if (!win) {
win = { timestamps: [] };
conversations.set(conversationKey, win);
}
win.timestamps.push(Date.now());
},
isRateLimited(conversationKey: string): boolean {
cleanup();
const win = conversations.get(conversationKey);
if (!win) {
return false;
}
const now = Date.now();
const recent = win.timestamps.filter((ts) => now - ts <= windowMs);
win.timestamps = recent;
return recent.length >= maxHits;
},
};
}

View File

@@ -35,7 +35,8 @@ describe("iMessage sent-message echo cache", () => {
const cache = createSentMessageCache();
cache.remember("acct:imessage:+1555", { text: "hello", messageId: "m-1" });
vi.advanceTimersByTime(6000);
// Text fallback stays short to avoid suppressing legitimate repeated user text.
vi.advanceTimersByTime(6_000);
expect(cache.has("acct:imessage:+1555", { text: "hello" })).toBe(false);
expect(cache.has("acct:imessage:+1555", { messageId: "m-1" })).toBe(true);

View File

@@ -50,6 +50,7 @@ import {
buildIMessageInboundContext,
resolveIMessageInboundDecision,
} from "./inbound-processing.js";
import { createLoopRateLimiter } from "./loop-rate-limiter.js";
import { parseIMessageNotification } from "./parse-notification.js";
import { normalizeAllowList, resolveRuntime } from "./runtime.js";
import type { IMessagePayload, MonitorIMessageOpts } from "./types.js";
@@ -98,6 +99,7 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
);
const groupHistories = new Map<string, HistoryEntry[]>();
const sentMessageCache = createSentMessageCache();
const loopRateLimiter = createLoopRateLimiter();
const textLimit = resolveTextChunkLimit(cfg, "imessage", accountInfo.accountId);
const allowFrom = normalizeAllowList(opts.allowFrom ?? imessageCfg.allowFrom);
const groupAllowFrom = normalizeAllowList(
@@ -253,11 +255,34 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
logVerbose,
});
// Build conversation key for rate limiting (used by both drop and dispatch paths).
const chatId = message.chat_id ?? undefined;
const senderForKey = (message.sender ?? "").trim();
const conversationKey = chatId != null ? `group:${chatId}` : `dm:${senderForKey}`;
const rateLimitKey = `${accountInfo.accountId}:${conversationKey}`;
if (decision.kind === "drop") {
// Record echo/reflection drops so the rate limiter can detect sustained loops.
// Only loop-related drop reasons feed the counter; policy/mention/empty drops
// are normal and should not escalate.
const isLoopDrop =
decision.reason === "echo" ||
decision.reason === "reflected assistant content" ||
decision.reason === "from me";
if (isLoopDrop) {
loopRateLimiter.record(rateLimitKey);
}
return;
}
// After repeated echo/reflection drops for a conversation, suppress all
// remaining messages as a safety net against amplification that slips
// through the primary guards.
if (decision.kind === "dispatch" && loopRateLimiter.isRateLimited(rateLimitKey)) {
logVerbose(`imessage: rate-limited conversation ${conversationKey} (echo loop detected)`);
return;
}
const chatId = message.chat_id ?? undefined;
if (decision.kind === "pairing") {
const sender = (message.sender ?? "").trim();
if (!sender) {

View File

@@ -0,0 +1,107 @@
import { describe, expect, it } from "vitest";
import { detectReflectedContent } from "./reflection-guard.js";
describe("detectReflectedContent", () => {
it("returns false for empty text", () => {
expect(detectReflectedContent("").isReflection).toBe(false);
});
it("returns false for normal user text", () => {
const result = detectReflectedContent("Hey, what's the weather today?");
expect(result.isReflection).toBe(false);
expect(result.matchedLabels).toEqual([]);
});
it("detects +#+#+#+# separator pattern", () => {
const result = detectReflectedContent("NO_REPLY +#+#+#+#+#+assistant to=final");
expect(result.isReflection).toBe(true);
expect(result.matchedLabels).toContain("internal-separator");
});
it("detects assistant to=final marker", () => {
const result = detectReflectedContent("some text assistant to=final rest");
expect(result.isReflection).toBe(true);
expect(result.matchedLabels).toContain("assistant-role-marker");
});
it("detects <thinking> tags", () => {
const result = detectReflectedContent("<thinking>internal reasoning</thinking>");
expect(result.isReflection).toBe(true);
expect(result.matchedLabels).toContain("thinking-tag");
});
it("detects <thought> tags", () => {
const result = detectReflectedContent("<thought>secret</thought>");
expect(result.isReflection).toBe(true);
expect(result.matchedLabels).toContain("thinking-tag");
});
it("detects <relevant_memories> tags", () => {
const result = detectReflectedContent("<relevant_memories>data</relevant_memories>");
expect(result.isReflection).toBe(true);
expect(result.matchedLabels).toContain("relevant-memories-tag");
});
it("detects <final> tags", () => {
const result = detectReflectedContent("<final>visible</final>");
expect(result.isReflection).toBe(true);
expect(result.matchedLabels).toContain("final-tag");
});
it("returns multiple matched labels for combined markers", () => {
const text = "NO_REPLY +#+#+#+# <thinking>step</thinking> assistant to=final";
const result = detectReflectedContent(text);
expect(result.isReflection).toBe(true);
expect(result.matchedLabels.length).toBeGreaterThanOrEqual(3);
});
it("ignores reflection markers inside inline code", () => {
const result = detectReflectedContent(
"Please keep `<thinking>debug trace</thinking>` in the example output",
);
expect(result.isReflection).toBe(false);
expect(result.matchedLabels).toEqual([]);
});
it("ignores reflection markers inside fenced code blocks", () => {
const result = detectReflectedContent(
[
"User pasted a repro snippet:",
"```xml",
"<relevant_memories>cached</relevant_memories>",
"assistant to=final",
"```",
].join("\n"),
);
expect(result.isReflection).toBe(false);
expect(result.matchedLabels).toEqual([]);
});
it("still flags markers that appear outside code blocks", () => {
const result = detectReflectedContent(
["```xml", "<thinking>inside code</thinking>", "```", "", "assistant to=final"].join("\n"),
);
expect(result.isReflection).toBe(true);
expect(result.matchedLabels).toContain("assistant-role-marker");
});
it("does not flag normal code discussion about thinking", () => {
const result = detectReflectedContent("I was thinking about your question");
expect(result.isReflection).toBe(false);
});
it("flags '<final answer>' as reflection when it forms a complete tag", () => {
const result = detectReflectedContent("Here is my <final answer>");
expect(result.isReflection).toBe(true);
});
it("does not flag partial tag without closing bracket", () => {
const result = detectReflectedContent("I sent a <final draft, see below");
expect(result.isReflection).toBe(false);
});
it("does not flag '<thought experiment>' phrase without closing bracket", () => {
const result = detectReflectedContent("This is a <thought experiment I ran");
expect(result.isReflection).toBe(false);
});
});

View File

@@ -0,0 +1,64 @@
/**
* Detects inbound messages that are reflections of assistant-originated content.
* These patterns indicate internal metadata leaked into a channel and then
* bounced back as a new inbound message — creating an echo loop.
*/
import { findCodeRegions, isInsideCode } from "../../shared/text/code-regions.js";
const INTERNAL_SEPARATOR_RE = /(?:#\+){2,}#?/;
const ASSISTANT_ROLE_MARKER_RE = /\bassistant\s+to\s*=\s*\w+/i;
// Require closing `>` to avoid false-positives on phrases like "<thought experiment>".
const THINKING_TAG_RE = /<\s*\/?\s*(?:think(?:ing)?|thought|antthinking)\b[^<>]*>/i;
const RELEVANT_MEMORIES_TAG_RE = /<\s*\/?\s*relevant[-_]memories\b[^<>]*>/i;
// Require closing `>` to avoid false-positives on phrases like "<final answer>".
const FINAL_TAG_RE = /<\s*\/?\s*final\b[^<>]*>/i;
const REFLECTION_PATTERNS: Array<{ re: RegExp; label: string }> = [
{ re: INTERNAL_SEPARATOR_RE, label: "internal-separator" },
{ re: ASSISTANT_ROLE_MARKER_RE, label: "assistant-role-marker" },
{ re: THINKING_TAG_RE, label: "thinking-tag" },
{ re: RELEVANT_MEMORIES_TAG_RE, label: "relevant-memories-tag" },
{ re: FINAL_TAG_RE, label: "final-tag" },
];
export type ReflectionDetection = {
isReflection: boolean;
matchedLabels: string[];
};
function hasMatchOutsideCode(text: string, re: RegExp): boolean {
const codeRegions = findCodeRegions(text);
const globalRe = new RegExp(re.source, re.flags.includes("g") ? re.flags : `${re.flags}g`);
for (const match of text.matchAll(globalRe)) {
const start = match.index ?? -1;
if (start >= 0 && !isInsideCode(start, codeRegions)) {
return true;
}
}
return false;
}
/**
* Check whether an inbound message appears to be a reflection of
* assistant-originated content. Returns matched pattern labels for telemetry.
*/
export function detectReflectedContent(text: string): ReflectionDetection {
if (!text) {
return { isReflection: false, matchedLabels: [] };
}
const matchedLabels: string[] = [];
for (const { re, label } of REFLECTION_PATTERNS) {
if (hasMatchOutsideCode(text, re)) {
matchedLabels.push(label);
}
}
return {
isReflection: matchedLabels.length > 0,
matchedLabels,
};
}

View File

@@ -0,0 +1,64 @@
import { describe, expect, it } from "vitest";
import { sanitizeOutboundText } from "./sanitize-outbound.js";
describe("sanitizeOutboundText", () => {
it("returns empty string unchanged", () => {
expect(sanitizeOutboundText("")).toBe("");
});
it("preserves normal user-facing text", () => {
const text = "Hello! How can I help you today?";
expect(sanitizeOutboundText(text)).toBe(text);
});
it("strips <thinking> tags and content", () => {
const text = "<thinking>internal reasoning</thinking>The answer is 42.";
expect(sanitizeOutboundText(text)).toBe("The answer is 42.");
});
it("strips <thought> tags and content", () => {
const text = "<thought>secret</thought>Visible reply";
expect(sanitizeOutboundText(text)).toBe("Visible reply");
});
it("strips <final> tags", () => {
const text = "<final>Hello world</final>";
expect(sanitizeOutboundText(text)).toBe("Hello world");
});
it("strips <relevant_memories> tags and content", () => {
const text = "<relevant_memories>memory data</relevant_memories>Visible";
expect(sanitizeOutboundText(text)).toBe("Visible");
});
it("strips +#+#+#+# separator patterns", () => {
const text = "NO_REPLY +#+#+#+#+#+ more internal stuff";
expect(sanitizeOutboundText(text)).not.toContain("+#+#");
});
it("strips assistant to=final markers", () => {
const text = "Some text assistant to=final more text";
const result = sanitizeOutboundText(text);
expect(result).not.toMatch(/assistant\s+to\s*=\s*final/i);
});
it("strips trailing role turn markers", () => {
const text = "Hello\nassistant:\nuser:";
const result = sanitizeOutboundText(text);
expect(result).not.toMatch(/^assistant:$/m);
});
it("collapses excessive blank lines after stripping", () => {
const text = "Hello\n\n\n\n\nWorld";
expect(sanitizeOutboundText(text)).toBe("Hello\n\nWorld");
});
it("handles combined internal markers in one message", () => {
const text = "<thinking>step 1</thinking>NO_REPLY +#+#+#+# assistant to=final\n\nActual reply";
const result = sanitizeOutboundText(text);
expect(result).not.toContain("<thinking>");
expect(result).not.toContain("+#+#");
expect(result).not.toMatch(/assistant to=final/i);
expect(result).toContain("Actual reply");
});
});

View File

@@ -0,0 +1,31 @@
import { stripAssistantInternalScaffolding } from "../../shared/text/assistant-visible-text.js";
/**
* Patterns that indicate assistant-internal metadata leaked into text.
* These must never reach a user-facing channel.
*/
const INTERNAL_SEPARATOR_RE = /(?:#\+){2,}#?/g;
const ASSISTANT_ROLE_MARKER_RE = /\bassistant\s+to\s*=\s*\w+/gi;
const ROLE_TURN_MARKER_RE = /\b(?:user|system|assistant)\s*:\s*$/gm;
/**
* Strip all assistant-internal scaffolding from outbound text before delivery.
* Applies reasoning/thinking tag removal, memory tag removal, and
* model-specific internal separator stripping.
*/
export function sanitizeOutboundText(text: string): string {
if (!text) {
return text;
}
let cleaned = stripAssistantInternalScaffolding(text);
cleaned = cleaned.replace(INTERNAL_SEPARATOR_RE, "");
cleaned = cleaned.replace(ASSISTANT_ROLE_MARKER_RE, "");
cleaned = cleaned.replace(ROLE_TURN_MARKER_RE, "");
// Collapse excessive blank lines left after stripping.
cleaned = cleaned.replace(/\n{3,}/g, "\n\n").trim();
return cleaned;
}