acp: harden follow-up reliability and attachments (#41464)

Merged via squash.

Prepared head SHA: 7d167dff54
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Reviewed-by: @mbelinky
This commit is contained in:
Mariano
2026-03-09 23:03:50 +01:00
committed by GitHub
parent 0669b0ddc2
commit 3c3474360b
7 changed files with 230 additions and 44 deletions

View File

@@ -33,6 +33,7 @@ Docs: https://docs.openclaw.ai
- ACP/runtime attachments: forward normalized inbound image attachments into ACP runtime turns so ACPX sessions can preserve image prompt content on the runtime path. (#41427) Thanks @mbelinky. - ACP/runtime attachments: forward normalized inbound image attachments into ACP runtime turns so ACPX sessions can preserve image prompt content on the runtime path. (#41427) Thanks @mbelinky.
- ACP/regressions: add gateway RPC coverage for ACP lineage patching, ACPX runtime coverage for image prompt serialization, and an operator smoke-test procedure for live ACP spawn verification. (#41456) Thanks @mbelinky. - ACP/regressions: add gateway RPC coverage for ACP lineage patching, ACPX runtime coverage for image prompt serialization, and an operator smoke-test procedure for live ACP spawn verification. (#41456) Thanks @mbelinky.
- Agents/billing recovery: probe single-provider billing cooldowns on the existing throttle so topping up credits can recover without a manual gateway restart. (#41422) thanks @altaywtf. - Agents/billing recovery: probe single-provider billing cooldowns on the existing throttle so topping up credits can recover without a manual gateway restart. (#41422) thanks @altaywtf.
- ACP/follow-up hardening: make session restore and prompt completion degrade gracefully on transcript/update failures, enforce bounded tool-location traversal, and skip non-image ACPX turns the runtime cannot serialize. (#41464) Thanks @mbelinky.
## 2026.3.8 ## 2026.3.8

View File

@@ -0,0 +1,18 @@
import { describe, expect, it } from "vitest";
import { extractToolCallLocations } from "./event-mapper.js";
describe("extractToolCallLocations", () => {
it("enforces the global node visit cap across nested structures", () => {
const nested = Array.from({ length: 20 }, (_, outer) =>
Array.from({ length: 20 }, (_, inner) =>
inner === 19 ? { path: `/tmp/file-${outer}.txt` } : { note: `${outer}-${inner}` },
),
);
const locations = extractToolCallLocations(nested);
expect(locations).toBeDefined();
expect(locations?.length).toBeLessThan(20);
expect(locations).not.toContainEqual({ path: "/tmp/file-19.txt" });
});
});

View File

@@ -186,9 +186,10 @@ function collectLocationsFromTextMarkers(
function collectToolLocations( function collectToolLocations(
value: unknown, value: unknown,
locations: Map<string, ToolCallLocation>, locations: Map<string, ToolCallLocation>,
state: { visited: number; depth: number }, state: { visited: number },
depth: number,
): void { ): void {
if (state.visited >= TOOL_LOCATION_MAX_NODES || state.depth > TOOL_LOCATION_MAX_DEPTH) { if (state.visited >= TOOL_LOCATION_MAX_NODES || depth > TOOL_LOCATION_MAX_DEPTH) {
return; return;
} }
state.visited += 1; state.visited += 1;
@@ -202,8 +203,7 @@ function collectToolLocations(
} }
if (Array.isArray(value)) { if (Array.isArray(value)) {
for (const item of value) { for (const item of value) {
collectToolLocations(item, locations, { visited: state.visited, depth: state.depth + 1 }); collectToolLocations(item, locations, state, depth + 1);
state.visited += 1;
if (state.visited >= TOOL_LOCATION_MAX_NODES) { if (state.visited >= TOOL_LOCATION_MAX_NODES) {
return; return;
} }
@@ -230,9 +230,11 @@ function collectToolLocations(
} }
} }
for (const nested of Object.values(record)) { for (const [key, nested] of Object.entries(record)) {
collectToolLocations(nested, locations, { visited: state.visited, depth: state.depth + 1 }); if (key === "content") {
state.visited += 1; continue;
}
collectToolLocations(nested, locations, state, depth + 1);
if (state.visited >= TOOL_LOCATION_MAX_NODES) { if (state.visited >= TOOL_LOCATION_MAX_NODES) {
return; return;
} }
@@ -402,7 +404,7 @@ export function extractToolCallContent(value: unknown): ToolCallContent[] | unde
export function extractToolCallLocations(...values: unknown[]): ToolCallLocation[] | undefined { export function extractToolCallLocations(...values: unknown[]): ToolCallLocation[] | undefined {
const locations = new Map<string, ToolCallLocation>(); const locations = new Map<string, ToolCallLocation>();
for (const value of values) { for (const value of values) {
collectToolLocations(value, locations, { visited: 0, depth: 0 }); collectToolLocations(value, locations, { visited: 0 }, 0);
} }
return locations.size > 0 ? [...locations.values()] : undefined; return locations.size > 0 ? [...locations.values()] : undefined;
} }

View File

@@ -365,6 +365,63 @@ describe("acp session UX bridge behavior", () => {
sessionStore.clearAllSessionsForTest(); sessionStore.clearAllSessionsForTest();
}); });
it("falls back to an empty transcript when sessions.get fails during loadSession", async () => {
const sessionStore = createInMemorySessionStore();
const connection = createAcpConnection();
const sessionUpdate = connection.__sessionUpdateMock;
const request = vi.fn(async (method: string) => {
if (method === "sessions.list") {
return {
ts: Date.now(),
path: "/tmp/sessions.json",
count: 1,
defaults: {
modelProvider: null,
model: null,
contextTokens: null,
},
sessions: [
{
key: "agent:main:recover",
label: "recover",
displayName: "Recover session",
kind: "direct",
updatedAt: 1_710_000_000_000,
thinkingLevel: "adaptive",
modelProvider: "openai",
model: "gpt-5.4",
},
],
};
}
if (method === "sessions.get") {
throw new Error("sessions.get unavailable");
}
return { ok: true };
}) as GatewayClient["request"];
const agent = new AcpGatewayAgent(connection, createAcpGateway(request), {
sessionStore,
});
const result = await agent.loadSession(createLoadSessionRequest("agent:main:recover"));
expect(result.modes?.currentModeId).toBe("adaptive");
expect(sessionUpdate).toHaveBeenCalledWith({
sessionId: "agent:main:recover",
update: expect.objectContaining({
sessionUpdate: "available_commands_update",
}),
});
expect(sessionUpdate).not.toHaveBeenCalledWith({
sessionId: "agent:main:recover",
update: expect.objectContaining({
sessionUpdate: "user_message_chunk",
}),
});
sessionStore.clearAllSessionsForTest();
});
}); });
describe("acp setSessionMode bridge behavior", () => { describe("acp setSessionMode bridge behavior", () => {
@@ -771,6 +828,61 @@ describe("acp session metadata and usage updates", () => {
sessionStore.clearAllSessionsForTest(); sessionStore.clearAllSessionsForTest();
}); });
it("still resolves prompts when snapshot updates fail after completion", async () => {
const sessionStore = createInMemorySessionStore();
const connection = createAcpConnection();
const sessionUpdate = connection.__sessionUpdateMock;
const request = vi.fn(async (method: string) => {
if (method === "sessions.list") {
return {
ts: Date.now(),
path: "/tmp/sessions.json",
count: 1,
defaults: {
modelProvider: null,
model: null,
contextTokens: null,
},
sessions: [
{
key: "usage-session",
displayName: "Usage session",
kind: "direct",
updatedAt: 1_710_000_123_000,
thinkingLevel: "adaptive",
modelProvider: "openai",
model: "gpt-5.4",
totalTokens: 1200,
totalTokensFresh: true,
contextTokens: 4000,
},
],
};
}
if (method === "chat.send") {
return new Promise(() => {});
}
return { ok: true };
}) as GatewayClient["request"];
const agent = new AcpGatewayAgent(connection, createAcpGateway(request), {
sessionStore,
});
await agent.loadSession(createLoadSessionRequest("usage-session"));
sessionUpdate.mockClear();
sessionUpdate.mockRejectedValueOnce(new Error("session update transport failed"));
const promptPromise = agent.prompt(createPromptRequest("usage-session", "hello"));
await agent.handleGatewayEvent(createChatFinalEvent("usage-session"));
await expect(promptPromise).resolves.toEqual({ stopReason: "end_turn" });
const session = sessionStore.getSession("usage-session");
expect(session?.activeRunId).toBeNull();
expect(session?.abortController).toBeNull();
sessionStore.clearAllSessionsForTest();
});
}); });
describe("acp prompt size hardening", () => { describe("acp prompt size hardening", () => {

View File

@@ -458,7 +458,10 @@ export class AcpGatewayAgent implements Agent {
this.log(`loadSession: ${session.sessionId} -> ${session.sessionKey}`); this.log(`loadSession: ${session.sessionId} -> ${session.sessionKey}`);
const [sessionSnapshot, transcript] = await Promise.all([ const [sessionSnapshot, transcript] = await Promise.all([
this.getSessionSnapshot(session.sessionKey), this.getSessionSnapshot(session.sessionKey),
this.getSessionTranscript(session.sessionKey), this.getSessionTranscript(session.sessionKey).catch((err) => {
this.log(`session transcript fallback for ${session.sessionKey}: ${String(err)}`);
return [];
}),
]); ]);
await this.replaySessionTranscript(session.sessionId, transcript); await this.replaySessionTranscript(session.sessionId, transcript);
await this.sendSessionSnapshotUpdate(session.sessionId, sessionSnapshot, { await this.sendSessionSnapshotUpdate(session.sessionId, sessionSnapshot, {
@@ -630,7 +633,6 @@ export class AcpGatewayAgent implements Agent {
if (!session) { if (!session) {
return; return;
} }
this.sessionStore.cancelActiveRun(params.sessionId); this.sessionStore.cancelActiveRun(params.sessionId);
try { try {
await this.gateway.request("chat.abort", { sessionKey: session.sessionKey }); await this.gateway.request("chat.abort", { sessionKey: session.sessionKey });
@@ -841,9 +843,13 @@ export class AcpGatewayAgent implements Agent {
this.pendingPrompts.delete(sessionId); this.pendingPrompts.delete(sessionId);
this.sessionStore.clearActiveRun(sessionId); this.sessionStore.clearActiveRun(sessionId);
const sessionSnapshot = await this.getSessionSnapshot(pending.sessionKey); const sessionSnapshot = await this.getSessionSnapshot(pending.sessionKey);
await this.sendSessionSnapshotUpdate(sessionId, sessionSnapshot, { try {
includeControls: false, await this.sendSessionSnapshotUpdate(sessionId, sessionSnapshot, {
}); includeControls: false,
});
} catch (err) {
this.log(`session snapshot update failed for ${sessionId}: ${String(err)}`);
}
pending.resolve({ stopReason }); pending.resolve({ stopReason });
} }

View File

@@ -362,28 +362,58 @@ describe("tryDispatchAcpReply", () => {
setReadyAcpResolution(); setReadyAcpResolution();
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "dispatch-acp-")); const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "dispatch-acp-"));
const imagePath = path.join(tempDir, "inbound.png"); const imagePath = path.join(tempDir, "inbound.png");
await fs.writeFile(imagePath, "image-bytes"); try {
managerMocks.runTurn.mockResolvedValue(undefined); await fs.writeFile(imagePath, "image-bytes");
managerMocks.runTurn.mockResolvedValue(undefined);
await runDispatch({ await runDispatch({
bodyForAgent: " ", bodyForAgent: " ",
ctxOverrides: { ctxOverrides: {
MediaPath: imagePath, MediaPath: imagePath,
MediaType: "image/png", MediaType: "image/png",
}, },
}); });
expect(managerMocks.runTurn).toHaveBeenCalledWith( expect(managerMocks.runTurn).toHaveBeenCalledWith(
expect.objectContaining({ expect.objectContaining({
text: "", text: "",
attachments: [ attachments: [
{ {
mediaType: "image/png", mediaType: "image/png",
data: Buffer.from("image-bytes").toString("base64"), data: Buffer.from("image-bytes").toString("base64"),
}, },
], ],
}), }),
); );
} finally {
await fs.rm(tempDir, { recursive: true, force: true });
}
});
it("skips ACP turns for non-image attachments when there is no text prompt", async () => {
setReadyAcpResolution();
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "dispatch-acp-"));
const docPath = path.join(tempDir, "inbound.pdf");
const { dispatcher } = createDispatcher();
const onReplyStart = vi.fn();
try {
await fs.writeFile(docPath, "pdf-bytes");
await runDispatch({
bodyForAgent: " ",
dispatcher,
onReplyStart,
ctxOverrides: {
MediaPath: docPath,
MediaType: "application/pdf",
},
});
expect(managerMocks.runTurn).not.toHaveBeenCalled();
expect(onReplyStart).not.toHaveBeenCalled();
} finally {
await fs.rm(tempDir, { recursive: true, force: true });
}
}); });
it("surfaces ACP policy errors as final error replies", async () => { it("surfaces ACP policy errors as final error replies", async () => {

View File

@@ -16,6 +16,7 @@ import { logVerbose } from "../../globals.js";
import { getSessionBindingService } from "../../infra/outbound/session-binding-service.js"; import { getSessionBindingService } from "../../infra/outbound/session-binding-service.js";
import { generateSecureUuid } from "../../infra/secure-random.js"; import { generateSecureUuid } from "../../infra/secure-random.js";
import { prefixSystemMessage } from "../../infra/system-message.js"; import { prefixSystemMessage } from "../../infra/system-message.js";
import { applyMediaUnderstanding } from "../../media-understanding/apply.js";
import { import {
normalizeAttachmentPath, normalizeAttachmentPath,
normalizeAttachments, normalizeAttachments,
@@ -69,6 +70,10 @@ async function resolveAcpAttachments(ctx: FinalizedMsgContext): Promise<AcpTurnA
const mediaAttachments = normalizeAttachments(ctx); const mediaAttachments = normalizeAttachments(ctx);
const results: AcpTurnAttachment[] = []; const results: AcpTurnAttachment[] = [];
for (const attachment of mediaAttachments) { for (const attachment of mediaAttachments) {
const mediaType = attachment.mime ?? "application/octet-stream";
if (!mediaType.startsWith("image/")) {
continue;
}
const filePath = normalizeAttachmentPath(attachment.path); const filePath = normalizeAttachmentPath(attachment.path);
if (!filePath) { if (!filePath) {
continue; continue;
@@ -83,7 +88,7 @@ async function resolveAcpAttachments(ctx: FinalizedMsgContext): Promise<AcpTurnA
} }
const buf = await fs.readFile(filePath); const buf = await fs.readFile(filePath);
results.push({ results.push({
mediaType: attachment.mime ?? "application/octet-stream", mediaType,
data: buf.toString("base64"), data: buf.toString("base64"),
}); });
} catch { } catch {
@@ -224,16 +229,6 @@ export async function tryDispatchAcpReply(params: {
onReplyStart: params.onReplyStart, onReplyStart: params.onReplyStart,
}); });
const promptText = resolveAcpPromptText(params.ctx);
const attachments = await resolveAcpAttachments(params.ctx);
if (!promptText && attachments.length === 0) {
const counts = params.dispatcher.getQueuedCounts();
delivery.applyRoutedCounts(counts);
params.recordProcessed("completed", { reason: "acp_empty_prompt" });
params.markIdle("message_completed");
return { queuedFinal: false, counts };
}
const identityPendingBeforeTurn = isSessionIdentityPending( const identityPendingBeforeTurn = isSessionIdentityPending(
resolveSessionIdentityFromMeta(acpResolution.kind === "ready" ? acpResolution.meta : undefined), resolveSessionIdentityFromMeta(acpResolution.kind === "ready" ? acpResolution.meta : undefined),
); );
@@ -275,6 +270,28 @@ export async function tryDispatchAcpReply(params: {
if (agentPolicyError) { if (agentPolicyError) {
throw agentPolicyError; throw agentPolicyError;
} }
if (!params.ctx.MediaUnderstanding?.length) {
try {
await applyMediaUnderstanding({
ctx: params.ctx,
cfg: params.cfg,
});
} catch (err) {
logVerbose(
`dispatch-acp: media understanding failed, proceeding with raw content: ${err instanceof Error ? err.message : String(err)}`,
);
}
}
const promptText = resolveAcpPromptText(params.ctx);
const attachments = await resolveAcpAttachments(params.ctx);
if (!promptText && attachments.length === 0) {
const counts = params.dispatcher.getQueuedCounts();
delivery.applyRoutedCounts(counts);
params.recordProcessed("completed", { reason: "acp_empty_prompt" });
params.markIdle("message_completed");
return { queuedFinal: false, counts };
}
try { try {
await delivery.startReplyLifecycle(); await delivery.startReplyLifecycle();