mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-08 20:08:26 +00:00
Subagents: restore announce chain + fix nested retry/drop regressions (#22223)
* Subagents: restore announce flow and fix nested delivery retries * fix: prep subagent announce + docs alignment (#22223) (thanks @tyler6204)
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
import { resolveQueueSettings } from "../auto-reply/reply/queue.js";
|
||||
import { SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js";
|
||||
import { DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH } from "../config/agent-limits.js";
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import {
|
||||
loadSessionStore,
|
||||
@@ -10,14 +11,13 @@ import {
|
||||
import { callGateway } from "../gateway/call.js";
|
||||
import { normalizeMainKey } from "../routing/session-key.js";
|
||||
import { defaultRuntime } from "../runtime.js";
|
||||
import { extractTextFromChatContent } from "../shared/chat-content.js";
|
||||
import {
|
||||
type DeliveryContext,
|
||||
deliveryContextFromSession,
|
||||
mergeDeliveryContext,
|
||||
normalizeDeliveryContext,
|
||||
} from "../utils/delivery-context.js";
|
||||
import { isDeliverableMessageChannel } from "../utils/message-channel.js";
|
||||
import { isInternalMessageChannel } from "../utils/message-channel.js";
|
||||
import {
|
||||
buildAnnounceIdFromChildRun,
|
||||
buildAnnounceIdempotencyKey,
|
||||
@@ -30,170 +30,7 @@ import {
|
||||
} from "./pi-embedded.js";
|
||||
import { type AnnounceQueueItem, enqueueAnnounce } from "./subagent-announce-queue.js";
|
||||
import { getSubagentDepthFromSessionStore } from "./subagent-depth.js";
|
||||
import { sanitizeTextContent, extractAssistantText } from "./tools/sessions-helpers.js";
|
||||
|
||||
type ToolResultMessage = {
|
||||
role?: unknown;
|
||||
content?: unknown;
|
||||
};
|
||||
|
||||
type SubagentDeliveryPath = "queued" | "steered" | "direct" | "none";
|
||||
|
||||
type SubagentAnnounceDeliveryResult = {
|
||||
delivered: boolean;
|
||||
path: SubagentDeliveryPath;
|
||||
error?: string;
|
||||
};
|
||||
|
||||
function buildCompletionDeliveryMessage(params: {
|
||||
findings: string;
|
||||
subagentName: string;
|
||||
}): string {
|
||||
const findingsText = params.findings.trim();
|
||||
const hasFindings = findingsText.length > 0 && findingsText !== "(no output)";
|
||||
const header = `✅ Subagent ${params.subagentName} finished`;
|
||||
if (!hasFindings) {
|
||||
return header;
|
||||
}
|
||||
return `${header}\n\n${findingsText}`;
|
||||
}
|
||||
|
||||
function summarizeDeliveryError(error: unknown): string {
|
||||
if (error instanceof Error) {
|
||||
return error.message || "error";
|
||||
}
|
||||
if (typeof error === "string") {
|
||||
return error;
|
||||
}
|
||||
if (error === undefined || error === null) {
|
||||
return "unknown error";
|
||||
}
|
||||
try {
|
||||
return JSON.stringify(error);
|
||||
} catch {
|
||||
return "error";
|
||||
}
|
||||
}
|
||||
|
||||
function extractToolResultText(content: unknown): string {
|
||||
if (typeof content === "string") {
|
||||
return sanitizeTextContent(content);
|
||||
}
|
||||
if (content && typeof content === "object" && !Array.isArray(content)) {
|
||||
const obj = content as {
|
||||
text?: unknown;
|
||||
output?: unknown;
|
||||
content?: unknown;
|
||||
result?: unknown;
|
||||
error?: unknown;
|
||||
summary?: unknown;
|
||||
};
|
||||
if (typeof obj.text === "string") {
|
||||
return sanitizeTextContent(obj.text);
|
||||
}
|
||||
if (typeof obj.output === "string") {
|
||||
return sanitizeTextContent(obj.output);
|
||||
}
|
||||
if (typeof obj.content === "string") {
|
||||
return sanitizeTextContent(obj.content);
|
||||
}
|
||||
if (typeof obj.result === "string") {
|
||||
return sanitizeTextContent(obj.result);
|
||||
}
|
||||
if (typeof obj.error === "string") {
|
||||
return sanitizeTextContent(obj.error);
|
||||
}
|
||||
if (typeof obj.summary === "string") {
|
||||
return sanitizeTextContent(obj.summary);
|
||||
}
|
||||
}
|
||||
if (!Array.isArray(content)) {
|
||||
return "";
|
||||
}
|
||||
const joined = extractTextFromChatContent(content, {
|
||||
sanitizeText: sanitizeTextContent,
|
||||
normalizeText: (text) => text,
|
||||
joinWith: "\n",
|
||||
});
|
||||
return joined?.trim() ?? "";
|
||||
}
|
||||
|
||||
function extractInlineTextContent(content: unknown): string {
|
||||
if (!Array.isArray(content)) {
|
||||
return "";
|
||||
}
|
||||
return (
|
||||
extractTextFromChatContent(content, {
|
||||
sanitizeText: sanitizeTextContent,
|
||||
normalizeText: (text) => text.trim(),
|
||||
joinWith: "",
|
||||
}) ?? ""
|
||||
);
|
||||
}
|
||||
|
||||
function extractSubagentOutputText(message: unknown): string {
|
||||
if (!message || typeof message !== "object") {
|
||||
return "";
|
||||
}
|
||||
const role = (message as { role?: unknown }).role;
|
||||
const content = (message as { content?: unknown }).content;
|
||||
if (role === "assistant") {
|
||||
const assistantText = extractAssistantText(message);
|
||||
if (assistantText) {
|
||||
return assistantText;
|
||||
}
|
||||
if (typeof content === "string") {
|
||||
return sanitizeTextContent(content);
|
||||
}
|
||||
if (Array.isArray(content)) {
|
||||
return extractInlineTextContent(content);
|
||||
}
|
||||
return "";
|
||||
}
|
||||
if (role === "toolResult" || role === "tool") {
|
||||
return extractToolResultText((message as ToolResultMessage).content);
|
||||
}
|
||||
if (typeof content === "string") {
|
||||
return sanitizeTextContent(content);
|
||||
}
|
||||
if (Array.isArray(content)) {
|
||||
return extractInlineTextContent(content);
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
async function readLatestSubagentOutput(sessionKey: string): Promise<string | undefined> {
|
||||
const history = await callGateway<{ messages?: Array<unknown> }>({
|
||||
method: "chat.history",
|
||||
params: { sessionKey, limit: 50 },
|
||||
});
|
||||
const messages = Array.isArray(history?.messages) ? history.messages : [];
|
||||
for (let i = messages.length - 1; i >= 0; i -= 1) {
|
||||
const msg = messages[i];
|
||||
const text = extractSubagentOutputText(msg);
|
||||
if (text) {
|
||||
return text;
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
async function readLatestSubagentOutputWithRetry(params: {
|
||||
sessionKey: string;
|
||||
maxWaitMs: number;
|
||||
}): Promise<string | undefined> {
|
||||
const RETRY_INTERVAL_MS = 100;
|
||||
const deadline = Date.now() + Math.max(0, Math.min(params.maxWaitMs, 15_000));
|
||||
let result: string | undefined;
|
||||
while (Date.now() < deadline) {
|
||||
result = await readLatestSubagentOutput(params.sessionKey);
|
||||
if (result?.trim()) {
|
||||
return result;
|
||||
}
|
||||
await new Promise((resolve) => setTimeout(resolve, RETRY_INTERVAL_MS));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
import { readLatestAssistantReply } from "./tools/agent-step.js";
|
||||
|
||||
function formatDurationShort(valueMs?: number) {
|
||||
if (!valueMs || !Number.isFinite(valueMs) || valueMs <= 0) {
|
||||
@@ -273,8 +110,8 @@ function resolveAnnounceOrigin(
|
||||
): DeliveryContext | undefined {
|
||||
const normalizedRequester = normalizeDeliveryContext(requesterOrigin);
|
||||
const normalizedEntry = deliveryContextFromSession(entry);
|
||||
if (normalizedRequester?.channel && !isDeliverableMessageChannel(normalizedRequester.channel)) {
|
||||
// Ignore internal/non-deliverable channel hints (for example webchat)
|
||||
if (normalizedRequester?.channel && isInternalMessageChannel(normalizedRequester.channel)) {
|
||||
// Ignore internal channel hints, for example webchat,
|
||||
// so a valid persisted route can still be used for outbound delivery.
|
||||
return mergeDeliveryContext(
|
||||
{
|
||||
@@ -284,7 +121,7 @@ function resolveAnnounceOrigin(
|
||||
normalizedEntry,
|
||||
);
|
||||
}
|
||||
// requesterOrigin (captured at spawn time) reflects the channel the user is
|
||||
// requesterOrigin, captured at spawn time, reflects the channel the user is
|
||||
// actually on and must take priority over the session entry, which may carry
|
||||
// stale lastChannel / lastTo values from a previous channel interaction.
|
||||
return mergeDeliveryContext(normalizedRequester, normalizedEntry);
|
||||
@@ -408,182 +245,6 @@ async function maybeQueueSubagentAnnounce(params: {
|
||||
return "none";
|
||||
}
|
||||
|
||||
function queueOutcomeToDeliveryResult(
|
||||
outcome: "steered" | "queued" | "none",
|
||||
): SubagentAnnounceDeliveryResult {
|
||||
if (outcome === "steered") {
|
||||
return {
|
||||
delivered: true,
|
||||
path: "steered",
|
||||
};
|
||||
}
|
||||
if (outcome === "queued") {
|
||||
return {
|
||||
delivered: true,
|
||||
path: "queued",
|
||||
};
|
||||
}
|
||||
return {
|
||||
delivered: false,
|
||||
path: "none",
|
||||
};
|
||||
}
|
||||
|
||||
async function sendSubagentAnnounceDirectly(params: {
|
||||
targetRequesterSessionKey: string;
|
||||
triggerMessage: string;
|
||||
completionMessage?: string;
|
||||
expectsCompletionMessage: boolean;
|
||||
directIdempotencyKey: string;
|
||||
completionDirectOrigin?: DeliveryContext;
|
||||
directOrigin?: DeliveryContext;
|
||||
requesterIsSubagent: boolean;
|
||||
}): Promise<SubagentAnnounceDeliveryResult> {
|
||||
const cfg = loadConfig();
|
||||
const canonicalRequesterSessionKey = resolveRequesterStoreKey(
|
||||
cfg,
|
||||
params.targetRequesterSessionKey,
|
||||
);
|
||||
try {
|
||||
const completionDirectOrigin = normalizeDeliveryContext(params.completionDirectOrigin);
|
||||
const completionChannelRaw =
|
||||
typeof completionDirectOrigin?.channel === "string"
|
||||
? completionDirectOrigin.channel.trim()
|
||||
: "";
|
||||
const completionChannel =
|
||||
completionChannelRaw && isDeliverableMessageChannel(completionChannelRaw)
|
||||
? completionChannelRaw
|
||||
: "";
|
||||
const completionTo =
|
||||
typeof completionDirectOrigin?.to === "string" ? completionDirectOrigin.to.trim() : "";
|
||||
const hasCompletionDirectTarget =
|
||||
!params.requesterIsSubagent && Boolean(completionChannel) && Boolean(completionTo);
|
||||
|
||||
if (
|
||||
params.expectsCompletionMessage &&
|
||||
hasCompletionDirectTarget &&
|
||||
params.completionMessage?.trim()
|
||||
) {
|
||||
const completionThreadId =
|
||||
completionDirectOrigin?.threadId != null && completionDirectOrigin.threadId !== ""
|
||||
? String(completionDirectOrigin.threadId)
|
||||
: undefined;
|
||||
await callGateway({
|
||||
method: "send",
|
||||
params: {
|
||||
channel: completionChannel,
|
||||
to: completionTo,
|
||||
accountId: completionDirectOrigin?.accountId,
|
||||
threadId: completionThreadId,
|
||||
sessionKey: canonicalRequesterSessionKey,
|
||||
message: params.completionMessage,
|
||||
idempotencyKey: params.directIdempotencyKey,
|
||||
},
|
||||
timeoutMs: 15_000,
|
||||
});
|
||||
|
||||
return {
|
||||
delivered: true,
|
||||
path: "direct",
|
||||
};
|
||||
}
|
||||
|
||||
const directOrigin = normalizeDeliveryContext(params.directOrigin);
|
||||
const threadId =
|
||||
directOrigin?.threadId != null && directOrigin.threadId !== ""
|
||||
? String(directOrigin.threadId)
|
||||
: undefined;
|
||||
await callGateway({
|
||||
method: "agent",
|
||||
params: {
|
||||
sessionKey: canonicalRequesterSessionKey,
|
||||
message: params.triggerMessage,
|
||||
deliver: !params.requesterIsSubagent,
|
||||
channel: params.requesterIsSubagent ? undefined : directOrigin?.channel,
|
||||
accountId: params.requesterIsSubagent ? undefined : directOrigin?.accountId,
|
||||
to: params.requesterIsSubagent ? undefined : directOrigin?.to,
|
||||
threadId: params.requesterIsSubagent ? undefined : threadId,
|
||||
idempotencyKey: params.directIdempotencyKey,
|
||||
},
|
||||
expectFinal: true,
|
||||
timeoutMs: 15_000,
|
||||
});
|
||||
|
||||
return {
|
||||
delivered: true,
|
||||
path: "direct",
|
||||
};
|
||||
} catch (err) {
|
||||
return {
|
||||
delivered: false,
|
||||
path: "direct",
|
||||
error: summarizeDeliveryError(err),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
async function deliverSubagentAnnouncement(params: {
|
||||
requesterSessionKey: string;
|
||||
announceId?: string;
|
||||
triggerMessage: string;
|
||||
completionMessage?: string;
|
||||
summaryLine?: string;
|
||||
requesterOrigin?: DeliveryContext;
|
||||
completionDirectOrigin?: DeliveryContext;
|
||||
directOrigin?: DeliveryContext;
|
||||
targetRequesterSessionKey: string;
|
||||
requesterIsSubagent: boolean;
|
||||
expectsCompletionMessage: boolean;
|
||||
directIdempotencyKey: string;
|
||||
}): Promise<SubagentAnnounceDeliveryResult> {
|
||||
// Non-completion mode mirrors historical behavior: try queued/steered delivery first,
|
||||
// then (only if not queued) attempt direct delivery.
|
||||
if (!params.expectsCompletionMessage) {
|
||||
const queueOutcome = await maybeQueueSubagentAnnounce({
|
||||
requesterSessionKey: params.requesterSessionKey,
|
||||
announceId: params.announceId,
|
||||
triggerMessage: params.triggerMessage,
|
||||
summaryLine: params.summaryLine,
|
||||
requesterOrigin: params.requesterOrigin,
|
||||
});
|
||||
const queued = queueOutcomeToDeliveryResult(queueOutcome);
|
||||
if (queued.delivered) {
|
||||
return queued;
|
||||
}
|
||||
}
|
||||
|
||||
// Completion-mode uses direct send first so manual spawns can return immediately
|
||||
// in the common ready-to-deliver case.
|
||||
const direct = await sendSubagentAnnounceDirectly({
|
||||
targetRequesterSessionKey: params.targetRequesterSessionKey,
|
||||
triggerMessage: params.triggerMessage,
|
||||
completionMessage: params.completionMessage,
|
||||
directIdempotencyKey: params.directIdempotencyKey,
|
||||
completionDirectOrigin: params.completionDirectOrigin,
|
||||
directOrigin: params.directOrigin,
|
||||
requesterIsSubagent: params.requesterIsSubagent,
|
||||
expectsCompletionMessage: params.expectsCompletionMessage,
|
||||
});
|
||||
if (direct.delivered || !params.expectsCompletionMessage) {
|
||||
return direct;
|
||||
}
|
||||
|
||||
// If completion path failed direct delivery, try queueing as a fallback so the
|
||||
// report can still be delivered once the requester session is idle.
|
||||
const queueOutcome = await maybeQueueSubagentAnnounce({
|
||||
requesterSessionKey: params.requesterSessionKey,
|
||||
announceId: params.announceId,
|
||||
triggerMessage: params.triggerMessage,
|
||||
summaryLine: params.summaryLine,
|
||||
requesterOrigin: params.requesterOrigin,
|
||||
});
|
||||
if (queueOutcome === "steered" || queueOutcome === "queued") {
|
||||
return queueOutcomeToDeliveryResult(queueOutcome);
|
||||
}
|
||||
|
||||
return direct;
|
||||
}
|
||||
|
||||
function loadSessionEntryByKey(sessionKey: string) {
|
||||
const cfg = loadConfig();
|
||||
const agentId = resolveAgentIdFromSessionKey(sessionKey);
|
||||
@@ -592,6 +253,65 @@ function loadSessionEntryByKey(sessionKey: string) {
|
||||
return store[sessionKey];
|
||||
}
|
||||
|
||||
async function readLatestAssistantReplyWithRetry(params: {
|
||||
sessionKey: string;
|
||||
initialReply?: string;
|
||||
maxWaitMs: number;
|
||||
}): Promise<string | undefined> {
|
||||
const RETRY_INTERVAL_MS = 100;
|
||||
let reply = params.initialReply?.trim() ? params.initialReply : undefined;
|
||||
if (reply) {
|
||||
return reply;
|
||||
}
|
||||
|
||||
const deadline = Date.now() + Math.max(0, Math.min(params.maxWaitMs, 15_000));
|
||||
while (Date.now() < deadline) {
|
||||
await new Promise((resolve) => setTimeout(resolve, RETRY_INTERVAL_MS));
|
||||
const latest = await readLatestAssistantReply({ sessionKey: params.sessionKey });
|
||||
if (latest?.trim()) {
|
||||
return latest;
|
||||
}
|
||||
}
|
||||
return reply;
|
||||
}
|
||||
|
||||
function isLikelyWaitingForDescendantResult(reply?: string): boolean {
|
||||
const text = reply?.trim();
|
||||
if (!text) {
|
||||
return false;
|
||||
}
|
||||
const normalized = text.toLowerCase();
|
||||
if (!normalized.includes("waiting")) {
|
||||
return false;
|
||||
}
|
||||
return (
|
||||
normalized.includes("subagent") ||
|
||||
normalized.includes("child") ||
|
||||
normalized.includes("auto-announce") ||
|
||||
normalized.includes("auto announced") ||
|
||||
normalized.includes("result")
|
||||
);
|
||||
}
|
||||
|
||||
async function waitForAssistantReplyChange(params: {
|
||||
sessionKey: string;
|
||||
previousReply?: string;
|
||||
maxWaitMs: number;
|
||||
}): Promise<string | undefined> {
|
||||
const RETRY_INTERVAL_MS = 200;
|
||||
const previous = params.previousReply?.trim() ?? "";
|
||||
const deadline = Date.now() + Math.max(0, Math.min(params.maxWaitMs, 30_000));
|
||||
while (Date.now() < deadline) {
|
||||
await new Promise((resolve) => setTimeout(resolve, RETRY_INTERVAL_MS));
|
||||
const latest = await readLatestAssistantReply({ sessionKey: params.sessionKey });
|
||||
const normalizedLatest = latest?.trim() ?? "";
|
||||
if (normalizedLatest && normalizedLatest !== previous) {
|
||||
return latest;
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
export function buildSubagentSystemPrompt(params: {
|
||||
requesterSessionKey?: string;
|
||||
requesterOrigin?: DeliveryContext;
|
||||
@@ -608,7 +328,10 @@ export function buildSubagentSystemPrompt(params: {
|
||||
? params.task.replace(/\s+/g, " ").trim()
|
||||
: "{{TASK_DESCRIPTION}}";
|
||||
const childDepth = typeof params.childDepth === "number" ? params.childDepth : 1;
|
||||
const maxSpawnDepth = typeof params.maxSpawnDepth === "number" ? params.maxSpawnDepth : 1;
|
||||
const maxSpawnDepth =
|
||||
typeof params.maxSpawnDepth === "number"
|
||||
? params.maxSpawnDepth
|
||||
: DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH;
|
||||
const canSpawn = childDepth < maxSpawnDepth;
|
||||
const parentLabel = childDepth >= 2 ? "parent orchestrator" : "main agent";
|
||||
|
||||
@@ -692,11 +415,7 @@ function buildAnnounceReplyInstruction(params: {
|
||||
remainingActiveSubagentRuns: number;
|
||||
requesterIsSubagent: boolean;
|
||||
announceType: SubagentAnnounceType;
|
||||
expectsCompletionMessage?: boolean;
|
||||
}): string {
|
||||
if (params.expectsCompletionMessage) {
|
||||
return `A completed ${params.announceType} is ready for user delivery. Convert the result above into your normal assistant voice and send that user-facing update now. Keep this internal context private (don't mention system/log/stats/session details or announce type).`;
|
||||
}
|
||||
if (params.remainingActiveSubagentRuns > 0) {
|
||||
const activeRunsLabel = params.remainingActiveSubagentRuns === 1 ? "run" : "runs";
|
||||
return `There are still ${params.remainingActiveSubagentRuns} active subagent ${activeRunsLabel} for this session. If they are part of the same workflow, wait for the remaining results before sending a user update. If they are unrelated, respond normally using only the result above.`;
|
||||
@@ -723,10 +442,8 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
label?: string;
|
||||
outcome?: SubagentRunOutcome;
|
||||
announceType?: SubagentAnnounceType;
|
||||
expectsCompletionMessage?: boolean;
|
||||
}): Promise<boolean> {
|
||||
let didAnnounce = false;
|
||||
const expectsCompletionMessage = params.expectsCompletionMessage === true;
|
||||
let shouldDeleteChildSession = params.cleanup === "delete";
|
||||
try {
|
||||
let targetRequesterSessionKey = params.requesterSessionKey;
|
||||
@@ -742,7 +459,7 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
let outcome: SubagentRunOutcome | undefined = params.outcome;
|
||||
// Lifecycle "end" can arrive before auto-compaction retries finish. If the
|
||||
// subagent is still active, wait for the embedded run to fully settle.
|
||||
if (!expectsCompletionMessage && childSessionId && isEmbeddedPiRunActive(childSessionId)) {
|
||||
if (childSessionId && isEmbeddedPiRunActive(childSessionId)) {
|
||||
const settled = await waitForEmbeddedPiRunEnd(childSessionId, settleTimeoutMs);
|
||||
if (!settled && isEmbeddedPiRunActive(childSessionId)) {
|
||||
// The child run is still active (e.g., compaction retry still in progress).
|
||||
@@ -787,26 +504,22 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
outcome = { status: "timeout" };
|
||||
}
|
||||
}
|
||||
reply = await readLatestSubagentOutput(params.childSessionKey);
|
||||
reply = await readLatestAssistantReply({ sessionKey: params.childSessionKey });
|
||||
}
|
||||
|
||||
if (!reply) {
|
||||
reply = await readLatestSubagentOutput(params.childSessionKey);
|
||||
reply = await readLatestAssistantReply({ sessionKey: params.childSessionKey });
|
||||
}
|
||||
|
||||
if (!reply?.trim()) {
|
||||
reply = await readLatestSubagentOutputWithRetry({
|
||||
reply = await readLatestAssistantReplyWithRetry({
|
||||
sessionKey: params.childSessionKey,
|
||||
initialReply: reply,
|
||||
maxWaitMs: params.timeoutMs,
|
||||
});
|
||||
}
|
||||
|
||||
if (
|
||||
!expectsCompletionMessage &&
|
||||
!reply?.trim() &&
|
||||
childSessionId &&
|
||||
isEmbeddedPiRunActive(childSessionId)
|
||||
) {
|
||||
if (!reply?.trim() && childSessionId && isEmbeddedPiRunActive(childSessionId)) {
|
||||
// Avoid announcing "(no output)" while the child run is still producing output.
|
||||
shouldDeleteChildSession = false;
|
||||
return false;
|
||||
@@ -823,12 +536,46 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
} catch {
|
||||
// Best-effort only; fall back to direct announce behavior when unavailable.
|
||||
}
|
||||
if (!expectsCompletionMessage && activeChildDescendantRuns > 0) {
|
||||
if (activeChildDescendantRuns > 0) {
|
||||
// The finished run still has active descendant subagents. Defer announcing
|
||||
// this run until descendants settle so we avoid posting in-progress updates.
|
||||
shouldDeleteChildSession = false;
|
||||
return false;
|
||||
}
|
||||
// If the subagent reply is still a "waiting for nested result" placeholder,
|
||||
// hold this announce and wait for the follow-up turn that synthesizes child output.
|
||||
let hasAnyChildDescendantRuns = false;
|
||||
try {
|
||||
const { listDescendantRunsForRequester } = await import("./subagent-registry.js");
|
||||
hasAnyChildDescendantRuns = listDescendantRunsForRequester(params.childSessionKey).length > 0;
|
||||
} catch {
|
||||
// Best-effort only; fall back to existing behavior when unavailable.
|
||||
}
|
||||
if (hasAnyChildDescendantRuns && isLikelyWaitingForDescendantResult(reply)) {
|
||||
const followupReply = await waitForAssistantReplyChange({
|
||||
sessionKey: params.childSessionKey,
|
||||
previousReply: reply,
|
||||
maxWaitMs: settleTimeoutMs,
|
||||
});
|
||||
if (!followupReply?.trim()) {
|
||||
shouldDeleteChildSession = false;
|
||||
return false;
|
||||
}
|
||||
reply = followupReply;
|
||||
try {
|
||||
const { countActiveDescendantRuns } = await import("./subagent-registry.js");
|
||||
activeChildDescendantRuns = Math.max(0, countActiveDescendantRuns(params.childSessionKey));
|
||||
} catch {
|
||||
activeChildDescendantRuns = 0;
|
||||
}
|
||||
if (
|
||||
activeChildDescendantRuns > 0 ||
|
||||
(hasAnyChildDescendantRuns && isLikelyWaitingForDescendantResult(reply))
|
||||
) {
|
||||
shouldDeleteChildSession = false;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Build status label
|
||||
const statusLabel =
|
||||
@@ -843,14 +590,12 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
// Build instructional message for main agent
|
||||
const announceType = params.announceType ?? "subagent task";
|
||||
const taskLabel = params.label || params.task || "task";
|
||||
const subagentName = resolveAgentIdFromSessionKey(params.childSessionKey);
|
||||
const announceSessionId = childSessionId || "unknown";
|
||||
const findings = reply || "(no output)";
|
||||
let completionMessage = "";
|
||||
let triggerMessage = "";
|
||||
|
||||
let requesterDepth = getSubagentDepthFromSessionStore(targetRequesterSessionKey);
|
||||
let requesterIsSubagent = !expectsCompletionMessage && requesterDepth >= 1;
|
||||
let requesterIsSubagent = requesterDepth >= 1;
|
||||
// If the requester subagent has already finished, bubble the announce to its
|
||||
// requester (typically main) so descendant completion is not silently lost.
|
||||
// BUT: only fallback if the parent SESSION is deleted, not just if the current
|
||||
@@ -903,31 +648,43 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
remainingActiveSubagentRuns,
|
||||
requesterIsSubagent,
|
||||
announceType,
|
||||
expectsCompletionMessage,
|
||||
});
|
||||
const statsLine = await buildCompactAnnounceStatsLine({
|
||||
sessionKey: params.childSessionKey,
|
||||
startedAt: params.startedAt,
|
||||
endedAt: params.endedAt,
|
||||
});
|
||||
completionMessage = buildCompletionDeliveryMessage({
|
||||
findings,
|
||||
subagentName,
|
||||
});
|
||||
const internalSummaryMessage = [
|
||||
triggerMessage = [
|
||||
`[System Message] [sessionId: ${announceSessionId}] A ${announceType} "${taskLabel}" just ${statusLabel}.`,
|
||||
"",
|
||||
"Result:",
|
||||
findings,
|
||||
"",
|
||||
statsLine,
|
||||
"",
|
||||
replyInstruction,
|
||||
].join("\n");
|
||||
triggerMessage = [internalSummaryMessage, "", replyInstruction].join("\n");
|
||||
|
||||
const announceId = buildAnnounceIdFromChildRun({
|
||||
childSessionKey: params.childSessionKey,
|
||||
childRunId: params.childRunId,
|
||||
});
|
||||
const queued = await maybeQueueSubagentAnnounce({
|
||||
requesterSessionKey: targetRequesterSessionKey,
|
||||
announceId,
|
||||
triggerMessage,
|
||||
summaryLine: taskLabel,
|
||||
requesterOrigin: targetRequesterOrigin,
|
||||
});
|
||||
if (queued === "steered") {
|
||||
didAnnounce = true;
|
||||
return true;
|
||||
}
|
||||
if (queued === "queued") {
|
||||
didAnnounce = true;
|
||||
return true;
|
||||
}
|
||||
|
||||
// Send to the requester session. For nested subagents this is an internal
|
||||
// follow-up injection (deliver=false) so the orchestrator receives it.
|
||||
let directOrigin = targetRequesterOrigin;
|
||||
@@ -939,26 +696,26 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
// catches duplicates if this announce is also queued by the gateway-
|
||||
// level message queue while the main session is busy (#17122).
|
||||
const directIdempotencyKey = buildAnnounceIdempotencyKey(announceId);
|
||||
const delivery = await deliverSubagentAnnouncement({
|
||||
requesterSessionKey: targetRequesterSessionKey,
|
||||
announceId,
|
||||
triggerMessage,
|
||||
completionMessage,
|
||||
summaryLine: taskLabel,
|
||||
requesterOrigin: targetRequesterOrigin,
|
||||
completionDirectOrigin: targetRequesterOrigin,
|
||||
directOrigin,
|
||||
targetRequesterSessionKey,
|
||||
requesterIsSubagent,
|
||||
expectsCompletionMessage: expectsCompletionMessage,
|
||||
directIdempotencyKey,
|
||||
await callGateway({
|
||||
method: "agent",
|
||||
params: {
|
||||
sessionKey: targetRequesterSessionKey,
|
||||
message: triggerMessage,
|
||||
deliver: !requesterIsSubagent,
|
||||
channel: requesterIsSubagent ? undefined : directOrigin?.channel,
|
||||
accountId: requesterIsSubagent ? undefined : directOrigin?.accountId,
|
||||
to: requesterIsSubagent ? undefined : directOrigin?.to,
|
||||
threadId:
|
||||
!requesterIsSubagent && directOrigin?.threadId != null && directOrigin.threadId !== ""
|
||||
? String(directOrigin.threadId)
|
||||
: undefined,
|
||||
idempotencyKey: directIdempotencyKey,
|
||||
},
|
||||
expectFinal: true,
|
||||
timeoutMs: 15_000,
|
||||
});
|
||||
didAnnounce = delivery.delivered;
|
||||
if (!delivery.delivered && delivery.path === "direct" && delivery.error) {
|
||||
defaultRuntime.error?.(
|
||||
`Subagent completion direct announce failed for run ${params.childRunId}: ${delivery.error}`,
|
||||
);
|
||||
}
|
||||
|
||||
didAnnounce = true;
|
||||
} catch (err) {
|
||||
defaultRuntime.error?.(`Subagent announce failed: ${String(err)}`);
|
||||
// Best-effort follow-ups; ignore failures to avoid breaking the caller response.
|
||||
|
||||
Reference in New Issue
Block a user