mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-08 07:21:23 +00:00
refactor(agents): unify subagent announce delivery pipeline
Co-authored-by: Smith Labs <SmithLabsLLC@users.noreply.github.com> Co-authored-by: Do Cao Hieu <docaohieu2808@users.noreply.github.com>
This commit is contained in:
156
src/agents/subagent-announce-dispatch.test.ts
Normal file
156
src/agents/subagent-announce-dispatch.test.ts
Normal file
@@ -0,0 +1,156 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
mapQueueOutcomeToDeliveryResult,
|
||||
runSubagentAnnounceDispatch,
|
||||
} from "./subagent-announce-dispatch.js";
|
||||
|
||||
describe("mapQueueOutcomeToDeliveryResult", () => {
|
||||
it("maps steered to delivered", () => {
|
||||
expect(mapQueueOutcomeToDeliveryResult("steered")).toEqual({
|
||||
delivered: true,
|
||||
path: "steered",
|
||||
});
|
||||
});
|
||||
|
||||
it("maps queued to delivered", () => {
|
||||
expect(mapQueueOutcomeToDeliveryResult("queued")).toEqual({
|
||||
delivered: true,
|
||||
path: "queued",
|
||||
});
|
||||
});
|
||||
|
||||
it("maps none to not-delivered", () => {
|
||||
expect(mapQueueOutcomeToDeliveryResult("none")).toEqual({
|
||||
delivered: false,
|
||||
path: "none",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("runSubagentAnnounceDispatch", () => {
|
||||
it("uses queue-first ordering for non-completion mode", async () => {
|
||||
const queue = vi.fn(async () => "none" as const);
|
||||
const direct = vi.fn(async () => ({ delivered: true, path: "direct" as const }));
|
||||
|
||||
const result = await runSubagentAnnounceDispatch({
|
||||
expectsCompletionMessage: false,
|
||||
queue,
|
||||
direct,
|
||||
});
|
||||
|
||||
expect(queue).toHaveBeenCalledTimes(1);
|
||||
expect(direct).toHaveBeenCalledTimes(1);
|
||||
expect(result.delivered).toBe(true);
|
||||
expect(result.path).toBe("direct");
|
||||
expect(result.phases).toEqual([
|
||||
{ phase: "queue-primary", delivered: false, path: "none", error: undefined },
|
||||
{ phase: "direct-primary", delivered: true, path: "direct", error: undefined },
|
||||
]);
|
||||
});
|
||||
|
||||
it("short-circuits direct send when non-completion queue delivers", async () => {
|
||||
const queue = vi.fn(async () => "queued" as const);
|
||||
const direct = vi.fn(async () => ({ delivered: true, path: "direct" as const }));
|
||||
|
||||
const result = await runSubagentAnnounceDispatch({
|
||||
expectsCompletionMessage: false,
|
||||
queue,
|
||||
direct,
|
||||
});
|
||||
|
||||
expect(queue).toHaveBeenCalledTimes(1);
|
||||
expect(direct).not.toHaveBeenCalled();
|
||||
expect(result.path).toBe("queued");
|
||||
expect(result.phases).toEqual([
|
||||
{ phase: "queue-primary", delivered: true, path: "queued", error: undefined },
|
||||
]);
|
||||
});
|
||||
|
||||
it("uses direct-first ordering for completion mode", async () => {
|
||||
const queue = vi.fn(async () => "queued" as const);
|
||||
const direct = vi.fn(async () => ({ delivered: true, path: "direct" as const }));
|
||||
|
||||
const result = await runSubagentAnnounceDispatch({
|
||||
expectsCompletionMessage: true,
|
||||
queue,
|
||||
direct,
|
||||
});
|
||||
|
||||
expect(direct).toHaveBeenCalledTimes(1);
|
||||
expect(queue).not.toHaveBeenCalled();
|
||||
expect(result.path).toBe("direct");
|
||||
expect(result.phases).toEqual([
|
||||
{ phase: "direct-primary", delivered: true, path: "direct", error: undefined },
|
||||
]);
|
||||
});
|
||||
|
||||
it("falls back to queue when completion direct send fails", async () => {
|
||||
const queue = vi.fn(async () => "steered" as const);
|
||||
const direct = vi.fn(async () => ({
|
||||
delivered: false,
|
||||
path: "direct" as const,
|
||||
error: "network",
|
||||
}));
|
||||
|
||||
const result = await runSubagentAnnounceDispatch({
|
||||
expectsCompletionMessage: true,
|
||||
queue,
|
||||
direct,
|
||||
});
|
||||
|
||||
expect(direct).toHaveBeenCalledTimes(1);
|
||||
expect(queue).toHaveBeenCalledTimes(1);
|
||||
expect(result.path).toBe("steered");
|
||||
expect(result.phases).toEqual([
|
||||
{ phase: "direct-primary", delivered: false, path: "direct", error: "network" },
|
||||
{ phase: "queue-fallback", delivered: true, path: "steered", error: undefined },
|
||||
]);
|
||||
});
|
||||
|
||||
it("returns direct failure when completion fallback queue cannot deliver", async () => {
|
||||
const queue = vi.fn(async () => "none" as const);
|
||||
const direct = vi.fn(async () => ({
|
||||
delivered: false,
|
||||
path: "direct" as const,
|
||||
error: "failed",
|
||||
}));
|
||||
|
||||
const result = await runSubagentAnnounceDispatch({
|
||||
expectsCompletionMessage: true,
|
||||
queue,
|
||||
direct,
|
||||
});
|
||||
|
||||
expect(result).toMatchObject({
|
||||
delivered: false,
|
||||
path: "direct",
|
||||
error: "failed",
|
||||
});
|
||||
expect(result.phases).toEqual([
|
||||
{ phase: "direct-primary", delivered: false, path: "direct", error: "failed" },
|
||||
{ phase: "queue-fallback", delivered: false, path: "none", error: undefined },
|
||||
]);
|
||||
});
|
||||
|
||||
it("returns none immediately when signal is already aborted", async () => {
|
||||
const queue = vi.fn(async () => "none" as const);
|
||||
const direct = vi.fn(async () => ({ delivered: true, path: "direct" as const }));
|
||||
const controller = new AbortController();
|
||||
controller.abort();
|
||||
|
||||
const result = await runSubagentAnnounceDispatch({
|
||||
expectsCompletionMessage: true,
|
||||
signal: controller.signal,
|
||||
queue,
|
||||
direct,
|
||||
});
|
||||
|
||||
expect(queue).not.toHaveBeenCalled();
|
||||
expect(direct).not.toHaveBeenCalled();
|
||||
expect(result).toEqual({
|
||||
delivered: false,
|
||||
path: "none",
|
||||
phases: [],
|
||||
});
|
||||
});
|
||||
});
|
||||
104
src/agents/subagent-announce-dispatch.ts
Normal file
104
src/agents/subagent-announce-dispatch.ts
Normal file
@@ -0,0 +1,104 @@
|
||||
export type SubagentDeliveryPath = "queued" | "steered" | "direct" | "none";
|
||||
|
||||
export type SubagentAnnounceQueueOutcome = "steered" | "queued" | "none";
|
||||
|
||||
export type SubagentAnnounceDeliveryResult = {
|
||||
delivered: boolean;
|
||||
path: SubagentDeliveryPath;
|
||||
error?: string;
|
||||
phases?: SubagentAnnounceDispatchPhaseResult[];
|
||||
};
|
||||
|
||||
export type SubagentAnnounceDispatchPhase = "queue-primary" | "direct-primary" | "queue-fallback";
|
||||
|
||||
export type SubagentAnnounceDispatchPhaseResult = {
|
||||
phase: SubagentAnnounceDispatchPhase;
|
||||
delivered: boolean;
|
||||
path: SubagentDeliveryPath;
|
||||
error?: string;
|
||||
};
|
||||
|
||||
export function mapQueueOutcomeToDeliveryResult(
|
||||
outcome: SubagentAnnounceQueueOutcome,
|
||||
): SubagentAnnounceDeliveryResult {
|
||||
if (outcome === "steered") {
|
||||
return {
|
||||
delivered: true,
|
||||
path: "steered",
|
||||
};
|
||||
}
|
||||
if (outcome === "queued") {
|
||||
return {
|
||||
delivered: true,
|
||||
path: "queued",
|
||||
};
|
||||
}
|
||||
return {
|
||||
delivered: false,
|
||||
path: "none",
|
||||
};
|
||||
}
|
||||
|
||||
export async function runSubagentAnnounceDispatch(params: {
|
||||
expectsCompletionMessage: boolean;
|
||||
signal?: AbortSignal;
|
||||
queue: () => Promise<SubagentAnnounceQueueOutcome>;
|
||||
direct: () => Promise<SubagentAnnounceDeliveryResult>;
|
||||
}): Promise<SubagentAnnounceDeliveryResult> {
|
||||
const phases: SubagentAnnounceDispatchPhaseResult[] = [];
|
||||
const appendPhase = (
|
||||
phase: SubagentAnnounceDispatchPhase,
|
||||
result: SubagentAnnounceDeliveryResult,
|
||||
) => {
|
||||
phases.push({
|
||||
phase,
|
||||
delivered: result.delivered,
|
||||
path: result.path,
|
||||
error: result.error,
|
||||
});
|
||||
};
|
||||
const withPhases = (result: SubagentAnnounceDeliveryResult): SubagentAnnounceDeliveryResult => ({
|
||||
...result,
|
||||
phases,
|
||||
});
|
||||
|
||||
if (params.signal?.aborted) {
|
||||
return withPhases({
|
||||
delivered: false,
|
||||
path: "none",
|
||||
});
|
||||
}
|
||||
|
||||
if (!params.expectsCompletionMessage) {
|
||||
const primaryQueue = mapQueueOutcomeToDeliveryResult(await params.queue());
|
||||
appendPhase("queue-primary", primaryQueue);
|
||||
if (primaryQueue.delivered) {
|
||||
return withPhases(primaryQueue);
|
||||
}
|
||||
|
||||
const primaryDirect = await params.direct();
|
||||
appendPhase("direct-primary", primaryDirect);
|
||||
return withPhases(primaryDirect);
|
||||
}
|
||||
|
||||
const primaryDirect = await params.direct();
|
||||
appendPhase("direct-primary", primaryDirect);
|
||||
if (primaryDirect.delivered) {
|
||||
return withPhases(primaryDirect);
|
||||
}
|
||||
|
||||
if (params.signal?.aborted) {
|
||||
return withPhases({
|
||||
delivered: false,
|
||||
path: "none",
|
||||
});
|
||||
}
|
||||
|
||||
const fallbackQueue = mapQueueOutcomeToDeliveryResult(await params.queue());
|
||||
appendPhase("queue-fallback", fallbackQueue);
|
||||
if (fallbackQueue.delivered) {
|
||||
return withPhases(fallbackQueue);
|
||||
}
|
||||
|
||||
return withPhases(primaryDirect);
|
||||
}
|
||||
@@ -32,6 +32,10 @@ import {
|
||||
queueEmbeddedPiMessage,
|
||||
waitForEmbeddedPiRunEnd,
|
||||
} from "./pi-embedded.js";
|
||||
import {
|
||||
runSubagentAnnounceDispatch,
|
||||
type SubagentAnnounceDeliveryResult,
|
||||
} from "./subagent-announce-dispatch.js";
|
||||
import { type AnnounceQueueItem, enqueueAnnounce } from "./subagent-announce-queue.js";
|
||||
import { getSubagentDepthFromSessionStore } from "./subagent-depth.js";
|
||||
import type { SpawnSubagentMode } from "./subagent-spawn.js";
|
||||
@@ -53,14 +57,6 @@ type ToolResultMessage = {
|
||||
content?: unknown;
|
||||
};
|
||||
|
||||
type SubagentDeliveryPath = "queued" | "steered" | "direct" | "none";
|
||||
|
||||
type SubagentAnnounceDeliveryResult = {
|
||||
delivered: boolean;
|
||||
path: SubagentDeliveryPath;
|
||||
error?: string;
|
||||
};
|
||||
|
||||
function resolveSubagentAnnounceTimeoutMs(cfg: ReturnType<typeof loadConfig>): number {
|
||||
const configured = cfg.agents?.defaults?.subagents?.announceTimeoutMs;
|
||||
if (typeof configured !== "number" || !Number.isFinite(configured)) {
|
||||
@@ -705,27 +701,6 @@ async function maybeQueueSubagentAnnounce(params: {
|
||||
return "none";
|
||||
}
|
||||
|
||||
function queueOutcomeToDeliveryResult(
|
||||
outcome: "steered" | "queued" | "none",
|
||||
): SubagentAnnounceDeliveryResult {
|
||||
if (outcome === "steered") {
|
||||
return {
|
||||
delivered: true,
|
||||
path: "steered",
|
||||
};
|
||||
}
|
||||
if (outcome === "queued") {
|
||||
return {
|
||||
delivered: true,
|
||||
path: "queued",
|
||||
};
|
||||
}
|
||||
return {
|
||||
delivered: false,
|
||||
path: "none",
|
||||
};
|
||||
}
|
||||
|
||||
async function sendSubagentAnnounceDirectly(params: {
|
||||
targetRequesterSessionKey: string;
|
||||
triggerMessage: string;
|
||||
@@ -905,64 +880,34 @@ async function deliverSubagentAnnouncement(params: {
|
||||
directIdempotencyKey: string;
|
||||
signal?: AbortSignal;
|
||||
}): Promise<SubagentAnnounceDeliveryResult> {
|
||||
if (params.signal?.aborted) {
|
||||
return {
|
||||
delivered: false,
|
||||
path: "none",
|
||||
};
|
||||
}
|
||||
// Non-completion mode mirrors historical behavior: try queued/steered delivery first,
|
||||
// then (only if not queued) attempt direct delivery.
|
||||
if (!params.expectsCompletionMessage) {
|
||||
const queueOutcome = await maybeQueueSubagentAnnounce({
|
||||
requesterSessionKey: params.requesterSessionKey,
|
||||
announceId: params.announceId,
|
||||
triggerMessage: params.triggerMessage,
|
||||
summaryLine: params.summaryLine,
|
||||
requesterOrigin: params.requesterOrigin,
|
||||
signal: params.signal,
|
||||
});
|
||||
const queued = queueOutcomeToDeliveryResult(queueOutcome);
|
||||
if (queued.delivered) {
|
||||
return queued;
|
||||
}
|
||||
}
|
||||
|
||||
// Completion-mode uses direct send first so manual spawns can return immediately
|
||||
// in the common ready-to-deliver case.
|
||||
const direct = await sendSubagentAnnounceDirectly({
|
||||
targetRequesterSessionKey: params.targetRequesterSessionKey,
|
||||
triggerMessage: params.triggerMessage,
|
||||
completionMessage: params.completionMessage,
|
||||
directIdempotencyKey: params.directIdempotencyKey,
|
||||
completionDirectOrigin: params.completionDirectOrigin,
|
||||
completionRouteMode: params.completionRouteMode,
|
||||
spawnMode: params.spawnMode,
|
||||
directOrigin: params.directOrigin,
|
||||
requesterIsSubagent: params.requesterIsSubagent,
|
||||
return await runSubagentAnnounceDispatch({
|
||||
expectsCompletionMessage: params.expectsCompletionMessage,
|
||||
signal: params.signal,
|
||||
bestEffortDeliver: params.bestEffortDeliver,
|
||||
queue: async () =>
|
||||
await maybeQueueSubagentAnnounce({
|
||||
requesterSessionKey: params.requesterSessionKey,
|
||||
announceId: params.announceId,
|
||||
triggerMessage: params.triggerMessage,
|
||||
summaryLine: params.summaryLine,
|
||||
requesterOrigin: params.requesterOrigin,
|
||||
signal: params.signal,
|
||||
}),
|
||||
direct: async () =>
|
||||
await sendSubagentAnnounceDirectly({
|
||||
targetRequesterSessionKey: params.targetRequesterSessionKey,
|
||||
triggerMessage: params.triggerMessage,
|
||||
completionMessage: params.completionMessage,
|
||||
directIdempotencyKey: params.directIdempotencyKey,
|
||||
completionDirectOrigin: params.completionDirectOrigin,
|
||||
completionRouteMode: params.completionRouteMode,
|
||||
spawnMode: params.spawnMode,
|
||||
directOrigin: params.directOrigin,
|
||||
requesterIsSubagent: params.requesterIsSubagent,
|
||||
expectsCompletionMessage: params.expectsCompletionMessage,
|
||||
signal: params.signal,
|
||||
bestEffortDeliver: params.bestEffortDeliver,
|
||||
}),
|
||||
});
|
||||
if (direct.delivered || !params.expectsCompletionMessage) {
|
||||
return direct;
|
||||
}
|
||||
|
||||
// If completion path failed direct delivery, try queueing as a fallback so the
|
||||
// report can still be delivered once the requester session is idle.
|
||||
const queueOutcome = await maybeQueueSubagentAnnounce({
|
||||
requesterSessionKey: params.requesterSessionKey,
|
||||
announceId: params.announceId,
|
||||
triggerMessage: params.triggerMessage,
|
||||
summaryLine: params.summaryLine,
|
||||
requesterOrigin: params.requesterOrigin,
|
||||
signal: params.signal,
|
||||
});
|
||||
if (queueOutcome === "steered" || queueOutcome === "queued") {
|
||||
return queueOutcomeToDeliveryResult(queueOutcome);
|
||||
}
|
||||
|
||||
return direct;
|
||||
}
|
||||
|
||||
function loadSessionEntryByKey(sessionKey: string) {
|
||||
|
||||
@@ -155,4 +155,43 @@ describe("announce loop guard (#18264)", () => {
|
||||
const stored = runs.find((run) => run.runId === entry.runId);
|
||||
expect(stored?.cleanupCompletedAt).toBeDefined();
|
||||
});
|
||||
|
||||
test("announce rejection resets cleanupHandled so retries can resume", async () => {
|
||||
announceFn.mockReset();
|
||||
announceFn.mockRejectedValueOnce(new Error("announce failed"));
|
||||
registry.resetSubagentRegistryForTests();
|
||||
|
||||
const now = Date.now();
|
||||
const runId = "test-announce-rejection";
|
||||
loadSubagentRegistryFromDisk.mockReturnValue(
|
||||
new Map([
|
||||
[
|
||||
runId,
|
||||
{
|
||||
runId,
|
||||
childSessionKey: "agent:main:subagent:child-1",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "agent:main:main",
|
||||
task: "rejection test",
|
||||
cleanup: "keep" as const,
|
||||
createdAt: now - 30_000,
|
||||
startedAt: now - 20_000,
|
||||
endedAt: now - 10_000,
|
||||
cleanupHandled: false,
|
||||
},
|
||||
],
|
||||
]),
|
||||
);
|
||||
|
||||
registry.initSubagentRegistry();
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
|
||||
const runs = registry.listSubagentRunsForRequester("agent:main:main");
|
||||
const stored = runs.find((run) => run.runId === runId);
|
||||
expect(stored?.cleanupHandled).toBe(false);
|
||||
expect(stored?.cleanupCompletedAt).toBeUndefined();
|
||||
expect(stored?.announceRetryCount).toBe(1);
|
||||
expect(stored?.lastAnnounceRetryAt).toBeTypeOf("number");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -331,9 +331,16 @@ function startSubagentAnnounceCleanupFlow(runId: string, entry: SubagentRunRecor
|
||||
outcome: entry.outcome,
|
||||
spawnMode: entry.spawnMode,
|
||||
expectsCompletionMessage: entry.expectsCompletionMessage,
|
||||
}).then((didAnnounce) => {
|
||||
void finalizeSubagentCleanup(runId, entry.cleanup, didAnnounce);
|
||||
});
|
||||
})
|
||||
.then((didAnnounce) => {
|
||||
void finalizeSubagentCleanup(runId, entry.cleanup, didAnnounce);
|
||||
})
|
||||
.catch((error) => {
|
||||
defaultRuntime.log(
|
||||
`[warn] Subagent announce flow failed during cleanup for run ${runId}: ${String(error)}`,
|
||||
);
|
||||
void finalizeSubagentCleanup(runId, entry.cleanup, false);
|
||||
});
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user