mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-21 00:54:59 +00:00
fix(subagents): freeze completion output and ignore late run-mode announces
This commit is contained in:
@@ -33,6 +33,7 @@ const embeddedRunMock = {
|
||||
};
|
||||
const subagentRegistryMock = {
|
||||
isSubagentSessionRunActive: vi.fn(() => true),
|
||||
shouldIgnorePostCompletionAnnounceForSession: vi.fn((_sessionKey: string) => false),
|
||||
countActiveDescendantRuns: vi.fn((_sessionKey: string) => 0),
|
||||
countPendingDescendantRuns: vi.fn((_sessionKey: string) => 0),
|
||||
countPendingDescendantRunsExcludingRun: vi.fn((_sessionKey: string, _runId: string) => 0),
|
||||
@@ -183,6 +184,9 @@ describe("subagent announce formatting", () => {
|
||||
embeddedRunMock.queueEmbeddedPiMessage.mockClear().mockReturnValue(false);
|
||||
embeddedRunMock.waitForEmbeddedPiRunEnd.mockClear().mockResolvedValue(true);
|
||||
subagentRegistryMock.isSubagentSessionRunActive.mockClear().mockReturnValue(true);
|
||||
subagentRegistryMock.shouldIgnorePostCompletionAnnounceForSession
|
||||
.mockClear()
|
||||
.mockReturnValue(false);
|
||||
subagentRegistryMock.countActiveDescendantRuns.mockClear().mockReturnValue(0);
|
||||
subagentRegistryMock.countPendingDescendantRuns
|
||||
.mockClear()
|
||||
@@ -1836,6 +1840,29 @@ describe("subagent announce formatting", () => {
|
||||
expect(msg).not.toContain("Waiting for child output...");
|
||||
});
|
||||
|
||||
it("ignores post-completion announce traffic for completed run-mode requester sessions", async () => {
|
||||
subagentRegistryMock.isSubagentSessionRunActive.mockReturnValue(false);
|
||||
subagentRegistryMock.shouldIgnorePostCompletionAnnounceForSession.mockReturnValue(true);
|
||||
sessionStore = {
|
||||
"agent:main:subagent:orchestrator": {
|
||||
sessionId: "orchestrator-session-id",
|
||||
},
|
||||
};
|
||||
|
||||
const didAnnounce = await runSubagentAnnounceFlow({
|
||||
childSessionKey: "agent:main:subagent:leaf",
|
||||
childRunId: "run-leaf-late",
|
||||
requesterSessionKey: "agent:main:subagent:orchestrator",
|
||||
requesterDisplayKey: "agent:main:subagent:orchestrator",
|
||||
...defaultOutcomeAnnounce,
|
||||
});
|
||||
|
||||
expect(didAnnounce).toBe(true);
|
||||
expect(agentSpy).not.toHaveBeenCalled();
|
||||
expect(sendSpy).not.toHaveBeenCalled();
|
||||
expect(subagentRegistryMock.resolveRequesterForChildSession).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("bubbles child announce to parent requester when requester subagent already ended", async () => {
|
||||
subagentRegistryMock.isSubagentSessionRunActive.mockReturnValue(false);
|
||||
subagentRegistryMock.resolveRequesterForChildSession.mockReturnValue({
|
||||
|
||||
@@ -315,6 +315,19 @@ async function readLatestSubagentOutputWithRetry(params: {
|
||||
return result;
|
||||
}
|
||||
|
||||
export async function captureSubagentCompletionReply(
|
||||
sessionKey: string,
|
||||
): Promise<string | undefined> {
|
||||
const immediate = await readLatestSubagentOutput(sessionKey);
|
||||
if (immediate?.trim()) {
|
||||
return immediate;
|
||||
}
|
||||
return await readLatestSubagentOutputWithRetry({
|
||||
sessionKey,
|
||||
maxWaitMs: FAST_TEST_MODE ? 50 : 1_500,
|
||||
});
|
||||
}
|
||||
|
||||
async function waitForSubagentOutputChange(params: {
|
||||
sessionKey: string;
|
||||
baselineReply: string;
|
||||
@@ -979,6 +992,10 @@ export function buildSubagentSystemPrompt(params: {
|
||||
"Use the `subagents` tool to steer, kill, or do an on-demand status check for your spawned sub-agents.",
|
||||
"Your sub-agents will announce their results back to you automatically (not to the main agent).",
|
||||
"Default workflow: spawn work, continue orchestrating, and wait for auto-announced completions.",
|
||||
"Auto-announce is push-based. After spawning children, do NOT call sessions_list, sessions_history, exec sleep, or any polling tool.",
|
||||
"Wait for completion events to arrive as user messages.",
|
||||
"Track expected child session keys and only send your final answer after completion events for ALL expected children arrive.",
|
||||
"If a child completion event arrives AFTER you already sent your final answer, reply ONLY with NO_REPLY.",
|
||||
"Do NOT repeatedly poll `subagents list` in a loop unless you are actively debugging or intervening.",
|
||||
"Coordinate their work and synthesize results before reporting back.",
|
||||
...(acpEnabled
|
||||
@@ -1224,9 +1241,15 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
// run ended. A parent waiting for child results has no active run but should
|
||||
// still receive the announce — injecting will start a new agent turn.
|
||||
if (requesterIsSubagent) {
|
||||
const { isSubagentSessionRunActive, resolveRequesterForChildSession } =
|
||||
await loadSubagentRegistryRuntime();
|
||||
const {
|
||||
isSubagentSessionRunActive,
|
||||
resolveRequesterForChildSession,
|
||||
shouldIgnorePostCompletionAnnounceForSession,
|
||||
} = await loadSubagentRegistryRuntime();
|
||||
if (!isSubagentSessionRunActive(targetRequesterSessionKey)) {
|
||||
if (shouldIgnorePostCompletionAnnounceForSession(targetRequesterSessionKey)) {
|
||||
return true;
|
||||
}
|
||||
// Parent run has ended. Check if parent SESSION still exists.
|
||||
// If it does, the parent may be waiting for child results — inject there.
|
||||
const parentSessionEntry = loadSessionEntryByKey(targetRequesterSessionKey);
|
||||
|
||||
@@ -58,6 +58,35 @@ export function resolveRequesterForChildSessionFromRuns(
|
||||
};
|
||||
}
|
||||
|
||||
export function shouldIgnorePostCompletionAnnounceForSessionFromRuns(
|
||||
runs: Map<string, SubagentRunRecord>,
|
||||
childSessionKey: string,
|
||||
): boolean {
|
||||
const key = childSessionKey.trim();
|
||||
if (!key) {
|
||||
return false;
|
||||
}
|
||||
let latest: SubagentRunRecord | undefined;
|
||||
for (const entry of runs.values()) {
|
||||
if (entry.childSessionKey !== key) {
|
||||
continue;
|
||||
}
|
||||
if (!latest || entry.createdAt > latest.createdAt) {
|
||||
latest = entry;
|
||||
}
|
||||
}
|
||||
if (!latest) {
|
||||
return false;
|
||||
}
|
||||
// Session-mode subagents remain available for follow-up turns.
|
||||
if (latest.spawnMode === "session") {
|
||||
return false;
|
||||
}
|
||||
// Run-mode subagent sessions should not process new descendant completion
|
||||
// traffic after their own run has already ended.
|
||||
return typeof latest.endedAt === "number";
|
||||
}
|
||||
|
||||
export function countActiveRunsForSessionFromRuns(
|
||||
runs: Map<string, SubagentRunRecord>,
|
||||
requesterSessionKey: string,
|
||||
|
||||
@@ -4,4 +4,5 @@ export {
|
||||
countPendingDescendantRunsExcludingRun,
|
||||
isSubagentSessionRunActive,
|
||||
resolveRequesterForChildSession,
|
||||
shouldIgnorePostCompletionAnnounceForSession,
|
||||
} from "./subagent-registry.js";
|
||||
|
||||
@@ -36,6 +36,7 @@ const loadConfigMock = vi.fn(() => ({
|
||||
const loadRegistryMock = vi.fn(() => new Map());
|
||||
const saveRegistryMock = vi.fn(() => {});
|
||||
const announceSpy = vi.fn(async () => true);
|
||||
const captureCompletionReplySpy = vi.fn(async () => undefined as string | undefined);
|
||||
|
||||
vi.mock("../gateway/call.js", () => ({
|
||||
callGateway: callGatewayMock,
|
||||
@@ -51,6 +52,7 @@ vi.mock("../config/config.js", () => ({
|
||||
|
||||
vi.mock("./subagent-announce.js", () => ({
|
||||
runSubagentAnnounceFlow: announceSpy,
|
||||
captureSubagentCompletionReply: captureCompletionReplySpy,
|
||||
}));
|
||||
|
||||
vi.mock("../plugins/hook-runner-global.js", () => ({
|
||||
@@ -71,10 +73,11 @@ describe("subagent registry lifecycle error grace", () => {
|
||||
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
announceSpy.mockReset().mockResolvedValue(true);
|
||||
captureCompletionReplySpy.mockReset().mockResolvedValue(undefined);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
announceSpy.mockClear();
|
||||
lifecycleHandler = undefined;
|
||||
mod.resetSubagentRegistryForTests({ persist: false });
|
||||
vi.useRealTimers();
|
||||
@@ -158,4 +161,91 @@ describe("subagent registry lifecycle error grace", () => {
|
||||
expect(readFirstAnnounceOutcome()?.status).toBe("error");
|
||||
expect(readFirstAnnounceOutcome()?.error).toBe("fatal failure");
|
||||
});
|
||||
|
||||
it("freezes completion result at run termination across deferred announce retries", async () => {
|
||||
registerCompletionRun("run-freeze", "freeze", "freeze test");
|
||||
captureCompletionReplySpy.mockResolvedValueOnce("Final answer X");
|
||||
announceSpy.mockResolvedValueOnce(false).mockResolvedValueOnce(true);
|
||||
|
||||
const endedAt = Date.now();
|
||||
emitLifecycleEvent("run-freeze", { phase: "end", endedAt });
|
||||
await flushAsync();
|
||||
|
||||
expect(announceSpy).toHaveBeenCalledTimes(1);
|
||||
const firstCall = announceSpy.mock.calls[0]?.[0] as { roundOneReply?: string } | undefined;
|
||||
expect(firstCall?.roundOneReply).toBe("Final answer X");
|
||||
|
||||
await vi.waitFor(() => {
|
||||
const run = mod
|
||||
.listSubagentRunsForRequester(MAIN_REQUESTER_SESSION_KEY)
|
||||
.find((candidate) => candidate.runId === "run-freeze");
|
||||
expect(run?.cleanupHandled).toBe(false);
|
||||
});
|
||||
|
||||
captureCompletionReplySpy.mockResolvedValueOnce("Late reply Y");
|
||||
emitLifecycleEvent("run-freeze", { phase: "end", endedAt: endedAt + 100 });
|
||||
await flushAsync();
|
||||
|
||||
expect(announceSpy).toHaveBeenCalledTimes(2);
|
||||
const secondCall = announceSpy.mock.calls[1]?.[0] as { roundOneReply?: string } | undefined;
|
||||
expect(secondCall?.roundOneReply).toBe("Final answer X");
|
||||
expect(captureCompletionReplySpy).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("keeps parallel child completion results frozen even when late traffic arrives", async () => {
|
||||
registerCompletionRun("run-parallel-a", "parallel-a", "parallel a");
|
||||
registerCompletionRun("run-parallel-b", "parallel-b", "parallel b");
|
||||
captureCompletionReplySpy
|
||||
.mockResolvedValueOnce("Final answer A")
|
||||
.mockResolvedValueOnce("Final answer B");
|
||||
announceSpy
|
||||
.mockResolvedValueOnce(false)
|
||||
.mockResolvedValueOnce(false)
|
||||
.mockResolvedValueOnce(true)
|
||||
.mockResolvedValueOnce(true);
|
||||
|
||||
const parallelEndedAt = Date.now();
|
||||
emitLifecycleEvent("run-parallel-a", { phase: "end", endedAt: parallelEndedAt });
|
||||
emitLifecycleEvent("run-parallel-b", { phase: "end", endedAt: parallelEndedAt + 1 });
|
||||
await flushAsync();
|
||||
|
||||
expect(announceSpy).toHaveBeenCalledTimes(2);
|
||||
await vi.waitFor(() => {
|
||||
const runs = mod.listSubagentRunsForRequester(MAIN_REQUESTER_SESSION_KEY);
|
||||
const runA = runs.find((candidate) => candidate.runId === "run-parallel-a");
|
||||
const runB = runs.find((candidate) => candidate.runId === "run-parallel-b");
|
||||
expect(runA?.cleanupHandled).toBe(false);
|
||||
expect(runB?.cleanupHandled).toBe(false);
|
||||
});
|
||||
|
||||
captureCompletionReplySpy.mockResolvedValue("Late overwrite");
|
||||
|
||||
emitLifecycleEvent("run-parallel-a", { phase: "end", endedAt: parallelEndedAt + 100 });
|
||||
emitLifecycleEvent("run-parallel-b", { phase: "end", endedAt: parallelEndedAt + 101 });
|
||||
await flushAsync();
|
||||
|
||||
expect(announceSpy).toHaveBeenCalledTimes(4);
|
||||
|
||||
const callsByRun = new Map<string, Array<{ roundOneReply?: string }>>();
|
||||
for (const call of announceSpy.mock.calls) {
|
||||
const params = (call?.[0] ?? {}) as { childRunId?: string; roundOneReply?: string };
|
||||
const runId = params.childRunId;
|
||||
if (!runId) {
|
||||
continue;
|
||||
}
|
||||
const existing = callsByRun.get(runId) ?? [];
|
||||
existing.push({ roundOneReply: params.roundOneReply });
|
||||
callsByRun.set(runId, existing);
|
||||
}
|
||||
|
||||
expect(callsByRun.get("run-parallel-a")?.map((entry) => entry.roundOneReply)).toEqual([
|
||||
"Final answer A",
|
||||
"Final answer A",
|
||||
]);
|
||||
expect(callsByRun.get("run-parallel-b")?.map((entry) => entry.roundOneReply)).toEqual([
|
||||
"Final answer B",
|
||||
"Final answer B",
|
||||
]);
|
||||
expect(captureCompletionReplySpy).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -12,7 +12,11 @@ import { onAgentEvent } from "../infra/agent-events.js";
|
||||
import { defaultRuntime } from "../runtime.js";
|
||||
import { type DeliveryContext, normalizeDeliveryContext } from "../utils/delivery-context.js";
|
||||
import { resetAnnounceQueuesForTests } from "./subagent-announce-queue.js";
|
||||
import { runSubagentAnnounceFlow, type SubagentRunOutcome } from "./subagent-announce.js";
|
||||
import {
|
||||
captureSubagentCompletionReply,
|
||||
runSubagentAnnounceFlow,
|
||||
type SubagentRunOutcome,
|
||||
} from "./subagent-announce.js";
|
||||
import {
|
||||
SUBAGENT_ENDED_OUTCOME_KILLED,
|
||||
SUBAGENT_ENDED_REASON_COMPLETE,
|
||||
@@ -38,6 +42,7 @@ import {
|
||||
listDescendantRunsForRequesterFromRuns,
|
||||
listRunsForRequesterFromRuns,
|
||||
resolveRequesterForChildSessionFromRuns,
|
||||
shouldIgnorePostCompletionAnnounceForSessionFromRuns,
|
||||
} from "./subagent-registry-queries.js";
|
||||
import {
|
||||
getSubagentRunsSnapshotForRead,
|
||||
@@ -322,6 +327,20 @@ async function emitSubagentEndedHookForRun(params: {
|
||||
});
|
||||
}
|
||||
|
||||
async function freezeRunResultAtCompletion(entry: SubagentRunRecord): Promise<boolean> {
|
||||
if (entry.frozenResultText !== undefined) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
const captured = await captureSubagentCompletionReply(entry.childSessionKey);
|
||||
entry.frozenResultText = captured?.trim() ? captured : null;
|
||||
} catch {
|
||||
entry.frozenResultText = null;
|
||||
}
|
||||
entry.frozenResultCapturedAt = Date.now();
|
||||
return true;
|
||||
}
|
||||
|
||||
async function completeSubagentRun(params: {
|
||||
runId: string;
|
||||
endedAt?: number;
|
||||
@@ -352,6 +371,10 @@ async function completeSubagentRun(params: {
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
if (await freezeRunResultAtCompletion(entry)) {
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
if (mutated) {
|
||||
persistSubagentRuns();
|
||||
}
|
||||
@@ -400,6 +423,7 @@ function startSubagentAnnounceCleanupFlow(runId: string, entry: SubagentRunRecor
|
||||
task: entry.task,
|
||||
timeoutMs: SUBAGENT_ANNOUNCE_TIMEOUT_MS,
|
||||
cleanup: entry.cleanup,
|
||||
roundOneReply: entry.frozenResultText ?? undefined,
|
||||
waitForCompletion: false,
|
||||
startedAt: entry.startedAt,
|
||||
endedAt: entry.endedAt,
|
||||
@@ -941,6 +965,8 @@ export function replaceSubagentRunAfterSteer(params: {
|
||||
endedReason: undefined,
|
||||
endedHookEmittedAt: undefined,
|
||||
outcome: undefined,
|
||||
frozenResultText: undefined,
|
||||
frozenResultCapturedAt: undefined,
|
||||
cleanupCompletedAt: undefined,
|
||||
cleanupHandled: false,
|
||||
suppressAnnounceReason: undefined,
|
||||
@@ -1151,6 +1177,13 @@ export function isSubagentSessionRunActive(childSessionKey: string): boolean {
|
||||
return false;
|
||||
}
|
||||
|
||||
export function shouldIgnorePostCompletionAnnounceForSession(childSessionKey: string): boolean {
|
||||
return shouldIgnorePostCompletionAnnounceForSessionFromRuns(
|
||||
getSubagentRunsSnapshotForRead(subagentRuns),
|
||||
childSessionKey,
|
||||
);
|
||||
}
|
||||
|
||||
export function markSubagentRunTerminated(params: {
|
||||
runId?: string;
|
||||
childSessionKey?: string;
|
||||
|
||||
@@ -30,6 +30,10 @@ export type SubagentRunRecord = {
|
||||
lastAnnounceRetryAt?: number;
|
||||
/** Terminal lifecycle reason recorded when the run finishes. */
|
||||
endedReason?: SubagentLifecycleEndedReason;
|
||||
/** Frozen completion output captured when the run first transitions to ended state. */
|
||||
frozenResultText?: string | null;
|
||||
/** Timestamp when frozenResultText was captured and locked. */
|
||||
frozenResultCapturedAt?: number;
|
||||
/** Set after the subagent_ended hook has been emitted successfully once. */
|
||||
endedHookEmittedAt?: number;
|
||||
attachmentsDir?: string;
|
||||
|
||||
@@ -88,7 +88,7 @@ export type SpawnSubagentContext = {
|
||||
};
|
||||
|
||||
export const SUBAGENT_SPAWN_ACCEPTED_NOTE =
|
||||
"auto-announces on completion, do not poll/sleep. The response will be sent back as an user message.";
|
||||
"Auto-announce is push-based. After spawning children, do NOT call sessions_list, sessions_history, exec sleep, or any polling tool. Wait for completion events to arrive as user messages, track expected child session keys, and only send your final answer after ALL expected completions arrive. If a child completion event arrives AFTER your final answer, reply ONLY with NO_REPLY.";
|
||||
export const SUBAGENT_SPAWN_SESSION_ACCEPTED_NOTE =
|
||||
"thread-bound session stays active after this task; continue in-thread for follow-ups.";
|
||||
|
||||
|
||||
@@ -695,6 +695,15 @@ describe("buildSubagentSystemPrompt", () => {
|
||||
expect(prompt).toContain("Do not use `exec` (`openclaw ...`, `acpx ...`)");
|
||||
expect(prompt).toContain("Use `subagents` only for OpenClaw subagents");
|
||||
expect(prompt).toContain("Subagent results auto-announce back to you");
|
||||
expect(prompt).toContain(
|
||||
"After spawning children, do NOT call sessions_list, sessions_history, exec sleep, or any polling tool.",
|
||||
);
|
||||
expect(prompt).toContain(
|
||||
"Track expected child session keys and only send your final answer after completion events for ALL expected children arrive.",
|
||||
);
|
||||
expect(prompt).toContain(
|
||||
"If a child completion event arrives AFTER you already sent your final answer, reply ONLY with NO_REPLY.",
|
||||
);
|
||||
expect(prompt).toContain("Avoid polling loops");
|
||||
expect(prompt).toContain("spawned by the main agent");
|
||||
expect(prompt).toContain("reported to the main agent");
|
||||
|
||||
Reference in New Issue
Block a user