From abbd6b4d94b000a9781a9b3ee50bfab81fa1137d Mon Sep 17 00:00:00 2001 From: Tyler Yust Date: Wed, 4 Mar 2026 14:26:48 -0800 Subject: [PATCH] fix(subagents): deliver announces immediately and guard parallel descendants --- .../subagent-announce.format.e2e.test.ts | 76 ++++++++++++++----- src/agents/subagent-announce.ts | 55 ++------------ .../subagent-registry.nested.e2e.test.ts | 76 +++++++++++++++++++ 3 files changed, 136 insertions(+), 71 deletions(-) diff --git a/src/agents/subagent-announce.format.e2e.test.ts b/src/agents/subagent-announce.format.e2e.test.ts index 8d554307482..cdea7e3d876 100644 --- a/src/agents/subagent-announce.format.e2e.test.ts +++ b/src/agents/subagent-announce.format.e2e.test.ts @@ -438,7 +438,7 @@ describe("subagent announce formatting", () => { expect(msg).not.toContain("✅ Subagent"); }); - it("keeps direct completion announce delivery when only the announcing run itself is pending", async () => { + it("keeps direct completion announce delivery immediate even when sibling counters are non-zero", async () => { sessionStore = { "agent:main:subagent:test": { sessionId: "child-session-self-pending", @@ -451,11 +451,11 @@ describe("subagent announce formatting", () => { messages: [{ role: "assistant", content: [{ type: "text", text: "final answer: done" }] }], }); subagentRegistryMock.countPendingDescendantRuns.mockImplementation((sessionKey: string) => - sessionKey === "agent:main:main" ? 1 : 0, + sessionKey === "agent:main:main" ? 2 : 0, ); subagentRegistryMock.countPendingDescendantRunsExcludingRun.mockImplementation( (sessionKey: string, runId: string) => - sessionKey === "agent:main:main" && runId === "run-direct-self-pending" ? 0 : 1, + sessionKey === "agent:main:main" && runId === "run-direct-self-pending" ? 1 : 2, ); const didAnnounce = await runSubagentAnnounceFlow({ @@ -469,12 +469,12 @@ describe("subagent announce formatting", () => { }); expect(didAnnounce).toBe(true); - expect(subagentRegistryMock.countPendingDescendantRunsExcludingRun).toHaveBeenCalledWith( - "agent:main:main", - "run-direct-self-pending", - ); expect(sendSpy).not.toHaveBeenCalled(); expect(agentSpy).toHaveBeenCalledTimes(1); + const call = agentSpy.mock.calls[0]?.[0] as { params?: Record }; + expect(call?.params?.deliver).toBe(true); + expect(call?.params?.channel).toBe("discord"); + expect(call?.params?.to).toBe("channel:12345"); }); it("suppresses completion delivery when subagent reply is ANNOUNCE_SKIP", async () => { @@ -590,7 +590,7 @@ describe("subagent announce formatting", () => { expect(sendSpy).not.toHaveBeenCalled(); }); - it("keeps completion-mode delivery coordinated when sibling runs are still active", async () => { + it("delivers completion-mode announces immediately even when sibling runs are still active", async () => { sessionStore = { "agent:main:subagent:test": { sessionId: "child-session-coordinated", @@ -622,13 +622,11 @@ describe("subagent announce formatting", () => { const call = agentSpy.mock.calls[0]?.[0] as { params?: Record }; const rawMessage = call?.params?.message; const msg = typeof rawMessage === "string" ? rawMessage : ""; - expect(call?.params?.deliver).toBe(false); - expect(call?.params?.channel).toBeUndefined(); - expect(call?.params?.to).toBeUndefined(); - expect(msg).toContain("There are still 1 active subagent run for this session."); - expect(msg).toContain( - "If they are part of the same workflow, wait for the remaining results before sending a user update.", - ); + expect(call?.params?.deliver).toBe(true); + expect(call?.params?.channel).toBe("discord"); + expect(call?.params?.to).toBe("channel:12345"); + expect(msg).not.toContain("There are still"); + expect(msg).not.toContain("wait for the remaining results"); }); it("keeps session-mode completion delivery on the bound destination when sibling runs are active", async () => { @@ -1660,7 +1658,7 @@ describe("subagent announce formatting", () => { expect(call?.expectFinal).toBe(true); }); - it("injects direct announce into requester subagent session instead of chat channel", async () => { + it("injects direct announce into requester subagent session as a user-turn agent call", async () => { embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(false); embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false); @@ -1679,6 +1677,7 @@ describe("subagent announce formatting", () => { expect(call?.params?.deliver).toBe(false); expect(call?.params?.channel).toBeUndefined(); expect(call?.params?.to).toBeUndefined(); + expect((call?.params as { role?: unknown } | undefined)?.role).toBeUndefined(); expect(call?.params?.inputProvenance).toMatchObject({ kind: "inter_session", sourceSessionKey: "agent:main:subagent:worker", @@ -1753,7 +1752,7 @@ describe("subagent announce formatting", () => { expect(call?.params?.message).not.toContain("(no output)"); }); - it("uses advisory guidance when sibling subagents are still active", async () => { + it("does not include batching guidance when sibling subagents are still active", async () => { subagentRegistryMock.countActiveDescendantRuns.mockImplementation((sessionKey: string) => sessionKey === "agent:main:main" ? 2 : 0, ); @@ -1768,11 +1767,46 @@ describe("subagent announce formatting", () => { const call = agentSpy.mock.calls[0]?.[0] as { params?: { message?: string } }; const msg = call?.params?.message as string; - expect(msg).toContain("There are still 2 active subagent runs for this session."); - expect(msg).toContain( - "If they are part of the same workflow, wait for the remaining results before sending a user update.", + expect(msg).not.toContain("There are still"); + expect(msg).not.toContain("wait for the remaining results"); + expect(msg).not.toContain( + "If they are unrelated, respond normally using only the result above.", ); - expect(msg).toContain("If they are unrelated, respond normally using only the result above."); + }); + + it("defers nested parent announce when active descendants exist even if pending snapshot is stale", async () => { + subagentRegistryMock.countPendingDescendantRuns.mockReturnValue(0); + subagentRegistryMock.countActiveDescendantRuns.mockImplementation((sessionKey: string) => + sessionKey === "agent:main:subagent:parent" ? 1 : 0, + ); + + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:parent", + childRunId: "run-parent-active-descendant", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + ...defaultOutcomeAnnounce, + expectsCompletionMessage: true, + }); + + expect(didAnnounce).toBe(false); + expect(agentSpy).not.toHaveBeenCalled(); + expect(sendSpy).not.toHaveBeenCalled(); + }); + + it("keeps single subagent announces self contained without batching hints", async () => { + await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:test", + childRunId: "run-self-contained", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + ...defaultOutcomeAnnounce, + }); + + const call = agentSpy.mock.calls[0]?.[0] as { params?: { message?: string } }; + const msg = call?.params?.message as string; + expect(msg).not.toContain("There are still"); + expect(msg).not.toContain("wait for the remaining results"); }); it("defers announce while finished runs still have active descendants", async () => { diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index 6e4f3cb997c..2db5c8ccd5e 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -770,38 +770,6 @@ async function sendSubagentAnnounceDirectly(params: { !params.requesterIsSubagent && (!params.expectsCompletionMessage || hasDeliverableDirectTarget); - if (params.expectsCompletionMessage && hasDeliverableDirectTarget) { - const forceBoundSessionDirectDelivery = - params.spawnMode === "session" && - (params.completionRouteMode === "bound" || params.completionRouteMode === "hook"); - if (!forceBoundSessionDirectDelivery) { - let pendingDescendantRuns = 0; - try { - const { countPendingDescendantRuns, countPendingDescendantRunsExcludingRun } = - await loadSubagentRegistryRuntime(); - if (params.currentRunId) { - pendingDescendantRuns = Math.max( - 0, - countPendingDescendantRunsExcludingRun( - canonicalRequesterSessionKey, - params.currentRunId, - ), - ); - } else { - pendingDescendantRuns = Math.max( - 0, - countPendingDescendantRuns(canonicalRequesterSessionKey), - ); - } - } catch { - // Best-effort only; default to immediate delivery when registry runtime is unavailable. - } - if (pendingDescendantRuns > 0) { - shouldDeliverExternally = false; - } - } - } - const threadId = effectiveDirectOrigin?.threadId != null && effectiveDirectOrigin.threadId !== "" ? String(effectiveDirectOrigin.threadId) @@ -1044,15 +1012,10 @@ export type SubagentRunOutcome = { export type SubagentAnnounceType = "subagent task" | "cron job"; function buildAnnounceReplyInstruction(params: { - remainingActiveSubagentRuns: number; requesterIsSubagent: boolean; announceType: SubagentAnnounceType; expectsCompletionMessage?: boolean; }): string { - if (params.remainingActiveSubagentRuns > 0) { - const activeRunsLabel = params.remainingActiveSubagentRuns === 1 ? "run" : "runs"; - return `There are still ${params.remainingActiveSubagentRuns} active subagent ${activeRunsLabel} for this session. If they are part of the same workflow, wait for the remaining results before sending a user update. If they are unrelated, respond normally using only the result above.`; - } if (params.requesterIsSubagent) { return `Convert this completion into a concise internal orchestration update for your parent agent in your own words. Keep this internal context private (don't mention system/log/stats/session details or announce type). If this result is duplicate or no update is needed, reply ONLY: ${SILENT_REPLY_TOKEN}.`; } @@ -1193,8 +1156,11 @@ export async function runSubagentAnnounceFlow(params: { let pendingChildDescendantRuns = 0; try { - const { countPendingDescendantRuns } = await loadSubagentRegistryRuntime(); - pendingChildDescendantRuns = Math.max(0, countPendingDescendantRuns(params.childSessionKey)); + const { countPendingDescendantRuns, countActiveDescendantRuns } = + await loadSubagentRegistryRuntime(); + const pending = Math.max(0, countPendingDescendantRuns(params.childSessionKey)); + const active = Math.max(0, countActiveDescendantRuns(params.childSessionKey)); + pendingChildDescendantRuns = Math.max(pending, active); } catch { // Best-effort only; fall back to direct announce behavior when unavailable. } @@ -1279,18 +1245,7 @@ export async function runSubagentAnnounceFlow(params: { } } - let remainingActiveSubagentRuns = 0; - try { - const { countActiveDescendantRuns } = await loadSubagentRegistryRuntime(); - remainingActiveSubagentRuns = Math.max( - 0, - countActiveDescendantRuns(targetRequesterSessionKey), - ); - } catch { - // Best-effort only; fall back to default announce instructions when unavailable. - } const replyInstruction = buildAnnounceReplyInstruction({ - remainingActiveSubagentRuns, requesterIsSubagent, announceType, expectsCompletionMessage, diff --git a/src/agents/subagent-registry.nested.e2e.test.ts b/src/agents/subagent-registry.nested.e2e.test.ts index 7da5d951999..30e447149c2 100644 --- a/src/agents/subagent-registry.nested.e2e.test.ts +++ b/src/agents/subagent-registry.nested.e2e.test.ts @@ -212,6 +212,82 @@ describe("subagent registry nested agent tracking", () => { expect(countPendingDescendantRuns("agent:main:subagent:orch-pending")).toBe(1); }); + it("keeps parent pending for parallel children until both descendants complete cleanup", async () => { + const { addSubagentRunForTests, countPendingDescendantRuns } = subagentRegistry; + const parentSessionKey = "agent:main:subagent:orch-parallel"; + + addSubagentRunForTests({ + runId: "run-parent-parallel", + childSessionKey: parentSessionKey, + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "parallel orchestrator", + cleanup: "keep", + createdAt: 1, + startedAt: 1, + endedAt: 2, + cleanupHandled: false, + cleanupCompletedAt: undefined, + }); + addSubagentRunForTests({ + runId: "run-leaf-a", + childSessionKey: `${parentSessionKey}:subagent:leaf-a`, + requesterSessionKey: parentSessionKey, + requesterDisplayKey: "orch-parallel", + task: "leaf a", + cleanup: "keep", + createdAt: 1, + startedAt: 1, + endedAt: 2, + cleanupHandled: true, + cleanupCompletedAt: undefined, + }); + addSubagentRunForTests({ + runId: "run-leaf-b", + childSessionKey: `${parentSessionKey}:subagent:leaf-b`, + requesterSessionKey: parentSessionKey, + requesterDisplayKey: "orch-parallel", + task: "leaf b", + cleanup: "keep", + createdAt: 1, + startedAt: 1, + cleanupHandled: false, + cleanupCompletedAt: undefined, + }); + + expect(countPendingDescendantRuns(parentSessionKey)).toBe(2); + + addSubagentRunForTests({ + runId: "run-leaf-a", + childSessionKey: `${parentSessionKey}:subagent:leaf-a`, + requesterSessionKey: parentSessionKey, + requesterDisplayKey: "orch-parallel", + task: "leaf a", + cleanup: "keep", + createdAt: 1, + startedAt: 1, + endedAt: 2, + cleanupHandled: true, + cleanupCompletedAt: 3, + }); + expect(countPendingDescendantRuns(parentSessionKey)).toBe(1); + + addSubagentRunForTests({ + runId: "run-leaf-b", + childSessionKey: `${parentSessionKey}:subagent:leaf-b`, + requesterSessionKey: parentSessionKey, + requesterDisplayKey: "orch-parallel", + task: "leaf b", + cleanup: "keep", + createdAt: 1, + startedAt: 1, + endedAt: 4, + cleanupHandled: true, + cleanupCompletedAt: 5, + }); + expect(countPendingDescendantRuns(parentSessionKey)).toBe(0); + }); + it("countPendingDescendantRunsExcludingRun ignores only the active announce run", async () => { const { addSubagentRunForTests, countPendingDescendantRunsExcludingRun } = subagentRegistry;