fix(acp): persist spawned child session history (#40137)

Merged via squash.

Prepared head SHA: 62de5d5669
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Reviewed-by: @mbelinky
This commit is contained in:
Mariano
2026-03-08 19:37:00 +01:00
committed by GitHub
parent 72ebaf97c3
commit 404b1527e6
7 changed files with 436 additions and 30 deletions

View File

@@ -35,6 +35,9 @@ const hoisted = vi.hoisted(() => {
const initializeSessionMock = vi.fn();
const startAcpSpawnParentStreamRelayMock = vi.fn();
const resolveAcpSpawnStreamLogPathMock = vi.fn();
const loadSessionStoreMock = vi.fn();
const resolveStorePathMock = vi.fn();
const resolveSessionTranscriptFileMock = vi.fn();
const state = {
cfg: createDefaultSpawnConfig(),
};
@@ -49,6 +52,9 @@ const hoisted = vi.hoisted(() => {
initializeSessionMock,
startAcpSpawnParentStreamRelayMock,
resolveAcpSpawnStreamLogPathMock,
loadSessionStoreMock,
resolveStorePathMock,
resolveSessionTranscriptFileMock,
state,
};
});
@@ -86,6 +92,24 @@ vi.mock("../gateway/call.js", () => ({
callGateway: (opts: unknown) => hoisted.callGatewayMock(opts),
}));
vi.mock("../config/sessions.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../config/sessions.js")>();
return {
...actual,
loadSessionStore: (storePath: string) => hoisted.loadSessionStoreMock(storePath),
resolveStorePath: (store: unknown, opts: unknown) => hoisted.resolveStorePathMock(store, opts),
};
});
vi.mock("../config/sessions/transcript.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../config/sessions/transcript.js")>();
return {
...actual,
resolveSessionTranscriptFile: (params: unknown) =>
hoisted.resolveSessionTranscriptFileMock(params),
};
});
vi.mock("../acp/control-plane/manager.js", () => {
return {
getAcpSessionManager: () => ({
@@ -263,6 +287,34 @@ describe("spawnAcpDirect", () => {
hoisted.resolveAcpSpawnStreamLogPathMock
.mockReset()
.mockReturnValue("/tmp/sess-main.acp-stream.jsonl");
hoisted.resolveStorePathMock.mockReset().mockReturnValue("/tmp/codex-sessions.json");
hoisted.loadSessionStoreMock.mockReset().mockImplementation(() => {
const store: Record<string, { sessionId: string; updatedAt: number }> = {};
return new Proxy(store, {
get(_target, prop) {
if (typeof prop === "string" && prop.startsWith("agent:codex:acp:")) {
return { sessionId: "sess-123", updatedAt: Date.now() };
}
return undefined;
},
});
});
hoisted.resolveSessionTranscriptFileMock
.mockReset()
.mockImplementation(async (params: unknown) => {
const typed = params as { threadId?: string };
const sessionFile = typed.threadId
? `/tmp/agents/codex/sessions/sess-123-topic-${typed.threadId}.jsonl`
: "/tmp/agents/codex/sessions/sess-123.jsonl";
return {
sessionFile,
sessionEntry: {
sessionId: "sess-123",
updatedAt: Date.now(),
sessionFile,
},
};
});
});
it("spawns ACP session, binds a new thread, and dispatches initial task", async () => {
@@ -286,6 +338,13 @@ describe("spawnAcpDirect", () => {
expect(result.childSessionKey).toMatch(/^agent:codex:acp:/);
expect(result.runId).toBe("run-1");
expect(result.mode).toBe("session");
const patchCalls = hoisted.callGatewayMock.mock.calls
.map((call: unknown[]) => call[0] as { method?: string; params?: Record<string, unknown> })
.filter((request) => request.method === "sessions.patch");
expect(patchCalls[0]?.params).toMatchObject({
key: result.childSessionKey,
spawnedBy: "agent:main:main",
});
expect(hoisted.sessionBindingBindMock).toHaveBeenCalledWith(
expect.objectContaining({
targetKind: "session",
@@ -308,6 +367,12 @@ describe("spawnAcpDirect", () => {
mode: "persistent",
}),
);
const transcriptCalls = hoisted.resolveSessionTranscriptFileMock.mock.calls.map(
(call: unknown[]) => call[0] as { threadId?: string },
);
expect(transcriptCalls).toHaveLength(2);
expect(transcriptCalls[0]?.threadId).toBeUndefined();
expect(transcriptCalls[1]?.threadId).toBe("child-thread");
});
it("does not inline delivery for fresh oneshot ACP runs", async () => {
@@ -328,6 +393,13 @@ describe("spawnAcpDirect", () => {
expect(result.status).toBe("accepted");
expect(result.mode).toBe("run");
expect(hoisted.resolveSessionTranscriptFileMock).toHaveBeenCalledWith(
expect.objectContaining({
sessionId: "sess-123",
storePath: "/tmp/codex-sessions.json",
agentId: "codex",
}),
);
const agentCall = hoisted.callGatewayMock.mock.calls
.map((call: unknown[]) => call[0] as { method?: string; params?: Record<string, unknown> })
.find((request) => request.method === "agent");
@@ -337,6 +409,32 @@ describe("spawnAcpDirect", () => {
expect(agentCall?.params?.threadId).toBeUndefined();
});
it("keeps ACP spawn running when session-file persistence fails", async () => {
hoisted.resolveSessionTranscriptFileMock.mockRejectedValueOnce(new Error("disk full"));
const result = await spawnAcpDirect(
{
task: "Investigate flaky tests",
agentId: "codex",
mode: "run",
},
{
agentSessionKey: "agent:main:main",
agentChannel: "telegram",
agentAccountId: "default",
agentTo: "telegram:6098642967",
agentThreadId: "1",
},
);
expect(result.status).toBe("accepted");
expect(result.childSessionKey).toMatch(/^agent:codex:acp:/);
const agentCall = hoisted.callGatewayMock.mock.calls
.map((call: unknown[]) => call[0] as { method?: string; params?: Record<string, unknown> })
.find((request) => request.method === "agent");
expect(agentCall?.params?.sessionKey).toBe(result.childSessionKey);
});
it("includes cwd in ACP thread intro banner when provided at spawn time", async () => {
const result = await spawnAcpDirect(
{

View File

@@ -23,6 +23,8 @@ import {
} from "../channels/thread-bindings-policy.js";
import { loadConfig } from "../config/config.js";
import type { OpenClawConfig } from "../config/config.js";
import { loadSessionStore, resolveStorePath, type SessionEntry } from "../config/sessions.js";
import { resolveSessionTranscriptFile } from "../config/sessions/transcript.js";
import { callGateway } from "../gateway/call.js";
import { resolveConversationIdFromTargets } from "../infra/outbound/conversation-id.js";
import {
@@ -30,6 +32,7 @@ import {
isSessionBindingError,
type SessionBindingRecord,
} from "../infra/outbound/session-binding-service.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { normalizeAgentId } from "../routing/session-key.js";
import { normalizeDeliveryContext } from "../utils/delivery-context.js";
import {
@@ -38,6 +41,9 @@ import {
startAcpSpawnParentStreamRelay,
} from "./acp-spawn-parent-stream.js";
import { resolveSandboxRuntimeStatus } from "./sandbox/runtime-status.js";
import { resolveInternalSessionKey, resolveMainSessionAlias } from "./tools/sessions-helpers.js";
const log = createSubsystemLogger("agents/acp-spawn");
export const ACP_SPAWN_MODES = ["run", "session"] as const;
export type SpawnAcpMode = (typeof ACP_SPAWN_MODES)[number];
@@ -162,6 +168,50 @@ function summarizeError(err: unknown): string {
return "error";
}
function resolveRequesterInternalSessionKey(params: {
cfg: OpenClawConfig;
requesterSessionKey?: string;
}): string {
const { mainKey, alias } = resolveMainSessionAlias(params.cfg);
const requesterSessionKey = params.requesterSessionKey?.trim();
return requesterSessionKey
? resolveInternalSessionKey({
key: requesterSessionKey,
alias,
mainKey,
})
: alias;
}
async function persistAcpSpawnSessionFileBestEffort(params: {
sessionId: string;
sessionKey: string;
sessionEntry: SessionEntry | undefined;
sessionStore: Record<string, SessionEntry>;
storePath: string;
agentId: string;
threadId?: string | number;
stage: "spawn" | "thread-bind";
}): Promise<SessionEntry | undefined> {
try {
const resolvedSessionFile = await resolveSessionTranscriptFile({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
sessionEntry: params.sessionEntry,
sessionStore: params.sessionStore,
storePath: params.storePath,
agentId: params.agentId,
threadId: params.threadId,
});
return resolvedSessionFile.sessionEntry;
} catch (error) {
log.warn(
`ACP session-file persistence failed during ${params.stage} for ${params.sessionKey}: ${summarizeError(error)}`,
);
return params.sessionEntry;
}
}
function resolveConversationIdForThreadBinding(params: {
to?: string;
threadId?: string | number;
@@ -257,6 +307,10 @@ export async function spawnAcpDirect(
ctx: SpawnAcpContext,
): Promise<SpawnAcpResult> {
const cfg = loadConfig();
const requesterInternalKey = resolveRequesterInternalSessionKey({
cfg,
requesterSessionKey: ctx.agentSessionKey,
});
if (!isAcpEnabledByPolicy(cfg)) {
return {
status: "forbidden",
@@ -346,11 +400,27 @@ export async function spawnAcpDirect(
method: "sessions.patch",
params: {
key: sessionKey,
spawnedBy: requesterInternalKey,
...(params.label ? { label: params.label } : {}),
},
timeoutMs: 10_000,
});
sessionCreated = true;
const storePath = resolveStorePath(cfg.session?.store, { agentId: targetAgentId });
const sessionStore = loadSessionStore(storePath);
let sessionEntry: SessionEntry | undefined = sessionStore[sessionKey];
const sessionId = sessionEntry?.sessionId;
if (sessionId) {
sessionEntry = await persistAcpSpawnSessionFileBestEffort({
sessionId,
sessionKey,
sessionStore,
storePath,
sessionEntry,
agentId: targetAgentId,
stage: "spawn",
});
}
const initialized = await acpManager.initializeSession({
cfg,
sessionKey,
@@ -408,6 +478,21 @@ export async function spawnAcpDirect(
`Failed to create and bind a ${preparedBinding.channel} thread for this ACP session.`,
);
}
if (sessionId) {
const boundThreadId = String(binding.conversation.conversationId).trim() || undefined;
if (boundThreadId) {
sessionEntry = await persistAcpSpawnSessionFileBestEffort({
sessionId,
sessionKey,
sessionStore,
storePath,
sessionEntry,
agentId: targetAgentId,
threadId: boundThreadId,
stage: "thread-bind",
});
}
}
}
} catch (err) {
await cleanupFailedAcpSpawn({