fix(agents): add cross-turn dedup cache for embedded assistant text

shouldSkipAssistantText() only deduplicated within the same
assistantMessageIndex. After context compaction the index resets, so
when the model re-emits the same text from the compaction summary it
bypasses the dedup check and gets delivered as a new message.

Added recentDeliveredTexts — a bounded rolling cache (max 20 entries,
1h TTL) that persists across assistant message turns within a session.
Before delivering text, normalized characters are compared against
recently delivered hashes. Matches are skipped.

Includes unit tests covering hash building, cross-turn duplicate
detection, TTL expiration, whitespace/casing normalization, cache
eviction and capacity limits.

Closes #37702
Related: #38434, #33308, #33453, #33592, #37697, #30316
This commit is contained in:
eveiljuice
2026-03-08 04:27:20 +00:00
committed by Vincent Koc
parent ed0ec57a7b
commit 6bee86e618
5 changed files with 250 additions and 16 deletions

View File

@@ -57,7 +57,10 @@ export {
isMessagingToolDuplicate,
isMessagingToolDuplicateNormalized,
normalizeTextForComparison,
isRecentlyDelivered,
recordDeliveredText,
} from "./pi-embedded-helpers/messaging-dedupe.js";
export type { RecentDeliveredEntry } from "./pi-embedded-helpers/messaging-dedupe.js";
export { pickFallbackThinkingLevel } from "./pi-embedded-helpers/thinking.js";

View File

@@ -0,0 +1,110 @@
import { describe, expect, it } from "vitest";
import {
buildDeliveredTextHash,
isRecentlyDelivered,
normalizeTextForComparison,
recordDeliveredText,
type RecentDeliveredEntry,
} from "./messaging-dedupe.js";
describe("normalizeTextForComparison", () => {
it("lowercases and trims", () => {
expect(normalizeTextForComparison(" Hello World ")).toBe("hello world");
});
it("collapses whitespace", () => {
expect(normalizeTextForComparison("hello world")).toBe("hello world");
});
});
describe("cross-turn dedup", () => {
describe("buildDeliveredTextHash", () => {
it("returns normalized prefix up to 200 chars", () => {
const hash = buildDeliveredTextHash("Hello World!");
expect(hash).toBe("hello world!");
});
it("includes length and full-text hash for strings over 200 chars", () => {
const long = "a".repeat(300);
const hash = buildDeliveredTextHash(long);
expect(hash).toContain("|300|");
expect(hash.startsWith("a".repeat(200))).toBe(true);
});
it("produces different hashes for texts with same prefix but different tails", () => {
const base = "x".repeat(200);
const textA = base + " ending alpha with more content here";
const textB = base + " ending beta with different content";
expect(buildDeliveredTextHash(textA)).not.toBe(buildDeliveredTextHash(textB));
});
it("returns empty for very short text", () => {
expect(buildDeliveredTextHash("hi")).toBe("hi");
});
});
describe("isRecentlyDelivered", () => {
it("returns false for empty cache", () => {
expect(isRecentlyDelivered("Hello world test message", [])).toBe(false);
});
it("returns true when text was recently recorded", () => {
const cache: RecentDeliveredEntry[] = [];
const now = Date.now();
recordDeliveredText("Hello world test message", cache, now);
expect(isRecentlyDelivered("Hello world test message", cache, now + 1000)).toBe(true);
});
it("returns false after TTL expires", () => {
const cache: RecentDeliveredEntry[] = [];
const now = Date.now();
recordDeliveredText("Hello world test message", cache, now);
// 1 hour + 1ms later
expect(isRecentlyDelivered("Hello world test message", cache, now + 3_600_001)).toBe(false);
});
it("returns false for text shorter than MIN_DUPLICATE_TEXT_LENGTH", () => {
const cache: RecentDeliveredEntry[] = [];
recordDeliveredText("short", cache);
expect(isRecentlyDelivered("short", cache)).toBe(false);
});
it("detects duplicates with different whitespace/casing", () => {
const cache: RecentDeliveredEntry[] = [];
const now = Date.now();
recordDeliveredText(" Hello World Test Message ", cache, now);
expect(isRecentlyDelivered("hello world test message", cache, now)).toBe(true);
});
});
describe("recordDeliveredText", () => {
it("evicts expired entries on record", () => {
const cache: RecentDeliveredEntry[] = [];
const now = Date.now();
recordDeliveredText("First message that is long enough", cache, now);
// Record second much later (> TTL)
recordDeliveredText("Second message that is long enough", cache, now + 3_700_000);
// First should be evicted
expect(cache.length).toBe(1);
expect(cache[0].hash).toContain("second");
});
it("caps at RECENT_DELIVERED_MAX entries", () => {
const cache: RecentDeliveredEntry[] = [];
const now = Date.now();
for (let i = 0; i < 25; i++) {
recordDeliveredText(`Unique message number ${i} with enough length`, cache, now + i);
}
expect(cache.length).toBe(20);
});
it("updates timestamp for duplicate hash instead of adding", () => {
const cache: RecentDeliveredEntry[] = [];
const now = Date.now();
recordDeliveredText("Same message repeated in session", cache, now);
recordDeliveredText("Same message repeated in session", cache, now + 5000);
expect(cache.length).toBe(1);
expect(cache[0].timestamp).toBe(now + 5000);
});
});
});

