feat: thread-bound subagents on Discord (#21805)

* docs: thread-bound subagents plan

* docs: add exact thread-bound subagent implementation touchpoints

* Docs: prioritize auto thread-bound subagent flow

* Docs: add ACP harness thread-binding extensions

* Discord: add thread-bound session routing and auto-bind spawn flow

* Subagents: add focus commands and ACP/session binding lifecycle hooks

* Tests: cover thread bindings, focus commands, and ACP unbind hooks

* Docs: add plugin-hook appendix for thread-bound subagents

* Plugins: add subagent lifecycle hook events

* Core: emit subagent lifecycle hooks and decouple Discord bindings

* Discord: handle subagent bind lifecycle via plugin hooks

* Subagents: unify completion finalizer and split registry modules

* Add subagent lifecycle events module

* Hooks: fix subagent ended context key

* Discord: share thread bindings across ESM and Jiti

* Subagents: add persistent sessions_spawn mode for thread-bound sessions

* Subagents: clarify thread intro and persistent completion copy

* test(subagents): stabilize sessions_spawn lifecycle cleanup assertions

* Discord: add thread-bound session TTL with auto-unfocus

* Subagents: fail session spawns when thread bind fails

* Subagents: cover thread session failure cleanup paths

* Session: add thread binding TTL config and /session ttl controls

* Tests: align discord reaction expectations

* Agent: persist sessionFile for keyed subagent sessions

* Discord: normalize imports after conflict resolution

* Sessions: centralize sessionFile resolve/persist helper

* Discord: harden thread-bound subagent session routing

* Rebase: resolve upstream/main conflicts

* Subagents: move thread binding into hooks and split bindings modules

* Docs: add channel-agnostic subagent routing hook plan

* Agents: decouple subagent routing from Discord

* Discord: refactor thread-bound subagent flows

* Subagents: prevent duplicate end hooks and orphaned failed sessions

* Refactor: split subagent command and provider phases

* Subagents: honor hook delivery target overrides

* Discord: add thread binding kill switches and refresh plan doc

* Discord: fix thread bind channel resolution

* Routing: centralize account id normalization

* Discord: clean up thread bindings on startup failures

* Discord: add startup cleanup regression tests

* Docs: add long-term thread-bound subagent architecture

* Docs: split session binding plan and dedupe thread-bound doc

* Subagents: add channel-agnostic session binding routing

* Subagents: stabilize announce completion routing tests

* Subagents: cover multi-bound completion routing

* Subagents: suppress lifecycle hooks on failed thread bind

* tests: fix discord provider mock typing regressions

* docs/protocol: sync slash command aliases and delete param models

* fix: add changelog entry for Discord thread-bound subagents (#21805) (thanks @onutc)

---------

Co-authored-by: Shadow <hi@shadowing.dev>
This commit is contained in:
Onur
2026-02-21 16:14:55 +01:00
committed by GitHub
parent 166068dfbe
commit 8178ea472d
114 changed files with 12214 additions and 1659 deletions

View File

@@ -79,6 +79,8 @@ describe("sessions tools", () => {
expect(schemaProp("sessions_send", "timeoutSeconds").type).toBe("number");
expect(schemaProp("sessions_spawn", "thinking").type).toBe("string");
expect(schemaProp("sessions_spawn", "runTimeoutSeconds").type).toBe("number");
expect(schemaProp("sessions_spawn", "thread").type).toBe("boolean");
expect(schemaProp("sessions_spawn", "mode").type).toBe("string");
expect(schemaProp("subagents", "recentMinutes").type).toBe("number");
});

View File

@@ -133,35 +133,6 @@ const waitFor = async (predicate: () => boolean, timeoutMs = 2000) => {
);
};
function expectSingleCompletionSend(
calls: GatewayRequest[],
expected: { sessionKey: string; channel: string; to: string; message: string },
) {
const sendCalls = calls.filter((call) => call.method === "send");
expect(sendCalls).toHaveLength(1);
const send = sendCalls[0]?.params as
| { sessionKey?: string; channel?: string; to?: string; message?: string }
| undefined;
expect(send?.sessionKey).toBe(expected.sessionKey);
expect(send?.channel).toBe(expected.channel);
expect(send?.to).toBe(expected.to);
expect(send?.message).toBe(expected.message);
}
function createDeleteCleanupHooks(setDeletedKey: (key: string | undefined) => void) {
return {
onAgentSubagentSpawn: (params: unknown) => {
const rec = params as { channel?: string; timeout?: number } | undefined;
expect(rec?.channel).toBe("discord");
expect(rec?.timeout).toBe(1);
},
onSessionsDelete: (params: unknown) => {
const rec = params as { key?: string } | undefined;
setDeletedKey(rec?.key);
},
};
}
describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => {
beforeEach(() => {
resetSessionsSpawnConfigOverride();
@@ -184,7 +155,6 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => {
const tool = await getSessionsSpawnTool({
agentSessionKey: "main",
agentChannel: "whatsapp",
agentTo: "+123",
});
const result = await tool.execute("call2", {
@@ -213,7 +183,7 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => {
await waitFor(() => ctx.waitCalls.some((call) => call.runId === child.runId));
await waitFor(() => patchCalls.some((call) => call.label === "my-task"));
await waitFor(() => ctx.calls.filter((c) => c.method === "send").length >= 1);
await waitFor(() => ctx.calls.filter((c) => c.method === "agent").length >= 2);
const childWait = ctx.waitCalls.find((call) => call.runId === child.runId);
expect(childWait?.timeoutMs).toBe(1000);
@@ -222,21 +192,22 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => {
expect(labelPatch?.key).toBe(child.sessionKey);
expect(labelPatch?.label).toBe("my-task");
// Subagent spawn call plus direct outbound completion send.
// Two agent calls: subagent spawn + main agent trigger
const agentCalls = ctx.calls.filter((c) => c.method === "agent");
expect(agentCalls).toHaveLength(1);
expect(agentCalls).toHaveLength(2);
// First call: subagent spawn
const first = agentCalls[0]?.params as { lane?: string } | undefined;
expect(first?.lane).toBe("subagent");
// Direct send should route completion to the requester channel/session.
expectSingleCompletionSend(ctx.calls, {
sessionKey: "agent:main:main",
channel: "whatsapp",
to: "+123",
message: "✅ Subagent main finished\n\ndone",
});
// Second call: main agent trigger (not "Sub-agent announce step." anymore)
const second = agentCalls[1]?.params as { sessionKey?: string; message?: string } | undefined;
expect(second?.sessionKey).toBe("agent:main:main");
expect(second?.message).toContain("subagent task");
// No direct send to external channel (main agent handles delivery)
const sendCalls = ctx.calls.filter((c) => c.method === "send");
expect(sendCalls.length).toBe(0);
expect(child.sessionKey?.startsWith("agent:main:subagent:")).toBe(true);
});
@@ -245,15 +216,20 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => {
callGatewayMock.mockReset();
let deletedKey: string | undefined;
const ctx = setupSessionsSpawnGatewayMock({
...createDeleteCleanupHooks((key) => {
deletedKey = key;
}),
onAgentSubagentSpawn: (params) => {
const rec = params as { channel?: string; timeout?: number } | undefined;
expect(rec?.channel).toBe("discord");
expect(rec?.timeout).toBe(1);
},
onSessionsDelete: (params) => {
const rec = params as { key?: string } | undefined;
deletedKey = rec?.key;
},
});
const tool = await getSessionsSpawnTool({
agentSessionKey: "discord:group:req",
agentChannel: "discord",
agentTo: "discord:dm:u123",
});
const result = await tool.execute("call1", {
@@ -287,11 +263,14 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => {
vi.useRealTimers();
}
await waitFor(() => ctx.calls.filter((call) => call.method === "agent").length >= 2);
await waitFor(() => Boolean(deletedKey));
const childWait = ctx.waitCalls.find((call) => call.runId === child.runId);
expect(childWait?.timeoutMs).toBe(1000);
const agentCalls = ctx.calls.filter((call) => call.method === "agent");
expect(agentCalls).toHaveLength(1);
expect(agentCalls).toHaveLength(2);
const first = agentCalls[0]?.params as
| {
@@ -307,12 +286,19 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => {
expect(first?.sessionKey?.startsWith("agent:main:subagent:")).toBe(true);
expect(child.sessionKey?.startsWith("agent:main:subagent:")).toBe(true);
expectSingleCompletionSend(ctx.calls, {
sessionKey: "agent:main:discord:group:req",
channel: "discord",
to: "discord:dm:u123",
message: "✅ Subagent main finished",
});
const second = agentCalls[1]?.params as
| {
sessionKey?: string;
message?: string;
deliver?: boolean;
}
| undefined;
expect(second?.sessionKey).toBe("agent:main:discord:group:req");
expect(second?.deliver).toBe(true);
expect(second?.message).toContain("subagent task");
const sendCalls = ctx.calls.filter((c) => c.method === "send");
expect(sendCalls.length).toBe(0);
expect(deletedKey?.startsWith("agent:main:subagent:")).toBe(true);
});
@@ -323,16 +309,21 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => {
let deletedKey: string | undefined;
const ctx = setupSessionsSpawnGatewayMock({
includeChatHistory: true,
...createDeleteCleanupHooks((key) => {
deletedKey = key;
}),
onAgentSubagentSpawn: (params) => {
const rec = params as { channel?: string; timeout?: number } | undefined;
expect(rec?.channel).toBe("discord");
expect(rec?.timeout).toBe(1);
},
onSessionsDelete: (params) => {
const rec = params as { key?: string } | undefined;
deletedKey = rec?.key;
},
agentWaitResult: { status: "ok", startedAt: 3000, endedAt: 4000 },
});
const tool = await getSessionsSpawnTool({
agentSessionKey: "discord:group:req",
agentChannel: "discord",
agentTo: "discord:dm:u123",
});
const result = await tool.execute("call1b", {
@@ -350,27 +341,29 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => {
throw new Error("missing child runId");
}
await waitFor(() => ctx.waitCalls.some((call) => call.runId === child.runId));
await waitFor(() => ctx.calls.filter((call) => call.method === "send").length >= 1);
await waitFor(() => ctx.calls.filter((call) => call.method === "agent").length >= 2);
await waitFor(() => Boolean(deletedKey));
const childWait = ctx.waitCalls.find((call) => call.runId === child.runId);
expect(childWait?.timeoutMs).toBe(1000);
expect(child.sessionKey?.startsWith("agent:main:subagent:")).toBe(true);
// One agent call for spawn, then direct completion send.
// Two agent calls: subagent spawn + main agent trigger
const agentCalls = ctx.calls.filter((call) => call.method === "agent");
expect(agentCalls).toHaveLength(1);
expect(agentCalls).toHaveLength(2);
// First call: subagent spawn
const first = agentCalls[0]?.params as { lane?: string } | undefined;
expect(first?.lane).toBe("subagent");
expectSingleCompletionSend(ctx.calls, {
sessionKey: "agent:main:discord:group:req",
channel: "discord",
to: "discord:dm:u123",
message: "✅ Subagent main finished\n\ndone",
});
// Second call: main agent trigger
const second = agentCalls[1]?.params as { sessionKey?: string; deliver?: boolean } | undefined;
expect(second?.sessionKey).toBe("agent:main:discord:group:req");
expect(second?.deliver).toBe(true);
// No direct send to external channel (main agent handles delivery)
const sendCalls = ctx.calls.filter((c) => c.method === "send");
expect(sendCalls.length).toBe(0);
// Session should be deleted
expect(deletedKey?.startsWith("agent:main:subagent:")).toBe(true);

View File

@@ -1,4 +1,5 @@
import { getChannelDock } from "../channels/dock.js";
import { DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH } from "../config/agent-limits.js";
import type { OpenClawConfig } from "../config/config.js";
import { resolveChannelGroupToolsPolicy } from "../config/group-policy.js";
import { resolveThreadParentSessionKey } from "../sessions/session-key-utils.js";
@@ -83,7 +84,8 @@ function resolveSubagentDenyList(depth: number, maxSpawnDepth: number): string[]
export function resolveSubagentToolPolicy(cfg?: OpenClawConfig, depth?: number): SandboxToolPolicy {
const configured = cfg?.tools?.subagents?.tools;
const maxSpawnDepth = cfg?.agents?.defaults?.subagents?.maxSpawnDepth ?? 1;
const maxSpawnDepth =
cfg?.agents?.defaults?.subagents?.maxSpawnDepth ?? DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH;
const effectiveDepth = typeof depth === "number" && depth >= 0 ? depth : 1;
const baseDeny = resolveSubagentDenyList(effectiveDepth, maxSpawnDepth);
const deny = [...baseDeny, ...(Array.isArray(configured?.deny) ? configured.deny : [])];

View File

@@ -0,0 +1,373 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import "./test-helpers/fast-core-tools.js";
import {
getCallGatewayMock,
getSessionsSpawnTool,
setSessionsSpawnConfigOverride,
} from "./openclaw-tools.subagents.sessions-spawn.test-harness.js";
const hookRunnerMocks = vi.hoisted(() => ({
hasSubagentEndedHook: true,
runSubagentSpawning: vi.fn(async (event: unknown) => {
const input = event as {
threadRequested?: boolean;
requester?: { channel?: string };
};
if (!input.threadRequested) {
return undefined;
}
const channel = input.requester?.channel?.trim().toLowerCase();
if (channel !== "discord") {
const channelLabel = input.requester?.channel?.trim() || "unknown";
return {
status: "error" as const,
error: `thread=true is not supported for channel "${channelLabel}". Only Discord thread-bound subagent sessions are supported right now.`,
};
}
return {
status: "ok" as const,
threadBindingReady: true,
};
}),
runSubagentSpawned: vi.fn(async () => {}),
runSubagentEnded: vi.fn(async () => {}),
}));
vi.mock("../plugins/hook-runner-global.js", () => ({
getGlobalHookRunner: vi.fn(() => ({
hasHooks: (hookName: string) =>
hookName === "subagent_spawning" ||
hookName === "subagent_spawned" ||
(hookName === "subagent_ended" && hookRunnerMocks.hasSubagentEndedHook),
runSubagentSpawning: hookRunnerMocks.runSubagentSpawning,
runSubagentSpawned: hookRunnerMocks.runSubagentSpawned,
runSubagentEnded: hookRunnerMocks.runSubagentEnded,
})),
}));
describe("sessions_spawn subagent lifecycle hooks", () => {
beforeEach(() => {
hookRunnerMocks.hasSubagentEndedHook = true;
hookRunnerMocks.runSubagentSpawning.mockClear();
hookRunnerMocks.runSubagentSpawned.mockClear();
hookRunnerMocks.runSubagentEnded.mockClear();
const callGatewayMock = getCallGatewayMock();
callGatewayMock.mockReset();
setSessionsSpawnConfigOverride({
session: {
mainKey: "main",
scope: "per-sender",
},
});
callGatewayMock.mockImplementation(async (opts: unknown) => {
const request = opts as { method?: string };
if (request.method === "agent") {
return { runId: "run-1", status: "accepted", acceptedAt: 1 };
}
if (request.method === "agent.wait") {
return { runId: "run-1", status: "running" };
}
return {};
});
});
it("runs subagent_spawning and emits subagent_spawned with requester metadata", async () => {
const tool = await getSessionsSpawnTool({
agentSessionKey: "main",
agentChannel: "discord",
agentAccountId: "work",
agentTo: "channel:123",
agentThreadId: 456,
});
const result = await tool.execute("call", {
task: "do thing",
label: "research",
runTimeoutSeconds: 1,
thread: true,
});
expect(result.details).toMatchObject({ status: "accepted", runId: "run-1" });
expect(hookRunnerMocks.runSubagentSpawning).toHaveBeenCalledTimes(1);
expect(hookRunnerMocks.runSubagentSpawning).toHaveBeenCalledWith(
{
childSessionKey: expect.stringMatching(/^agent:main:subagent:/),
agentId: "main",
label: "research",
mode: "session",
requester: {
channel: "discord",
accountId: "work",
to: "channel:123",
threadId: 456,
},
threadRequested: true,
},
{
childSessionKey: expect.stringMatching(/^agent:main:subagent:/),
requesterSessionKey: "main",
},
);
expect(hookRunnerMocks.runSubagentSpawned).toHaveBeenCalledTimes(1);
const [event, ctx] = (hookRunnerMocks.runSubagentSpawned.mock.calls[0] ?? []) as unknown as [
Record<string, unknown>,
Record<string, unknown>,
];
expect(event).toMatchObject({
runId: "run-1",
agentId: "main",
label: "research",
mode: "session",
requester: {
channel: "discord",
accountId: "work",
to: "channel:123",
threadId: 456,
},
threadRequested: true,
});
expect(event.childSessionKey).toEqual(expect.stringMatching(/^agent:main:subagent:/));
expect(ctx).toMatchObject({
runId: "run-1",
requesterSessionKey: "main",
childSessionKey: event.childSessionKey,
});
});
it("emits subagent_spawned with threadRequested=false when not requested", async () => {
const tool = await getSessionsSpawnTool({
agentSessionKey: "main",
agentChannel: "discord",
agentTo: "channel:123",
});
const result = await tool.execute("call2", {
task: "do thing",
runTimeoutSeconds: 1,
});
expect(result.details).toMatchObject({ status: "accepted", runId: "run-1" });
expect(hookRunnerMocks.runSubagentSpawning).not.toHaveBeenCalled();
expect(hookRunnerMocks.runSubagentSpawned).toHaveBeenCalledTimes(1);
const [event] = (hookRunnerMocks.runSubagentSpawned.mock.calls[0] ?? []) as unknown as [
Record<string, unknown>,
];
expect(event).toMatchObject({
mode: "run",
threadRequested: false,
requester: {
channel: "discord",
to: "channel:123",
},
});
});
it("respects explicit mode=run when thread binding is requested", async () => {
const tool = await getSessionsSpawnTool({
agentSessionKey: "main",
agentChannel: "discord",
agentTo: "channel:123",
});
const result = await tool.execute("call3", {
task: "do thing",
runTimeoutSeconds: 1,
thread: true,
mode: "run",
});
expect(result.details).toMatchObject({ status: "accepted", runId: "run-1", mode: "run" });
expect(hookRunnerMocks.runSubagentSpawning).toHaveBeenCalledTimes(1);
const [event] = (hookRunnerMocks.runSubagentSpawned.mock.calls[0] ?? []) as unknown as [
Record<string, unknown>,
];
expect(event).toMatchObject({
mode: "run",
threadRequested: true,
});
});
it("returns error when thread binding cannot be created", async () => {
hookRunnerMocks.runSubagentSpawning.mockResolvedValueOnce({
status: "error",
error: "Unable to create or bind a Discord thread for this subagent session.",
});
const tool = await getSessionsSpawnTool({
agentSessionKey: "main",
agentChannel: "discord",
agentAccountId: "work",
agentTo: "channel:123",
});
const result = await tool.execute("call4", {
task: "do thing",
runTimeoutSeconds: 1,
thread: true,
mode: "session",
});
expect(result.details).toMatchObject({ status: "error" });
const details = result.details as { error?: string; childSessionKey?: string };
expect(details.error).toMatch(/thread/i);
expect(hookRunnerMocks.runSubagentSpawned).not.toHaveBeenCalled();
const callGatewayMock = getCallGatewayMock();
const calledMethods = callGatewayMock.mock.calls.map((call: [unknown]) => {
const request = call[0] as { method?: string };
return request.method;
});
expect(calledMethods).toContain("sessions.delete");
expect(calledMethods).not.toContain("agent");
const deleteCall = callGatewayMock.mock.calls
.map((call: [unknown]) => call[0] as { method?: string; params?: Record<string, unknown> })
.find(
(request: { method?: string; params?: Record<string, unknown> }) =>
request.method === "sessions.delete",
);
expect(deleteCall?.params).toMatchObject({
key: details.childSessionKey,
emitLifecycleHooks: false,
});
});
it("rejects mode=session when thread=true is not requested", async () => {
const tool = await getSessionsSpawnTool({
agentSessionKey: "main",
agentChannel: "discord",
agentTo: "channel:123",
});
const result = await tool.execute("call6", {
task: "do thing",
mode: "session",
});
expect(result.details).toMatchObject({ status: "error" });
const details = result.details as { error?: string };
expect(details.error).toMatch(/requires thread=true/i);
expect(hookRunnerMocks.runSubagentSpawning).not.toHaveBeenCalled();
expect(hookRunnerMocks.runSubagentSpawned).not.toHaveBeenCalled();
const callGatewayMock = getCallGatewayMock();
expect(callGatewayMock).not.toHaveBeenCalled();
});
it("rejects thread=true on channels without thread support", async () => {
const tool = await getSessionsSpawnTool({
agentSessionKey: "main",
agentChannel: "signal",
agentTo: "+123",
});
const result = await tool.execute("call5", {
task: "do thing",
thread: true,
mode: "session",
});
expect(result.details).toMatchObject({ status: "error" });
const details = result.details as { error?: string };
expect(details.error).toMatch(/only discord/i);
expect(hookRunnerMocks.runSubagentSpawning).toHaveBeenCalledTimes(1);
expect(hookRunnerMocks.runSubagentSpawned).not.toHaveBeenCalled();
const callGatewayMock = getCallGatewayMock();
const calledMethods = callGatewayMock.mock.calls.map((call: [unknown]) => {
const request = call[0] as { method?: string };
return request.method;
});
expect(calledMethods).toContain("sessions.delete");
expect(calledMethods).not.toContain("agent");
});
it("runs subagent_ended cleanup hook when agent start fails after successful bind", async () => {
const callGatewayMock = getCallGatewayMock();
callGatewayMock.mockImplementation(async (opts: unknown) => {
const request = opts as { method?: string };
if (request.method === "agent") {
throw new Error("spawn failed");
}
return {};
});
const tool = await getSessionsSpawnTool({
agentSessionKey: "main",
agentChannel: "discord",
agentAccountId: "work",
agentTo: "channel:123",
agentThreadId: "456",
});
const result = await tool.execute("call7", {
task: "do thing",
thread: true,
mode: "session",
});
expect(result.details).toMatchObject({ status: "error" });
expect(hookRunnerMocks.runSubagentEnded).toHaveBeenCalledTimes(1);
const [event] = (hookRunnerMocks.runSubagentEnded.mock.calls[0] ?? []) as unknown as [
Record<string, unknown>,
];
expect(event).toMatchObject({
targetSessionKey: expect.stringMatching(/^agent:main:subagent:/),
accountId: "work",
targetKind: "subagent",
reason: "spawn-failed",
sendFarewell: true,
outcome: "error",
error: "Session failed to start",
});
const deleteCall = callGatewayMock.mock.calls
.map((call: [unknown]) => call[0] as { method?: string; params?: Record<string, unknown> })
.find(
(request: { method?: string; params?: Record<string, unknown> }) =>
request.method === "sessions.delete",
);
expect(deleteCall?.params).toMatchObject({
key: event.targetSessionKey,
deleteTranscript: true,
emitLifecycleHooks: false,
});
});
it("falls back to sessions.delete cleanup when subagent_ended hook is unavailable", async () => {
hookRunnerMocks.hasSubagentEndedHook = false;
const callGatewayMock = getCallGatewayMock();
callGatewayMock.mockImplementation(async (opts: unknown) => {
const request = opts as { method?: string };
if (request.method === "agent") {
throw new Error("spawn failed");
}
return {};
});
const tool = await getSessionsSpawnTool({
agentSessionKey: "main",
agentChannel: "discord",
agentAccountId: "work",
agentTo: "channel:123",
agentThreadId: "456",
});
const result = await tool.execute("call8", {
task: "do thing",
thread: true,
mode: "session",
});
expect(result.details).toMatchObject({ status: "error" });
expect(hookRunnerMocks.runSubagentEnded).not.toHaveBeenCalled();
const methods = callGatewayMock.mock.calls.map((call: [unknown]) => {
const request = call[0] as { method?: string };
return request.method;
});
expect(methods).toContain("sessions.delete");
const deleteCall = callGatewayMock.mock.calls
.map((call: [unknown]) => call[0] as { method?: string; params?: Record<string, unknown> })
.find(
(request: { method?: string; params?: Record<string, unknown> }) =>
request.method === "sessions.delete",
);
expect(deleteCall?.params).toMatchObject({
deleteTranscript: true,
emitLifecycleHooks: true,
});
});
});

View File

@@ -1,11 +1,23 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js";
import {
__testing as sessionBindingServiceTesting,
registerSessionBindingAdapter,
} from "../infra/outbound/session-binding-service.js";
type AgentCallRequest = { method?: string; params?: Record<string, unknown> };
type RequesterResolution = {
requesterSessionKey: string;
requesterOrigin?: Record<string, unknown>;
} | null;
type SubagentDeliveryTargetResult = {
origin?: {
channel?: string;
accountId?: string;
to?: string;
threadId?: string | number;
};
};
const agentSpy = vi.fn(async (_req: AgentCallRequest) => ({ runId: "run-main", status: "ok" }));
const sendSpy = vi.fn(async (_req: AgentCallRequest) => ({ runId: "send-main", status: "ok" }));
@@ -24,6 +36,19 @@ const subagentRegistryMock = {
countActiveDescendantRuns: vi.fn((_sessionKey: string) => 0),
resolveRequesterForChildSession: vi.fn((_sessionKey: string): RequesterResolution => null),
};
const subagentDeliveryTargetHookMock = vi.fn(
async (_event?: unknown, _ctx?: unknown): Promise<SubagentDeliveryTargetResult | undefined> =>
undefined,
);
let hasSubagentDeliveryTargetHook = false;
const hookRunnerMock = {
hasHooks: vi.fn(
(hookName: string) => hookName === "subagent_delivery_target" && hasSubagentDeliveryTargetHook,
),
runSubagentDeliveryTarget: vi.fn((event: unknown, ctx: unknown) =>
subagentDeliveryTargetHookMock(event, ctx),
),
};
const chatHistoryMock = vi.fn(async (_sessionKey?: string) => ({
messages: [] as Array<unknown>,
}));
@@ -103,6 +128,9 @@ vi.mock("../config/sessions.js", () => ({
vi.mock("./pi-embedded.js", () => embeddedRunMock);
vi.mock("./subagent-registry.js", () => subagentRegistryMock);
vi.mock("../plugins/hook-runner-global.js", () => ({
getGlobalHookRunner: () => hookRunnerMock,
}));
vi.mock("../config/config.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../config/config.js")>();
@@ -114,9 +142,13 @@ vi.mock("../config/config.js", async (importOriginal) => {
describe("subagent announce formatting", () => {
beforeEach(() => {
agentSpy.mockClear();
sendSpy.mockClear();
sessionsDeleteSpy.mockClear();
agentSpy
.mockReset()
.mockImplementation(async (_req: AgentCallRequest) => ({ runId: "run-main", status: "ok" }));
sendSpy
.mockReset()
.mockImplementation(async (_req: AgentCallRequest) => ({ runId: "send-main", status: "ok" }));
sessionsDeleteSpy.mockReset().mockImplementation((_req: AgentCallRequest) => undefined);
embeddedRunMock.isEmbeddedPiRunActive.mockReset().mockReturnValue(false);
embeddedRunMock.isEmbeddedPiRunStreaming.mockReset().mockReturnValue(false);
embeddedRunMock.queueEmbeddedPiMessage.mockReset().mockReturnValue(false);
@@ -124,9 +156,14 @@ describe("subagent announce formatting", () => {
subagentRegistryMock.isSubagentSessionRunActive.mockReset().mockReturnValue(true);
subagentRegistryMock.countActiveDescendantRuns.mockReset().mockReturnValue(0);
subagentRegistryMock.resolveRequesterForChildSession.mockReset().mockReturnValue(null);
hasSubagentDeliveryTargetHook = false;
hookRunnerMock.hasHooks.mockClear();
hookRunnerMock.runSubagentDeliveryTarget.mockClear();
subagentDeliveryTargetHookMock.mockReset().mockResolvedValue(undefined);
readLatestAssistantReplyMock.mockReset().mockResolvedValue("raw subagent reply");
chatHistoryMock.mockReset().mockResolvedValue({ messages: [] });
sessionStore = {};
sessionBindingServiceTesting.resetSessionBindingAdaptersForTests();
configOverride = {
session: {
mainKey: "main",
@@ -328,6 +365,7 @@ describe("subagent announce formatting", () => {
chatHistoryMock.mockResolvedValueOnce({
messages: [{ role: "assistant", content: [{ type: "text", text: "final answer: 2" }] }],
});
readLatestAssistantReplyMock.mockResolvedValue("");
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
@@ -353,6 +391,283 @@ describe("subagent announce formatting", () => {
expect(msg).not.toContain("Convert the result above into your normal assistant voice");
});
it("keeps completion-mode delivery coordinated when sibling runs are still active", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
sessionStore = {
"agent:main:subagent:test": {
sessionId: "child-session-coordinated",
},
"agent:main:main": {
sessionId: "requester-session-coordinated",
},
};
chatHistoryMock.mockResolvedValueOnce({
messages: [{ role: "assistant", content: [{ type: "text", text: "final answer: 2" }] }],
});
subagentRegistryMock.countActiveDescendantRuns.mockImplementation((sessionKey: string) =>
sessionKey === "agent:main:main" ? 1 : 0,
);
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
childRunId: "run-direct-coordinated",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
requesterOrigin: { channel: "discord", to: "channel:12345", accountId: "acct-1" },
...defaultOutcomeAnnounce,
expectsCompletionMessage: true,
});
expect(didAnnounce).toBe(true);
expect(sendSpy).not.toHaveBeenCalled();
expect(agentSpy).toHaveBeenCalledTimes(1);
const call = agentSpy.mock.calls[0]?.[0] as { params?: Record<string, unknown> };
const rawMessage = call?.params?.message;
const msg = typeof rawMessage === "string" ? rawMessage : "";
expect(call?.params?.channel).toBe("discord");
expect(call?.params?.to).toBe("channel:12345");
expect(msg).toContain("There are still 1 active subagent run for this session.");
expect(msg).toContain(
"If they are part of the same workflow, wait for the remaining results before sending a user update.",
);
});
it("keeps session-mode completion delivery on the bound destination when sibling runs are active", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
sessionStore = {
"agent:main:subagent:test": {
sessionId: "child-session-bound",
},
"agent:main:main": {
sessionId: "requester-session-bound",
},
};
chatHistoryMock.mockResolvedValueOnce({
messages: [{ role: "assistant", content: [{ type: "text", text: "bound answer: 2" }] }],
});
subagentRegistryMock.countActiveDescendantRuns.mockImplementation((sessionKey: string) =>
sessionKey === "agent:main:main" ? 1 : 0,
);
registerSessionBindingAdapter({
channel: "discord",
accountId: "acct-1",
listBySession: (targetSessionKey: string) =>
targetSessionKey === "agent:main:subagent:test"
? [
{
bindingId: "discord:acct-1:thread-bound-1",
targetSessionKey,
targetKind: "subagent",
conversation: {
channel: "discord",
accountId: "acct-1",
conversationId: "thread-bound-1",
parentConversationId: "parent-main",
},
status: "active",
boundAt: Date.now(),
},
]
: [],
resolveByConversation: () => null,
});
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
childRunId: "run-session-bound-direct",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
requesterOrigin: { channel: "discord", to: "channel:12345", accountId: "acct-1" },
...defaultOutcomeAnnounce,
expectsCompletionMessage: true,
spawnMode: "session",
});
expect(didAnnounce).toBe(true);
expect(sendSpy).toHaveBeenCalledTimes(1);
expect(agentSpy).not.toHaveBeenCalled();
const call = sendSpy.mock.calls[0]?.[0] as { params?: Record<string, unknown> };
expect(call?.params?.channel).toBe("discord");
expect(call?.params?.to).toBe("channel:thread-bound-1");
});
it("does not duplicate to main channel when two active bound sessions complete from the same requester channel", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
sessionStore = {
"agent:main:subagent:child-a": {
sessionId: "child-session-a",
},
"agent:main:subagent:child-b": {
sessionId: "child-session-b",
},
"agent:main:main": {
sessionId: "requester-session-main",
},
};
// Simulate active sibling runs so non-bound paths would normally coordinate via agent().
subagentRegistryMock.countActiveDescendantRuns.mockImplementation((sessionKey: string) =>
sessionKey === "agent:main:main" ? 2 : 0,
);
registerSessionBindingAdapter({
channel: "discord",
accountId: "acct-1",
listBySession: (targetSessionKey: string) => {
if (targetSessionKey === "agent:main:subagent:child-a") {
return [
{
bindingId: "discord:acct-1:thread-child-a",
targetSessionKey,
targetKind: "subagent",
conversation: {
channel: "discord",
accountId: "acct-1",
conversationId: "thread-child-a",
parentConversationId: "main-parent-channel",
},
status: "active",
boundAt: Date.now(),
},
];
}
if (targetSessionKey === "agent:main:subagent:child-b") {
return [
{
bindingId: "discord:acct-1:thread-child-b",
targetSessionKey,
targetKind: "subagent",
conversation: {
channel: "discord",
accountId: "acct-1",
conversationId: "thread-child-b",
parentConversationId: "main-parent-channel",
},
status: "active",
boundAt: Date.now(),
},
];
}
return [];
},
resolveByConversation: () => null,
});
await Promise.all([
runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:child-a",
childRunId: "run-child-a",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
requesterOrigin: {
channel: "discord",
to: "channel:main-parent-channel",
accountId: "acct-1",
},
...defaultOutcomeAnnounce,
expectsCompletionMessage: true,
spawnMode: "session",
}),
runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:child-b",
childRunId: "run-child-b",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
requesterOrigin: {
channel: "discord",
to: "channel:main-parent-channel",
accountId: "acct-1",
},
...defaultOutcomeAnnounce,
expectsCompletionMessage: true,
spawnMode: "session",
}),
]);
await expect.poll(() => sendSpy.mock.calls.length).toBe(2);
expect(agentSpy).not.toHaveBeenCalled();
const directTargets = sendSpy.mock.calls.map(
(call) => (call?.[0] as { params?: { to?: string } })?.params?.to,
);
expect(directTargets).toEqual(
expect.arrayContaining(["channel:thread-child-a", "channel:thread-child-b"]),
);
expect(directTargets).not.toContain("channel:main-parent-channel");
});
it("uses failure header for completion direct-send when subagent outcome is error", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
sessionStore = {
"agent:main:subagent:test": {
sessionId: "child-session-direct-error",
},
"agent:main:main": {
sessionId: "requester-session-error",
},
};
chatHistoryMock.mockResolvedValueOnce({
messages: [{ role: "assistant", content: [{ type: "text", text: "boom details" }] }],
});
readLatestAssistantReplyMock.mockResolvedValue("");
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
childRunId: "run-direct-completion-error",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
requesterOrigin: { channel: "discord", to: "channel:12345", accountId: "acct-1" },
...defaultOutcomeAnnounce,
outcome: { status: "error", error: "boom" },
expectsCompletionMessage: true,
spawnMode: "session",
});
expect(didAnnounce).toBe(true);
expect(sendSpy).toHaveBeenCalledTimes(1);
const call = sendSpy.mock.calls[0]?.[0] as { params?: Record<string, unknown> };
const rawMessage = call?.params?.message;
const msg = typeof rawMessage === "string" ? rawMessage : "";
expect(msg).toContain("❌ Subagent main failed this task (session remains active)");
expect(msg).toContain("boom details");
expect(msg).not.toContain("✅ Subagent main");
});
it("uses timeout header for completion direct-send when subagent outcome timed out", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
sessionStore = {
"agent:main:subagent:test": {
sessionId: "child-session-direct-timeout",
},
"agent:main:main": {
sessionId: "requester-session-timeout",
},
};
chatHistoryMock.mockResolvedValueOnce({
messages: [{ role: "assistant", content: [{ type: "text", text: "partial output" }] }],
});
readLatestAssistantReplyMock.mockResolvedValue("");
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
childRunId: "run-direct-completion-timeout",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
requesterOrigin: { channel: "discord", to: "channel:12345", accountId: "acct-1" },
...defaultOutcomeAnnounce,
outcome: { status: "timeout" },
expectsCompletionMessage: true,
});
expect(didAnnounce).toBe(true);
expect(sendSpy).toHaveBeenCalledTimes(1);
const call = sendSpy.mock.calls[0]?.[0] as { params?: Record<string, unknown> };
const rawMessage = call?.params?.message;
const msg = typeof rawMessage === "string" ? rawMessage : "";
expect(msg).toContain("⏱️ Subagent main timed out");
expect(msg).toContain("partial output");
expect(msg).not.toContain("✅ Subagent main finished");
});
it("ignores stale session thread hints for manual completion direct-send", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
sessionStore = {
@@ -427,6 +742,197 @@ describe("subagent announce formatting", () => {
expect(call?.params?.threadId).toBe("99");
});
it("uses hook-provided thread target for completion direct-send", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
hasSubagentDeliveryTargetHook = true;
subagentDeliveryTargetHookMock.mockResolvedValueOnce({
origin: {
channel: "discord",
accountId: "acct-1",
to: "channel:777",
threadId: "777",
},
});
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
childRunId: "run-direct-thread-bound",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
requesterOrigin: {
channel: "discord",
to: "channel:12345",
accountId: "acct-1",
threadId: "777",
},
...defaultOutcomeAnnounce,
expectsCompletionMessage: true,
spawnMode: "session",
});
expect(didAnnounce).toBe(true);
expect(subagentDeliveryTargetHookMock).toHaveBeenCalledWith(
{
childSessionKey: "agent:main:subagent:test",
requesterSessionKey: "agent:main:main",
requesterOrigin: {
channel: "discord",
to: "channel:12345",
accountId: "acct-1",
threadId: "777",
},
childRunId: "run-direct-thread-bound",
spawnMode: "session",
expectsCompletionMessage: true,
},
{
runId: "run-direct-thread-bound",
childSessionKey: "agent:main:subagent:test",
requesterSessionKey: "agent:main:main",
},
);
expect(sendSpy).toHaveBeenCalledTimes(1);
const call = sendSpy.mock.calls[0]?.[0] as { params?: Record<string, unknown> };
expect(call?.params?.channel).toBe("discord");
expect(call?.params?.to).toBe("channel:777");
expect(call?.params?.threadId).toBe("777");
const message = typeof call?.params?.message === "string" ? call.params.message : "";
expect(message).toContain("completed this task (session remains active)");
expect(message).not.toContain("finished");
});
it("uses hook-provided thread target when requester origin has no threadId", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
hasSubagentDeliveryTargetHook = true;
subagentDeliveryTargetHookMock.mockResolvedValueOnce({
origin: {
channel: "discord",
accountId: "acct-1",
to: "channel:777",
threadId: "777",
},
});
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
childRunId: "run-direct-thread-bound-single",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
requesterOrigin: {
channel: "discord",
to: "channel:12345",
accountId: "acct-1",
},
...defaultOutcomeAnnounce,
expectsCompletionMessage: true,
spawnMode: "session",
});
expect(didAnnounce).toBe(true);
expect(sendSpy).toHaveBeenCalledTimes(1);
const call = sendSpy.mock.calls[0]?.[0] as { params?: Record<string, unknown> };
expect(call?.params?.channel).toBe("discord");
expect(call?.params?.to).toBe("channel:777");
expect(call?.params?.threadId).toBe("777");
});
it("keeps requester origin when delivery-target hook returns no override", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
hasSubagentDeliveryTargetHook = true;
subagentDeliveryTargetHookMock.mockResolvedValueOnce(undefined);
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
childRunId: "run-direct-thread-persisted",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
requesterOrigin: {
channel: "discord",
to: "channel:12345",
accountId: "acct-1",
},
...defaultOutcomeAnnounce,
expectsCompletionMessage: true,
spawnMode: "session",
});
expect(didAnnounce).toBe(true);
expect(sendSpy).toHaveBeenCalledTimes(1);
const call = sendSpy.mock.calls[0]?.[0] as { params?: Record<string, unknown> };
expect(call?.params?.channel).toBe("discord");
expect(call?.params?.to).toBe("channel:12345");
expect(call?.params?.threadId).toBeUndefined();
});
it("keeps requester origin when delivery-target hook returns non-deliverable channel", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
hasSubagentDeliveryTargetHook = true;
subagentDeliveryTargetHookMock.mockResolvedValueOnce({
origin: {
channel: "webchat",
to: "conversation:123",
},
});
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
childRunId: "run-direct-thread-multi-no-origin",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
requesterOrigin: {
channel: "discord",
to: "channel:12345",
accountId: "acct-1",
},
...defaultOutcomeAnnounce,
expectsCompletionMessage: true,
spawnMode: "session",
});
expect(didAnnounce).toBe(true);
expect(sendSpy).toHaveBeenCalledTimes(1);
const call = sendSpy.mock.calls[0]?.[0] as { params?: Record<string, unknown> };
expect(call?.params?.channel).toBe("discord");
expect(call?.params?.to).toBe("channel:12345");
expect(call?.params?.threadId).toBeUndefined();
});
it("uses hook-provided thread target when requester threadId does not match", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
hasSubagentDeliveryTargetHook = true;
subagentDeliveryTargetHookMock.mockResolvedValueOnce({
origin: {
channel: "discord",
accountId: "acct-1",
to: "channel:777",
threadId: "777",
},
});
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
childRunId: "run-direct-thread-no-match",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
requesterOrigin: {
channel: "discord",
to: "channel:12345",
accountId: "acct-1",
threadId: "999",
},
...defaultOutcomeAnnounce,
expectsCompletionMessage: true,
spawnMode: "session",
});
expect(didAnnounce).toBe(true);
expect(sendSpy).toHaveBeenCalledTimes(1);
const call = sendSpy.mock.calls[0]?.[0] as { params?: Record<string, unknown> };
expect(call?.params?.channel).toBe("discord");
expect(call?.params?.to).toBe("channel:777");
expect(call?.params?.threadId).toBe("777");
});
it("steers announcements into an active run when queue mode is steer", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true);
@@ -623,13 +1129,14 @@ describe("subagent announce formatting", () => {
},
],
});
readLatestAssistantReplyMock.mockResolvedValue("assistant ignored fallback");
readLatestAssistantReplyMock.mockResolvedValue("");
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:worker",
childRunId: "run-completion-assistant-output",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
requesterOrigin: { channel: "discord", to: "channel:12345", accountId: "acct-1" },
expectsCompletionMessage: true,
...defaultOutcomeAnnounce,
});
@@ -663,6 +1170,7 @@ describe("subagent announce formatting", () => {
childRunId: "run-completion-tool-output",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
requesterOrigin: { channel: "discord", to: "channel:12345", accountId: "acct-1" },
expectsCompletionMessage: true,
...defaultOutcomeAnnounce,
});
@@ -674,6 +1182,36 @@ describe("subagent announce formatting", () => {
expect(msg).toContain("tool output only");
});
it("ignores user text when deriving fallback completion output", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
chatHistoryMock.mockResolvedValueOnce({
messages: [
{
role: "user",
content: [{ type: "text", text: "user prompt should not be announced" }],
},
],
});
readLatestAssistantReplyMock.mockResolvedValue("");
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:worker",
childRunId: "run-completion-ignore-user",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
requesterOrigin: { channel: "discord", to: "channel:12345", accountId: "acct-1" },
expectsCompletionMessage: true,
...defaultOutcomeAnnounce,
});
expect(didAnnounce).toBe(true);
await expect.poll(() => sendSpy.mock.calls.length).toBe(1);
const call = sendSpy.mock.calls[0]?.[0] as { params?: { message?: string } };
const msg = call?.params?.message as string;
expect(msg).toContain("✅ Subagent main finished");
expect(msg).not.toContain("user prompt should not be announced");
});
it("queues announce delivery back into requester subagent session", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true);
@@ -856,6 +1394,34 @@ describe("subagent announce formatting", () => {
expect(call?.params?.to).toBeUndefined();
});
it("keeps completion-mode announce internal for nested requester subagent sessions", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(false);
embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false);
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:orchestrator:subagent:worker",
childRunId: "run-worker-nested-completion",
requesterSessionKey: "agent:main:subagent:orchestrator",
requesterOrigin: { channel: "whatsapp", accountId: "acct-123", to: "+1555" },
requesterDisplayKey: "agent:main:subagent:orchestrator",
expectsCompletionMessage: true,
...defaultOutcomeAnnounce,
});
expect(didAnnounce).toBe(true);
expect(sendSpy).not.toHaveBeenCalled();
const call = agentSpy.mock.calls[0]?.[0] as { params?: Record<string, unknown> };
expect(call?.params?.sessionKey).toBe("agent:main:subagent:orchestrator");
expect(call?.params?.deliver).toBe(false);
expect(call?.params?.channel).toBeUndefined();
expect(call?.params?.to).toBeUndefined();
const message = typeof call?.params?.message === "string" ? call.params.message : "";
expect(message).toContain(
"Convert this completion into a concise internal orchestration update for your parent agent",
);
});
it("retries reading subagent output when early lifecycle completion had no text", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValueOnce(true).mockReturnValue(false);
@@ -933,6 +1499,57 @@ describe("subagent announce formatting", () => {
expect(agentSpy).not.toHaveBeenCalled();
});
it("defers completion-mode announce while the finished run still has active descendants", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
subagentRegistryMock.countActiveDescendantRuns.mockImplementation((sessionKey: string) =>
sessionKey === "agent:main:subagent:parent" ? 1 : 0,
);
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:parent",
childRunId: "run-parent-completion",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
expectsCompletionMessage: true,
...defaultOutcomeAnnounce,
});
expect(didAnnounce).toBe(false);
expect(sendSpy).not.toHaveBeenCalled();
expect(agentSpy).not.toHaveBeenCalled();
});
it("waits for updated synthesized output before announcing nested subagent completion", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
let historyReads = 0;
chatHistoryMock.mockImplementation(async () => {
historyReads += 1;
if (historyReads < 3) {
return {
messages: [{ role: "assistant", content: "Waiting for child output..." }],
};
}
return {
messages: [{ role: "assistant", content: "Final synthesized answer." }],
};
});
readLatestAssistantReplyMock.mockResolvedValue(undefined);
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:parent",
childRunId: "run-parent-synth",
requesterSessionKey: "agent:main:subagent:orchestrator",
requesterDisplayKey: "agent:main:subagent:orchestrator",
...defaultOutcomeAnnounce,
});
expect(didAnnounce).toBe(true);
const call = agentSpy.mock.calls[0]?.[0] as { params?: { message?: string } };
const msg = call?.params?.message ?? "";
expect(msg).toContain("Final synthesized answer.");
expect(msg).not.toContain("Waiting for child output...");
});
it("bubbles child announce to parent requester when requester subagent already ended", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
subagentRegistryMock.isSubagentSessionRunActive.mockReturnValue(false);
@@ -1013,6 +1630,35 @@ describe("subagent announce formatting", () => {
expect(agentSpy).not.toHaveBeenCalled();
});
it("defers completion-mode announce when child run is still active after settle timeout", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true);
embeddedRunMock.waitForEmbeddedPiRunEnd.mockResolvedValue(false);
sessionStore = {
"agent:main:subagent:test": {
sessionId: "child-session-active",
},
};
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
childRunId: "run-child-active-completion",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
task: "completion-context-stress-test",
timeoutMs: 1000,
cleanup: "keep",
waitForCompletion: false,
startedAt: 10,
endedAt: 20,
outcome: { status: "ok" },
expectsCompletionMessage: true,
});
expect(didAnnounce).toBe(false);
expect(agentSpy).not.toHaveBeenCalled();
});
it("prefers requesterOrigin channel over stale session lastChannel in queued announce", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true);
@@ -1031,7 +1677,7 @@ describe("subagent announce formatting", () => {
childSessionKey: "agent:main:subagent:test",
childRunId: "run-stale-channel",
requesterSessionKey: "main",
requesterOrigin: { channel: "bluebubbles", to: "bluebubbles:chat_guid:123" },
requesterOrigin: { channel: "telegram", to: "telegram:123" },
requesterDisplayKey: "main",
...defaultOutcomeAnnounce,
});
@@ -1041,8 +1687,8 @@ describe("subagent announce formatting", () => {
const call = agentSpy.mock.calls[0]?.[0] as { params?: Record<string, unknown> };
// The channel should match requesterOrigin, NOT the stale session entry.
expect(call?.params?.channel).toBe("bluebubbles");
expect(call?.params?.to).toBe("bluebubbles:chat_guid:123");
expect(call?.params?.channel).toBe("telegram");
expect(call?.params?.to).toBe("telegram:123");
});
it("routes to parent subagent when parent run ended but session still exists (#18037)", async () => {

View File

@@ -1,5 +1,6 @@
import { resolveQueueSettings } from "../auto-reply/reply/queue.js";
import { SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js";
import { DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH } from "../config/agent-limits.js";
import { loadConfig } from "../config/config.js";
import {
loadSessionStore,
@@ -8,7 +9,10 @@ import {
resolveStorePath,
} from "../config/sessions.js";
import { callGateway } from "../gateway/call.js";
import { normalizeMainKey } from "../routing/session-key.js";
import { createBoundDeliveryRouter } from "../infra/outbound/bound-delivery-router.js";
import type { ConversationRef } from "../infra/outbound/session-binding-service.js";
import { getGlobalHookRunner } from "../plugins/hook-runner-global.js";
import { normalizeAccountId, normalizeMainKey } from "../routing/session-key.js";
import { defaultRuntime } from "../runtime.js";
import { extractTextFromChatContent } from "../shared/chat-content.js";
import {
@@ -30,6 +34,8 @@ import {
} from "./pi-embedded.js";
import { type AnnounceQueueItem, enqueueAnnounce } from "./subagent-announce-queue.js";
import { getSubagentDepthFromSessionStore } from "./subagent-depth.js";
import type { SpawnSubagentMode } from "./subagent-spawn.js";
import { readLatestAssistantReply } from "./tools/agent-step.js";
import { sanitizeTextContent, extractAssistantText } from "./tools/sessions-helpers.js";
type ToolResultMessage = {
@@ -48,10 +54,26 @@ type SubagentAnnounceDeliveryResult = {
function buildCompletionDeliveryMessage(params: {
findings: string;
subagentName: string;
spawnMode?: SpawnSubagentMode;
outcome?: SubagentRunOutcome;
}): string {
const findingsText = params.findings.trim();
const hasFindings = findingsText.length > 0 && findingsText !== "(no output)";
const header = `✅ Subagent ${params.subagentName} finished`;
const header = (() => {
if (params.outcome?.status === "error") {
return params.spawnMode === "session"
? `❌ Subagent ${params.subagentName} failed this task (session remains active)`
: `❌ Subagent ${params.subagentName} failed`;
}
if (params.outcome?.status === "timeout") {
return params.spawnMode === "session"
? `⏱️ Subagent ${params.subagentName} timed out on this task (session remains active)`
: `⏱️ Subagent ${params.subagentName} timed out`;
}
return params.spawnMode === "session"
? `✅ Subagent ${params.subagentName} completed this task (session remains active)`
: `✅ Subagent ${params.subagentName} finished`;
})();
if (!hasFindings) {
return header;
}
@@ -153,16 +175,29 @@ function extractSubagentOutputText(message: unknown): string {
if (role === "toolResult" || role === "tool") {
return extractToolResultText((message as ToolResultMessage).content);
}
if (typeof content === "string") {
return sanitizeTextContent(content);
}
if (Array.isArray(content)) {
return extractInlineTextContent(content);
if (role == null) {
if (typeof content === "string") {
return sanitizeTextContent(content);
}
if (Array.isArray(content)) {
return extractInlineTextContent(content);
}
}
return "";
}
async function readLatestSubagentOutput(sessionKey: string): Promise<string | undefined> {
try {
const latestAssistant = await readLatestAssistantReply({
sessionKey,
limit: 50,
});
if (latestAssistant?.trim()) {
return latestAssistant;
}
} catch {
// Best-effort: fall back to richer history parsing below.
}
const history = await callGateway<{ messages?: Array<unknown> }>({
method: "chat.history",
params: { sessionKey, limit: 50 },
@@ -195,6 +230,31 @@ async function readLatestSubagentOutputWithRetry(params: {
return result;
}
async function waitForSubagentOutputChange(params: {
sessionKey: string;
baselineReply: string;
maxWaitMs: number;
}): Promise<string> {
const baseline = params.baselineReply.trim();
if (!baseline) {
return params.baselineReply;
}
const RETRY_INTERVAL_MS = 100;
const deadline = Date.now() + Math.max(0, Math.min(params.maxWaitMs, 5_000));
let latest = params.baselineReply;
while (Date.now() < deadline) {
const next = await readLatestSubagentOutput(params.sessionKey);
if (next?.trim()) {
latest = next;
if (next.trim() !== baseline) {
return next;
}
}
await new Promise((resolve) => setTimeout(resolve, RETRY_INTERVAL_MS));
}
return latest;
}
function formatDurationShort(valueMs?: number) {
if (!valueMs || !Number.isFinite(valueMs) || valueMs <= 0) {
return "n/a";
@@ -287,7 +347,117 @@ function resolveAnnounceOrigin(
// requesterOrigin (captured at spawn time) reflects the channel the user is
// actually on and must take priority over the session entry, which may carry
// stale lastChannel / lastTo values from a previous channel interaction.
return mergeDeliveryContext(normalizedRequester, normalizedEntry);
const entryForMerge =
normalizedRequester?.to &&
normalizedRequester.threadId == null &&
normalizedEntry?.threadId != null
? (() => {
const { threadId: _ignore, ...rest } = normalizedEntry;
return rest;
})()
: normalizedEntry;
return mergeDeliveryContext(normalizedRequester, entryForMerge);
}
async function resolveSubagentCompletionOrigin(params: {
childSessionKey: string;
requesterSessionKey: string;
requesterOrigin?: DeliveryContext;
childRunId?: string;
spawnMode?: SpawnSubagentMode;
expectsCompletionMessage: boolean;
}): Promise<{
origin?: DeliveryContext;
routeMode: "bound" | "fallback" | "hook";
}> {
const requesterOrigin = normalizeDeliveryContext(params.requesterOrigin);
const requesterConversation = (() => {
const channel = requesterOrigin?.channel?.trim().toLowerCase();
const to = requesterOrigin?.to?.trim();
const accountId = normalizeAccountId(requesterOrigin?.accountId);
const threadId =
requesterOrigin?.threadId != null && requesterOrigin.threadId !== ""
? String(requesterOrigin.threadId).trim()
: undefined;
const conversationId =
threadId || (to?.startsWith("channel:") ? to.slice("channel:".length) : "");
if (!channel || !conversationId) {
return undefined;
}
const ref: ConversationRef = {
channel,
accountId,
conversationId,
};
return ref;
})();
const route = createBoundDeliveryRouter().resolveDestination({
eventKind: "task_completion",
targetSessionKey: params.childSessionKey,
requester: requesterConversation,
failClosed: false,
});
if (route.mode === "bound" && route.binding) {
const boundOrigin: DeliveryContext = {
channel: route.binding.conversation.channel,
accountId: route.binding.conversation.accountId,
to: `channel:${route.binding.conversation.conversationId}`,
threadId: route.binding.conversation.conversationId,
};
return {
// Bound target is authoritative; requester hints fill only missing fields.
origin: mergeDeliveryContext(boundOrigin, requesterOrigin),
routeMode: "bound",
};
}
const hookRunner = getGlobalHookRunner();
if (!hookRunner?.hasHooks("subagent_delivery_target")) {
return {
origin: requesterOrigin,
routeMode: "fallback",
};
}
try {
const result = await hookRunner.runSubagentDeliveryTarget(
{
childSessionKey: params.childSessionKey,
requesterSessionKey: params.requesterSessionKey,
requesterOrigin,
childRunId: params.childRunId,
spawnMode: params.spawnMode,
expectsCompletionMessage: params.expectsCompletionMessage,
},
{
runId: params.childRunId,
childSessionKey: params.childSessionKey,
requesterSessionKey: params.requesterSessionKey,
},
);
const hookOrigin = normalizeDeliveryContext(result?.origin);
if (!hookOrigin) {
return {
origin: requesterOrigin,
routeMode: "fallback",
};
}
if (hookOrigin.channel && !isDeliverableMessageChannel(hookOrigin.channel)) {
return {
origin: requesterOrigin,
routeMode: "fallback",
};
}
// Hook-provided origin should override requester defaults when present.
return {
origin: mergeDeliveryContext(hookOrigin, requesterOrigin),
routeMode: "hook",
};
} catch {
return {
origin: requesterOrigin,
routeMode: "fallback",
};
}
}
async function sendAnnounce(item: AnnounceQueueItem) {
@@ -434,6 +604,8 @@ async function sendSubagentAnnounceDirectly(params: {
triggerMessage: string;
completionMessage?: string;
expectsCompletionMessage: boolean;
completionRouteMode?: "bound" | "fallback" | "hook";
spawnMode?: SpawnSubagentMode;
directIdempotencyKey: string;
completionDirectOrigin?: DeliveryContext;
directOrigin?: DeliveryContext;
@@ -464,28 +636,52 @@ async function sendSubagentAnnounceDirectly(params: {
hasCompletionDirectTarget &&
params.completionMessage?.trim()
) {
const completionThreadId =
completionDirectOrigin?.threadId != null && completionDirectOrigin.threadId !== ""
? String(completionDirectOrigin.threadId)
: undefined;
await callGateway({
method: "send",
params: {
channel: completionChannel,
to: completionTo,
accountId: completionDirectOrigin?.accountId,
threadId: completionThreadId,
sessionKey: canonicalRequesterSessionKey,
message: params.completionMessage,
idempotencyKey: params.directIdempotencyKey,
},
timeoutMs: 15_000,
});
const forceBoundSessionDirectDelivery =
params.spawnMode === "session" &&
(params.completionRouteMode === "bound" || params.completionRouteMode === "hook");
let shouldSendCompletionDirectly = true;
if (!forceBoundSessionDirectDelivery) {
let activeDescendantRuns = 0;
try {
const { countActiveDescendantRuns } = await import("./subagent-registry.js");
activeDescendantRuns = Math.max(
0,
countActiveDescendantRuns(canonicalRequesterSessionKey),
);
} catch {
// Best-effort only; when unavailable keep historical direct-send behavior.
}
// Keep non-bound completion announcements coordinated via requester
// session routing while sibling/descendant runs are still active.
if (activeDescendantRuns > 0) {
shouldSendCompletionDirectly = false;
}
}
return {
delivered: true,
path: "direct",
};
if (shouldSendCompletionDirectly) {
const completionThreadId =
completionDirectOrigin?.threadId != null && completionDirectOrigin.threadId !== ""
? String(completionDirectOrigin.threadId)
: undefined;
await callGateway({
method: "send",
params: {
channel: completionChannel,
to: completionTo,
accountId: completionDirectOrigin?.accountId,
threadId: completionThreadId,
sessionKey: canonicalRequesterSessionKey,
message: params.completionMessage,
idempotencyKey: params.directIdempotencyKey,
},
timeoutMs: 15_000,
});
return {
delivered: true,
path: "direct",
};
}
}
const directOrigin = normalizeDeliveryContext(params.directOrigin);
@@ -534,6 +730,8 @@ async function deliverSubagentAnnouncement(params: {
targetRequesterSessionKey: string;
requesterIsSubagent: boolean;
expectsCompletionMessage: boolean;
completionRouteMode?: "bound" | "fallback" | "hook";
spawnMode?: SpawnSubagentMode;
directIdempotencyKey: string;
}): Promise<SubagentAnnounceDeliveryResult> {
// Non-completion mode mirrors historical behavior: try queued/steered delivery first,
@@ -560,6 +758,8 @@ async function deliverSubagentAnnouncement(params: {
completionMessage: params.completionMessage,
directIdempotencyKey: params.directIdempotencyKey,
completionDirectOrigin: params.completionDirectOrigin,
completionRouteMode: params.completionRouteMode,
spawnMode: params.spawnMode,
directOrigin: params.directOrigin,
requesterIsSubagent: params.requesterIsSubagent,
expectsCompletionMessage: params.expectsCompletionMessage,
@@ -608,7 +808,10 @@ export function buildSubagentSystemPrompt(params: {
? params.task.replace(/\s+/g, " ").trim()
: "{{TASK_DESCRIPTION}}";
const childDepth = typeof params.childDepth === "number" ? params.childDepth : 1;
const maxSpawnDepth = typeof params.maxSpawnDepth === "number" ? params.maxSpawnDepth : 1;
const maxSpawnDepth =
typeof params.maxSpawnDepth === "number"
? params.maxSpawnDepth
: DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH;
const canSpawn = childDepth < maxSpawnDepth;
const parentLabel = childDepth >= 2 ? "parent orchestrator" : "main agent";
@@ -694,9 +897,6 @@ function buildAnnounceReplyInstruction(params: {
announceType: SubagentAnnounceType;
expectsCompletionMessage?: boolean;
}): string {
if (params.expectsCompletionMessage) {
return `A completed ${params.announceType} is ready for user delivery. Convert the result above into your normal assistant voice and send that user-facing update now. Keep this internal context private (don't mention system/log/stats/session details or announce type).`;
}
if (params.remainingActiveSubagentRuns > 0) {
const activeRunsLabel = params.remainingActiveSubagentRuns === 1 ? "run" : "runs";
return `There are still ${params.remainingActiveSubagentRuns} active subagent ${activeRunsLabel} for this session. If they are part of the same workflow, wait for the remaining results before sending a user update. If they are unrelated, respond normally using only the result above.`;
@@ -704,6 +904,9 @@ function buildAnnounceReplyInstruction(params: {
if (params.requesterIsSubagent) {
return `Convert this completion into a concise internal orchestration update for your parent agent in your own words. Keep this internal context private (don't mention system/log/stats/session details or announce type). If this result is duplicate or no update is needed, reply ONLY: ${SILENT_REPLY_TOKEN}.`;
}
if (params.expectsCompletionMessage) {
return `A completed ${params.announceType} is ready for user delivery. Convert the result above into your normal assistant voice and send that user-facing update now. Keep this internal context private (don't mention system/log/stats/session details or announce type).`;
}
return `A completed ${params.announceType} is ready for user delivery. Convert the result above into your normal assistant voice and send that user-facing update now. Keep this internal context private (don't mention system/log/stats/session details or announce type), and do not copy the system message verbatim. Reply ONLY: ${SILENT_REPLY_TOKEN} if this exact result was already delivered to the user in this same turn.`;
}
@@ -724,6 +927,7 @@ export async function runSubagentAnnounceFlow(params: {
outcome?: SubagentRunOutcome;
announceType?: SubagentAnnounceType;
expectsCompletionMessage?: boolean;
spawnMode?: SpawnSubagentMode;
}): Promise<boolean> {
let didAnnounce = false;
const expectsCompletionMessage = params.expectsCompletionMessage === true;
@@ -742,7 +946,7 @@ export async function runSubagentAnnounceFlow(params: {
let outcome: SubagentRunOutcome | undefined = params.outcome;
// Lifecycle "end" can arrive before auto-compaction retries finish. If the
// subagent is still active, wait for the embedded run to fully settle.
if (!expectsCompletionMessage && childSessionId && isEmbeddedPiRunActive(childSessionId)) {
if (childSessionId && isEmbeddedPiRunActive(childSessionId)) {
const settled = await waitForEmbeddedPiRunEnd(childSessionId, settleTimeoutMs);
if (!settled && isEmbeddedPiRunActive(childSessionId)) {
// The child run is still active (e.g., compaction retry still in progress).
@@ -816,6 +1020,8 @@ export async function runSubagentAnnounceFlow(params: {
outcome = { status: "unknown" };
}
let requesterDepth = getSubagentDepthFromSessionStore(targetRequesterSessionKey);
let activeChildDescendantRuns = 0;
try {
const { countActiveDescendantRuns } = await import("./subagent-registry.js");
@@ -823,13 +1029,21 @@ export async function runSubagentAnnounceFlow(params: {
} catch {
// Best-effort only; fall back to direct announce behavior when unavailable.
}
if (!expectsCompletionMessage && activeChildDescendantRuns > 0) {
if (activeChildDescendantRuns > 0) {
// The finished run still has active descendant subagents. Defer announcing
// this run until descendants settle so we avoid posting in-progress updates.
shouldDeleteChildSession = false;
return false;
}
if (requesterDepth >= 1 && reply?.trim()) {
reply = await waitForSubagentOutputChange({
sessionKey: params.childSessionKey,
baselineReply: reply,
maxWaitMs: Math.max(250, Math.min(params.timeoutMs, 2_000)),
});
}
// Build status label
const statusLabel =
outcome.status === "ok"
@@ -849,8 +1063,7 @@ export async function runSubagentAnnounceFlow(params: {
let completionMessage = "";
let triggerMessage = "";
let requesterDepth = getSubagentDepthFromSessionStore(targetRequesterSessionKey);
let requesterIsSubagent = !expectsCompletionMessage && requesterDepth >= 1;
let requesterIsSubagent = requesterDepth >= 1;
// If the requester subagent has already finished, bubble the announce to its
// requester (typically main) so descendant completion is not silently lost.
// BUT: only fallback if the parent SESSION is deleted, not just if the current
@@ -913,6 +1126,8 @@ export async function runSubagentAnnounceFlow(params: {
completionMessage = buildCompletionDeliveryMessage({
findings,
subagentName,
spawnMode: params.spawnMode,
outcome,
});
const internalSummaryMessage = [
`[System Message] [sessionId: ${announceSessionId}] A ${announceType} "${taskLabel}" just ${statusLabel}.`,
@@ -935,6 +1150,21 @@ export async function runSubagentAnnounceFlow(params: {
const { entry } = loadRequesterSessionEntry(targetRequesterSessionKey);
directOrigin = resolveAnnounceOrigin(entry, targetRequesterOrigin);
}
const completionResolution =
expectsCompletionMessage && !requesterIsSubagent
? await resolveSubagentCompletionOrigin({
childSessionKey: params.childSessionKey,
requesterSessionKey: targetRequesterSessionKey,
requesterOrigin: directOrigin,
childRunId: params.childRunId,
spawnMode: params.spawnMode,
expectsCompletionMessage,
})
: {
origin: targetRequesterOrigin,
routeMode: "fallback" as const,
};
const completionDirectOrigin = completionResolution.origin;
// Use a deterministic idempotency key so the gateway dedup cache
// catches duplicates if this announce is also queued by the gateway-
// level message queue while the main session is busy (#17122).
@@ -945,12 +1175,17 @@ export async function runSubagentAnnounceFlow(params: {
triggerMessage,
completionMessage,
summaryLine: taskLabel,
requesterOrigin: targetRequesterOrigin,
completionDirectOrigin: targetRequesterOrigin,
requesterOrigin:
expectsCompletionMessage && !requesterIsSubagent
? completionDirectOrigin
: targetRequesterOrigin,
completionDirectOrigin,
directOrigin,
targetRequesterSessionKey,
requesterIsSubagent,
expectsCompletionMessage: expectsCompletionMessage,
completionRouteMode: completionResolution.routeMode,
spawnMode: params.spawnMode,
directIdempotencyKey,
});
didAnnounce = delivery.delivered;
@@ -979,7 +1214,11 @@ export async function runSubagentAnnounceFlow(params: {
try {
await callGateway({
method: "sessions.delete",
params: { key: params.childSessionKey, deleteTranscript: true },
params: {
key: params.childSessionKey,
deleteTranscript: true,
emitLifecycleHooks: false,
},
timeoutMs: 10_000,
});
} catch {

View File

@@ -0,0 +1,47 @@
export const SUBAGENT_TARGET_KIND_SUBAGENT = "subagent" as const;
export const SUBAGENT_TARGET_KIND_ACP = "acp" as const;
export type SubagentLifecycleTargetKind =
| typeof SUBAGENT_TARGET_KIND_SUBAGENT
| typeof SUBAGENT_TARGET_KIND_ACP;
export const SUBAGENT_ENDED_REASON_COMPLETE = "subagent-complete" as const;
export const SUBAGENT_ENDED_REASON_ERROR = "subagent-error" as const;
export const SUBAGENT_ENDED_REASON_KILLED = "subagent-killed" as const;
export const SUBAGENT_ENDED_REASON_SESSION_RESET = "session-reset" as const;
export const SUBAGENT_ENDED_REASON_SESSION_DELETE = "session-delete" as const;
export type SubagentLifecycleEndedReason =
| typeof SUBAGENT_ENDED_REASON_COMPLETE
| typeof SUBAGENT_ENDED_REASON_ERROR
| typeof SUBAGENT_ENDED_REASON_KILLED
| typeof SUBAGENT_ENDED_REASON_SESSION_RESET
| typeof SUBAGENT_ENDED_REASON_SESSION_DELETE;
export type SubagentSessionLifecycleEndedReason =
| typeof SUBAGENT_ENDED_REASON_SESSION_RESET
| typeof SUBAGENT_ENDED_REASON_SESSION_DELETE;
export const SUBAGENT_ENDED_OUTCOME_OK = "ok" as const;
export const SUBAGENT_ENDED_OUTCOME_ERROR = "error" as const;
export const SUBAGENT_ENDED_OUTCOME_TIMEOUT = "timeout" as const;
export const SUBAGENT_ENDED_OUTCOME_KILLED = "killed" as const;
export const SUBAGENT_ENDED_OUTCOME_RESET = "reset" as const;
export const SUBAGENT_ENDED_OUTCOME_DELETED = "deleted" as const;
export type SubagentLifecycleEndedOutcome =
| typeof SUBAGENT_ENDED_OUTCOME_OK
| typeof SUBAGENT_ENDED_OUTCOME_ERROR
| typeof SUBAGENT_ENDED_OUTCOME_TIMEOUT
| typeof SUBAGENT_ENDED_OUTCOME_KILLED
| typeof SUBAGENT_ENDED_OUTCOME_RESET
| typeof SUBAGENT_ENDED_OUTCOME_DELETED;
export function resolveSubagentSessionEndedOutcome(
reason: SubagentSessionLifecycleEndedReason,
): SubagentLifecycleEndedOutcome {
if (reason === SUBAGENT_ENDED_REASON_SESSION_RESET) {
return SUBAGENT_ENDED_OUTCOME_RESET;
}
return SUBAGENT_ENDED_OUTCOME_DELETED;
}

View File

@@ -0,0 +1,67 @@
import {
SUBAGENT_ENDED_REASON_COMPLETE,
type SubagentLifecycleEndedReason,
} from "./subagent-lifecycle-events.js";
import type { SubagentRunRecord } from "./subagent-registry.types.js";
export type DeferredCleanupDecision =
| {
kind: "defer-descendants";
delayMs: number;
}
| {
kind: "give-up";
reason: "retry-limit" | "expiry";
retryCount?: number;
}
| {
kind: "retry";
retryCount: number;
resumeDelayMs?: number;
};
export function resolveCleanupCompletionReason(
entry: SubagentRunRecord,
): SubagentLifecycleEndedReason {
return entry.endedReason ?? SUBAGENT_ENDED_REASON_COMPLETE;
}
function resolveEndedAgoMs(entry: SubagentRunRecord, now: number): number {
return typeof entry.endedAt === "number" ? now - entry.endedAt : 0;
}
export function resolveDeferredCleanupDecision(params: {
entry: SubagentRunRecord;
now: number;
activeDescendantRuns: number;
announceExpiryMs: number;
maxAnnounceRetryCount: number;
deferDescendantDelayMs: number;
resolveAnnounceRetryDelayMs: (retryCount: number) => number;
}): DeferredCleanupDecision {
const endedAgo = resolveEndedAgoMs(params.entry, params.now);
if (params.entry.expectsCompletionMessage === true && params.activeDescendantRuns > 0) {
if (endedAgo > params.announceExpiryMs) {
return { kind: "give-up", reason: "expiry" };
}
return { kind: "defer-descendants", delayMs: params.deferDescendantDelayMs };
}
const retryCount = (params.entry.announceRetryCount ?? 0) + 1;
if (retryCount >= params.maxAnnounceRetryCount || endedAgo > params.announceExpiryMs) {
return {
kind: "give-up",
reason: retryCount >= params.maxAnnounceRetryCount ? "retry-limit" : "expiry",
retryCount,
};
}
return {
kind: "retry",
retryCount,
resumeDelayMs:
params.entry.expectsCompletionMessage === true
? params.resolveAnnounceRetryDelayMs(retryCount)
: undefined,
};
}

View File

@@ -0,0 +1,79 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { SUBAGENT_ENDED_REASON_COMPLETE } from "./subagent-lifecycle-events.js";
import type { SubagentRunRecord } from "./subagent-registry.types.js";
const lifecycleMocks = vi.hoisted(() => ({
getGlobalHookRunner: vi.fn(),
runSubagentEnded: vi.fn(async () => {}),
}));
vi.mock("../plugins/hook-runner-global.js", () => ({
getGlobalHookRunner: () => lifecycleMocks.getGlobalHookRunner(),
}));
import { emitSubagentEndedHookOnce } from "./subagent-registry-completion.js";
function createRunEntry(): SubagentRunRecord {
return {
runId: "run-1",
childSessionKey: "agent:main:subagent:child-1",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
task: "task",
cleanup: "keep",
createdAt: Date.now(),
};
}
describe("emitSubagentEndedHookOnce", () => {
beforeEach(() => {
lifecycleMocks.getGlobalHookRunner.mockReset();
lifecycleMocks.runSubagentEnded.mockClear();
});
it("records ended hook marker even when no subagent_ended hooks are registered", async () => {
lifecycleMocks.getGlobalHookRunner.mockReturnValue({
hasHooks: () => false,
runSubagentEnded: lifecycleMocks.runSubagentEnded,
});
const entry = createRunEntry();
const persist = vi.fn();
const emitted = await emitSubagentEndedHookOnce({
entry,
reason: SUBAGENT_ENDED_REASON_COMPLETE,
sendFarewell: true,
accountId: "acct-1",
inFlightRunIds: new Set<string>(),
persist,
});
expect(emitted).toBe(true);
expect(lifecycleMocks.runSubagentEnded).not.toHaveBeenCalled();
expect(typeof entry.endedHookEmittedAt).toBe("number");
expect(persist).toHaveBeenCalledTimes(1);
});
it("runs subagent_ended hooks when available", async () => {
lifecycleMocks.getGlobalHookRunner.mockReturnValue({
hasHooks: () => true,
runSubagentEnded: lifecycleMocks.runSubagentEnded,
});
const entry = createRunEntry();
const persist = vi.fn();
const emitted = await emitSubagentEndedHookOnce({
entry,
reason: SUBAGENT_ENDED_REASON_COMPLETE,
sendFarewell: true,
accountId: "acct-1",
inFlightRunIds: new Set<string>(),
persist,
});
expect(emitted).toBe(true);
expect(lifecycleMocks.runSubagentEnded).toHaveBeenCalledTimes(1);
expect(typeof entry.endedHookEmittedAt).toBe("number");
expect(persist).toHaveBeenCalledTimes(1);
});
});

View File

@@ -0,0 +1,96 @@
import { getGlobalHookRunner } from "../plugins/hook-runner-global.js";
import type { SubagentRunOutcome } from "./subagent-announce.js";
import {
SUBAGENT_ENDED_OUTCOME_ERROR,
SUBAGENT_ENDED_OUTCOME_OK,
SUBAGENT_ENDED_OUTCOME_TIMEOUT,
SUBAGENT_TARGET_KIND_SUBAGENT,
type SubagentLifecycleEndedOutcome,
type SubagentLifecycleEndedReason,
} from "./subagent-lifecycle-events.js";
import type { SubagentRunRecord } from "./subagent-registry.types.js";
export function runOutcomesEqual(
a: SubagentRunOutcome | undefined,
b: SubagentRunOutcome | undefined,
): boolean {
if (!a && !b) {
return true;
}
if (!a || !b) {
return false;
}
if (a.status !== b.status) {
return false;
}
if (a.status === "error" && b.status === "error") {
return (a.error ?? "") === (b.error ?? "");
}
return true;
}
export function resolveLifecycleOutcomeFromRunOutcome(
outcome: SubagentRunOutcome | undefined,
): SubagentLifecycleEndedOutcome {
if (outcome?.status === "error") {
return SUBAGENT_ENDED_OUTCOME_ERROR;
}
if (outcome?.status === "timeout") {
return SUBAGENT_ENDED_OUTCOME_TIMEOUT;
}
return SUBAGENT_ENDED_OUTCOME_OK;
}
export async function emitSubagentEndedHookOnce(params: {
entry: SubagentRunRecord;
reason: SubagentLifecycleEndedReason;
sendFarewell?: boolean;
accountId?: string;
outcome?: SubagentLifecycleEndedOutcome;
error?: string;
inFlightRunIds: Set<string>;
persist: () => void;
}) {
const runId = params.entry.runId.trim();
if (!runId) {
return false;
}
if (params.entry.endedHookEmittedAt) {
return false;
}
if (params.inFlightRunIds.has(runId)) {
return false;
}
params.inFlightRunIds.add(runId);
try {
const hookRunner = getGlobalHookRunner();
if (hookRunner?.hasHooks("subagent_ended")) {
await hookRunner.runSubagentEnded(
{
targetSessionKey: params.entry.childSessionKey,
targetKind: SUBAGENT_TARGET_KIND_SUBAGENT,
reason: params.reason,
sendFarewell: params.sendFarewell,
accountId: params.accountId,
runId: params.entry.runId,
endedAt: params.entry.endedAt,
outcome: params.outcome,
error: params.error,
},
{
runId: params.entry.runId,
childSessionKey: params.entry.childSessionKey,
requesterSessionKey: params.entry.requesterSessionKey,
},
);
}
params.entry.endedHookEmittedAt = Date.now();
params.persist();
return true;
} catch {
return false;
} finally {
params.inFlightRunIds.delete(runId);
}
}

View File

@@ -0,0 +1,146 @@
import type { DeliveryContext } from "../utils/delivery-context.js";
import type { SubagentRunRecord } from "./subagent-registry.types.js";
export function findRunIdsByChildSessionKeyFromRuns(
runs: Map<string, SubagentRunRecord>,
childSessionKey: string,
): string[] {
const key = childSessionKey.trim();
if (!key) {
return [];
}
const runIds: string[] = [];
for (const [runId, entry] of runs.entries()) {
if (entry.childSessionKey === key) {
runIds.push(runId);
}
}
return runIds;
}
export function listRunsForRequesterFromRuns(
runs: Map<string, SubagentRunRecord>,
requesterSessionKey: string,
): SubagentRunRecord[] {
const key = requesterSessionKey.trim();
if (!key) {
return [];
}
return [...runs.values()].filter((entry) => entry.requesterSessionKey === key);
}
export function resolveRequesterForChildSessionFromRuns(
runs: Map<string, SubagentRunRecord>,
childSessionKey: string,
): {
requesterSessionKey: string;
requesterOrigin?: DeliveryContext;
} | null {
const key = childSessionKey.trim();
if (!key) {
return null;
}
let best: SubagentRunRecord | undefined;
for (const entry of runs.values()) {
if (entry.childSessionKey !== key) {
continue;
}
if (!best || entry.createdAt > best.createdAt) {
best = entry;
}
}
if (!best) {
return null;
}
return {
requesterSessionKey: best.requesterSessionKey,
requesterOrigin: best.requesterOrigin,
};
}
export function countActiveRunsForSessionFromRuns(
runs: Map<string, SubagentRunRecord>,
requesterSessionKey: string,
): number {
const key = requesterSessionKey.trim();
if (!key) {
return 0;
}
let count = 0;
for (const entry of runs.values()) {
if (entry.requesterSessionKey !== key) {
continue;
}
if (typeof entry.endedAt === "number") {
continue;
}
count += 1;
}
return count;
}
export function countActiveDescendantRunsFromRuns(
runs: Map<string, SubagentRunRecord>,
rootSessionKey: string,
): number {
const root = rootSessionKey.trim();
if (!root) {
return 0;
}
const pending = [root];
const visited = new Set<string>([root]);
let count = 0;
while (pending.length > 0) {
const requester = pending.shift();
if (!requester) {
continue;
}
for (const entry of runs.values()) {
if (entry.requesterSessionKey !== requester) {
continue;
}
if (typeof entry.endedAt !== "number") {
count += 1;
}
const childKey = entry.childSessionKey.trim();
if (!childKey || visited.has(childKey)) {
continue;
}
visited.add(childKey);
pending.push(childKey);
}
}
return count;
}
export function listDescendantRunsForRequesterFromRuns(
runs: Map<string, SubagentRunRecord>,
rootSessionKey: string,
): SubagentRunRecord[] {
const root = rootSessionKey.trim();
if (!root) {
return [];
}
const pending = [root];
const visited = new Set<string>([root]);
const descendants: SubagentRunRecord[] = [];
while (pending.length > 0) {
const requester = pending.shift();
if (!requester) {
continue;
}
for (const entry of runs.values()) {
if (entry.requesterSessionKey !== requester) {
continue;
}
descendants.push(entry);
const childKey = entry.childSessionKey.trim();
if (!childKey || visited.has(childKey)) {
continue;
}
visited.add(childKey);
pending.push(childKey);
}
}
return descendants;
}

View File

@@ -0,0 +1,56 @@
import {
loadSubagentRegistryFromDisk,
saveSubagentRegistryToDisk,
} from "./subagent-registry.store.js";
import type { SubagentRunRecord } from "./subagent-registry.types.js";
export function persistSubagentRunsToDisk(runs: Map<string, SubagentRunRecord>) {
try {
saveSubagentRegistryToDisk(runs);
} catch {
// ignore persistence failures
}
}
export function restoreSubagentRunsFromDisk(params: {
runs: Map<string, SubagentRunRecord>;
mergeOnly?: boolean;
}) {
const restored = loadSubagentRegistryFromDisk();
if (restored.size === 0) {
return 0;
}
let added = 0;
for (const [runId, entry] of restored.entries()) {
if (!runId || !entry) {
continue;
}
if (params.mergeOnly && params.runs.has(runId)) {
continue;
}
params.runs.set(runId, entry);
added += 1;
}
return added;
}
export function getSubagentRunsSnapshotForRead(
inMemoryRuns: Map<string, SubagentRunRecord>,
): Map<string, SubagentRunRecord> {
const merged = new Map<string, SubagentRunRecord>();
const shouldReadDisk = !(process.env.VITEST || process.env.NODE_ENV === "test");
if (shouldReadDisk) {
try {
// Persisted state lets other worker processes observe active runs.
for (const [runId, entry] of loadSubagentRegistryFromDisk().entries()) {
merged.set(runId, entry);
}
} catch {
// Ignore disk read failures and fall back to local memory.
}
}
for (const [runId, entry] of inMemoryRuns.entries()) {
merged.set(runId, entry);
}
return merged;
}

View File

@@ -0,0 +1,90 @@
import { afterEach, beforeAll, describe, expect, it, vi } from "vitest";
const noop = () => {};
vi.mock("../gateway/call.js", () => ({
callGateway: vi.fn(async (request: unknown) => {
const method = (request as { method?: string }).method;
if (method === "agent.wait") {
// Keep lifecycle unsettled so register/replace assertions can inspect stored state.
return { status: "pending" };
}
return {};
}),
}));
vi.mock("../infra/agent-events.js", () => ({
onAgentEvent: vi.fn((_handler: unknown) => noop),
}));
vi.mock("../config/config.js", () => ({
loadConfig: vi.fn(() => ({
agents: { defaults: { subagents: { archiveAfterMinutes: 60 } } },
})),
}));
vi.mock("./subagent-announce.js", () => ({
runSubagentAnnounceFlow: vi.fn(async () => true),
}));
vi.mock("../plugins/hook-runner-global.js", () => ({
getGlobalHookRunner: vi.fn(() => null),
}));
vi.mock("./subagent-registry.store.js", () => ({
loadSubagentRegistryFromDisk: vi.fn(() => new Map()),
saveSubagentRegistryToDisk: vi.fn(() => {}),
}));
describe("subagent registry archive behavior", () => {
let mod: typeof import("./subagent-registry.js");
beforeAll(async () => {
mod = await import("./subagent-registry.js");
});
afterEach(() => {
mod.resetSubagentRegistryForTests({ persist: false });
});
it("does not set archiveAtMs for persistent session-mode runs", () => {
mod.registerSubagentRun({
runId: "run-session-1",
childSessionKey: "agent:main:subagent:session-1",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
task: "persistent-session",
cleanup: "keep",
spawnMode: "session",
});
const run = mod.listSubagentRunsForRequester("agent:main:main")[0];
expect(run?.runId).toBe("run-session-1");
expect(run?.spawnMode).toBe("session");
expect(run?.archiveAtMs).toBeUndefined();
});
it("keeps archiveAtMs unset when replacing a session-mode run after steer restart", () => {
mod.registerSubagentRun({
runId: "run-old",
childSessionKey: "agent:main:subagent:session-1",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
task: "persistent-session",
cleanup: "keep",
spawnMode: "session",
});
const replaced = mod.replaceSubagentRunAfterSteer({
previousRunId: "run-old",
nextRunId: "run-new",
});
expect(replaced).toBe(true);
const run = mod
.listSubagentRunsForRequester("agent:main:main")
.find((entry) => entry.runId === "run-new");
expect(run?.spawnMode).toBe("session");
expect(run?.archiveAtMs).toBeUndefined();
});
});

View File

@@ -2,7 +2,17 @@ import { afterEach, beforeAll, describe, expect, it, vi } from "vitest";
const noop = () => {};
let lifecycleHandler:
| ((evt: { stream?: string; runId: string; data?: { phase?: string } }) => void)
| ((evt: {
stream?: string;
runId: string;
data?: {
phase?: string;
startedAt?: number;
endedAt?: number;
aborted?: boolean;
error?: string;
};
}) => void)
| undefined;
vi.mock("../gateway/call.js", () => ({
@@ -29,10 +39,18 @@ vi.mock("../config/config.js", () => ({
}));
const announceSpy = vi.fn(async (_params: unknown) => true);
const runSubagentEndedHookMock = vi.fn(async (_event?: unknown, _ctx?: unknown) => {});
vi.mock("./subagent-announce.js", () => ({
runSubagentAnnounceFlow: announceSpy,
}));
vi.mock("../plugins/hook-runner-global.js", () => ({
getGlobalHookRunner: vi.fn(() => ({
hasHooks: (hookName: string) => hookName === "subagent_ended",
runSubagentEnded: runSubagentEndedHookMock,
})),
}));
vi.mock("./subagent-registry.store.js", () => ({
loadSubagentRegistryFromDisk: vi.fn(() => new Map()),
saveSubagentRegistryToDisk: vi.fn(() => {}),
@@ -52,6 +70,7 @@ describe("subagent registry steer restarts", () => {
afterEach(async () => {
announceSpy.mockReset();
announceSpy.mockResolvedValue(true);
runSubagentEndedHookMock.mockClear();
lifecycleHandler = undefined;
mod.resetSubagentRegistryForTests({ persist: false });
});
@@ -80,6 +99,7 @@ describe("subagent registry steer restarts", () => {
await flushAnnounce();
expect(announceSpy).not.toHaveBeenCalled();
expect(runSubagentEndedHookMock).not.toHaveBeenCalled();
const replaced = mod.replaceSubagentRunAfterSteer({
previousRunId: "run-old",
@@ -100,11 +120,152 @@ describe("subagent registry steer restarts", () => {
await flushAnnounce();
expect(announceSpy).toHaveBeenCalledTimes(1);
expect(runSubagentEndedHookMock).toHaveBeenCalledTimes(1);
expect(runSubagentEndedHookMock).toHaveBeenCalledWith(
expect.objectContaining({
runId: "run-new",
}),
expect.objectContaining({
runId: "run-new",
}),
);
const announce = (announceSpy.mock.calls[0]?.[0] ?? {}) as { childRunId?: string };
expect(announce.childRunId).toBe("run-new");
});
it("defers subagent_ended hook for completion-mode runs until announce delivery resolves", async () => {
const callGateway = vi.mocked((await import("../gateway/call.js")).callGateway);
const originalCallGateway = callGateway.getMockImplementation();
callGateway.mockImplementation(async (request: unknown) => {
const typed = request as { method?: string };
if (typed.method === "agent.wait") {
return new Promise<unknown>(() => undefined);
}
if (originalCallGateway) {
return originalCallGateway(request as Parameters<typeof callGateway>[0]);
}
return {};
});
try {
let resolveAnnounce!: (value: boolean) => void;
announceSpy.mockImplementationOnce(
() =>
new Promise<boolean>((resolve) => {
resolveAnnounce = resolve;
}),
);
mod.registerSubagentRun({
runId: "run-completion-delayed",
childSessionKey: "agent:main:subagent:completion-delayed",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
requesterOrigin: {
channel: "discord",
to: "channel:123",
accountId: "work",
},
task: "completion-mode task",
cleanup: "keep",
expectsCompletionMessage: true,
});
lifecycleHandler?.({
stream: "lifecycle",
runId: "run-completion-delayed",
data: { phase: "end" },
});
await flushAnnounce();
expect(runSubagentEndedHookMock).not.toHaveBeenCalled();
resolveAnnounce(true);
await flushAnnounce();
expect(runSubagentEndedHookMock).toHaveBeenCalledTimes(1);
expect(runSubagentEndedHookMock).toHaveBeenCalledWith(
expect.objectContaining({
targetSessionKey: "agent:main:subagent:completion-delayed",
reason: "subagent-complete",
sendFarewell: true,
}),
expect.objectContaining({
runId: "run-completion-delayed",
requesterSessionKey: "agent:main:main",
}),
);
} finally {
if (originalCallGateway) {
callGateway.mockImplementation(originalCallGateway);
}
}
});
it("does not emit subagent_ended on completion for persistent session-mode runs", async () => {
const callGateway = vi.mocked((await import("../gateway/call.js")).callGateway);
const originalCallGateway = callGateway.getMockImplementation();
callGateway.mockImplementation(async (request: unknown) => {
const typed = request as { method?: string };
if (typed.method === "agent.wait") {
return new Promise<unknown>(() => undefined);
}
if (originalCallGateway) {
return originalCallGateway(request as Parameters<typeof callGateway>[0]);
}
return {};
});
try {
let resolveAnnounce!: (value: boolean) => void;
announceSpy.mockImplementationOnce(
() =>
new Promise<boolean>((resolve) => {
resolveAnnounce = resolve;
}),
);
mod.registerSubagentRun({
runId: "run-persistent-session",
childSessionKey: "agent:main:subagent:persistent-session",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
requesterOrigin: {
channel: "discord",
to: "channel:123",
accountId: "work",
},
task: "persistent session task",
cleanup: "keep",
expectsCompletionMessage: true,
spawnMode: "session",
});
lifecycleHandler?.({
stream: "lifecycle",
runId: "run-persistent-session",
data: { phase: "end" },
});
await flushAnnounce();
expect(runSubagentEndedHookMock).not.toHaveBeenCalled();
resolveAnnounce(true);
await flushAnnounce();
expect(runSubagentEndedHookMock).not.toHaveBeenCalled();
const run = mod.listSubagentRunsForRequester("agent:main:main")[0];
expect(run?.runId).toBe("run-persistent-session");
expect(run?.cleanupCompletedAt).toBeTypeOf("number");
expect(run?.endedHookEmittedAt).toBeUndefined();
} finally {
if (originalCallGateway) {
callGateway.mockImplementation(originalCallGateway);
}
}
});
it("clears announce retry state when replacing after steer restart", () => {
mod.registerSubagentRun({
runId: "run-retry-reset-old",
@@ -136,6 +297,56 @@ describe("subagent registry steer restarts", () => {
expect(runs[0].lastAnnounceRetryAt).toBeUndefined();
});
it("clears terminal lifecycle state when replacing after steer restart", async () => {
mod.registerSubagentRun({
runId: "run-terminal-state-old",
childSessionKey: "agent:main:subagent:terminal-state",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
task: "terminal state",
cleanup: "keep",
});
const previous = mod.listSubagentRunsForRequester("agent:main:main")[0];
expect(previous?.runId).toBe("run-terminal-state-old");
if (previous) {
previous.endedHookEmittedAt = Date.now();
previous.endedReason = "subagent-complete";
previous.endedAt = Date.now();
previous.outcome = { status: "ok" };
}
const replaced = mod.replaceSubagentRunAfterSteer({
previousRunId: "run-terminal-state-old",
nextRunId: "run-terminal-state-new",
fallback: previous,
});
expect(replaced).toBe(true);
const runs = mod.listSubagentRunsForRequester("agent:main:main");
expect(runs).toHaveLength(1);
expect(runs[0].runId).toBe("run-terminal-state-new");
expect(runs[0].endedHookEmittedAt).toBeUndefined();
expect(runs[0].endedReason).toBeUndefined();
lifecycleHandler?.({
stream: "lifecycle",
runId: "run-terminal-state-new",
data: { phase: "end" },
});
await flushAnnounce();
expect(runSubagentEndedHookMock).toHaveBeenCalledTimes(1);
expect(runSubagentEndedHookMock).toHaveBeenCalledWith(
expect.objectContaining({
runId: "run-terminal-state-new",
}),
expect.objectContaining({
runId: "run-terminal-state-new",
}),
);
});
it("restores announce for a finished run when steer replacement dispatch fails", async () => {
mod.registerSubagentRun({
runId: "run-failed-restart",
@@ -189,6 +400,24 @@ describe("subagent registry steer restarts", () => {
expect(run?.outcome).toEqual({ status: "error", error: "manual kill" });
expect(run?.cleanupHandled).toBe(true);
expect(typeof run?.cleanupCompletedAt).toBe("number");
expect(runSubagentEndedHookMock).toHaveBeenCalledWith(
{
targetSessionKey: childSessionKey,
targetKind: "subagent",
reason: "subagent-killed",
sendFarewell: true,
accountId: undefined,
runId: "run-killed",
endedAt: expect.any(Number),
outcome: "killed",
error: "manual kill",
},
{
runId: "run-killed",
childSessionKey,
requesterSessionKey: "agent:main:main",
},
);
});
it("retries deferred parent cleanup after a descendant announces", async () => {
@@ -302,4 +531,48 @@ describe("subagent registry steer restarts", () => {
vi.useRealTimers();
}
});
it("emits subagent_ended when completion cleanup expires with active descendants", async () => {
announceSpy.mockResolvedValue(false);
mod.registerSubagentRun({
runId: "run-parent-expiry",
childSessionKey: "agent:main:subagent:parent-expiry",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
task: "parent completion expiry",
cleanup: "keep",
expectsCompletionMessage: true,
});
mod.registerSubagentRun({
runId: "run-child-active",
childSessionKey: "agent:main:subagent:parent-expiry:subagent:child-active",
requesterSessionKey: "agent:main:subagent:parent-expiry",
requesterDisplayKey: "parent-expiry",
task: "child still running",
cleanup: "keep",
});
lifecycleHandler?.({
stream: "lifecycle",
runId: "run-parent-expiry",
data: {
phase: "end",
startedAt: Date.now() - 7 * 60_000,
endedAt: Date.now() - 6 * 60_000,
},
});
await flushAnnounce();
const parentHookCall = runSubagentEndedHookMock.mock.calls.find((call) => {
const event = call[0] as { runId?: string; reason?: string };
return event.runId === "run-parent-expiry" && event.reason === "subagent-complete";
});
expect(parentHookCall).toBeDefined();
const parent = mod
.listSubagentRunsForRequester("agent:main:main")
.find((entry) => entry.runId === "run-parent-expiry");
expect(parent?.cleanupCompletedAt).toBeTypeOf("number");
});
});

View File

@@ -3,7 +3,7 @@ import path from "node:path";
import { resolveStateDir } from "../config/paths.js";
import { loadJsonFile, saveJsonFile } from "../infra/json-file.js";
import { normalizeDeliveryContext } from "../utils/delivery-context.js";
import type { SubagentRunRecord } from "./subagent-registry.js";
import type { SubagentRunRecord } from "./subagent-registry.types.js";
export type PersistedSubagentRegistryVersion = 1 | 2;
@@ -101,6 +101,7 @@ export function loadSubagentRegistryFromDisk(): Map<string, SubagentRunRecord> {
requesterOrigin,
cleanupCompletedAt,
cleanupHandled,
spawnMode: typed.spawnMode === "session" ? "session" : "run",
});
if (isLegacy) {
migrated = true;

View File

@@ -6,36 +6,38 @@ import { type DeliveryContext, normalizeDeliveryContext } from "../utils/deliver
import { resetAnnounceQueuesForTests } from "./subagent-announce-queue.js";
import { runSubagentAnnounceFlow, type SubagentRunOutcome } from "./subagent-announce.js";
import {
loadSubagentRegistryFromDisk,
saveSubagentRegistryToDisk,
} from "./subagent-registry.store.js";
SUBAGENT_ENDED_OUTCOME_KILLED,
SUBAGENT_ENDED_REASON_COMPLETE,
SUBAGENT_ENDED_REASON_ERROR,
SUBAGENT_ENDED_REASON_KILLED,
type SubagentLifecycleEndedReason,
} from "./subagent-lifecycle-events.js";
import {
resolveCleanupCompletionReason,
resolveDeferredCleanupDecision,
} from "./subagent-registry-cleanup.js";
import {
emitSubagentEndedHookOnce,
resolveLifecycleOutcomeFromRunOutcome,
runOutcomesEqual,
} from "./subagent-registry-completion.js";
import {
countActiveDescendantRunsFromRuns,
countActiveRunsForSessionFromRuns,
findRunIdsByChildSessionKeyFromRuns,
listDescendantRunsForRequesterFromRuns,
listRunsForRequesterFromRuns,
resolveRequesterForChildSessionFromRuns,
} from "./subagent-registry-queries.js";
import {
getSubagentRunsSnapshotForRead,
persistSubagentRunsToDisk,
restoreSubagentRunsFromDisk,
} from "./subagent-registry-state.js";
import type { SubagentRunRecord } from "./subagent-registry.types.js";
import { resolveAgentTimeoutMs } from "./timeout.js";
export type SubagentRunRecord = {
runId: string;
childSessionKey: string;
requesterSessionKey: string;
requesterOrigin?: DeliveryContext;
requesterDisplayKey: string;
task: string;
cleanup: "delete" | "keep";
label?: string;
model?: string;
runTimeoutSeconds?: number;
createdAt: number;
startedAt?: number;
endedAt?: number;
outcome?: SubagentRunOutcome;
archiveAtMs?: number;
cleanupCompletedAt?: number;
cleanupHandled?: boolean;
suppressAnnounceReason?: "steer-restart" | "killed";
expectsCompletionMessage?: boolean;
/** Number of times announce delivery has been attempted and returned false (deferred). */
announceRetryCount?: number;
/** Timestamp of the last announce retry attempt (for backoff). */
lastAnnounceRetryAt?: number;
};
export type { SubagentRunRecord } from "./subagent-registry.types.js";
const subagentRuns = new Map<string, SubagentRunRecord>();
let sweeper: NodeJS.Timeout | null = null;
@@ -77,19 +79,117 @@ function logAnnounceGiveUp(entry: SubagentRunRecord, reason: "retry-limit" | "ex
}
function persistSubagentRuns() {
try {
saveSubagentRegistryToDisk(subagentRuns);
} catch {
// ignore persistence failures
}
persistSubagentRunsToDisk(subagentRuns);
}
const resumedRuns = new Set<string>();
const endedHookInFlightRunIds = new Set<string>();
function suppressAnnounceForSteerRestart(entry?: SubagentRunRecord) {
return entry?.suppressAnnounceReason === "steer-restart";
}
function shouldKeepThreadBindingAfterRun(params: {
entry: SubagentRunRecord;
reason: SubagentLifecycleEndedReason;
}) {
if (params.reason === SUBAGENT_ENDED_REASON_KILLED) {
return false;
}
return params.entry.spawnMode === "session";
}
function shouldEmitEndedHookForRun(params: {
entry: SubagentRunRecord;
reason: SubagentLifecycleEndedReason;
}) {
return !shouldKeepThreadBindingAfterRun(params);
}
async function emitSubagentEndedHookForRun(params: {
entry: SubagentRunRecord;
reason?: SubagentLifecycleEndedReason;
sendFarewell?: boolean;
accountId?: string;
}) {
const reason = params.reason ?? params.entry.endedReason ?? SUBAGENT_ENDED_REASON_COMPLETE;
const outcome = resolveLifecycleOutcomeFromRunOutcome(params.entry.outcome);
const error = params.entry.outcome?.status === "error" ? params.entry.outcome.error : undefined;
await emitSubagentEndedHookOnce({
entry: params.entry,
reason,
sendFarewell: params.sendFarewell,
accountId: params.accountId ?? params.entry.requesterOrigin?.accountId,
outcome,
error,
inFlightRunIds: endedHookInFlightRunIds,
persist: persistSubagentRuns,
});
}
async function completeSubagentRun(params: {
runId: string;
endedAt?: number;
outcome: SubagentRunOutcome;
reason: SubagentLifecycleEndedReason;
sendFarewell?: boolean;
accountId?: string;
triggerCleanup: boolean;
}) {
const entry = subagentRuns.get(params.runId);
if (!entry) {
return;
}
let mutated = false;
const endedAt = typeof params.endedAt === "number" ? params.endedAt : Date.now();
if (entry.endedAt !== endedAt) {
entry.endedAt = endedAt;
mutated = true;
}
if (!runOutcomesEqual(entry.outcome, params.outcome)) {
entry.outcome = params.outcome;
mutated = true;
}
if (entry.endedReason !== params.reason) {
entry.endedReason = params.reason;
mutated = true;
}
if (mutated) {
persistSubagentRuns();
}
const suppressedForSteerRestart = suppressAnnounceForSteerRestart(entry);
const shouldEmitEndedHook =
!suppressedForSteerRestart &&
shouldEmitEndedHookForRun({
entry,
reason: params.reason,
});
const shouldDeferEndedHook =
shouldEmitEndedHook &&
params.triggerCleanup &&
entry.expectsCompletionMessage === true &&
!suppressedForSteerRestart;
if (!shouldDeferEndedHook && shouldEmitEndedHook) {
await emitSubagentEndedHookForRun({
entry,
reason: params.reason,
sendFarewell: params.sendFarewell,
accountId: params.accountId,
});
}
if (!params.triggerCleanup) {
return;
}
if (suppressedForSteerRestart) {
return;
}
startSubagentAnnounceCleanupFlow(params.runId, entry);
}
function startSubagentAnnounceCleanupFlow(runId: string, entry: SubagentRunRecord): boolean {
if (!beginSubagentCleanup(runId)) {
return false;
@@ -102,7 +202,6 @@ function startSubagentAnnounceCleanupFlow(runId: string, entry: SubagentRunRecor
requesterOrigin,
requesterDisplayKey: entry.requesterDisplayKey,
task: entry.task,
expectsCompletionMessage: entry.expectsCompletionMessage,
timeoutMs: SUBAGENT_ANNOUNCE_TIMEOUT_MS,
cleanup: entry.cleanup,
waitForCompletion: false,
@@ -110,8 +209,10 @@ function startSubagentAnnounceCleanupFlow(runId: string, entry: SubagentRunRecor
endedAt: entry.endedAt,
label: entry.label,
outcome: entry.outcome,
spawnMode: entry.spawnMode,
expectsCompletionMessage: entry.expectsCompletionMessage,
}).then((didAnnounce) => {
finalizeSubagentCleanup(runId, entry.cleanup, didAnnounce);
void finalizeSubagentCleanup(runId, entry.cleanup, didAnnounce);
});
return true;
}
@@ -182,20 +283,13 @@ function restoreSubagentRunsOnce() {
}
restoreAttempted = true;
try {
const restored = loadSubagentRegistryFromDisk();
if (restored.size === 0) {
const restoredCount = restoreSubagentRunsFromDisk({
runs: subagentRuns,
mergeOnly: true,
});
if (restoredCount === 0) {
return;
}
for (const [runId, entry] of restored.entries()) {
if (!runId || !entry) {
continue;
}
// Keep any newer in-memory entries.
if (!subagentRuns.has(runId)) {
subagentRuns.set(runId, entry);
}
}
// Resume pending work.
ensureListener();
if ([...subagentRuns.values()].some((entry) => entry.archiveAtMs)) {
@@ -255,7 +349,11 @@ async function sweepSubagentRuns() {
try {
await callGateway({
method: "sessions.delete",
params: { key: entry.childSessionKey, deleteTranscript: true },
params: {
key: entry.childSessionKey,
deleteTranscript: true,
emitLifecycleHooks: false,
},
timeoutMs: 10_000,
});
} catch {
@@ -276,93 +374,154 @@ function ensureListener() {
}
listenerStarted = true;
listenerStop = onAgentEvent((evt) => {
if (!evt || evt.stream !== "lifecycle") {
return;
}
const entry = subagentRuns.get(evt.runId);
if (!entry) {
return;
}
const phase = evt.data?.phase;
if (phase === "start") {
const startedAt = typeof evt.data?.startedAt === "number" ? evt.data.startedAt : undefined;
if (startedAt) {
entry.startedAt = startedAt;
persistSubagentRuns();
void (async () => {
if (!evt || evt.stream !== "lifecycle") {
return;
}
return;
}
if (phase !== "end" && phase !== "error") {
return;
}
const endedAt = typeof evt.data?.endedAt === "number" ? evt.data.endedAt : Date.now();
entry.endedAt = endedAt;
if (phase === "error") {
const entry = subagentRuns.get(evt.runId);
if (!entry) {
return;
}
const phase = evt.data?.phase;
if (phase === "start") {
const startedAt = typeof evt.data?.startedAt === "number" ? evt.data.startedAt : undefined;
if (startedAt) {
entry.startedAt = startedAt;
persistSubagentRuns();
}
return;
}
if (phase !== "end" && phase !== "error") {
return;
}
const endedAt = typeof evt.data?.endedAt === "number" ? evt.data.endedAt : Date.now();
const error = typeof evt.data?.error === "string" ? evt.data.error : undefined;
entry.outcome = { status: "error", error };
} else if (evt.data?.aborted) {
entry.outcome = { status: "timeout" };
} else {
entry.outcome = { status: "ok" };
}
persistSubagentRuns();
if (suppressAnnounceForSteerRestart(entry)) {
return;
}
if (!startSubagentAnnounceCleanupFlow(evt.runId, entry)) {
return;
}
const outcome: SubagentRunOutcome =
phase === "error"
? { status: "error", error }
: evt.data?.aborted
? { status: "timeout" }
: { status: "ok" };
await completeSubagentRun({
runId: evt.runId,
endedAt,
outcome,
reason: phase === "error" ? SUBAGENT_ENDED_REASON_ERROR : SUBAGENT_ENDED_REASON_COMPLETE,
sendFarewell: true,
accountId: entry.requesterOrigin?.accountId,
triggerCleanup: true,
});
})();
});
}
function finalizeSubagentCleanup(runId: string, cleanup: "delete" | "keep", didAnnounce: boolean) {
async function finalizeSubagentCleanup(
runId: string,
cleanup: "delete" | "keep",
didAnnounce: boolean,
) {
const entry = subagentRuns.get(runId);
if (!entry) {
return;
}
if (!didAnnounce) {
const now = Date.now();
const retryCount = (entry.announceRetryCount ?? 0) + 1;
entry.announceRetryCount = retryCount;
if (didAnnounce) {
const completionReason = resolveCleanupCompletionReason(entry);
await emitCompletionEndedHookIfNeeded(entry, completionReason);
completeCleanupBookkeeping({
runId,
entry,
cleanup,
completedAt: Date.now(),
});
return;
}
const now = Date.now();
const deferredDecision = resolveDeferredCleanupDecision({
entry,
now,
activeDescendantRuns: Math.max(0, countActiveDescendantRuns(entry.childSessionKey)),
announceExpiryMs: ANNOUNCE_EXPIRY_MS,
maxAnnounceRetryCount: MAX_ANNOUNCE_RETRY_COUNT,
deferDescendantDelayMs: MIN_ANNOUNCE_RETRY_DELAY_MS,
resolveAnnounceRetryDelayMs,
});
if (deferredDecision.kind === "defer-descendants") {
entry.lastAnnounceRetryAt = now;
// Check if the announce has exceeded retry limits or expired (#18264).
const endedAgo = typeof entry.endedAt === "number" ? now - entry.endedAt : 0;
if (retryCount >= MAX_ANNOUNCE_RETRY_COUNT || endedAgo > ANNOUNCE_EXPIRY_MS) {
// Give up: mark as completed to break the infinite retry loop.
logAnnounceGiveUp(entry, retryCount >= MAX_ANNOUNCE_RETRY_COUNT ? "retry-limit" : "expiry");
entry.cleanupCompletedAt = now;
persistSubagentRuns();
retryDeferredCompletedAnnounces(runId);
return;
}
// Allow retry on the next wake if announce was deferred or failed.
entry.cleanupHandled = false;
resumedRuns.delete(runId);
persistSubagentRuns();
if (entry.expectsCompletionMessage !== true) {
return;
}
setTimeout(
() => {
resumeSubagentRun(runId);
},
resolveAnnounceRetryDelayMs(entry.announceRetryCount ?? 0),
).unref?.();
setTimeout(() => {
resumeSubagentRun(runId);
}, deferredDecision.delayMs).unref?.();
return;
}
if (cleanup === "delete") {
subagentRuns.delete(runId);
persistSubagentRuns();
retryDeferredCompletedAnnounces(runId);
if (deferredDecision.retryCount != null) {
entry.announceRetryCount = deferredDecision.retryCount;
entry.lastAnnounceRetryAt = now;
}
if (deferredDecision.kind === "give-up") {
const completionReason = resolveCleanupCompletionReason(entry);
await emitCompletionEndedHookIfNeeded(entry, completionReason);
logAnnounceGiveUp(entry, deferredDecision.reason);
completeCleanupBookkeeping({
runId,
entry,
cleanup: "keep",
completedAt: now,
});
return;
}
entry.cleanupCompletedAt = Date.now();
// Allow retry on the next wake if announce was deferred or failed.
entry.cleanupHandled = false;
resumedRuns.delete(runId);
persistSubagentRuns();
retryDeferredCompletedAnnounces(runId);
if (deferredDecision.resumeDelayMs == null) {
return;
}
setTimeout(() => {
resumeSubagentRun(runId);
}, deferredDecision.resumeDelayMs).unref?.();
}
async function emitCompletionEndedHookIfNeeded(
entry: SubagentRunRecord,
reason: SubagentLifecycleEndedReason,
) {
if (
entry.expectsCompletionMessage === true &&
shouldEmitEndedHookForRun({
entry,
reason,
})
) {
await emitSubagentEndedHookForRun({
entry,
reason,
sendFarewell: true,
});
}
}
function completeCleanupBookkeeping(params: {
runId: string;
entry: SubagentRunRecord;
cleanup: "delete" | "keep";
completedAt: number;
}) {
if (params.cleanup === "delete") {
subagentRuns.delete(params.runId);
persistSubagentRuns();
retryDeferredCompletedAnnounces(params.runId);
return;
}
params.entry.cleanupCompletedAt = params.completedAt;
persistSubagentRuns();
retryDeferredCompletedAnnounces(params.runId);
}
function retryDeferredCompletedAnnounces(excludeRunId?: string) {
@@ -475,7 +634,9 @@ export function replaceSubagentRunAfterSteer(params: {
const now = Date.now();
const cfg = loadConfig();
const archiveAfterMs = resolveArchiveAfterMs(cfg);
const archiveAtMs = archiveAfterMs ? now + archiveAfterMs : undefined;
const spawnMode = source.spawnMode === "session" ? "session" : "run";
const archiveAtMs =
spawnMode === "session" ? undefined : archiveAfterMs ? now + archiveAfterMs : undefined;
const runTimeoutSeconds = params.runTimeoutSeconds ?? source.runTimeoutSeconds ?? 0;
const waitTimeoutMs = resolveSubagentWaitTimeoutMs(cfg, runTimeoutSeconds);
@@ -484,12 +645,15 @@ export function replaceSubagentRunAfterSteer(params: {
runId: nextRunId,
startedAt: now,
endedAt: undefined,
endedReason: undefined,
endedHookEmittedAt: undefined,
outcome: undefined,
cleanupCompletedAt: undefined,
cleanupHandled: false,
suppressAnnounceReason: undefined,
announceRetryCount: undefined,
lastAnnounceRetryAt: undefined,
spawnMode,
archiveAtMs,
runTimeoutSeconds,
};
@@ -516,11 +680,14 @@ export function registerSubagentRun(params: {
model?: string;
runTimeoutSeconds?: number;
expectsCompletionMessage?: boolean;
spawnMode?: "run" | "session";
}) {
const now = Date.now();
const cfg = loadConfig();
const archiveAfterMs = resolveArchiveAfterMs(cfg);
const archiveAtMs = archiveAfterMs ? now + archiveAfterMs : undefined;
const spawnMode = params.spawnMode === "session" ? "session" : "run";
const archiveAtMs =
spawnMode === "session" ? undefined : archiveAfterMs ? now + archiveAfterMs : undefined;
const runTimeoutSeconds = params.runTimeoutSeconds ?? 0;
const waitTimeoutMs = resolveSubagentWaitTimeoutMs(cfg, runTimeoutSeconds);
const requesterOrigin = normalizeDeliveryContext(params.requesterOrigin);
@@ -533,6 +700,7 @@ export function registerSubagentRun(params: {
task: params.task,
cleanup: params.cleanup,
expectsCompletionMessage: params.expectsCompletionMessage,
spawnMode,
label: params.label,
model: params.model,
runTimeoutSeconds,
@@ -543,7 +711,7 @@ export function registerSubagentRun(params: {
});
ensureListener();
persistSubagentRuns();
if (archiveAfterMs) {
if (archiveAtMs) {
startSweeper();
}
// Wait for subagent completion via gateway RPC (cross-process).
@@ -588,22 +756,29 @@ async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) {
mutated = true;
}
const waitError = typeof wait.error === "string" ? wait.error : undefined;
entry.outcome =
const outcome: SubagentRunOutcome =
wait.status === "error"
? { status: "error", error: waitError }
: wait.status === "timeout"
? { status: "timeout" }
: { status: "ok" };
mutated = true;
if (!runOutcomesEqual(entry.outcome, outcome)) {
entry.outcome = outcome;
mutated = true;
}
if (mutated) {
persistSubagentRuns();
}
if (suppressAnnounceForSteerRestart(entry)) {
return;
}
if (!startSubagentAnnounceCleanupFlow(runId, entry)) {
return;
}
await completeSubagentRun({
runId,
endedAt: entry.endedAt,
outcome,
reason:
wait.status === "error" ? SUBAGENT_ENDED_REASON_ERROR : SUBAGENT_ENDED_REASON_COMPLETE,
sendFarewell: true,
accountId: entry.requesterOrigin?.accountId,
triggerCleanup: true,
});
} catch {
// ignore
}
@@ -612,6 +787,7 @@ async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) {
export function resetSubagentRegistryForTests(opts?: { persist?: boolean }) {
subagentRuns.clear();
resumedRuns.clear();
endedHookInFlightRunIds.clear();
resetAnnounceQueuesForTests();
stopSweeper();
restoreAttempted = false;
@@ -640,62 +816,23 @@ export function releaseSubagentRun(runId: string) {
}
function findRunIdsByChildSessionKey(childSessionKey: string): string[] {
const key = childSessionKey.trim();
if (!key) {
return [];
}
const runIds: string[] = [];
for (const [runId, entry] of subagentRuns.entries()) {
if (entry.childSessionKey === key) {
runIds.push(runId);
}
}
return runIds;
}
function getRunsSnapshotForRead(): Map<string, SubagentRunRecord> {
const merged = new Map<string, SubagentRunRecord>();
const shouldReadDisk = !(process.env.VITEST || process.env.NODE_ENV === "test");
if (shouldReadDisk) {
try {
// Registry state is persisted to disk so other worker processes (for
// example cron runners) can observe active children spawned elsewhere.
for (const [runId, entry] of loadSubagentRegistryFromDisk().entries()) {
merged.set(runId, entry);
}
} catch {
// Ignore disk read failures and fall back to local memory state.
}
}
for (const [runId, entry] of subagentRuns.entries()) {
merged.set(runId, entry);
}
return merged;
return findRunIdsByChildSessionKeyFromRuns(subagentRuns, childSessionKey);
}
export function resolveRequesterForChildSession(childSessionKey: string): {
requesterSessionKey: string;
requesterOrigin?: DeliveryContext;
} | null {
const key = childSessionKey.trim();
if (!key) {
return null;
}
let best: SubagentRunRecord | undefined;
for (const entry of getRunsSnapshotForRead().values()) {
if (entry.childSessionKey !== key) {
continue;
}
if (!best || entry.createdAt > best.createdAt) {
best = entry;
}
}
if (!best) {
const resolved = resolveRequesterForChildSessionFromRuns(
getSubagentRunsSnapshotForRead(subagentRuns),
childSessionKey,
);
if (!resolved) {
return null;
}
return {
requesterSessionKey: best.requesterSessionKey,
requesterOrigin: normalizeDeliveryContext(best.requesterOrigin),
requesterSessionKey: resolved.requesterSessionKey,
requesterOrigin: normalizeDeliveryContext(resolved.requesterOrigin),
};
}
@@ -734,6 +871,7 @@ export function markSubagentRunTerminated(params: {
const now = Date.now();
const reason = params.reason?.trim() || "killed";
let updated = 0;
const entriesByChildSessionKey = new Map<string, SubagentRunRecord>();
for (const runId of runIds) {
const entry = subagentRuns.get(runId);
if (!entry) {
@@ -744,103 +882,57 @@ export function markSubagentRunTerminated(params: {
}
entry.endedAt = now;
entry.outcome = { status: "error", error: reason };
entry.endedReason = SUBAGENT_ENDED_REASON_KILLED;
entry.cleanupHandled = true;
entry.cleanupCompletedAt = now;
entry.suppressAnnounceReason = "killed";
if (!entriesByChildSessionKey.has(entry.childSessionKey)) {
entriesByChildSessionKey.set(entry.childSessionKey, entry);
}
updated += 1;
}
if (updated > 0) {
persistSubagentRuns();
for (const entry of entriesByChildSessionKey.values()) {
void emitSubagentEndedHookOnce({
entry,
reason: SUBAGENT_ENDED_REASON_KILLED,
sendFarewell: true,
outcome: SUBAGENT_ENDED_OUTCOME_KILLED,
error: reason,
inFlightRunIds: endedHookInFlightRunIds,
persist: persistSubagentRuns,
}).catch(() => {
// Hook failures should not break termination flow.
});
}
}
return updated;
}
export function listSubagentRunsForRequester(requesterSessionKey: string): SubagentRunRecord[] {
const key = requesterSessionKey.trim();
if (!key) {
return [];
}
return [...subagentRuns.values()].filter((entry) => entry.requesterSessionKey === key);
return listRunsForRequesterFromRuns(subagentRuns, requesterSessionKey);
}
export function countActiveRunsForSession(requesterSessionKey: string): number {
const key = requesterSessionKey.trim();
if (!key) {
return 0;
}
let count = 0;
for (const entry of getRunsSnapshotForRead().values()) {
if (entry.requesterSessionKey !== key) {
continue;
}
if (typeof entry.endedAt === "number") {
continue;
}
count += 1;
}
return count;
return countActiveRunsForSessionFromRuns(
getSubagentRunsSnapshotForRead(subagentRuns),
requesterSessionKey,
);
}
export function countActiveDescendantRuns(rootSessionKey: string): number {
const root = rootSessionKey.trim();
if (!root) {
return 0;
}
const runs = getRunsSnapshotForRead();
const pending = [root];
const visited = new Set<string>([root]);
let count = 0;
while (pending.length > 0) {
const requester = pending.shift();
if (!requester) {
continue;
}
for (const entry of runs.values()) {
if (entry.requesterSessionKey !== requester) {
continue;
}
if (typeof entry.endedAt !== "number") {
count += 1;
}
const childKey = entry.childSessionKey.trim();
if (!childKey || visited.has(childKey)) {
continue;
}
visited.add(childKey);
pending.push(childKey);
}
}
return count;
return countActiveDescendantRunsFromRuns(
getSubagentRunsSnapshotForRead(subagentRuns),
rootSessionKey,
);
}
export function listDescendantRunsForRequester(rootSessionKey: string): SubagentRunRecord[] {
const root = rootSessionKey.trim();
if (!root) {
return [];
}
const runs = getRunsSnapshotForRead();
const pending = [root];
const visited = new Set<string>([root]);
const descendants: SubagentRunRecord[] = [];
while (pending.length > 0) {
const requester = pending.shift();
if (!requester) {
continue;
}
for (const entry of runs.values()) {
if (entry.requesterSessionKey !== requester) {
continue;
}
descendants.push(entry);
const childKey = entry.childSessionKey.trim();
if (!childKey || visited.has(childKey)) {
continue;
}
visited.add(childKey);
pending.push(childKey);
}
}
return descendants;
return listDescendantRunsForRequesterFromRuns(
getSubagentRunsSnapshotForRead(subagentRuns),
rootSessionKey,
);
}
export function initSubagentRegistry() {

View File

@@ -0,0 +1,35 @@
import type { DeliveryContext } from "../utils/delivery-context.js";
import type { SubagentRunOutcome } from "./subagent-announce.js";
import type { SubagentLifecycleEndedReason } from "./subagent-lifecycle-events.js";
import type { SpawnSubagentMode } from "./subagent-spawn.js";
export type SubagentRunRecord = {
runId: string;
childSessionKey: string;
requesterSessionKey: string;
requesterOrigin?: DeliveryContext;
requesterDisplayKey: string;
task: string;
cleanup: "delete" | "keep";
label?: string;
model?: string;
runTimeoutSeconds?: number;
spawnMode?: SpawnSubagentMode;
createdAt: number;
startedAt?: number;
endedAt?: number;
outcome?: SubagentRunOutcome;
archiveAtMs?: number;
cleanupCompletedAt?: number;
cleanupHandled?: boolean;
suppressAnnounceReason?: "steer-restart" | "killed";
expectsCompletionMessage?: boolean;
/** Number of announce delivery attempts that returned false (deferred). */
announceRetryCount?: number;
/** Timestamp of the last announce retry attempt (for backoff). */
lastAnnounceRetryAt?: number;
/** Terminal lifecycle reason recorded when the run finishes. */
endedReason?: SubagentLifecycleEndedReason;
/** Set after the subagent_ended hook has been emitted successfully once. */
endedHookEmittedAt?: number;
};

View File

@@ -1,7 +1,9 @@
import crypto from "node:crypto";
import { formatThinkingLevels, normalizeThinkLevel } from "../auto-reply/thinking.js";
import { DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH } from "../config/agent-limits.js";
import { loadConfig } from "../config/config.js";
import { callGateway } from "../gateway/call.js";
import { getGlobalHookRunner } from "../plugins/hook-runner-global.js";
import { normalizeAgentId, parseAgentSessionKey } from "../routing/session-key.js";
import { normalizeDeliveryContext } from "../utils/delivery-context.js";
import { resolveAgentConfig } from "./agent-scope.js";
@@ -17,6 +19,9 @@ import {
resolveMainSessionAlias,
} from "./tools/sessions-helpers.js";
export const SUBAGENT_SPAWN_MODES = ["run", "session"] as const;
export type SpawnSubagentMode = (typeof SUBAGENT_SPAWN_MODES)[number];
export type SpawnSubagentParams = {
task: string;
label?: string;
@@ -24,6 +29,8 @@ export type SpawnSubagentParams = {
model?: string;
thinking?: string;
runTimeoutSeconds?: number;
thread?: boolean;
mode?: SpawnSubagentMode;
cleanup?: "delete" | "keep";
expectsCompletionMessage?: boolean;
};
@@ -42,11 +49,14 @@ export type SpawnSubagentContext = {
export const SUBAGENT_SPAWN_ACCEPTED_NOTE =
"auto-announces on completion, do not poll/sleep. The response will be sent back as an user message.";
export const SUBAGENT_SPAWN_SESSION_ACCEPTED_NOTE =
"thread-bound session stays active after this task; continue in-thread for follow-ups.";
export type SpawnSubagentResult = {
status: "accepted" | "forbidden" | "error";
childSessionKey?: string;
runId?: string;
mode?: SpawnSubagentMode;
note?: string;
modelApplied?: boolean;
error?: string;
@@ -67,6 +77,88 @@ export function splitModelRef(ref?: string) {
return { provider: undefined, model: trimmed };
}
function resolveSpawnMode(params: {
requestedMode?: SpawnSubagentMode;
threadRequested: boolean;
}): SpawnSubagentMode {
if (params.requestedMode === "run" || params.requestedMode === "session") {
return params.requestedMode;
}
// Thread-bound spawns should default to persistent sessions.
return params.threadRequested ? "session" : "run";
}
function summarizeError(err: unknown): string {
if (err instanceof Error) {
return err.message;
}
if (typeof err === "string") {
return err;
}
return "error";
}
async function ensureThreadBindingForSubagentSpawn(params: {
hookRunner: ReturnType<typeof getGlobalHookRunner>;
childSessionKey: string;
agentId: string;
label?: string;
mode: SpawnSubagentMode;
requesterSessionKey?: string;
requester: {
channel?: string;
accountId?: string;
to?: string;
threadId?: string | number;
};
}): Promise<{ status: "ok" } | { status: "error"; error: string }> {
const hookRunner = params.hookRunner;
if (!hookRunner?.hasHooks("subagent_spawning")) {
return {
status: "error",
error:
"thread=true is unavailable because no channel plugin registered subagent_spawning hooks.",
};
}
try {
const result = await hookRunner.runSubagentSpawning(
{
childSessionKey: params.childSessionKey,
agentId: params.agentId,
label: params.label,
mode: params.mode,
requester: params.requester,
threadRequested: true,
},
{
childSessionKey: params.childSessionKey,
requesterSessionKey: params.requesterSessionKey,
},
);
if (result?.status === "error") {
const error = result.error.trim();
return {
status: "error",
error: error || "Failed to prepare thread binding for this subagent session.",
};
}
if (result?.status !== "ok" || !result.threadBindingReady) {
return {
status: "error",
error:
"Unable to create or bind a thread for this subagent session. Session mode is unavailable for this target.",
};
}
return { status: "ok" };
} catch (err) {
return {
status: "error",
error: `Thread bind failed: ${summarizeError(err)}`,
};
}
}
export async function spawnSubagentDirect(
params: SpawnSubagentParams,
ctx: SpawnSubagentContext,
@@ -76,19 +168,37 @@ export async function spawnSubagentDirect(
const requestedAgentId = params.agentId;
const modelOverride = params.model;
const thinkingOverrideRaw = params.thinking;
const requestThreadBinding = params.thread === true;
const spawnMode = resolveSpawnMode({
requestedMode: params.mode,
threadRequested: requestThreadBinding,
});
if (spawnMode === "session" && !requestThreadBinding) {
return {
status: "error",
error: 'mode="session" requires thread=true so the subagent can stay bound to a thread.',
};
}
const cleanup =
params.cleanup === "keep" || params.cleanup === "delete" ? params.cleanup : "keep";
spawnMode === "session"
? "keep"
: params.cleanup === "keep" || params.cleanup === "delete"
? params.cleanup
: "keep";
const expectsCompletionMessage = params.expectsCompletionMessage !== false;
const requesterOrigin = normalizeDeliveryContext({
channel: ctx.agentChannel,
accountId: ctx.agentAccountId,
to: ctx.agentTo,
threadId: ctx.agentThreadId,
});
const hookRunner = getGlobalHookRunner();
const runTimeoutSeconds =
typeof params.runTimeoutSeconds === "number" && Number.isFinite(params.runTimeoutSeconds)
? Math.max(0, Math.floor(params.runTimeoutSeconds))
: 0;
let modelApplied = false;
let threadBindingReady = false;
const cfg = loadConfig();
const { mainKey, alias } = resolveMainSessionAlias(cfg);
@@ -107,7 +217,8 @@ export async function spawnSubagentDirect(
});
const callerDepth = getSubagentDepthFromSessionStore(requesterInternalKey, { cfg });
const maxSpawnDepth = cfg.agents?.defaults?.subagents?.maxSpawnDepth ?? 1;
const maxSpawnDepth =
cfg.agents?.defaults?.subagents?.maxSpawnDepth ?? DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH;
if (callerDepth >= maxSpawnDepth) {
return {
status: "forbidden",
@@ -227,6 +338,39 @@ export async function spawnSubagentDirect(
};
}
}
if (requestThreadBinding) {
const bindResult = await ensureThreadBindingForSubagentSpawn({
hookRunner,
childSessionKey,
agentId: targetAgentId,
label: label || undefined,
mode: spawnMode,
requesterSessionKey: requesterInternalKey,
requester: {
channel: requesterOrigin?.channel,
accountId: requesterOrigin?.accountId,
to: requesterOrigin?.to,
threadId: requesterOrigin?.threadId,
},
});
if (bindResult.status === "error") {
try {
await callGateway({
method: "sessions.delete",
params: { key: childSessionKey, emitLifecycleHooks: false },
timeoutMs: 10_000,
});
} catch {
// Best-effort cleanup only.
}
return {
status: "error",
error: bindResult.error,
childSessionKey,
};
}
threadBindingReady = true;
}
const childSystemPrompt = buildSubagentSystemPrompt({
requesterSessionKey,
requesterOrigin,
@@ -238,8 +382,13 @@ export async function spawnSubagentDirect(
});
const childTaskMessage = [
`[Subagent Context] You are running as a subagent (depth ${childDepth}/${maxSpawnDepth}). Results auto-announce to your requester; do not busy-poll for status.`,
spawnMode === "session"
? "[Subagent Context] This subagent session is persistent and remains available for thread follow-up messages."
: undefined,
`[Subagent Task]: ${task}`,
].join("\n\n");
]
.filter((line): line is string => Boolean(line))
.join("\n\n");
const childIdem = crypto.randomUUID();
let childRunId: string = childIdem;
@@ -271,8 +420,50 @@ export async function spawnSubagentDirect(
childRunId = response.runId;
}
} catch (err) {
const messageText =
err instanceof Error ? err.message : typeof err === "string" ? err : "error";
if (threadBindingReady) {
const hasEndedHook = hookRunner?.hasHooks("subagent_ended") === true;
let endedHookEmitted = false;
if (hasEndedHook) {
try {
await hookRunner?.runSubagentEnded(
{
targetSessionKey: childSessionKey,
targetKind: "subagent",
reason: "spawn-failed",
sendFarewell: true,
accountId: requesterOrigin?.accountId,
runId: childRunId,
outcome: "error",
error: "Session failed to start",
},
{
runId: childRunId,
childSessionKey,
requesterSessionKey: requesterInternalKey,
},
);
endedHookEmitted = true;
} catch {
// Spawn should still return an actionable error even if cleanup hooks fail.
}
}
// Always delete the provisional child session after a failed spawn attempt.
// If we already emitted subagent_ended above, suppress a duplicate lifecycle hook.
try {
await callGateway({
method: "sessions.delete",
params: {
key: childSessionKey,
deleteTranscript: true,
emitLifecycleHooks: !endedHookEmitted,
},
timeoutMs: 10_000,
});
} catch {
// Best-effort only.
}
}
const messageText = summarizeError(err);
return {
status: "error",
error: messageText,
@@ -292,14 +483,45 @@ export async function spawnSubagentDirect(
label: label || undefined,
model: resolvedModel,
runTimeoutSeconds,
expectsCompletionMessage: params.expectsCompletionMessage === true,
expectsCompletionMessage,
spawnMode,
});
if (hookRunner?.hasHooks("subagent_spawned")) {
try {
await hookRunner.runSubagentSpawned(
{
runId: childRunId,
childSessionKey,
agentId: targetAgentId,
label: label || undefined,
requester: {
channel: requesterOrigin?.channel,
accountId: requesterOrigin?.accountId,
to: requesterOrigin?.to,
threadId: requesterOrigin?.threadId,
},
threadRequested: requestThreadBinding,
mode: spawnMode,
},
{
runId: childRunId,
childSessionKey,
requesterSessionKey: requesterInternalKey,
},
);
} catch {
// Spawn should still return accepted if spawn lifecycle hooks fail.
}
}
return {
status: "accepted",
childSessionKey,
runId: childRunId,
note: SUBAGENT_SPAWN_ACCEPTED_NOTE,
mode: spawnMode,
note:
spawnMode === "session" ? SUBAGENT_SPAWN_SESSION_ACCEPTED_NOTE : SUBAGENT_SPAWN_ACCEPTED_NOTE,
modelApplied: resolvedModel ? modelApplied : undefined,
};
}

View File

@@ -1,7 +1,7 @@
import { Type } from "@sinclair/typebox";
import type { GatewayMessageChannel } from "../../utils/message-channel.js";
import { optionalStringEnum } from "../schema/typebox.js";
import { spawnSubagentDirect } from "../subagent-spawn.js";
import { SUBAGENT_SPAWN_MODES, spawnSubagentDirect } from "../subagent-spawn.js";
import type { AnyAgentTool } from "./common.js";
import { jsonResult, readStringParam } from "./common.js";
@@ -14,6 +14,8 @@ const SessionsSpawnToolSchema = Type.Object({
runTimeoutSeconds: Type.Optional(Type.Number({ minimum: 0 })),
// Back-compat: older callers used timeoutSeconds for this tool.
timeoutSeconds: Type.Optional(Type.Number({ minimum: 0 })),
thread: Type.Optional(Type.Boolean()),
mode: optionalStringEnum(SUBAGENT_SPAWN_MODES),
cleanup: optionalStringEnum(["delete", "keep"] as const),
});
@@ -34,7 +36,7 @@ export function createSessionsSpawnTool(opts?: {
label: "Sessions",
name: "sessions_spawn",
description:
"Spawn a background sub-agent run in an isolated session and announce the result back to the requester chat.",
'Spawn a sub-agent in an isolated session (mode="run" one-shot or mode="session" persistent) and route results back to the requester chat/thread.',
parameters: SessionsSpawnToolSchema,
execute: async (_toolCallId, args) => {
const params = args as Record<string, unknown>;
@@ -43,6 +45,7 @@ export function createSessionsSpawnTool(opts?: {
const requestedAgentId = readStringParam(params, "agentId");
const modelOverride = readStringParam(params, "model");
const thinkingOverrideRaw = readStringParam(params, "thinking");
const mode = params.mode === "run" || params.mode === "session" ? params.mode : undefined;
const cleanup =
params.cleanup === "keep" || params.cleanup === "delete" ? params.cleanup : "keep";
// Back-compat: older callers used timeoutSeconds for this tool.
@@ -56,6 +59,7 @@ export function createSessionsSpawnTool(opts?: {
typeof timeoutSecondsCandidate === "number" && Number.isFinite(timeoutSecondsCandidate)
? Math.max(0, Math.floor(timeoutSecondsCandidate))
: undefined;
const thread = params.thread === true;
const result = await spawnSubagentDirect(
{
@@ -65,6 +69,8 @@ export function createSessionsSpawnTool(opts?: {
model: modelOverride,
thinking: thinkingOverrideRaw,
runTimeoutSeconds,
thread,
mode,
cleanup,
expectsCompletionMessage: true,
},

View File

@@ -7,6 +7,7 @@ import {
sortSubagentRuns,
type SubagentTargetResolution,
} from "../../auto-reply/reply/subagents-utils.js";
import { DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH } from "../../config/agent-limits.js";
import { loadConfig } from "../../config/config.js";
import type { SessionEntry } from "../../config/sessions.js";
import { loadSessionStore, resolveStorePath, updateSessionStore } from "../../config/sessions.js";
@@ -199,7 +200,8 @@ function resolveRequesterKey(params: {
// Check if this sub-agent can spawn children (orchestrator).
// If so, it should see its own children, not its parent's children.
const callerDepth = getSubagentDepthFromSessionStore(callerSessionKey, { cfg: params.cfg });
const maxSpawnDepth = params.cfg.agents?.defaults?.subagents?.maxSpawnDepth ?? 1;
const maxSpawnDepth =
params.cfg.agents?.defaults?.subagents?.maxSpawnDepth ?? DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH;
if (callerDepth < maxSpawnDepth) {
// Orchestrator sub-agent: use its own session key as requester
// so it sees children it spawned.