refactor(commands): unify repeated ACP and routing flows

This commit is contained in:
Peter Steinberger
2026-03-02 05:19:47 +00:00
parent 2d31126e6a
commit 6b78544f82
12 changed files with 333 additions and 442 deletions

View File

@@ -1,6 +1,6 @@
import type { AcpSessionUpdateTag } from "../../acp/runtime/types.js";
import type { OpenClawConfig } from "../../config/config.js";
import { resolveEffectiveBlockStreamingConfig } from "./block-streaming.js";
import { clampPositiveInteger, resolveEffectiveBlockStreamingConfig } from "./block-streaming.js";
const DEFAULT_ACP_STREAM_COALESCE_IDLE_MS = 350;
const DEFAULT_ACP_STREAM_MAX_CHUNK_CHARS = 1800;
@@ -36,24 +36,6 @@ export type AcpProjectionSettings = {
tagVisibility: Partial<Record<AcpSessionUpdateTag, boolean>>;
};
function clampPositiveInteger(
value: unknown,
fallback: number,
bounds: { min: number; max: number },
): number {
if (typeof value !== "number" || !Number.isFinite(value)) {
return fallback;
}
const rounded = Math.round(value);
if (rounded < bounds.min) {
return bounds.min;
}
if (rounded > bounds.max) {
return bounds.max;
}
return rounded;
}
function clampBoolean(value: unknown, fallback: boolean): boolean {
return typeof value === "boolean" ? value : fallback;
}

View File

