ACP: start typing lifecycle at turn start and harden delivery

This commit is contained in:
Onur
2026-03-01 15:21:21 +01:00
committed by Onur Solmaz
parent c8b958e573
commit 43c57005a6
8 changed files with 189 additions and 11 deletions

View File

@@ -574,7 +574,7 @@ describe("createAcpReplyProjector", () => {
expect(deliveries[0]?.text).toContain("Tool Call");
});
it("inserts a newline boundary before visible text after hidden tool updates by default", async () => {
it("inserts a paragraph boundary before visible text after hidden tool updates by default", async () => {
const deliveries: Array<{ kind: string; text?: string }> = [];
const projector = createAcpReplyProjector({
cfg: createCfg({
@@ -610,7 +610,7 @@ describe("createAcpReplyProjector", () => {
.filter((entry) => entry.kind === "block")
.map((entry) => entry.text ?? "")
.join("");
expect(combinedText).toBe("fallback.\nI don't");
expect(combinedText).toBe("fallback.\n\nI don't");
});
it("supports hiddenBoundarySeparator=space", async () => {

View File

@@ -10,7 +10,7 @@ describe("acp stream settings", () => {
it("resolves stable defaults", () => {
const settings = resolveAcpProjectionSettings(createAcpTestConfig());
expect(settings.deliveryMode).toBe("final_only");
expect(settings.hiddenBoundarySeparator).toBe("newline");
expect(settings.hiddenBoundarySeparator).toBe("paragraph");
expect(settings.repeatSuppression).toBe(true);
expect(settings.maxTurnChars).toBe(24_000);
expect(settings.maxMetaEventsPerTurn).toBe(64);

View File

@@ -6,7 +6,7 @@ const DEFAULT_ACP_STREAM_COALESCE_IDLE_MS = 350;
const DEFAULT_ACP_STREAM_MAX_CHUNK_CHARS = 1800;
const DEFAULT_ACP_REPEAT_SUPPRESSION = true;
const DEFAULT_ACP_DELIVERY_MODE = "final_only";
const DEFAULT_ACP_HIDDEN_BOUNDARY_SEPARATOR = "newline";
const DEFAULT_ACP_HIDDEN_BOUNDARY_SEPARATOR = "paragraph";
const DEFAULT_ACP_MAX_TURN_CHARS = 24_000;
const DEFAULT_ACP_MAX_TOOL_SUMMARY_CHARS = 320;
const DEFAULT_ACP_MAX_STATUS_CHARS = 320;

View File

@@ -0,0 +1,93 @@
import { describe, expect, it, vi } from "vitest";
import { createAcpDispatchDeliveryCoordinator } from "./dispatch-acp-delivery.js";
import type { ReplyDispatcher } from "./reply-dispatcher.js";
import { buildTestCtx } from "./test-ctx.js";
import { createAcpTestConfig } from "./test-fixtures/acp-runtime.js";
const ttsMocks = vi.hoisted(() => ({
maybeApplyTtsToPayload: vi.fn(async (paramsUnknown: unknown) => {
const params = paramsUnknown as { payload: unknown };
return params.payload;
}),
}));
vi.mock("../../tts/tts.js", () => ({
maybeApplyTtsToPayload: (params: unknown) => ttsMocks.maybeApplyTtsToPayload(params),
}));
function createDispatcher(): ReplyDispatcher {
return {
sendToolResult: vi.fn(() => true),
sendBlockReply: vi.fn(() => true),
sendFinalReply: vi.fn(() => true),
waitForIdle: vi.fn(async () => {}),
getQueuedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })),
markComplete: vi.fn(),
};
}
describe("createAcpDispatchDeliveryCoordinator", () => {
it("starts reply lifecycle only once when called directly and through deliver", async () => {
const onReplyStart = vi.fn(async () => {});
const coordinator = createAcpDispatchDeliveryCoordinator({
cfg: createAcpTestConfig(),
ctx: buildTestCtx({
Provider: "discord",
Surface: "discord",
SessionKey: "agent:codex-acp:session-1",
}),
dispatcher: createDispatcher(),
inboundAudio: false,
shouldRouteToOriginating: false,
onReplyStart,
});
await coordinator.startReplyLifecycle();
await coordinator.deliver("final", { text: "hello" });
await coordinator.startReplyLifecycle();
await coordinator.deliver("block", { text: "world" });
expect(onReplyStart).toHaveBeenCalledTimes(1);
});
it("starts reply lifecycle once when deliver triggers first", async () => {
const onReplyStart = vi.fn(async () => {});
const coordinator = createAcpDispatchDeliveryCoordinator({
cfg: createAcpTestConfig(),
ctx: buildTestCtx({
Provider: "discord",
Surface: "discord",
SessionKey: "agent:codex-acp:session-1",
}),
dispatcher: createDispatcher(),
inboundAudio: false,
shouldRouteToOriginating: false,
onReplyStart,
});
await coordinator.deliver("final", { text: "hello" });
await coordinator.startReplyLifecycle();
expect(onReplyStart).toHaveBeenCalledTimes(1);
});
it("does not start reply lifecycle for empty payload delivery", async () => {
const onReplyStart = vi.fn(async () => {});
const coordinator = createAcpDispatchDeliveryCoordinator({
cfg: createAcpTestConfig(),
ctx: buildTestCtx({
Provider: "discord",
Surface: "discord",
SessionKey: "agent:codex-acp:session-1",
}),
dispatcher: createDispatcher(),
inboundAudio: false,
shouldRouteToOriginating: false,
onReplyStart,
});
await coordinator.deliver("final", {});
expect(onReplyStart).not.toHaveBeenCalled();
});
});

View File

@@ -30,6 +30,7 @@ type AcpDispatchDeliveryState = {
};
export type AcpDispatchDeliveryCoordinator = {
startReplyLifecycle: () => Promise<void>;
deliver: (
kind: ReplyDispatchKind,
payload: ReplyPayload,
@@ -65,7 +66,7 @@ export function createAcpDispatchDeliveryCoordinator(params: {
toolMessageByCallId: new Map(),
};
const ensureReplyLifecycleStarted = async () => {
const startReplyLifecycleOnce = async () => {
if (state.startedReplyLifecycle) {
return;
}
@@ -127,7 +128,7 @@ export function createAcpDispatchDeliveryCoordinator(params: {
}
if ((payload.text?.trim() ?? "").length > 0 || payload.mediaUrl || payload.mediaUrls?.length) {
await ensureReplyLifecycleStarted();
await startReplyLifecycleOnce();
}
const ttsPayload = await maybeApplyTtsToPayload({
@@ -186,6 +187,7 @@ export function createAcpDispatchDeliveryCoordinator(params: {
};
return {
startReplyLifecycle: startReplyLifecycleOnce,
deliver,
getBlockCount: () => state.blockCount,
getAccumulatedBlockText: () => state.accumulatedBlockText,

View File

@@ -103,6 +103,20 @@ function setReadyAcpResolution() {
});
}
function createAcpConfigWithVisibleToolTags(): OpenClawConfig {
return createAcpTestConfig({
acp: {
enabled: true,
stream: {
tagVisibility: {
tool_call: true,
tool_call_update: true,
},
},
},
});
}
describe("tryDispatchAcpReply", () => {
beforeEach(() => {
managerMocks.resolveSession.mockReset();
@@ -202,7 +216,7 @@ describe("tryDispatchAcpReply", () => {
SessionKey: "agent:codex-acp:session-1",
BodyForAgent: "run tool",
}),
cfg: createAcpTestConfig(),
cfg: createAcpConfigWithVisibleToolTags(),
dispatcher,
sessionKey: "agent:codex-acp:session-1",
inboundAudio: false,
@@ -262,7 +276,7 @@ describe("tryDispatchAcpReply", () => {
SessionKey: "agent:codex-acp:session-1",
BodyForAgent: "run tool",
}),
cfg: createAcpTestConfig(),
cfg: createAcpConfigWithVisibleToolTags(),
dispatcher,
sessionKey: "agent:codex-acp:session-1",
inboundAudio: false,
@@ -279,7 +293,7 @@ describe("tryDispatchAcpReply", () => {
expect(routeMocks.routeReply).toHaveBeenCalledTimes(2);
});
it("starts reply lifecycle only when visible projected output is emitted", async () => {
it("starts reply lifecycle when ACP turn starts, including hidden-only turns", async () => {
setReadyAcpResolution();
const onReplyStart = vi.fn();
const { dispatcher } = createDispatcher();
@@ -314,7 +328,7 @@ describe("tryDispatchAcpReply", () => {
recordProcessed: vi.fn(),
markIdle: vi.fn(),
});
expect(onReplyStart).not.toHaveBeenCalled();
expect(onReplyStart).toHaveBeenCalledTimes(1);
managerMocks.runTurn.mockImplementationOnce(
async ({ onEvent }: { onEvent: (event: unknown) => Promise<void> }) => {
@@ -340,9 +354,70 @@ describe("tryDispatchAcpReply", () => {
recordProcessed: vi.fn(),
markIdle: vi.fn(),
});
expect(onReplyStart).toHaveBeenCalledTimes(2);
});
it("starts reply lifecycle once per turn when output is delivered", async () => {
setReadyAcpResolution();
const onReplyStart = vi.fn();
managerMocks.runTurn.mockImplementationOnce(
async ({ onEvent }: { onEvent: (event: unknown) => Promise<void> }) => {
await onEvent({ type: "text_delta", text: "visible", tag: "agent_message_chunk" });
await onEvent({ type: "done" });
},
);
await tryDispatchAcpReply({
ctx: buildTestCtx({
Provider: "discord",
Surface: "discord",
SessionKey: "agent:codex-acp:session-1",
BodyForAgent: "visible",
}),
cfg: createAcpTestConfig(),
dispatcher: createDispatcher().dispatcher,
sessionKey: "agent:codex-acp:session-1",
inboundAudio: false,
shouldRouteToOriginating: false,
shouldSendToolSummaries: true,
bypassForCommand: false,
onReplyStart,
recordProcessed: vi.fn(),
markIdle: vi.fn(),
});
expect(onReplyStart).toHaveBeenCalledTimes(1);
});
it("does not start reply lifecycle for empty ACP prompt", async () => {
setReadyAcpResolution();
const onReplyStart = vi.fn();
const { dispatcher } = createDispatcher();
await tryDispatchAcpReply({
ctx: buildTestCtx({
Provider: "discord",
Surface: "discord",
SessionKey: "agent:codex-acp:session-1",
BodyForAgent: " ",
}),
cfg: createAcpTestConfig(),
dispatcher,
sessionKey: "agent:codex-acp:session-1",
inboundAudio: false,
shouldRouteToOriginating: false,
shouldSendToolSummaries: true,
bypassForCommand: false,
onReplyStart,
recordProcessed: vi.fn(),
markIdle: vi.fn(),
});
expect(managerMocks.runTurn).not.toHaveBeenCalled();
expect(onReplyStart).not.toHaveBeenCalled();
});
it("surfaces ACP policy errors as final error replies", async () => {
setReadyAcpResolution();
policyMocks.resolveAcpDispatchPolicyError.mockReturnValue(

View File

@@ -239,6 +239,14 @@ export async function tryDispatchAcpReply(params: {
throw agentPolicyError;
}
try {
await delivery.startReplyLifecycle();
} catch (error) {
logVerbose(
`dispatch-acp: start reply lifecycle failed: ${error instanceof Error ? error.message : String(error)}`,
);
}
await acpManager.runTurn({
cfg: params.cfg,
sessionKey,

View File

@@ -177,7 +177,7 @@ export const FIELD_HELP: Record<string, string> = {
"acp.stream.deliveryMode":
"ACP delivery style: live streams projected output incrementally, final_only buffers all projected ACP output until terminal turn events.",
"acp.stream.hiddenBoundarySeparator":
"Separator inserted before next visible assistant text when hidden ACP tool lifecycle events occurred (none|space|newline|paragraph).",
"Separator inserted before next visible assistant text when hidden ACP tool lifecycle events occurred (none|space|newline|paragraph). Default: paragraph.",
"acp.stream.maxTurnChars":
"Maximum assistant text characters projected per ACP turn before truncation notice is emitted.",
"acp.stream.maxToolSummaryChars":