refactor: dedupe agent and reply runtimes

This commit is contained in:
Peter Steinberger
2026-03-02 19:47:30 +00:00
parent 8768487aee
commit 9617ac9dd5
53 changed files with 1828 additions and 1176 deletions

View File

@@ -33,6 +33,107 @@ function expectToolCallSummary(delivery: Delivery | undefined) {
expect(delivery?.text).toContain("Tool Call");
}
function createFinalOnlyStatusToolHarness() {
return createProjectorHarness({
acp: {
enabled: true,
stream: {
coalesceIdleMs: 0,
maxChunkChars: 512,
deliveryMode: "final_only",
tagVisibility: {
available_commands_update: true,
tool_call: true,
},
},
},
});
}
function createLiveToolLifecycleHarness(params?: {
coalesceIdleMs?: number;
maxChunkChars?: number;
maxSessionUpdateChars?: number;
repeatSuppression?: boolean;
}) {
return createProjectorHarness({
acp: {
enabled: true,
stream: {
deliveryMode: "live",
...params,
tagVisibility: {
tool_call: true,
tool_call_update: true,
},
},
},
});
}
function createLiveStatusAndToolLifecycleHarness(params?: {
coalesceIdleMs?: number;
maxChunkChars?: number;
repeatSuppression?: boolean;
}) {
return createProjectorHarness({
acp: {
enabled: true,
stream: {
deliveryMode: "live",
...params,
tagVisibility: {
available_commands_update: true,
tool_call: true,
tool_call_update: true,
},
},
},
});
}
async function runHiddenBoundaryCase(params: {
cfgOverrides?: Parameters<typeof createCfg>[0];
toolCallId: string;
includeNonTerminalUpdate?: boolean;
firstText?: string;
secondText?: string;
expectedText: string;
}) {
const { deliveries, projector } = createProjectorHarness(params.cfgOverrides);
await projector.onEvent({
type: "text_delta",
text: params.firstText ?? "fallback.",
tag: "agent_message_chunk",
});
await projector.onEvent({
type: "tool_call",
tag: "tool_call",
toolCallId: params.toolCallId,
status: "in_progress",
title: "Run test",
text: "Run test (in_progress)",
});
if (params.includeNonTerminalUpdate) {
await projector.onEvent({
type: "tool_call",
tag: "tool_call_update",
toolCallId: params.toolCallId,
status: "in_progress",
title: "Run test",
text: "Run test (in_progress)",
});
}
await projector.onEvent({
type: "text_delta",
text: params.secondText ?? "I don't",
tag: "agent_message_chunk",
});
await projector.flush(true);
expect(combinedBlockText(deliveries)).toBe(params.expectedText);
}
describe("createAcpReplyProjector", () => {
it("coalesces text deltas into bounded block chunks", async () => {
const { deliveries, projector } = createProjectorHarness();
@@ -174,20 +275,7 @@ describe("createAcpReplyProjector", () => {
});
it("supports deliveryMode=final_only by buffering all projected output until done", async () => {
const { deliveries, projector } = createProjectorHarness({
acp: {
enabled: true,
stream: {
coalesceIdleMs: 0,
maxChunkChars: 512,
deliveryMode: "final_only",
tagVisibility: {
available_commands_update: true,
tool_call: true,
},
},
},
});
const { deliveries, projector } = createFinalOnlyStatusToolHarness();
await projector.onEvent({
type: "text_delta",
@@ -225,20 +313,7 @@ describe("createAcpReplyProjector", () => {
});
it("flushes buffered status/tool output on error in deliveryMode=final_only", async () => {
const { deliveries, projector } = createProjectorHarness({
acp: {
enabled: true,
stream: {
coalesceIdleMs: 0,
maxChunkChars: 512,
deliveryMode: "final_only",
tagVisibility: {
available_commands_update: true,
tool_call: true,
},
},
},
});
const { deliveries, projector } = createFinalOnlyStatusToolHarness();
await projector.onEvent({
type: "status",
@@ -329,18 +404,7 @@ describe("createAcpReplyProjector", () => {
});
it("dedupes repeated tool lifecycle updates when repeatSuppression is enabled", async () => {
const { deliveries, projector } = createProjectorHarness({
acp: {
enabled: true,
stream: {
deliveryMode: "live",
tagVisibility: {
tool_call: true,
tool_call_update: true,
},
},
},
});
const { deliveries, projector } = createLiveToolLifecycleHarness();
await projector.onEvent({
type: "tool_call",
@@ -381,18 +445,8 @@ describe("createAcpReplyProjector", () => {
});
it("keeps terminal tool updates even when rendered summaries are truncated", async () => {
const { deliveries, projector } = createProjectorHarness({
acp: {
enabled: true,
stream: {
deliveryMode: "live",
maxSessionUpdateChars: 48,
tagVisibility: {
tool_call: true,
tool_call_update: true,
},
},
},
const { deliveries, projector } = createLiveToolLifecycleHarness({
maxSessionUpdateChars: 48,
});
const longTitle =
@@ -420,18 +474,7 @@ describe("createAcpReplyProjector", () => {
});
it("renders fallback tool labels without leaking call ids as primary label", async () => {
const { deliveries, projector } = createProjectorHarness({
acp: {
enabled: true,
stream: {
deliveryMode: "live",
tagVisibility: {
tool_call: true,
tool_call_update: true,
},
},
},
});
const { deliveries, projector } = createLiveToolLifecycleHarness();
await projector.onEvent({
type: "tool_call",
@@ -446,21 +489,10 @@ describe("createAcpReplyProjector", () => {
});
it("allows repeated status/tool summaries when repeatSuppression is disabled", async () => {
const { deliveries, projector } = createProjectorHarness({
acp: {
enabled: true,
stream: {
coalesceIdleMs: 0,
maxChunkChars: 256,
deliveryMode: "live",
repeatSuppression: false,
tagVisibility: {
available_commands_update: true,
tool_call: true,
tool_call_update: true,
},
},
},
const { deliveries, projector } = createLiveStatusAndToolLifecycleHarness({
coalesceIdleMs: 0,
maxChunkChars: 256,
repeatSuppression: false,
});
await projector.onEvent({
@@ -616,156 +648,96 @@ describe("createAcpReplyProjector", () => {
});
it("inserts a space boundary before visible text after hidden tool updates by default", async () => {
const { deliveries, projector } = createProjectorHarness({
acp: {
enabled: true,
stream: {
coalesceIdleMs: 0,
maxChunkChars: 256,
deliveryMode: "live",
},
},
});
await projector.onEvent({ type: "text_delta", text: "fallback.", tag: "agent_message_chunk" });
await projector.onEvent({
type: "tool_call",
tag: "tool_call",
toolCallId: "call_hidden_1",
status: "in_progress",
title: "Run test",
text: "Run test (in_progress)",
});
await projector.onEvent({ type: "text_delta", text: "I don't", tag: "agent_message_chunk" });
await projector.flush(true);
expect(combinedBlockText(deliveries)).toBe("fallback. I don't");
});
it("preserves hidden boundary across nonterminal hidden tool updates", async () => {
const { deliveries, projector } = createProjectorHarness({
acp: {
enabled: true,
stream: {
coalesceIdleMs: 0,
maxChunkChars: 256,
deliveryMode: "live",
tagVisibility: {
tool_call: false,
tool_call_update: false,
await runHiddenBoundaryCase({
cfgOverrides: {
acp: {
enabled: true,
stream: {
coalesceIdleMs: 0,
maxChunkChars: 256,
deliveryMode: "live",
},
},
},
toolCallId: "call_hidden_1",
expectedText: "fallback. I don't",
});
});
await projector.onEvent({ type: "text_delta", text: "fallback.", tag: "agent_message_chunk" });
await projector.onEvent({
type: "tool_call",
tag: "tool_call",
it("preserves hidden boundary across nonterminal hidden tool updates", async () => {
await runHiddenBoundaryCase({
cfgOverrides: {
acp: {
enabled: true,
stream: {
coalesceIdleMs: 0,
maxChunkChars: 256,
deliveryMode: "live",
tagVisibility: {
tool_call: false,
tool_call_update: false,
},
},
},
},
toolCallId: "hidden_boundary_1",
status: "in_progress",
title: "Run test",
text: "Run test (in_progress)",
includeNonTerminalUpdate: true,
expectedText: "fallback. I don't",
});
await projector.onEvent({
type: "tool_call",
tag: "tool_call_update",
toolCallId: "hidden_boundary_1",
status: "in_progress",
title: "Run test",
text: "Run test (in_progress)",
});
await projector.onEvent({ type: "text_delta", text: "I don't", tag: "agent_message_chunk" });
await projector.flush(true);
expect(combinedBlockText(deliveries)).toBe("fallback. I don't");
});
it("supports hiddenBoundarySeparator=space", async () => {
const { deliveries, projector } = createProjectorHarness({
acp: {
enabled: true,
stream: {
coalesceIdleMs: 0,
maxChunkChars: 256,
deliveryMode: "live",
hiddenBoundarySeparator: "space",
await runHiddenBoundaryCase({
cfgOverrides: {
acp: {
enabled: true,
stream: {
coalesceIdleMs: 0,
maxChunkChars: 256,
deliveryMode: "live",
hiddenBoundarySeparator: "space",
},
},
},
});
await projector.onEvent({ type: "text_delta", text: "fallback.", tag: "agent_message_chunk" });
await projector.onEvent({
type: "tool_call",
tag: "tool_call",
toolCallId: "call_hidden_2",
status: "in_progress",
title: "Run test",
text: "Run test (in_progress)",
expectedText: "fallback. I don't",
});
await projector.onEvent({ type: "text_delta", text: "I don't", tag: "agent_message_chunk" });
await projector.flush(true);
expect(combinedBlockText(deliveries)).toBe("fallback. I don't");
});
it("supports hiddenBoundarySeparator=none", async () => {
const { deliveries, projector } = createProjectorHarness({
acp: {
enabled: true,
stream: {
coalesceIdleMs: 0,
maxChunkChars: 256,
deliveryMode: "live",
hiddenBoundarySeparator: "none",
await runHiddenBoundaryCase({
cfgOverrides: {
acp: {
enabled: true,
stream: {
coalesceIdleMs: 0,
maxChunkChars: 256,
deliveryMode: "live",
hiddenBoundarySeparator: "none",
},
},
},
});
await projector.onEvent({ type: "text_delta", text: "fallback.", tag: "agent_message_chunk" });
await projector.onEvent({
type: "tool_call",
tag: "tool_call",
toolCallId: "call_hidden_3",
status: "in_progress",
title: "Run test",
text: "Run test (in_progress)",
expectedText: "fallback.I don't",
});
await projector.onEvent({ type: "text_delta", text: "I don't", tag: "agent_message_chunk" });
await projector.flush(true);
expect(combinedBlockText(deliveries)).toBe("fallback.I don't");
});
it("does not duplicate newlines when previous visible text already ends with newline", async () => {
const { deliveries, projector } = createProjectorHarness({
acp: {
enabled: true,
stream: {
coalesceIdleMs: 0,
maxChunkChars: 256,
deliveryMode: "live",
await runHiddenBoundaryCase({
cfgOverrides: {
acp: {
enabled: true,
stream: {
coalesceIdleMs: 0,
maxChunkChars: 256,
deliveryMode: "live",
},
},
},
});
await projector.onEvent({
type: "text_delta",
text: "fallback.\n",
tag: "agent_message_chunk",
});
await projector.onEvent({
type: "tool_call",
tag: "tool_call",
toolCallId: "call_hidden_4",
status: "in_progress",
title: "Run test",
text: "Run test (in_progress)",
firstText: "fallback.\n",
expectedText: "fallback.\nI don't",
});
await projector.onEvent({ type: "text_delta", text: "I don't", tag: "agent_message_chunk" });
await projector.flush(true);
expect(combinedBlockText(deliveries)).toBe("fallback.\nI don't");
});
it("does not insert boundary separator for hidden non-tool status updates", async () => {

View File

@@ -181,6 +181,20 @@ export async function buildContextReply(params: HandleCommandsParams): Promise<R
session.totalTokens != null
? `Session tokens (cached): ${formatInt(session.totalTokens)} total / ctx=${session.contextTokens ?? "?"}`
: `Session tokens (cached): unknown / ctx=${session.contextTokens ?? "?"}`;
const sharedContextLines = [
`Workspace: ${workspaceLabel}`,
`Bootstrap max/file: ${bootstrapMaxLabel}`,
`Bootstrap max/total: ${bootstrapTotalLabel}`,
sandboxLine,
systemPromptLine,
...(bootstrapWarningLines.length ? ["", ...bootstrapWarningLines] : []),
"",
"Injected workspace files:",
...fileLines,
"",
skillsLine,
skillsNamesLine,
];
if (sub === "detail" || sub === "deep") {
const perSkill = formatListTop(
@@ -204,18 +218,7 @@ export async function buildContextReply(params: HandleCommandsParams): Promise<R
return {
text: [
"🧠 Context breakdown (detailed)",
`Workspace: ${workspaceLabel}`,
`Bootstrap max/file: ${bootstrapMaxLabel}`,
`Bootstrap max/total: ${bootstrapTotalLabel}`,
sandboxLine,
systemPromptLine,
...(bootstrapWarningLines.length ? ["", ...bootstrapWarningLines] : []),
"",
"Injected workspace files:",
...fileLines,
"",
skillsLine,
skillsNamesLine,
...sharedContextLines,
...(perSkill.lines.length ? ["Top skills (prompt entry size):", ...perSkill.lines] : []),
...(perSkill.omitted ? [`… (+${perSkill.omitted} more skills)`] : []),
"",
@@ -243,18 +246,7 @@ export async function buildContextReply(params: HandleCommandsParams): Promise<R
return {
text: [
"🧠 Context breakdown",
`Workspace: ${workspaceLabel}`,
`Bootstrap max/file: ${bootstrapMaxLabel}`,
`Bootstrap max/total: ${bootstrapTotalLabel}`,
sandboxLine,
systemPromptLine,
...(bootstrapWarningLines.length ? ["", ...bootstrapWarningLines] : []),
"",
"Injected workspace files:",
...fileLines,
"",
skillsLine,
skillsNamesLine,
...sharedContextLines,
toolListLine,
toolSchemaLine,
toolsNamesLine,

View File

@@ -20,30 +20,13 @@ import { normalizeUsageDisplay, resolveResponseUsageMode } from "../thinking.js"
import { handleAbortTrigger, handleStopCommand } from "./commands-session-abort.js";
import { persistSessionEntry } from "./commands-session-store.js";
import type { CommandHandler } from "./commands-types.js";
import { isDiscordSurface, resolveDiscordAccountId } from "./discord-context.js";
const SESSION_COMMAND_PREFIX = "/session";
const SESSION_DURATION_OFF_VALUES = new Set(["off", "disable", "disabled", "none", "0"]);
const SESSION_ACTION_IDLE = "idle";
const SESSION_ACTION_MAX_AGE = "max-age";
function isDiscordSurface(params: Parameters<CommandHandler>[0]): boolean {
const channel =
params.ctx.OriginatingChannel ??
params.command.channel ??
params.ctx.Surface ??
params.ctx.Provider;
return (
String(channel ?? "")
.trim()
.toLowerCase() === "discord"
);
}
function resolveDiscordAccountId(params: Parameters<CommandHandler>[0]): string {
const accountId = typeof params.ctx.AccountId === "string" ? params.ctx.AccountId.trim() : "";
return accountId || "default";
}
function resolveSessionCommandUsage() {
return "Usage: /session idle <duration|off> | /session max-age <duration|off> (example: /session idle 24h)";
}

View File

@@ -22,6 +22,7 @@ import {
truncateLine,
} from "../../../shared/subagents-format.js";
import type { CommandHandler, CommandHandlerResult } from "../commands-types.js";
import { isDiscordSurface, resolveDiscordAccountId } from "../discord-context.js";
import {
formatRunLabel,
formatRunStatus,
@@ -30,6 +31,7 @@ import {
} from "../subagents-utils.js";
export { extractAssistantText, stripToolMessages };
export { isDiscordSurface, resolveDiscordAccountId };
export const COMMAND = "/subagents";
export const COMMAND_KILL = "/kill";
@@ -267,24 +269,6 @@ export type FocusTargetResolution = {
label?: string;
};
export function isDiscordSurface(params: SubagentsCommandParams): boolean {
const channel =
params.ctx.OriginatingChannel ??
params.command.channel ??
params.ctx.Surface ??
params.ctx.Provider;
return (
String(channel ?? "")
.trim()
.toLowerCase() === "discord"
);
}
export function resolveDiscordAccountId(params: SubagentsCommandParams): string {
const accountId = typeof params.ctx.AccountId === "string" ? params.ctx.AccountId.trim() : "";
return accountId || "default";
}
export function resolveDiscordChannelIdForFocus(
params: SubagentsCommandParams,
): string | undefined {

View File

@@ -0,0 +1,35 @@
type DiscordSurfaceParams = {
ctx: {
OriginatingChannel?: string;
Surface?: string;
Provider?: string;
AccountId?: string;
};
command: {
channel?: string;
};
};
type DiscordAccountParams = {
ctx: {
AccountId?: string;
};
};
export function isDiscordSurface(params: DiscordSurfaceParams): boolean {
const channel =
params.ctx.OriginatingChannel ??
params.command.channel ??
params.ctx.Surface ??
params.ctx.Provider;
return (
String(channel ?? "")
.trim()
.toLowerCase() === "discord"
);
}
export function resolveDiscordAccountId(params: DiscordAccountParams): string {
const accountId = typeof params.ctx.AccountId === "string" ? params.ctx.AccountId.trim() : "";
return accountId || "default";
}

View File

@@ -68,6 +68,28 @@ describe("createModelSelectionState parent inheritance", () => {
});
}
async function resolveStateWithParent(params: {
cfg: OpenClawConfig;
parentKey: string;
sessionKey: string;
parentEntry: ReturnType<typeof makeEntry>;
sessionEntry?: ReturnType<typeof makeEntry>;
parentSessionKey?: string;
}) {
const sessionEntry = params.sessionEntry ?? makeEntry();
const sessionStore = {
[params.parentKey]: params.parentEntry,
[params.sessionKey]: sessionEntry,
};
return resolveState({
cfg: params.cfg,
sessionEntry,
sessionStore,
sessionKey: params.sessionKey,
parentSessionKey: params.parentSessionKey,
});
}
it("inherits parent override from explicit parentSessionKey", async () => {
const cfg = {} as OpenClawConfig;
const parentKey = "agent:main:discord:channel:c1";
@@ -76,17 +98,11 @@ describe("createModelSelectionState parent inheritance", () => {
providerOverride: "openai",
modelOverride: "gpt-4o",
});
const sessionEntry = makeEntry();
const sessionStore = {
[parentKey]: parentEntry,
[sessionKey]: sessionEntry,
};
const state = await resolveState({
const state = await resolveStateWithParent({
cfg,
sessionEntry,
sessionStore,
parentKey,
sessionKey,
parentEntry,
parentSessionKey: parentKey,
});
@@ -102,17 +118,11 @@ describe("createModelSelectionState parent inheritance", () => {
providerOverride: "openai",
modelOverride: "gpt-4o",
});
const sessionEntry = makeEntry();
const sessionStore = {
[parentKey]: parentEntry,
[sessionKey]: sessionEntry,
};
const state = await resolveState({
const state = await resolveStateWithParent({
cfg,
sessionEntry,
sessionStore,
parentKey,
sessionKey,
parentEntry,
});
expect(state.provider).toBe("openai");
@@ -131,15 +141,11 @@ describe("createModelSelectionState parent inheritance", () => {
providerOverride: "anthropic",
modelOverride: "claude-opus-4-5",
});
const sessionStore = {
[parentKey]: parentEntry,
[sessionKey]: sessionEntry,
};
const state = await resolveState({
const state = await resolveStateWithParent({
cfg,
parentKey,
parentEntry,
sessionEntry,
sessionStore,
sessionKey,
});
@@ -163,17 +169,11 @@ describe("createModelSelectionState parent inheritance", () => {
providerOverride: "anthropic",
modelOverride: "claude-opus-4-5",
});
const sessionEntry = makeEntry();
const sessionStore = {
[parentKey]: parentEntry,
[sessionKey]: sessionEntry,
};
const state = await resolveState({
const state = await resolveStateWithParent({
cfg,
sessionEntry,
sessionStore,
parentKey,
sessionKey,
parentEntry,
});
expect(state.provider).toBe(defaultProvider);