@@ -66,8 +66,8 @@ export type BlockStreamingChunking = {
flushOnParagraph?: boolean;
};
function clampPositiveInteger(
value: number | undefined,
export function clampPositiveInteger(
value: unknown,
fallback: number,
bounds: { min: number; max: number },
): number {

View File

@@ -363,30 +363,21 @@ export async function handleAcpSpawnAction(
return stopWithText(parts.join(" "));
}
export async function handleAcpCancelAction(
params: HandleCommandsParams,
restTokens: string[],
): Promise<CommandHandlerResult> {
const acpManager = getAcpSessionManager();
const token = restTokens.join(" ").trim() || undefined;
const target = await resolveAcpTargetSessionKey({
commandParams: params,
token,
});
if (!target.ok) {
return stopWithText(`⚠️ ${target.error}`);
}
const resolved = acpManager.resolveSession({
function resolveAcpSessionForCommandOrStop(params: {
acpManager: ReturnType<typeof getAcpSessionManager>;
cfg: OpenClawConfig;
sessionKey: string;
}): CommandHandlerResult | null {
const resolved = params.acpManager.resolveSession({
cfg: params.cfg,
sessionKey: target.sessionKey,
sessionKey: params.sessionKey,
});
if (resolved.kind === "none") {
return stopWithText(
collectAcpErrorText({
error: new AcpRuntimeError(
"ACP_SESSION_INIT_FAILED",
`Session is not ACP-enabled: ${target.sessionKey}`,
`Session is not ACP-enabled: ${params.sessionKey}`,
),
fallbackCode: "ACP_SESSION_INIT_FAILED",
fallbackMessage: "Session is not ACP-enabled.",
@@ -402,17 +393,73 @@ export async function handleAcpCancelAction(
}),
);
}
return null;
}
return await withAcpCommandErrorBoundary({
run: async () =>
await acpManager.cancelSession({
cfg: params.cfg,
sessionKey: target.sessionKey,
reason: "manual-cancel",
async function resolveAcpTokenTargetSessionKeyOrStop(params: {
commandParams: HandleCommandsParams;
restTokens: string[];
}): Promise<string | CommandHandlerResult> {
const token = params.restTokens.join(" ").trim() || undefined;
const target = await resolveAcpTargetSessionKey({
commandParams: params.commandParams,
token,
});
if (!target.ok) {
return stopWithText(`⚠️ ${target.error}`);
}
return target.sessionKey;
}
async function withResolvedAcpSessionTarget(params: {
commandParams: HandleCommandsParams;
restTokens: string[];
run: (ctx: {
acpManager: ReturnType<typeof getAcpSessionManager>;
sessionKey: string;
}) => Promise<CommandHandlerResult>;
}): Promise<CommandHandlerResult> {
const acpManager = getAcpSessionManager();
const targetSessionKey = await resolveAcpTokenTargetSessionKeyOrStop({
commandParams: params.commandParams,
restTokens: params.restTokens,
});
if (typeof targetSessionKey !== "string") {
return targetSessionKey;
}
const guardFailure = resolveAcpSessionForCommandOrStop({
acpManager,
cfg: params.commandParams.cfg,
sessionKey: targetSessionKey,
});
if (guardFailure) {
return guardFailure;
}
return await params.run({
acpManager,
sessionKey: targetSessionKey,
});
}
export async function handleAcpCancelAction(
params: HandleCommandsParams,
restTokens: string[],
): Promise<CommandHandlerResult> {
return await withResolvedAcpSessionTarget({
commandParams: params,
restTokens,
run: async ({ acpManager, sessionKey }) =>
await withAcpCommandErrorBoundary({
run: async () =>
await acpManager.cancelSession({
cfg: params.cfg,
sessionKey,
reason: "manual-cancel",
}),
fallbackCode: "ACP_TURN_FAILED",
fallbackMessage: "ACP cancel failed before completion.",
onSuccess: () => stopWithText(`✅ Cancel requested for ACP session ${sessionKey}.`),
}),
fallbackCode: "ACP_TURN_FAILED",
fallbackMessage: "ACP cancel failed before completion.",
onSuccess: () => stopWithText(`✅ Cancel requested for ACP session ${target.sessionKey}.`),
});
}
@@ -478,30 +525,13 @@ export async function handleAcpSteerAction(
return stopWithText(`⚠️ ${target.error}`);
}
const resolved = acpManager.resolveSession({
const guardFailure = resolveAcpSessionForCommandOrStop({
acpManager,
cfg: params.cfg,
sessionKey: target.sessionKey,
});
if (resolved.kind === "none") {
return stopWithText(
collectAcpErrorText({
error: new AcpRuntimeError(
"ACP_SESSION_INIT_FAILED",
`Session is not ACP-enabled: ${target.sessionKey}`,
),
fallbackCode: "ACP_SESSION_INIT_FAILED",
fallbackMessage: "Session is not ACP-enabled.",
}),
);
}
if (resolved.kind === "stale") {
return stopWithText(
collectAcpErrorText({
error: resolved.error,
fallbackCode: "ACP_SESSION_INIT_FAILED",
fallbackMessage: resolved.error.message,
}),
);
if (guardFailure) {
return guardFailure;
}
return await withAcpCommandErrorBoundary({
@@ -527,68 +557,38 @@ export async function handleAcpCloseAction(
params: HandleCommandsParams,
restTokens: string[],
): Promise<CommandHandlerResult> {
const acpManager = getAcpSessionManager();
const token = restTokens.join(" ").trim() || undefined;
const target = await resolveAcpTargetSessionKey({
return await withResolvedAcpSessionTarget({
commandParams: params,
token,
restTokens,
run: async ({ acpManager, sessionKey }) => {
let runtimeNotice = "";
try {
const closed = await acpManager.closeSession({
cfg: params.cfg,
sessionKey,
reason: "manual-close",
allowBackendUnavailable: true,
clearMeta: true,
});
runtimeNotice = closed.runtimeNotice ? ` (${closed.runtimeNotice})` : "";
} catch (error) {
return stopWithText(
collectAcpErrorText({
error,
fallbackCode: "ACP_TURN_FAILED",
fallbackMessage: "ACP close failed before completion.",
}),
);
}
const removedBindings = await getSessionBindingService().unbind({
targetSessionKey: sessionKey,
reason: "manual",
});
return stopWithText(
`✅ Closed ACP session ${sessionKey}${runtimeNotice}. Removed ${removedBindings.length} binding${removedBindings.length === 1 ? "" : "s"}.`,
);
},
});
if (!target.ok) {
return stopWithText(`⚠️ ${target.error}`);
}
const resolved = acpManager.resolveSession({
cfg: params.cfg,
sessionKey: target.sessionKey,
});
if (resolved.kind === "none") {
return stopWithText(
collectAcpErrorText({
error: new AcpRuntimeError(
"ACP_SESSION_INIT_FAILED",
`Session is not ACP-enabled: ${target.sessionKey}`,
),
fallbackCode: "ACP_SESSION_INIT_FAILED",
fallbackMessage: "Session is not ACP-enabled.",
}),
);
}
if (resolved.kind === "stale") {
return stopWithText(
collectAcpErrorText({
error: resolved.error,
fallbackCode: "ACP_SESSION_INIT_FAILED",
fallbackMessage: resolved.error.message,
}),
);
}
let runtimeNotice = "";
try {
const closed = await acpManager.closeSession({
cfg: params.cfg,
sessionKey: target.sessionKey,
reason: "manual-close",
allowBackendUnavailable: true,
clearMeta: true,
});
runtimeNotice = closed.runtimeNotice ? ` (${closed.runtimeNotice})` : "";
} catch (error) {
return stopWithText(
collectAcpErrorText({
error,
fallbackCode: "ACP_TURN_FAILED",
fallbackMessage: "ACP close failed before completion.",
}),
);
}
const removedBindings = await getSessionBindingService().unbind({
targetSessionKey: target.sessionKey,
reason: "manual",
});
return stopWithText(
`✅ Closed ACP session ${target.sessionKey}${runtimeNotice}. Removed ${removedBindings.length} binding${removedBindings.length === 1 ? "" : "s"}.`,
);
}

View File

@@ -27,27 +27,43 @@ import {
} from "./shared.js";
import { resolveAcpTargetSessionKey } from "./targets.js";
export async function handleAcpStatusAction(
params: HandleCommandsParams,
restTokens: string[],
): Promise<CommandHandlerResult> {
const parsed = parseOptionalSingleTarget(restTokens, ACP_STATUS_USAGE);
async function resolveOptionalSingleTargetOrStop(params: {
commandParams: HandleCommandsParams;
restTokens: string[];
usage: string;
}): Promise<string | CommandHandlerResult> {
const parsed = parseOptionalSingleTarget(params.restTokens, params.usage);
if (!parsed.ok) {
return stopWithText(`⚠️ ${parsed.error}`);
}
const target = await resolveAcpTargetSessionKey({
commandParams: params,
commandParams: params.commandParams,
token: parsed.sessionToken,
});
if (!target.ok) {
return stopWithText(`⚠️ ${target.error}`);
}
return target.sessionKey;
}
export async function handleAcpStatusAction(
params: HandleCommandsParams,
restTokens: string[],
): Promise<CommandHandlerResult> {
const targetSessionKey = await resolveOptionalSingleTargetOrStop({
commandParams: params,
restTokens,
usage: ACP_STATUS_USAGE,
});
if (typeof targetSessionKey !== "string") {
return targetSessionKey;
}
return await withAcpCommandErrorBoundary({
run: async () =>
await getAcpSessionManager().getSessionStatus({
cfg: params.cfg,
sessionKey: target.sessionKey,
sessionKey: targetSessionKey,
}),
fallbackCode: "ACP_TURN_FAILED",
fallbackMessage: "Could not read ACP session status.",
@@ -323,26 +339,23 @@ export async function handleAcpResetOptionsAction(
params: HandleCommandsParams,
restTokens: string[],
): Promise<CommandHandlerResult> {
const parsed = parseOptionalSingleTarget(restTokens, ACP_RESET_OPTIONS_USAGE);
if (!parsed.ok) {
return stopWithText(`⚠️ ${parsed.error}`);
}
const target = await resolveAcpTargetSessionKey({
const targetSessionKey = await resolveOptionalSingleTargetOrStop({
commandParams: params,
token: parsed.sessionToken,
restTokens,
usage: ACP_RESET_OPTIONS_USAGE,
});
if (!target.ok) {
return stopWithText(`⚠️ ${target.error}`);
if (typeof targetSessionKey !== "string") {
return targetSessionKey;
}
return await withAcpCommandErrorBoundary({
run: async () =>
await getAcpSessionManager().resetSessionRuntimeOptions({
cfg: params.cfg,
sessionKey: target.sessionKey,
sessionKey: targetSessionKey,
}),
fallbackCode: "ACP_TURN_FAILED",
fallbackMessage: "Could not reset ACP runtime options.",
onSuccess: () => stopWithText(`✅ Reset ACP runtime options for ${target.sessionKey}.`),
onSuccess: () => stopWithText(`✅ Reset ACP runtime options for ${targetSessionKey}.`),
});
}