mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-02 13:47:15 +00:00
fix(subagents): deterministic descendant completion gating and child-result synthesis
This commit is contained in:
@@ -37,6 +37,7 @@ const subagentRegistryMock = {
|
|||||||
countActiveDescendantRuns: vi.fn((_sessionKey: string) => 0),
|
countActiveDescendantRuns: vi.fn((_sessionKey: string) => 0),
|
||||||
countPendingDescendantRuns: vi.fn((_sessionKey: string) => 0),
|
countPendingDescendantRuns: vi.fn((_sessionKey: string) => 0),
|
||||||
countPendingDescendantRunsExcludingRun: vi.fn((_sessionKey: string, _runId: string) => 0),
|
countPendingDescendantRunsExcludingRun: vi.fn((_sessionKey: string, _runId: string) => 0),
|
||||||
|
listSubagentRunsForRequester: vi.fn((_sessionKey: string) => []),
|
||||||
resolveRequesterForChildSession: vi.fn((_sessionKey: string): RequesterResolution => null),
|
resolveRequesterForChildSession: vi.fn((_sessionKey: string): RequesterResolution => null),
|
||||||
};
|
};
|
||||||
const subagentDeliveryTargetHookMock = vi.fn(
|
const subagentDeliveryTargetHookMock = vi.fn(
|
||||||
@@ -198,6 +199,7 @@ describe("subagent announce formatting", () => {
|
|||||||
.mockImplementation((sessionKey: string, _runId: string) =>
|
.mockImplementation((sessionKey: string, _runId: string) =>
|
||||||
subagentRegistryMock.countPendingDescendantRuns(sessionKey),
|
subagentRegistryMock.countPendingDescendantRuns(sessionKey),
|
||||||
);
|
);
|
||||||
|
subagentRegistryMock.listSubagentRunsForRequester.mockClear().mockReturnValue([]);
|
||||||
subagentRegistryMock.resolveRequesterForChildSession.mockClear().mockReturnValue(null);
|
subagentRegistryMock.resolveRequesterForChildSession.mockClear().mockReturnValue(null);
|
||||||
hasSubagentDeliveryTargetHook = false;
|
hasSubagentDeliveryTargetHook = false;
|
||||||
hookRunnerMock.hasHooks.mockClear();
|
hookRunnerMock.hasHooks.mockClear();
|
||||||
@@ -1774,9 +1776,8 @@ describe("subagent announce formatting", () => {
|
|||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("defers nested parent announce when active descendants exist even if pending snapshot is stale", async () => {
|
it("defers nested parent announce while pending descendants still exist", async () => {
|
||||||
subagentRegistryMock.countPendingDescendantRuns.mockReturnValue(0);
|
subagentRegistryMock.countPendingDescendantRuns.mockImplementation((sessionKey: string) =>
|
||||||
subagentRegistryMock.countActiveDescendantRuns.mockImplementation((sessionKey: string) =>
|
|
||||||
sessionKey === "agent:main:subagent:parent" ? 1 : 0,
|
sessionKey === "agent:main:subagent:parent" ? 1 : 0,
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -1809,7 +1810,7 @@ describe("subagent announce formatting", () => {
|
|||||||
expect(msg).not.toContain("wait for the remaining results");
|
expect(msg).not.toContain("wait for the remaining results");
|
||||||
});
|
});
|
||||||
|
|
||||||
it("defers announce while finished runs still have active descendants", async () => {
|
it("defers announce while finished runs still have pending descendants", async () => {
|
||||||
const cases = [
|
const cases = [
|
||||||
{
|
{
|
||||||
childRunId: "run-parent",
|
childRunId: "run-parent",
|
||||||
@@ -1824,7 +1825,7 @@ describe("subagent announce formatting", () => {
|
|||||||
for (const testCase of cases) {
|
for (const testCase of cases) {
|
||||||
agentSpy.mockClear();
|
agentSpy.mockClear();
|
||||||
sendSpy.mockClear();
|
sendSpy.mockClear();
|
||||||
subagentRegistryMock.countActiveDescendantRuns.mockImplementation((sessionKey: string) =>
|
subagentRegistryMock.countPendingDescendantRuns.mockImplementation((sessionKey: string) =>
|
||||||
sessionKey === "agent:main:subagent:parent" ? 1 : 0,
|
sessionKey === "agent:main:subagent:parent" ? 1 : 0,
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -1843,35 +1844,214 @@ describe("subagent announce formatting", () => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
it("waits for updated synthesized output before announcing nested subagent completion", async () => {
|
it("defers completion announce when one descendant child is still pending", async () => {
|
||||||
let historyReads = 0;
|
subagentRegistryMock.countPendingDescendantRuns.mockImplementation((sessionKey: string) =>
|
||||||
chatHistoryMock.mockImplementation(async () => {
|
sessionKey === "agent:main:subagent:parent" ? 1 : 0,
|
||||||
historyReads += 1;
|
);
|
||||||
if (historyReads < 3) {
|
|
||||||
return {
|
|
||||||
messages: [{ role: "assistant", content: "Waiting for child output..." }],
|
|
||||||
};
|
|
||||||
}
|
|
||||||
return {
|
|
||||||
messages: [{ role: "assistant", content: "Final synthesized answer." }],
|
|
||||||
};
|
|
||||||
});
|
|
||||||
readLatestAssistantReplyMock.mockResolvedValue(undefined);
|
|
||||||
|
|
||||||
const didAnnounce = await runSubagentAnnounceFlow({
|
const didAnnounce = await runSubagentAnnounceFlow({
|
||||||
childSessionKey: "agent:main:subagent:parent",
|
childSessionKey: "agent:main:subagent:parent",
|
||||||
childRunId: "run-parent-synth",
|
childRunId: "run-parent-one-child-pending",
|
||||||
requesterSessionKey: "agent:main:subagent:orchestrator",
|
requesterSessionKey: "agent:main:main",
|
||||||
requesterDisplayKey: "agent:main:subagent:orchestrator",
|
requesterDisplayKey: "main",
|
||||||
...defaultOutcomeAnnounce,
|
...defaultOutcomeAnnounce,
|
||||||
timeoutMs: 100,
|
expectsCompletionMessage: true,
|
||||||
|
roundOneReply: "waiting for one child completion",
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(didAnnounce).toBe(false);
|
||||||
|
expect(agentSpy).not.toHaveBeenCalled();
|
||||||
|
expect(sendSpy).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("defers completion announce when two descendant children are still pending", async () => {
|
||||||
|
subagentRegistryMock.countPendingDescendantRuns.mockImplementation((sessionKey: string) =>
|
||||||
|
sessionKey === "agent:main:subagent:parent" ? 2 : 0,
|
||||||
|
);
|
||||||
|
|
||||||
|
const didAnnounce = await runSubagentAnnounceFlow({
|
||||||
|
childSessionKey: "agent:main:subagent:parent",
|
||||||
|
childRunId: "run-parent-two-children-pending",
|
||||||
|
requesterSessionKey: "agent:main:main",
|
||||||
|
requesterDisplayKey: "main",
|
||||||
|
...defaultOutcomeAnnounce,
|
||||||
|
expectsCompletionMessage: true,
|
||||||
|
roundOneReply: "waiting for both completion events",
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(didAnnounce).toBe(false);
|
||||||
|
expect(agentSpy).not.toHaveBeenCalled();
|
||||||
|
expect(sendSpy).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("announces completion immediately when no descendants are pending", async () => {
|
||||||
|
subagentRegistryMock.countPendingDescendantRuns.mockReturnValue(0);
|
||||||
|
subagentRegistryMock.countActiveDescendantRuns.mockReturnValue(0);
|
||||||
|
|
||||||
|
const didAnnounce = await runSubagentAnnounceFlow({
|
||||||
|
childSessionKey: "agent:main:subagent:leaf",
|
||||||
|
childRunId: "run-leaf-no-children",
|
||||||
|
requesterSessionKey: "agent:main:main",
|
||||||
|
requesterDisplayKey: "main",
|
||||||
|
...defaultOutcomeAnnounce,
|
||||||
|
expectsCompletionMessage: true,
|
||||||
|
roundOneReply: "single leaf result",
|
||||||
});
|
});
|
||||||
|
|
||||||
expect(didAnnounce).toBe(true);
|
expect(didAnnounce).toBe(true);
|
||||||
|
expect(agentSpy).toHaveBeenCalledTimes(1);
|
||||||
|
expect(sendSpy).not.toHaveBeenCalled();
|
||||||
const call = agentSpy.mock.calls[0]?.[0] as { params?: { message?: string } };
|
const call = agentSpy.mock.calls[0]?.[0] as { params?: { message?: string } };
|
||||||
const msg = call?.params?.message ?? "";
|
const msg = call?.params?.message ?? "";
|
||||||
expect(msg).toContain("Final synthesized answer.");
|
expect(msg).toContain("single leaf result");
|
||||||
expect(msg).not.toContain("Waiting for child output...");
|
});
|
||||||
|
|
||||||
|
it("announces with direct child completion outputs once all descendants are settled", async () => {
|
||||||
|
subagentRegistryMock.countPendingDescendantRuns.mockReturnValue(0);
|
||||||
|
subagentRegistryMock.listSubagentRunsForRequester.mockImplementation((sessionKey: string) =>
|
||||||
|
sessionKey === "agent:main:subagent:parent"
|
||||||
|
? [
|
||||||
|
{
|
||||||
|
runId: "run-child-a",
|
||||||
|
childSessionKey: "agent:main:subagent:parent:subagent:a",
|
||||||
|
requesterSessionKey: "agent:main:subagent:parent",
|
||||||
|
requesterDisplayKey: "parent",
|
||||||
|
task: "child task a",
|
||||||
|
label: "child-a",
|
||||||
|
cleanup: "keep",
|
||||||
|
createdAt: 10,
|
||||||
|
endedAt: 20,
|
||||||
|
cleanupCompletedAt: 21,
|
||||||
|
frozenResultText: "result from child a",
|
||||||
|
outcome: { status: "ok" },
|
||||||
|
},
|
||||||
|
{
|
||||||
|
runId: "run-child-b",
|
||||||
|
childSessionKey: "agent:main:subagent:parent:subagent:b",
|
||||||
|
requesterSessionKey: "agent:main:subagent:parent",
|
||||||
|
requesterDisplayKey: "parent",
|
||||||
|
task: "child task b",
|
||||||
|
label: "child-b",
|
||||||
|
cleanup: "keep",
|
||||||
|
createdAt: 11,
|
||||||
|
endedAt: 21,
|
||||||
|
cleanupCompletedAt: 22,
|
||||||
|
frozenResultText: "result from child b",
|
||||||
|
outcome: { status: "ok" },
|
||||||
|
},
|
||||||
|
]
|
||||||
|
: [],
|
||||||
|
);
|
||||||
|
|
||||||
|
const didAnnounce = await runSubagentAnnounceFlow({
|
||||||
|
childSessionKey: "agent:main:subagent:parent",
|
||||||
|
childRunId: "run-parent-settled",
|
||||||
|
requesterSessionKey: "agent:main:main",
|
||||||
|
requesterDisplayKey: "main",
|
||||||
|
...defaultOutcomeAnnounce,
|
||||||
|
expectsCompletionMessage: true,
|
||||||
|
roundOneReply: "placeholder waiting text that should be ignored",
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(didAnnounce).toBe(true);
|
||||||
|
expect(agentSpy).toHaveBeenCalledTimes(1);
|
||||||
|
const call = agentSpy.mock.calls[0]?.[0] as { params?: { message?: string } };
|
||||||
|
const msg = call?.params?.message ?? "";
|
||||||
|
expect(msg).toContain("Child completion results:");
|
||||||
|
expect(msg).toContain("result from child a");
|
||||||
|
expect(msg).toContain("result from child b");
|
||||||
|
expect(msg).not.toContain("placeholder waiting text that should be ignored");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("nested completion chains re-check child then parent deterministically", async () => {
|
||||||
|
const parentSessionKey = "agent:main:subagent:parent";
|
||||||
|
const childSessionKey = "agent:main:subagent:parent:subagent:child";
|
||||||
|
let parentPending = 1;
|
||||||
|
|
||||||
|
subagentRegistryMock.countPendingDescendantRuns.mockImplementation((sessionKey: string) => {
|
||||||
|
if (sessionKey === parentSessionKey) {
|
||||||
|
return parentPending;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
});
|
||||||
|
subagentRegistryMock.listSubagentRunsForRequester.mockImplementation((sessionKey: string) => {
|
||||||
|
if (sessionKey === childSessionKey) {
|
||||||
|
return [
|
||||||
|
{
|
||||||
|
runId: "run-grandchild",
|
||||||
|
childSessionKey: `${childSessionKey}:subagent:grandchild`,
|
||||||
|
requesterSessionKey: childSessionKey,
|
||||||
|
requesterDisplayKey: "child",
|
||||||
|
task: "grandchild task",
|
||||||
|
label: "grandchild",
|
||||||
|
cleanup: "keep",
|
||||||
|
createdAt: 10,
|
||||||
|
endedAt: 20,
|
||||||
|
cleanupCompletedAt: 21,
|
||||||
|
frozenResultText: "grandchild final output",
|
||||||
|
outcome: { status: "ok" },
|
||||||
|
},
|
||||||
|
];
|
||||||
|
}
|
||||||
|
if (sessionKey === parentSessionKey && parentPending === 0) {
|
||||||
|
return [
|
||||||
|
{
|
||||||
|
runId: "run-child",
|
||||||
|
childSessionKey,
|
||||||
|
requesterSessionKey: parentSessionKey,
|
||||||
|
requesterDisplayKey: "parent",
|
||||||
|
task: "child task",
|
||||||
|
label: "child",
|
||||||
|
cleanup: "keep",
|
||||||
|
createdAt: 11,
|
||||||
|
endedAt: 21,
|
||||||
|
cleanupCompletedAt: 22,
|
||||||
|
frozenResultText: "child synthesized output from grandchild",
|
||||||
|
outcome: { status: "ok" },
|
||||||
|
},
|
||||||
|
];
|
||||||
|
}
|
||||||
|
return [];
|
||||||
|
});
|
||||||
|
|
||||||
|
const parentDeferred = await runSubagentAnnounceFlow({
|
||||||
|
childSessionKey: parentSessionKey,
|
||||||
|
childRunId: "run-parent",
|
||||||
|
requesterSessionKey: "agent:main:main",
|
||||||
|
requesterDisplayKey: "main",
|
||||||
|
...defaultOutcomeAnnounce,
|
||||||
|
expectsCompletionMessage: true,
|
||||||
|
});
|
||||||
|
expect(parentDeferred).toBe(false);
|
||||||
|
expect(agentSpy).not.toHaveBeenCalled();
|
||||||
|
|
||||||
|
const childAnnounced = await runSubagentAnnounceFlow({
|
||||||
|
childSessionKey,
|
||||||
|
childRunId: "run-child",
|
||||||
|
requesterSessionKey: parentSessionKey,
|
||||||
|
requesterDisplayKey: parentSessionKey,
|
||||||
|
...defaultOutcomeAnnounce,
|
||||||
|
expectsCompletionMessage: true,
|
||||||
|
});
|
||||||
|
expect(childAnnounced).toBe(true);
|
||||||
|
|
||||||
|
parentPending = 0;
|
||||||
|
const parentAnnounced = await runSubagentAnnounceFlow({
|
||||||
|
childSessionKey: parentSessionKey,
|
||||||
|
childRunId: "run-parent",
|
||||||
|
requesterSessionKey: "agent:main:main",
|
||||||
|
requesterDisplayKey: "main",
|
||||||
|
...defaultOutcomeAnnounce,
|
||||||
|
expectsCompletionMessage: true,
|
||||||
|
});
|
||||||
|
expect(parentAnnounced).toBe(true);
|
||||||
|
expect(agentSpy).toHaveBeenCalledTimes(2);
|
||||||
|
|
||||||
|
const childCall = agentSpy.mock.calls[0]?.[0] as { params?: { message?: string } };
|
||||||
|
expect(childCall?.params?.message ?? "").toContain("grandchild final output");
|
||||||
|
|
||||||
|
const parentCall = agentSpy.mock.calls[1]?.[0] as { params?: { message?: string } };
|
||||||
|
expect(parentCall?.params?.message ?? "").toContain("child synthesized output from grandchild");
|
||||||
});
|
});
|
||||||
|
|
||||||
it("ignores post-completion announce traffic for completed run-mode requester sessions", async () => {
|
it("ignores post-completion announce traffic for completed run-mode requester sessions", async () => {
|
||||||
|
|||||||
@@ -54,6 +54,7 @@ vi.mock("./pi-embedded.js", () => ({
|
|||||||
vi.mock("./subagent-registry.js", () => ({
|
vi.mock("./subagent-registry.js", () => ({
|
||||||
countActiveDescendantRuns: () => 0,
|
countActiveDescendantRuns: () => 0,
|
||||||
countPendingDescendantRuns: () => 0,
|
countPendingDescendantRuns: () => 0,
|
||||||
|
listSubagentRunsForRequester: () => [],
|
||||||
isSubagentSessionRunActive: () => true,
|
isSubagentSessionRunActive: () => true,
|
||||||
resolveRequesterForChildSession: () => null,
|
resolveRequesterForChildSession: () => null,
|
||||||
}));
|
}));
|
||||||
|
|||||||
@@ -50,7 +50,6 @@ import { isAnnounceSkip } from "./tools/sessions-send-helpers.js";
|
|||||||
|
|
||||||
const FAST_TEST_MODE = process.env.OPENCLAW_TEST_FAST === "1";
|
const FAST_TEST_MODE = process.env.OPENCLAW_TEST_FAST === "1";
|
||||||
const FAST_TEST_RETRY_INTERVAL_MS = 8;
|
const FAST_TEST_RETRY_INTERVAL_MS = 8;
|
||||||
const FAST_TEST_REPLY_CHANGE_WAIT_MS = 20;
|
|
||||||
const DEFAULT_SUBAGENT_ANNOUNCE_TIMEOUT_MS = 60_000;
|
const DEFAULT_SUBAGENT_ANNOUNCE_TIMEOUT_MS = 60_000;
|
||||||
const MAX_TIMER_SAFE_TIMEOUT_MS = 2_147_000_000;
|
const MAX_TIMER_SAFE_TIMEOUT_MS = 2_147_000_000;
|
||||||
let subagentRegistryRuntimePromise: Promise<
|
let subagentRegistryRuntimePromise: Promise<
|
||||||
@@ -328,29 +327,63 @@ export async function captureSubagentCompletionReply(
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async function waitForSubagentOutputChange(params: {
|
function describeSubagentOutcome(outcome?: SubagentRunOutcome): string {
|
||||||
sessionKey: string;
|
if (!outcome) {
|
||||||
baselineReply: string;
|
return "unknown";
|
||||||
maxWaitMs: number;
|
|
||||||
}): Promise<string> {
|
|
||||||
const baseline = params.baselineReply.trim();
|
|
||||||
if (!baseline) {
|
|
||||||
return params.baselineReply;
|
|
||||||
}
|
}
|
||||||
const RETRY_INTERVAL_MS = FAST_TEST_MODE ? FAST_TEST_RETRY_INTERVAL_MS : 100;
|
if (outcome.status === "ok") {
|
||||||
const deadline = Date.now() + Math.max(0, Math.min(params.maxWaitMs, 5_000));
|
return "ok";
|
||||||
let latest = params.baselineReply;
|
}
|
||||||
while (Date.now() < deadline) {
|
if (outcome.status === "timeout") {
|
||||||
const next = await readLatestSubagentOutput(params.sessionKey);
|
return "timeout";
|
||||||
if (next?.trim()) {
|
}
|
||||||
latest = next;
|
if (outcome.status === "error") {
|
||||||
if (next.trim() !== baseline) {
|
return outcome.error?.trim() ? `error: ${outcome.error.trim()}` : "error";
|
||||||
return next;
|
}
|
||||||
}
|
return "unknown";
|
||||||
|
}
|
||||||
|
|
||||||
|
function buildChildCompletionFindings(
|
||||||
|
children: Array<{
|
||||||
|
childSessionKey: string;
|
||||||
|
task: string;
|
||||||
|
label?: string;
|
||||||
|
createdAt: number;
|
||||||
|
endedAt?: number;
|
||||||
|
frozenResultText?: string | null;
|
||||||
|
outcome?: SubagentRunOutcome;
|
||||||
|
}>,
|
||||||
|
): string | undefined {
|
||||||
|
const sorted = [...children].toSorted((a, b) => {
|
||||||
|
if (a.createdAt !== b.createdAt) {
|
||||||
|
return a.createdAt - b.createdAt;
|
||||||
}
|
}
|
||||||
await new Promise((resolve) => setTimeout(resolve, RETRY_INTERVAL_MS));
|
const aEnded = typeof a.endedAt === "number" ? a.endedAt : Number.MAX_SAFE_INTEGER;
|
||||||
|
const bEnded = typeof b.endedAt === "number" ? b.endedAt : Number.MAX_SAFE_INTEGER;
|
||||||
|
return aEnded - bEnded;
|
||||||
|
});
|
||||||
|
|
||||||
|
const sections: string[] = [];
|
||||||
|
for (const [index, child] of sorted.entries()) {
|
||||||
|
const title =
|
||||||
|
child.label?.trim() ||
|
||||||
|
child.task.trim() ||
|
||||||
|
child.childSessionKey.trim() ||
|
||||||
|
`child ${index + 1}`;
|
||||||
|
const resultText = child.frozenResultText?.trim();
|
||||||
|
const outcome = describeSubagentOutcome(child.outcome);
|
||||||
|
sections.push(
|
||||||
|
[`${index + 1}. ${title}`, `status: ${outcome}`, "result:", resultText || "(no output)"].join(
|
||||||
|
"\n",
|
||||||
|
),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
return latest;
|
|
||||||
|
if (sections.length === 0) {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
return ["Child completion results:", "", ...sections].join("\n\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
function formatDurationShort(valueMs?: number) {
|
function formatDurationShort(valueMs?: number) {
|
||||||
@@ -1116,36 +1149,6 @@ export async function runSubagentAnnounceFlow(params: {
|
|||||||
outcome = { status: "timeout" };
|
outcome = { status: "timeout" };
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
reply = await readLatestSubagentOutput(params.childSessionKey);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!reply) {
|
|
||||||
reply = await readLatestSubagentOutput(params.childSessionKey);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!reply?.trim()) {
|
|
||||||
reply = await readLatestSubagentOutputWithRetry({
|
|
||||||
sessionKey: params.childSessionKey,
|
|
||||||
maxWaitMs: params.timeoutMs,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
if (
|
|
||||||
!expectsCompletionMessage &&
|
|
||||||
!reply?.trim() &&
|
|
||||||
childSessionId &&
|
|
||||||
isEmbeddedPiRunActive(childSessionId)
|
|
||||||
) {
|
|
||||||
// Avoid announcing "(no output)" while the child run is still producing output.
|
|
||||||
shouldDeleteChildSession = false;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (isAnnounceSkip(reply)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (isSilentReplyText(reply, SILENT_REPLY_TOKEN)) {
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!outcome) {
|
if (!outcome) {
|
||||||
@@ -1155,30 +1158,68 @@ export async function runSubagentAnnounceFlow(params: {
|
|||||||
let requesterDepth = getSubagentDepthFromSessionStore(targetRequesterSessionKey);
|
let requesterDepth = getSubagentDepthFromSessionStore(targetRequesterSessionKey);
|
||||||
|
|
||||||
let pendingChildDescendantRuns = 0;
|
let pendingChildDescendantRuns = 0;
|
||||||
|
let childCompletionFindings: string | undefined;
|
||||||
try {
|
try {
|
||||||
const { countPendingDescendantRuns, countActiveDescendantRuns } =
|
const { countPendingDescendantRuns, listSubagentRunsForRequester } =
|
||||||
await loadSubagentRegistryRuntime();
|
await loadSubagentRegistryRuntime();
|
||||||
const pending = Math.max(0, countPendingDescendantRuns(params.childSessionKey));
|
pendingChildDescendantRuns = Math.max(0, countPendingDescendantRuns(params.childSessionKey));
|
||||||
const active = Math.max(0, countActiveDescendantRuns(params.childSessionKey));
|
if (pendingChildDescendantRuns > 0) {
|
||||||
pendingChildDescendantRuns = Math.max(pending, active);
|
// Deterministic nested announce policy: if this run still has unfinished
|
||||||
|
// descendants, do not announce yet. Wait for descendant cleanup retries
|
||||||
|
// to re-trigger this announce check once everything is complete.
|
||||||
|
shouldDeleteChildSession = false;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (typeof listSubagentRunsForRequester === "function") {
|
||||||
|
const directChildren = listSubagentRunsForRequester(params.childSessionKey);
|
||||||
|
if (Array.isArray(directChildren) && directChildren.length > 0) {
|
||||||
|
childCompletionFindings = buildChildCompletionFindings(
|
||||||
|
directChildren.map((child) => ({
|
||||||
|
childSessionKey: child.childSessionKey,
|
||||||
|
task: child.task,
|
||||||
|
label: child.label,
|
||||||
|
createdAt: child.createdAt,
|
||||||
|
endedAt: child.endedAt,
|
||||||
|
frozenResultText: child.frozenResultText,
|
||||||
|
outcome: child.outcome,
|
||||||
|
})),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
} catch {
|
} catch {
|
||||||
// Best-effort only; fall back to direct announce behavior when unavailable.
|
// Best-effort only; fall back to current-run reply extraction.
|
||||||
}
|
|
||||||
if (pendingChildDescendantRuns > 0) {
|
|
||||||
// The finished run still has pending descendant subagents (either active,
|
|
||||||
// or ended but still finishing their own announce and cleanup flow). Defer
|
|
||||||
// announcing this run until descendants fully settle.
|
|
||||||
shouldDeleteChildSession = false;
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (requesterDepth >= 1 && reply?.trim()) {
|
if (!childCompletionFindings) {
|
||||||
const minReplyChangeWaitMs = FAST_TEST_MODE ? FAST_TEST_REPLY_CHANGE_WAIT_MS : 250;
|
if (!reply) {
|
||||||
reply = await waitForSubagentOutputChange({
|
reply = await readLatestSubagentOutput(params.childSessionKey);
|
||||||
sessionKey: params.childSessionKey,
|
}
|
||||||
baselineReply: reply,
|
|
||||||
maxWaitMs: Math.max(minReplyChangeWaitMs, Math.min(params.timeoutMs, 2_000)),
|
if (!reply?.trim()) {
|
||||||
});
|
reply = await readLatestSubagentOutputWithRetry({
|
||||||
|
sessionKey: params.childSessionKey,
|
||||||
|
maxWaitMs: params.timeoutMs,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
!expectsCompletionMessage &&
|
||||||
|
!reply?.trim() &&
|
||||||
|
childSessionId &&
|
||||||
|
isEmbeddedPiRunActive(childSessionId)
|
||||||
|
) {
|
||||||
|
// Avoid announcing "(no output)" while the child run is still producing output.
|
||||||
|
shouldDeleteChildSession = false;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isAnnounceSkip(reply)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (isSilentReplyText(reply, SILENT_REPLY_TOKEN)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build status label
|
// Build status label
|
||||||
@@ -1195,7 +1236,7 @@ export async function runSubagentAnnounceFlow(params: {
|
|||||||
const announceType = params.announceType ?? "subagent task";
|
const announceType = params.announceType ?? "subagent task";
|
||||||
const taskLabel = params.label || params.task || "task";
|
const taskLabel = params.label || params.task || "task";
|
||||||
const announceSessionId = childSessionId || "unknown";
|
const announceSessionId = childSessionId || "unknown";
|
||||||
const findings = reply || "(no output)";
|
const findings = childCompletionFindings || reply || "(no output)";
|
||||||
let triggerMessage = "";
|
let triggerMessage = "";
|
||||||
let steerMessage = "";
|
let steerMessage = "";
|
||||||
let internalEvents: AgentInternalEvent[] = [];
|
let internalEvents: AgentInternalEvent[] = [];
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ export {
|
|||||||
countPendingDescendantRuns,
|
countPendingDescendantRuns,
|
||||||
countPendingDescendantRunsExcludingRun,
|
countPendingDescendantRunsExcludingRun,
|
||||||
isSubagentSessionRunActive,
|
isSubagentSessionRunActive,
|
||||||
|
listSubagentRunsForRequester,
|
||||||
resolveRequesterForChildSession,
|
resolveRequesterForChildSession,
|
||||||
shouldIgnorePostCompletionAnnounceForSession,
|
shouldIgnorePostCompletionAnnounceForSession,
|
||||||
} from "./subagent-registry.js";
|
} from "./subagent-registry.js";
|
||||||
|
|||||||
Reference in New Issue
Block a user