View File

@@ -1,5 +1,97 @@
const MIN_DUPLICATE_TEXT_LENGTH = 10;
/**
* Maximum number of recent delivered text hashes to retain for cross-turn
* deduplication. Keeps memory bounded while covering the typical window
* where context compaction may cause the model to re-emit a previous reply.
*/
const RECENT_DELIVERED_MAX = 20;
/**
* TTL for entries in the cross-turn dedup cache (1 hour).
* After this period the entry is evicted and the same text can be delivered
* again (which is desirable for intentionally repeated content).
*/
const RECENT_DELIVERED_TTL_MS = 60 * 60_000;
export type RecentDeliveredEntry = {
hash: string;
timestamp: number;
};
/**
* Build a collision-resistant hash from the full normalised text of a
* delivered assistant message. Uses a fast non-cryptographic approach:
* the first 200 normalised chars (for quick prefix screening) combined
* with the total length and a simple 53-bit numeric hash of the full
* string. This avoids false positives when two responses share the same
* opening paragraph but diverge later.
*/
export function buildDeliveredTextHash(text: string): string {
const normalized = normalizeTextForComparison(text);
if (normalized.length <= 200) {
return normalized;
}
// 53-bit FNV-1a-inspired hash (fits in a JS safe integer).
let h = 0x811c9dc5;
for (let i = 0; i < normalized.length; i++) {
h ^= normalized.charCodeAt(i);
h = Math.imul(h, 0x01000193);
}
// Combine prefix + length + full-text hash for uniqueness.
return `${normalized.slice(0, 200)}|${normalized.length}|${(h >>> 0).toString(36)}`;
}
/**
* Check whether `text` was recently delivered (cross-turn).
*/
export function isRecentlyDelivered(
text: string,
recentDelivered: RecentDeliveredEntry[],
now?: number,
): boolean {
const hash = buildDeliveredTextHash(text);
if (!hash || hash.length < MIN_DUPLICATE_TEXT_LENGTH) {
return false;
}
const currentTime = now ?? Date.now();
return recentDelivered.some(
(entry) => currentTime - entry.timestamp < RECENT_DELIVERED_TTL_MS && entry.hash === hash,
);
}
/**
* Record a delivered text in the rolling cache.
*/
export function recordDeliveredText(
text: string,
recentDelivered: RecentDeliveredEntry[],
now?: number,
): void {
const hash = buildDeliveredTextHash(text);
if (!hash || hash.length < MIN_DUPLICATE_TEXT_LENGTH) {
return;
}
const currentTime = now ?? Date.now();
// Evict expired entries.
for (let i = recentDelivered.length - 1; i >= 0; i--) {
if (currentTime - recentDelivered[i].timestamp >= RECENT_DELIVERED_TTL_MS) {
recentDelivered.splice(i, 1);
}
}
// Avoid duplicate entries for the same hash.
const existing = recentDelivered.findIndex((e) => e.hash === hash);
if (existing >= 0) {
recentDelivered[existing].timestamp = currentTime;
return;
}
recentDelivered.push({ hash, timestamp: currentTime });
// Trim oldest if over capacity.
while (recentDelivered.length > RECENT_DELIVERED_MAX) {
recentDelivered.shift();
}
}
/**
* Normalize text for duplicate comparison.
* - Trims whitespace

View File

@@ -4,6 +4,7 @@ import type { ReasoningLevel } from "../auto-reply/thinking.js";
import type { InlineCodeState } from "../markdown/code-spans.js";
import type { HookRunner } from "../plugins/hooks.js";
import type { EmbeddedBlockChunker } from "./pi-embedded-block-chunker.js";
import type { RecentDeliveredEntry } from "./pi-embedded-helpers/messaging-dedupe.js";
import type { MessagingToolSend } from "./pi-embedded-messaging.js";
import type {
BlockReplyChunking,
@@ -68,6 +69,7 @@ export type EmbeddedPiSubscribeState = {
compactionRetryPromise: Promise<void> | null;
unsubscribed: boolean;
recentDeliveredTexts: RecentDeliveredEntry[];
messagingToolSentTexts: string[];
messagingToolSentTextsNormalized: string[];
messagingToolSentTargets: MessagingToolSend[];

View File

@@ -9,8 +9,11 @@ import { buildCodeSpanIndex, createInlineCodeState } from "../markdown/code-span
import { EmbeddedBlockChunker } from "./pi-embedded-block-chunker.js";
import {
isMessagingToolDuplicateNormalized,
isRecentlyDelivered,
normalizeTextForComparison,
recordDeliveredText,
} from "./pi-embedded-helpers.js";
import type { RecentDeliveredEntry } from "./pi-embedded-helpers.js";
import { createEmbeddedPiSessionEventHandler } from "./pi-embedded-subscribe.handlers.js";
import type {
EmbeddedPiSubscribeContext,
@@ -70,6 +73,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
compactionRetryReject: undefined,
compactionRetryPromise: null,
unsubscribed: false,
recentDeliveredTexts: [] as RecentDeliveredEntry[],
messagingToolSentTexts: [],
messagingToolSentTextsNormalized: [],
messagingToolSentTargets: [],
@@ -103,12 +107,20 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
const partialReplyDirectiveAccumulator = createStreamingDirectiveAccumulator();
const emitBlockReplySafely = (
payload: Parameters<NonNullable<SubscribeEmbeddedPiSessionParams["onBlockReply"]>>[0],
opts?: { sourceText?: string },
) => {
if (!params.onBlockReply) {
return;
}
void Promise.resolve()
.then(() => params.onBlockReply?.(payload))
.then(() => {
// Record in cross-turn dedup cache only after successful delivery.
// Recording before send would suppress retries on transient failures.
if (opts?.sourceText) {
recordDeliveredText(opts.sourceText, state.recentDeliveredTexts);
}
})
.catch((err) => {
log.warn(`block reply callback failed: ${String(err)}`);
});
@@ -149,15 +161,21 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
};
const shouldSkipAssistantText = (text: string) => {
if (state.lastAssistantTextMessageIndex !== state.assistantMessageIndex) {
return false;
// Same-turn dedup (existing behaviour).
if (state.lastAssistantTextMessageIndex === state.assistantMessageIndex) {
const trimmed = text.trimEnd();
if (trimmed && trimmed === state.lastAssistantTextTrimmed) {
return true;
}
const normalized = normalizeTextForComparison(text);
if (normalized.length > 0 && normalized === state.lastAssistantTextNormalized) {
return true;
}
}
const trimmed = text.trimEnd();
if (trimmed && trimmed === state.lastAssistantTextTrimmed) {
return true;
}
const normalized = normalizeTextForComparison(text);
if (normalized.length > 0 && normalized === state.lastAssistantTextNormalized) {
// Cross-turn dedup: catch duplicates caused by context compaction replaying
// the same assistant text in a new turn. Uses a rolling hash cache with a
// 1-hour TTL so intentionally repeated content still goes through eventually.
if (isRecentlyDelivered(text, state.recentDeliveredTexts)) {
return true;
}
return false;
@@ -172,6 +190,8 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
}
assistantTexts.push(text);
rememberAssistantText(text);
// Record in cross-turn cache so post-compaction replays are caught.
recordDeliveredText(text, state.recentDeliveredTexts);
};
const finalizeAssistantTexts = (args: {
@@ -191,6 +211,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
text,
);
rememberAssistantText(text);
recordDeliveredText(text, state.recentDeliveredTexts);
} else {
pushAssistantText(text);
}
@@ -505,6 +526,9 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
assistantTexts.push(chunk);
rememberAssistantText(chunk);
if (!params.onBlockReply) {
// No block reply callback — text is accumulated for final delivery.
// Record now since there's no async send that could fail.
recordDeliveredText(chunk, state.recentDeliveredTexts);
return;
}
const splitResult = replyDirectiveAccumulator.consume(chunk);
@@ -523,14 +547,17 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
if (!cleanedText && (!mediaUrls || mediaUrls.length === 0) && !audioAsVoice) {
return;
}
emitBlockReplySafely({
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
audioAsVoice,
replyToId,
replyToTag,
replyToCurrent,
});
emitBlockReplySafely(
{
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
audioAsVoice,
replyToId,
replyToTag,
replyToCurrent,
},
{ sourceText: chunk },
);
};
const consumeReplyDirectives = (text: string, options?: { final?: boolean }) =>