mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 16:41:36 +00:00
feat: ACP thread-bound agents (#23580)
* docs: add ACP thread-bound agents plan doc * docs: expand ACP implementation specification * feat(acp): route ACP sessions through core dispatch and lifecycle cleanup * feat(acp): add /acp commands and Discord spawn gate * ACP: add acpx runtime plugin backend * fix(subagents): defer transient lifecycle errors before announce * Agents: harden ACP sessions_spawn and tighten spawn guidance * Agents: require explicit ACP target for runtime spawns * docs: expand ACP control-plane implementation plan * ACP: harden metadata seeding and spawn guidance * ACP: centralize runtime control-plane manager and fail-closed dispatch * ACP: harden runtime manager and unify spawn helpers * Commands: route ACP sessions through ACP runtime in agent command * ACP: require persisted metadata for runtime spawns * Sessions: preserve ACP metadata when updating entries * Plugins: harden ACP backend registry across loaders * ACPX: make availability probe compatible with adapters * E2E: add manual Discord ACP plain-language smoke script * ACPX: preserve streamed spacing across Discord delivery * Docs: add ACP Discord streaming strategy * ACP: harden Discord stream buffering for thread replies * ACP: reuse shared block reply pipeline for projector * ACP: unify streaming config and adopt coalesceIdleMs * Docs: add temporary ACP production hardening plan * Docs: trim temporary ACP hardening plan goals * Docs: gate ACP thread controls by backend capabilities * ACP: add capability-gated runtime controls and /acp operator commands * Docs: remove temporary ACP hardening plan * ACP: fix spawn target validation and close cache cleanup * ACP: harden runtime dispatch and recovery paths * ACP: split ACP command/runtime internals and centralize policy * ACP: harden runtime lifecycle, validation, and observability * ACP: surface runtime and backend session IDs in thread bindings * docs: add temp plan for binding-service migration * ACP: migrate thread binding flows to SessionBindingService * ACP: address review feedback and preserve prompt wording * ACPX plugin: pin runtime dependency and prefer bundled CLI * Discord: complete binding-service migration cleanup and restore ACP plan * Docs: add standalone ACP agents guide * ACP: route harness intents to thread-bound ACP sessions * ACP: fix spawn thread routing and queue-owner stall * ACP: harden startup reconciliation and command bypass handling * ACP: fix dispatch bypass type narrowing * ACP: align runtime metadata to agentSessionId * ACP: normalize session identifier handling and labels * ACP: mark thread banner session ids provisional until first reply * ACP: stabilize session identity mapping and startup reconciliation * ACP: add resolved session-id notices and cwd in thread intros * Discord: prefix thread meta notices consistently * Discord: unify ACP/thread meta notices with gear prefix * Discord: split thread persona naming from meta formatting * Extensions: bump acpx plugin dependency to 0.1.9 * Agents: gate ACP prompt guidance behind acp.enabled * Docs: remove temp experiment plan docs * Docs: scope streaming plan to holy grail refactor * Docs: refactor ACP agents guide for human-first flow * Docs/Skill: add ACP feature-flag guidance and direct acpx telephone-game flow * Docs/Skill: add OpenCode and Pi to ACP harness lists * Docs/Skill: align ACP harness list with current acpx registry * Dev/Test: move ACP plain-language smoke script and mark as keep * Docs/Skill: reorder ACP harness lists with Pi first * ACP: split control-plane manager into core/types/utils modules * Docs: refresh ACP thread-bound agents plan * ACP: extract dispatch lane and split manager domains * ACP: centralize binding context and remove reverse deps * Infra: unify system message formatting * ACP: centralize error boundaries and session id rendering * ACP: enforce init concurrency cap and strict meta clear * Tests: fix ACP dispatch binding mock typing * Tests: fix Discord thread-binding mock drift and ACP request id * ACP: gate slash bypass and persist cleared overrides * ACPX: await pre-abort cancel before runTurn return * Extension: pin acpx runtime dependency to 0.1.11 * Docs: add pinned acpx install strategy for ACP extension * Extensions/acpx: enforce strict local pinned startup * Extensions/acpx: tighten acp-router install guidance * ACPX: retry runtime test temp-dir cleanup * Extensions/acpx: require proactive ACPX repair for thread spawns * Extensions/acpx: require restart offer after acpx reinstall * extensions/acpx: remove workspace protocol devDependency * extensions/acpx: bump pinned acpx to 0.1.13 * extensions/acpx: sync lockfile after dependency bump * ACPX: make runtime spawn Windows-safe * fix: align doctor-config-flow repair tests with default-account migration (#23580) (thanks @osolmaz)
This commit is contained in:
379
src/auto-reply/reply/dispatch-acp.ts
Normal file
379
src/auto-reply/reply/dispatch-acp.ts
Normal file
@@ -0,0 +1,379 @@
|
||||
import { getAcpSessionManager } from "../../acp/control-plane/manager.js";
|
||||
import { resolveAcpAgentPolicyError, resolveAcpDispatchPolicyError } from "../../acp/policy.js";
|
||||
import { formatAcpRuntimeErrorText } from "../../acp/runtime/error-text.js";
|
||||
import { toAcpRuntimeError } from "../../acp/runtime/errors.js";
|
||||
import { resolveAcpThreadSessionDetailLines } from "../../acp/runtime/session-identifiers.js";
|
||||
import {
|
||||
isSessionIdentityPending,
|
||||
resolveSessionIdentityFromMeta,
|
||||
} from "../../acp/runtime/session-identity.js";
|
||||
import { readAcpSessionEntry } from "../../acp/runtime/session-meta.js";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import type { TtsAutoMode } from "../../config/types.tts.js";
|
||||
import { logVerbose } from "../../globals.js";
|
||||
import { getSessionBindingService } from "../../infra/outbound/session-binding-service.js";
|
||||
import { generateSecureUuid } from "../../infra/secure-random.js";
|
||||
import { prefixSystemMessage } from "../../infra/system-message.js";
|
||||
import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js";
|
||||
import { maybeApplyTtsToPayload, resolveTtsConfig } from "../../tts/tts.js";
|
||||
import {
|
||||
isCommandEnabled,
|
||||
maybeResolveTextAlias,
|
||||
shouldHandleTextCommands,
|
||||
} from "../commands-registry.js";
|
||||
import type { FinalizedMsgContext } from "../templating.js";
|
||||
import type { ReplyPayload } from "../types.js";
|
||||
import { createAcpReplyProjector } from "./acp-projector.js";
|
||||
import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js";
|
||||
import { routeReply } from "./route-reply.js";
|
||||
|
||||
type DispatchProcessedRecorder = (
|
||||
outcome: "completed" | "skipped" | "error",
|
||||
opts?: {
|
||||
reason?: string;
|
||||
error?: string;
|
||||
},
|
||||
) => void;
|
||||
|
||||
function resolveFirstContextText(
|
||||
ctx: FinalizedMsgContext,
|
||||
keys: Array<"BodyForAgent" | "BodyForCommands" | "CommandBody" | "RawBody" | "Body">,
|
||||
): string {
|
||||
for (const key of keys) {
|
||||
const value = ctx[key];
|
||||
if (typeof value === "string") {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
function resolveAcpPromptText(ctx: FinalizedMsgContext): string {
|
||||
return resolveFirstContextText(ctx, [
|
||||
"BodyForAgent",
|
||||
"BodyForCommands",
|
||||
"CommandBody",
|
||||
"RawBody",
|
||||
"Body",
|
||||
]).trim();
|
||||
}
|
||||
|
||||
function resolveCommandCandidateText(ctx: FinalizedMsgContext): string {
|
||||
return resolveFirstContextText(ctx, ["CommandBody", "BodyForCommands", "RawBody", "Body"]).trim();
|
||||
}
|
||||
|
||||
export function shouldBypassAcpDispatchForCommand(
|
||||
ctx: FinalizedMsgContext,
|
||||
cfg: OpenClawConfig,
|
||||
): boolean {
|
||||
const candidate = resolveCommandCandidateText(ctx);
|
||||
if (!candidate) {
|
||||
return false;
|
||||
}
|
||||
const allowTextCommands = shouldHandleTextCommands({
|
||||
cfg,
|
||||
surface: ctx.Surface ?? ctx.Provider ?? "",
|
||||
commandSource: ctx.CommandSource,
|
||||
});
|
||||
if (maybeResolveTextAlias(candidate, cfg) != null) {
|
||||
return allowTextCommands;
|
||||
}
|
||||
|
||||
const normalized = candidate.trim();
|
||||
if (!normalized.startsWith("!")) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!ctx.CommandAuthorized) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!isCommandEnabled(cfg, "bash")) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return allowTextCommands;
|
||||
}
|
||||
|
||||
function resolveAcpRequestId(ctx: FinalizedMsgContext): string {
|
||||
const id = ctx.MessageSidFull ?? ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast;
|
||||
if (typeof id === "string" && id.trim()) {
|
||||
return id.trim();
|
||||
}
|
||||
if (typeof id === "number" || typeof id === "bigint") {
|
||||
return String(id);
|
||||
}
|
||||
return generateSecureUuid();
|
||||
}
|
||||
|
||||
function hasBoundConversationForSession(params: {
|
||||
sessionKey: string;
|
||||
channelRaw: string | undefined;
|
||||
accountIdRaw: string | undefined;
|
||||
}): boolean {
|
||||
const channel = String(params.channelRaw ?? "")
|
||||
.trim()
|
||||
.toLowerCase();
|
||||
if (!channel) {
|
||||
return false;
|
||||
}
|
||||
const accountId = String(params.accountIdRaw ?? "")
|
||||
.trim()
|
||||
.toLowerCase();
|
||||
const normalizedAccountId = accountId || "default";
|
||||
const bindingService = getSessionBindingService();
|
||||
const bindings = bindingService.listBySession(params.sessionKey);
|
||||
return bindings.some((binding) => {
|
||||
const bindingChannel = String(binding.conversation.channel ?? "")
|
||||
.trim()
|
||||
.toLowerCase();
|
||||
const bindingAccountId = String(binding.conversation.accountId ?? "")
|
||||
.trim()
|
||||
.toLowerCase();
|
||||
const conversationId = String(binding.conversation.conversationId ?? "").trim();
|
||||
return (
|
||||
bindingChannel === channel &&
|
||||
(bindingAccountId || "default") === normalizedAccountId &&
|
||||
conversationId.length > 0
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
export type AcpDispatchAttemptResult = {
|
||||
queuedFinal: boolean;
|
||||
counts: Record<ReplyDispatchKind, number>;
|
||||
};
|
||||
|
||||
export async function tryDispatchAcpReply(params: {
|
||||
ctx: FinalizedMsgContext;
|
||||
cfg: OpenClawConfig;
|
||||
dispatcher: ReplyDispatcher;
|
||||
sessionKey?: string;
|
||||
inboundAudio: boolean;
|
||||
sessionTtsAuto?: TtsAutoMode;
|
||||
ttsChannel?: string;
|
||||
shouldRouteToOriginating: boolean;
|
||||
originatingChannel?: string;
|
||||
originatingTo?: string;
|
||||
shouldSendToolSummaries: boolean;
|
||||
bypassForCommand: boolean;
|
||||
recordProcessed: DispatchProcessedRecorder;
|
||||
markIdle: (reason: string) => void;
|
||||
}): Promise<AcpDispatchAttemptResult | null> {
|
||||
const sessionKey = params.sessionKey?.trim();
|
||||
if (!sessionKey || params.bypassForCommand) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const acpManager = getAcpSessionManager();
|
||||
const acpResolution = acpManager.resolveSession({
|
||||
cfg: params.cfg,
|
||||
sessionKey,
|
||||
});
|
||||
if (acpResolution.kind === "none") {
|
||||
return null;
|
||||
}
|
||||
|
||||
const routedCounts: Record<ReplyDispatchKind, number> = {
|
||||
tool: 0,
|
||||
block: 0,
|
||||
final: 0,
|
||||
};
|
||||
let queuedFinal = false;
|
||||
let acpAccumulatedBlockText = "";
|
||||
let acpBlockCount = 0;
|
||||
const deliverAcpPayload = async (
|
||||
kind: ReplyDispatchKind,
|
||||
payload: ReplyPayload,
|
||||
): Promise<boolean> => {
|
||||
if (kind === "block" && payload.text?.trim()) {
|
||||
if (acpAccumulatedBlockText.length > 0) {
|
||||
acpAccumulatedBlockText += "\n";
|
||||
}
|
||||
acpAccumulatedBlockText += payload.text;
|
||||
acpBlockCount += 1;
|
||||
}
|
||||
|
||||
const ttsPayload = await maybeApplyTtsToPayload({
|
||||
payload,
|
||||
cfg: params.cfg,
|
||||
channel: params.ttsChannel,
|
||||
kind,
|
||||
inboundAudio: params.inboundAudio,
|
||||
ttsAuto: params.sessionTtsAuto,
|
||||
});
|
||||
|
||||
if (params.shouldRouteToOriginating && params.originatingChannel && params.originatingTo) {
|
||||
const result = await routeReply({
|
||||
payload: ttsPayload,
|
||||
channel: params.originatingChannel,
|
||||
to: params.originatingTo,
|
||||
sessionKey: params.ctx.SessionKey,
|
||||
accountId: params.ctx.AccountId,
|
||||
threadId: params.ctx.MessageThreadId,
|
||||
cfg: params.cfg,
|
||||
});
|
||||
if (!result.ok) {
|
||||
logVerbose(
|
||||
`dispatch-acp: route-reply (acp/${kind}) failed: ${result.error ?? "unknown error"}`,
|
||||
);
|
||||
return false;
|
||||
}
|
||||
routedCounts[kind] += 1;
|
||||
return true;
|
||||
}
|
||||
if (kind === "tool") {
|
||||
return params.dispatcher.sendToolResult(ttsPayload);
|
||||
}
|
||||
if (kind === "block") {
|
||||
return params.dispatcher.sendBlockReply(ttsPayload);
|
||||
}
|
||||
return params.dispatcher.sendFinalReply(ttsPayload);
|
||||
};
|
||||
|
||||
const promptText = resolveAcpPromptText(params.ctx);
|
||||
if (!promptText) {
|
||||
const counts = params.dispatcher.getQueuedCounts();
|
||||
counts.tool += routedCounts.tool;
|
||||
counts.block += routedCounts.block;
|
||||
counts.final += routedCounts.final;
|
||||
params.recordProcessed("completed", { reason: "acp_empty_prompt" });
|
||||
params.markIdle("message_completed");
|
||||
return { queuedFinal: false, counts };
|
||||
}
|
||||
|
||||
const identityPendingBeforeTurn = isSessionIdentityPending(
|
||||
resolveSessionIdentityFromMeta(acpResolution.kind === "ready" ? acpResolution.meta : undefined),
|
||||
);
|
||||
const shouldEmitResolvedIdentityNotice =
|
||||
identityPendingBeforeTurn &&
|
||||
(Boolean(params.ctx.MessageThreadId != null && String(params.ctx.MessageThreadId).trim()) ||
|
||||
hasBoundConversationForSession({
|
||||
sessionKey,
|
||||
channelRaw: params.ctx.OriginatingChannel ?? params.ctx.Surface ?? params.ctx.Provider,
|
||||
accountIdRaw: params.ctx.AccountId,
|
||||
}));
|
||||
|
||||
const resolvedAcpAgent =
|
||||
acpResolution.kind === "ready"
|
||||
? (
|
||||
acpResolution.meta.agent?.trim() ||
|
||||
params.cfg.acp?.defaultAgent?.trim() ||
|
||||
resolveAgentIdFromSessionKey(sessionKey)
|
||||
).trim()
|
||||
: resolveAgentIdFromSessionKey(sessionKey);
|
||||
const projector = createAcpReplyProjector({
|
||||
cfg: params.cfg,
|
||||
shouldSendToolSummaries: params.shouldSendToolSummaries,
|
||||
deliver: deliverAcpPayload,
|
||||
provider: params.ctx.Surface ?? params.ctx.Provider,
|
||||
accountId: params.ctx.AccountId,
|
||||
});
|
||||
|
||||
const acpDispatchStartedAt = Date.now();
|
||||
try {
|
||||
const dispatchPolicyError = resolveAcpDispatchPolicyError(params.cfg);
|
||||
if (dispatchPolicyError) {
|
||||
throw dispatchPolicyError;
|
||||
}
|
||||
if (acpResolution.kind === "stale") {
|
||||
throw acpResolution.error;
|
||||
}
|
||||
const agentPolicyError = resolveAcpAgentPolicyError(params.cfg, resolvedAcpAgent);
|
||||
if (agentPolicyError) {
|
||||
throw agentPolicyError;
|
||||
}
|
||||
|
||||
await acpManager.runTurn({
|
||||
cfg: params.cfg,
|
||||
sessionKey,
|
||||
text: promptText,
|
||||
mode: "prompt",
|
||||
requestId: resolveAcpRequestId(params.ctx),
|
||||
onEvent: async (event) => await projector.onEvent(event),
|
||||
});
|
||||
|
||||
await projector.flush(true);
|
||||
const ttsMode = resolveTtsConfig(params.cfg).mode ?? "final";
|
||||
if (ttsMode === "final" && acpBlockCount > 0 && acpAccumulatedBlockText.trim()) {
|
||||
try {
|
||||
const ttsSyntheticReply = await maybeApplyTtsToPayload({
|
||||
payload: { text: acpAccumulatedBlockText },
|
||||
cfg: params.cfg,
|
||||
channel: params.ttsChannel,
|
||||
kind: "final",
|
||||
inboundAudio: params.inboundAudio,
|
||||
ttsAuto: params.sessionTtsAuto,
|
||||
});
|
||||
if (ttsSyntheticReply.mediaUrl) {
|
||||
const delivered = await deliverAcpPayload("final", {
|
||||
mediaUrl: ttsSyntheticReply.mediaUrl,
|
||||
audioAsVoice: ttsSyntheticReply.audioAsVoice,
|
||||
});
|
||||
queuedFinal = queuedFinal || delivered;
|
||||
}
|
||||
} catch (err) {
|
||||
logVerbose(
|
||||
`dispatch-acp: accumulated ACP block TTS failed: ${err instanceof Error ? err.message : String(err)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if (shouldEmitResolvedIdentityNotice) {
|
||||
const currentMeta = readAcpSessionEntry({
|
||||
cfg: params.cfg,
|
||||
sessionKey,
|
||||
})?.acp;
|
||||
const identityAfterTurn = resolveSessionIdentityFromMeta(currentMeta);
|
||||
if (!isSessionIdentityPending(identityAfterTurn)) {
|
||||
const resolvedDetails = resolveAcpThreadSessionDetailLines({
|
||||
sessionKey,
|
||||
meta: currentMeta,
|
||||
});
|
||||
if (resolvedDetails.length > 0) {
|
||||
const delivered = await deliverAcpPayload("final", {
|
||||
text: prefixSystemMessage(["Session ids resolved.", ...resolvedDetails].join("\n")),
|
||||
});
|
||||
queuedFinal = queuedFinal || delivered;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const counts = params.dispatcher.getQueuedCounts();
|
||||
counts.tool += routedCounts.tool;
|
||||
counts.block += routedCounts.block;
|
||||
counts.final += routedCounts.final;
|
||||
const acpStats = acpManager.getObservabilitySnapshot(params.cfg);
|
||||
logVerbose(
|
||||
`acp-dispatch: session=${sessionKey} outcome=ok latencyMs=${Date.now() - acpDispatchStartedAt} queueDepth=${acpStats.turns.queueDepth} activeRuntimes=${acpStats.runtimeCache.activeSessions}`,
|
||||
);
|
||||
params.recordProcessed("completed", { reason: "acp_dispatch" });
|
||||
params.markIdle("message_completed");
|
||||
return { queuedFinal, counts };
|
||||
} catch (err) {
|
||||
await projector.flush(true);
|
||||
const acpError = toAcpRuntimeError({
|
||||
error: err,
|
||||
fallbackCode: "ACP_TURN_FAILED",
|
||||
fallbackMessage: "ACP turn failed before completion.",
|
||||
});
|
||||
const delivered = await deliverAcpPayload("final", {
|
||||
text: formatAcpRuntimeErrorText(acpError),
|
||||
isError: true,
|
||||
});
|
||||
queuedFinal = queuedFinal || delivered;
|
||||
const counts = params.dispatcher.getQueuedCounts();
|
||||
counts.tool += routedCounts.tool;
|
||||
counts.block += routedCounts.block;
|
||||
counts.final += routedCounts.final;
|
||||
const acpStats = acpManager.getObservabilitySnapshot(params.cfg);
|
||||
logVerbose(
|
||||
`acp-dispatch: session=${sessionKey} outcome=error code=${acpError.code} latencyMs=${Date.now() - acpDispatchStartedAt} queueDepth=${acpStats.turns.queueDepth} activeRuntimes=${acpStats.runtimeCache.activeSessions}`,
|
||||
);
|
||||
params.recordProcessed("completed", {
|
||||
reason: `acp_error:${acpError.code.toLowerCase()}`,
|
||||
});
|
||||
params.markIdle("message_completed");
|
||||
return { queuedFinal, counts };
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user