ACP: improve live text batching readability

This commit is contained in:
Onur
2026-03-01 16:14:49 +01:00
committed by Onur Solmaz
parent dd2fcade3e
commit be73eb28b3
4 changed files with 187 additions and 26 deletions

View File

@@ -51,15 +51,15 @@ describe("createAcpReplyProjector", () => {
}); });
await projector.onEvent({ type: "text_delta", text: "A", tag: "agent_message_chunk" }); await projector.onEvent({ type: "text_delta", text: "A", tag: "agent_message_chunk" });
await vi.advanceTimersByTimeAsync(60); await vi.advanceTimersByTimeAsync(760);
await projector.flush(false); await projector.flush(false);
await projector.onEvent({ type: "text_delta", text: "B", tag: "agent_message_chunk" }); await projector.onEvent({ type: "text_delta", text: "B", tag: "agent_message_chunk" });
await vi.advanceTimersByTimeAsync(60); await vi.advanceTimersByTimeAsync(760);
await projector.flush(false); await projector.flush(false);
await projector.onEvent({ type: "text_delta", text: "C", tag: "agent_message_chunk" }); await projector.onEvent({ type: "text_delta", text: "C", tag: "agent_message_chunk" });
await vi.advanceTimersByTimeAsync(60); await vi.advanceTimersByTimeAsync(760);
await projector.flush(false); await projector.flush(false);
expect(deliveries.filter((entry) => entry.kind === "block")).toEqual([ expect(deliveries.filter((entry) => entry.kind === "block")).toEqual([
@@ -103,6 +103,55 @@ describe("createAcpReplyProjector", () => {
]); ]);
}); });
it("does not flush short live fragments mid-phrase on idle", async () => {
vi.useFakeTimers();
try {
const deliveries: Array<{ kind: string; text?: string }> = [];
const projector = createAcpReplyProjector({
cfg: createCfg({
acp: {
enabled: true,
stream: {
deliveryMode: "live",
coalesceIdleMs: 100,
maxChunkChars: 256,
},
},
}),
shouldSendToolSummaries: true,
deliver: async (kind, payload) => {
deliveries.push({ kind, text: payload.text });
return true;
},
});
await projector.onEvent({
type: "text_delta",
text: "Yes. Send me the term(s), and Ill run ",
tag: "agent_message_chunk",
});
await vi.advanceTimersByTimeAsync(1200);
expect(deliveries).toEqual([]);
await projector.onEvent({
type: "text_delta",
text: "`wd-cli` searches right away. ",
tag: "agent_message_chunk",
});
await projector.flush(false);
expect(deliveries).toEqual([
{
kind: "block",
text: "Yes. Send me the term(s), and Ill run `wd-cli` searches right away. ",
},
]);
} finally {
vi.useRealTimers();
}
});
it("supports deliveryMode=final_only by buffering all projected output until done", async () => { it("supports deliveryMode=final_only by buffering all projected output until done", async () => {
const deliveries: Array<{ kind: string; text?: string }> = []; const deliveries: Array<{ kind: string; text?: string }> = [];
const projector = createAcpReplyProjector({ const projector = createAcpReplyProjector({
@@ -649,7 +698,7 @@ describe("createAcpReplyProjector", () => {
expect(deliveries[0]?.text).toContain("Tool Call"); expect(deliveries[0]?.text).toContain("Tool Call");
}); });
it("inserts a paragraph boundary before visible text after hidden tool updates by default", async () => { it("inserts a space boundary before visible text after hidden tool updates by default", async () => {
const deliveries: Array<{ kind: string; text?: string }> = []; const deliveries: Array<{ kind: string; text?: string }> = [];
const projector = createAcpReplyProjector({ const projector = createAcpReplyProjector({
cfg: createCfg({ cfg: createCfg({
@@ -685,7 +734,7 @@ describe("createAcpReplyProjector", () => {
.filter((entry) => entry.kind === "block") .filter((entry) => entry.kind === "block")
.map((entry) => entry.text ?? "") .map((entry) => entry.text ?? "")
.join(""); .join("");
expect(combinedText).toBe("fallback.\n\nI don't"); expect(combinedText).toBe("fallback. I don't");
}); });
it("supports hiddenBoundarySeparator=space", async () => { it("supports hiddenBoundarySeparator=space", async () => {

View File

@@ -14,6 +14,10 @@ import { createBlockReplyPipeline } from "./block-reply-pipeline.js";
import type { ReplyDispatchKind } from "./reply-dispatcher.js"; import type { ReplyDispatchKind } from "./reply-dispatcher.js";
const ACP_BLOCK_REPLY_TIMEOUT_MS = 15_000; const ACP_BLOCK_REPLY_TIMEOUT_MS = 15_000;
const ACP_LIVE_IDLE_FLUSH_FLOOR_MS = 750;
const ACP_LIVE_IDLE_MIN_CHARS = 80;
const ACP_LIVE_SOFT_FLUSH_CHARS = 220;
const ACP_LIVE_HARD_FLUSH_CHARS = 480;
const TERMINAL_TOOL_STATUSES = new Set(["completed", "failed", "cancelled", "done", "error"]); const TERMINAL_TOOL_STATUSES = new Set(["completed", "failed", "cancelled", "done", "error"]);
const HIDDEN_BOUNDARY_TAGS = new Set<AcpSessionUpdateTag>(["tool_call", "tool_call_update"]); const HIDDEN_BOUNDARY_TAGS = new Set<AcpSessionUpdateTag>(["tool_call", "tool_call_update"]);
@@ -99,6 +103,41 @@ function shouldInsertSeparator(params: {
return true; return true;
} }
function shouldFlushLiveBufferOnBoundary(text: string): boolean {
if (!text) {
return false;
}
if (text.length >= ACP_LIVE_HARD_FLUSH_CHARS) {
return true;
}
if (text.endsWith("\n\n")) {
return true;
}
if (/[.!?][)"'`]*\s$/.test(text)) {
return true;
}
if (text.length >= ACP_LIVE_SOFT_FLUSH_CHARS && /\s$/.test(text)) {
return true;
}
return false;
}
function shouldFlushLiveBufferOnIdle(text: string): boolean {
if (!text) {
return false;
}
if (text.length >= ACP_LIVE_IDLE_MIN_CHARS) {
return true;
}
if (/[.!?][)"'`]*$/.test(text.trimEnd())) {
return true;
}
if (text.includes("\n")) {
return true;
}
return false;
}
function renderToolSummaryText(event: Extract<AcpRuntimeEvent, { type: "tool_call" }>): string { function renderToolSummaryText(event: Extract<AcpRuntimeEvent, { type: "tool_call" }>): string {
const detailParts: string[] = []; const detailParts: string[] = [];
const title = event.title?.trim(); const title = event.title?.trim();
@@ -148,9 +187,10 @@ export function createAcpReplyProjector(params: {
await params.deliver("block", payload); await params.deliver("block", payload);
}, },
timeoutMs: ACP_BLOCK_REPLY_TIMEOUT_MS, timeoutMs: ACP_BLOCK_REPLY_TIMEOUT_MS,
coalescing: streaming.coalescing, coalescing: settings.deliveryMode === "live" ? undefined : streaming.coalescing,
}); });
const chunker = new EmbeddedBlockChunker(streaming.chunking); const chunker = new EmbeddedBlockChunker(streaming.chunking);
const liveIdleFlushMs = Math.max(streaming.coalescing.idleMs, ACP_LIVE_IDLE_FLUSH_FLOOR_MS);
let emittedTurnChars = 0; let emittedTurnChars = 0;
let emittedMetaEvents = 0; let emittedMetaEvents = 0;
@@ -160,10 +200,65 @@ export function createAcpReplyProjector(params: {
let lastUsageTuple: string | undefined; let lastUsageTuple: string | undefined;
let lastVisibleOutputTail: string | undefined; let lastVisibleOutputTail: string | undefined;
let pendingHiddenBoundary = false; let pendingHiddenBoundary = false;
let liveBufferText = "";
let liveIdleTimer: NodeJS.Timeout | undefined;
const pendingToolDeliveries: BufferedToolDelivery[] = []; const pendingToolDeliveries: BufferedToolDelivery[] = [];
const toolLifecycleById = new Map<string, ToolLifecycleState>(); const toolLifecycleById = new Map<string, ToolLifecycleState>();
const clearLiveIdleTimer = () => {
if (!liveIdleTimer) {
return;
}
clearTimeout(liveIdleTimer);
liveIdleTimer = undefined;
};
const drainChunker = (force: boolean) => {
if (settings.deliveryMode === "final_only" && !force) {
return;
}
chunker.drain({
force,
emit: (chunk) => {
blockReplyPipeline.enqueue({ text: chunk });
},
});
};
const flushLiveBuffer = (opts?: { force?: boolean; idle?: boolean }) => {
if (settings.deliveryMode !== "live") {
return;
}
if (!liveBufferText) {
return;
}
if (opts?.idle && !shouldFlushLiveBufferOnIdle(liveBufferText)) {
return;
}
const text = liveBufferText;
liveBufferText = "";
chunker.append(text);
drainChunker(opts?.force === true);
};
const scheduleLiveIdleFlush = () => {
if (settings.deliveryMode !== "live") {
return;
}
if (liveIdleFlushMs <= 0 || !liveBufferText) {
return;
}
clearLiveIdleTimer();
liveIdleTimer = setTimeout(() => {
flushLiveBuffer({ force: true, idle: true });
if (liveBufferText) {
scheduleLiveIdleFlush();
}
}, liveIdleFlushMs);
};
const resetTurnState = () => { const resetTurnState = () => {
clearLiveIdleTimer();
emittedTurnChars = 0; emittedTurnChars = 0;
emittedMetaEvents = 0; emittedMetaEvents = 0;
truncationNoticeEmitted = false; truncationNoticeEmitted = false;
@@ -172,23 +267,11 @@ export function createAcpReplyProjector(params: {
lastUsageTuple = undefined; lastUsageTuple = undefined;
lastVisibleOutputTail = undefined; lastVisibleOutputTail = undefined;
pendingHiddenBoundary = false; pendingHiddenBoundary = false;
liveBufferText = "";
pendingToolDeliveries.length = 0; pendingToolDeliveries.length = 0;
toolLifecycleById.clear(); toolLifecycleById.clear();
}; };
const drainChunker = (force: boolean) => {
if (settings.deliveryMode === "final_only" && !force) {
return;
}
const effectiveForce = settings.deliveryMode === "live" ? true : force;
chunker.drain({
force: effectiveForce,
emit: (chunk) => {
blockReplyPipeline.enqueue({ text: chunk });
},
});
};
const flushBufferedToolDeliveries = async (force: boolean) => { const flushBufferedToolDeliveries = async (force: boolean) => {
if (!(settings.deliveryMode === "final_only" && force)) { if (!(settings.deliveryMode === "final_only" && force)) {
return; return;
@@ -199,6 +282,10 @@ export function createAcpReplyProjector(params: {
}; };
const flush = async (force = false): Promise<void> => { const flush = async (force = false): Promise<void> => {
if (settings.deliveryMode === "live") {
clearLiveIdleTimer();
flushLiveBuffer({ force: true });
}
await flushBufferedToolDeliveries(force); await flushBufferedToolDeliveries(force);
drainChunker(force); drainChunker(force);
await blockReplyPipeline.flush({ force }); await blockReplyPipeline.flush({ force });
@@ -362,10 +449,20 @@ export function createAcpReplyProjector(params: {
const remaining = settings.maxTurnChars - emittedTurnChars; const remaining = settings.maxTurnChars - emittedTurnChars;
const accepted = remaining < text.length ? text.slice(0, remaining) : text; const accepted = remaining < text.length ? text.slice(0, remaining) : text;
if (accepted.length > 0) { if (accepted.length > 0) {
chunker.append(accepted);
emittedTurnChars += accepted.length; emittedTurnChars += accepted.length;
lastVisibleOutputTail = accepted.slice(-1); lastVisibleOutputTail = accepted.slice(-1);
drainChunker(false); if (settings.deliveryMode === "live") {
liveBufferText += accepted;
if (shouldFlushLiveBufferOnBoundary(liveBufferText)) {
clearLiveIdleTimer();
flushLiveBuffer({ force: true });
} else {
scheduleLiveIdleFlush();
}
} else {
chunker.append(accepted);
drainChunker(false);
}
} }
if (accepted.length < text.length) { if (accepted.length < text.length) {
await emitTruncationNotice(); await emitTruncationNotice();
@@ -396,7 +493,9 @@ export function createAcpReplyProjector(params: {
if (event.type === "tool_call") { if (event.type === "tool_call") {
if (!isAcpTagVisible(settings, event.tag)) { if (!isAcpTagVisible(settings, event.tag)) {
if (event.tag && HIDDEN_BOUNDARY_TAGS.has(event.tag)) { if (event.tag && HIDDEN_BOUNDARY_TAGS.has(event.tag)) {
pendingHiddenBoundary = true; const status = normalizeToolStatus(event.status);
const isTerminal = status ? TERMINAL_TOOL_STATUSES.has(status) : false;
pendingHiddenBoundary = event.tag === "tool_call" || isTerminal;
} }
return; return;
} }

View File

@@ -54,6 +54,7 @@ describe("acp stream settings", () => {
}), }),
); );
expect(settings.deliveryMode).toBe("live"); expect(settings.deliveryMode).toBe("live");
expect(settings.hiddenBoundarySeparator).toBe("space");
}); });
it("uses default tag visibility when no override is provided", () => { it("uses default tag visibility when no override is provided", () => {

View File

@@ -7,6 +7,7 @@ const DEFAULT_ACP_STREAM_MAX_CHUNK_CHARS = 1800;
const DEFAULT_ACP_REPEAT_SUPPRESSION = true; const DEFAULT_ACP_REPEAT_SUPPRESSION = true;
const DEFAULT_ACP_DELIVERY_MODE = "final_only"; const DEFAULT_ACP_DELIVERY_MODE = "final_only";
const DEFAULT_ACP_HIDDEN_BOUNDARY_SEPARATOR = "paragraph"; const DEFAULT_ACP_HIDDEN_BOUNDARY_SEPARATOR = "paragraph";
const DEFAULT_ACP_HIDDEN_BOUNDARY_SEPARATOR_LIVE = "space";
const DEFAULT_ACP_MAX_TURN_CHARS = 24_000; const DEFAULT_ACP_MAX_TURN_CHARS = 24_000;
const DEFAULT_ACP_MAX_TOOL_SUMMARY_CHARS = 320; const DEFAULT_ACP_MAX_TOOL_SUMMARY_CHARS = 320;
const DEFAULT_ACP_MAX_STATUS_CHARS = 320; const DEFAULT_ACP_MAX_STATUS_CHARS = 320;
@@ -68,11 +69,14 @@ function resolveAcpDeliveryMode(value: unknown): AcpDeliveryMode {
return DEFAULT_ACP_DELIVERY_MODE; return DEFAULT_ACP_DELIVERY_MODE;
} }
function resolveAcpHiddenBoundarySeparator(value: unknown): AcpHiddenBoundarySeparator { function resolveAcpHiddenBoundarySeparator(
value: unknown,
fallback: AcpHiddenBoundarySeparator,
): AcpHiddenBoundarySeparator {
if (value === "none" || value === "space" || value === "newline" || value === "paragraph") { if (value === "none" || value === "space" || value === "newline" || value === "paragraph") {
return value; return value;
} }
return DEFAULT_ACP_HIDDEN_BOUNDARY_SEPARATOR; return fallback;
} }
function resolveAcpStreamCoalesceIdleMs(cfg: OpenClawConfig): number { function resolveAcpStreamCoalesceIdleMs(cfg: OpenClawConfig): number {
@@ -95,9 +99,17 @@ function resolveAcpStreamMaxChunkChars(cfg: OpenClawConfig): number {
export function resolveAcpProjectionSettings(cfg: OpenClawConfig): AcpProjectionSettings { export function resolveAcpProjectionSettings(cfg: OpenClawConfig): AcpProjectionSettings {
const stream = cfg.acp?.stream; const stream = cfg.acp?.stream;
const deliveryMode = resolveAcpDeliveryMode(stream?.deliveryMode);
const hiddenBoundaryFallback: AcpHiddenBoundarySeparator =
deliveryMode === "live"
? DEFAULT_ACP_HIDDEN_BOUNDARY_SEPARATOR_LIVE
: DEFAULT_ACP_HIDDEN_BOUNDARY_SEPARATOR;
return { return {
deliveryMode: resolveAcpDeliveryMode(stream?.deliveryMode), deliveryMode,
hiddenBoundarySeparator: resolveAcpHiddenBoundarySeparator(stream?.hiddenBoundarySeparator), hiddenBoundarySeparator: resolveAcpHiddenBoundarySeparator(
stream?.hiddenBoundarySeparator,
hiddenBoundaryFallback,
),
repeatSuppression: clampBoolean(stream?.repeatSuppression, DEFAULT_ACP_REPEAT_SUPPRESSION), repeatSuppression: clampBoolean(stream?.repeatSuppression, DEFAULT_ACP_REPEAT_SUPPRESSION),
maxTurnChars: clampPositiveInteger(stream?.maxTurnChars, DEFAULT_ACP_MAX_TURN_CHARS, { maxTurnChars: clampPositiveInteger(stream?.maxTurnChars, DEFAULT_ACP_MAX_TURN_CHARS, {
min: 1, min: 1,