mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-10 20:54:33 +00:00
fix(subagents): announce delivery with descendant gating, frozen result refresh, and cron retry (#35080)
Thanks @tyler6204
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
import { promises as fs } from "node:fs";
|
||||
import path from "node:path";
|
||||
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js";
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import {
|
||||
loadSessionStore,
|
||||
@@ -12,7 +13,11 @@ import { onAgentEvent } from "../infra/agent-events.js";
|
||||
import { defaultRuntime } from "../runtime.js";
|
||||
import { type DeliveryContext, normalizeDeliveryContext } from "../utils/delivery-context.js";
|
||||
import { resetAnnounceQueuesForTests } from "./subagent-announce-queue.js";
|
||||
import { runSubagentAnnounceFlow, type SubagentRunOutcome } from "./subagent-announce.js";
|
||||
import {
|
||||
captureSubagentCompletionReply,
|
||||
runSubagentAnnounceFlow,
|
||||
type SubagentRunOutcome,
|
||||
} from "./subagent-announce.js";
|
||||
import {
|
||||
SUBAGENT_ENDED_OUTCOME_KILLED,
|
||||
SUBAGENT_ENDED_REASON_COMPLETE,
|
||||
@@ -38,6 +43,7 @@ import {
|
||||
listDescendantRunsForRequesterFromRuns,
|
||||
listRunsForRequesterFromRuns,
|
||||
resolveRequesterForChildSessionFromRuns,
|
||||
shouldIgnorePostCompletionAnnounceForSessionFromRuns,
|
||||
} from "./subagent-registry-queries.js";
|
||||
import {
|
||||
getSubagentRunsSnapshotForRead,
|
||||
@@ -81,6 +87,25 @@ type SubagentRunOrphanReason = "missing-session-entry" | "missing-session-id";
|
||||
* subsequent lifecycle `start` / `end` can cancel premature failure announces.
|
||||
*/
|
||||
const LIFECYCLE_ERROR_RETRY_GRACE_MS = 15_000;
|
||||
const FROZEN_RESULT_TEXT_MAX_BYTES = 100 * 1024;
|
||||
|
||||
function capFrozenResultText(resultText: string): string {
|
||||
const trimmed = resultText.trim();
|
||||
if (!trimmed) {
|
||||
return "";
|
||||
}
|
||||
const totalBytes = Buffer.byteLength(trimmed, "utf8");
|
||||
if (totalBytes <= FROZEN_RESULT_TEXT_MAX_BYTES) {
|
||||
return trimmed;
|
||||
}
|
||||
const notice = `\n\n[truncated: frozen completion output exceeded ${Math.round(FROZEN_RESULT_TEXT_MAX_BYTES / 1024)}KB (${Math.round(totalBytes / 1024)}KB)]`;
|
||||
const maxPayloadBytes = Math.max(
|
||||
0,
|
||||
FROZEN_RESULT_TEXT_MAX_BYTES - Buffer.byteLength(notice, "utf8"),
|
||||
);
|
||||
const payload = Buffer.from(trimmed, "utf8").subarray(0, maxPayloadBytes).toString("utf8");
|
||||
return `${payload}${notice}`;
|
||||
}
|
||||
|
||||
function resolveAnnounceRetryDelayMs(retryCount: number) {
|
||||
const boundedRetryCount = Math.max(0, Math.min(retryCount, 10));
|
||||
@@ -322,6 +347,78 @@ async function emitSubagentEndedHookForRun(params: {
|
||||
});
|
||||
}
|
||||
|
||||
async function freezeRunResultAtCompletion(entry: SubagentRunRecord): Promise<boolean> {
|
||||
if (entry.frozenResultText !== undefined) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
const captured = await captureSubagentCompletionReply(entry.childSessionKey);
|
||||
entry.frozenResultText = captured?.trim() ? capFrozenResultText(captured) : null;
|
||||
} catch {
|
||||
entry.frozenResultText = null;
|
||||
}
|
||||
entry.frozenResultCapturedAt = Date.now();
|
||||
return true;
|
||||
}
|
||||
|
||||
function listPendingCompletionRunsForSession(sessionKey: string): SubagentRunRecord[] {
|
||||
const key = sessionKey.trim();
|
||||
if (!key) {
|
||||
return [];
|
||||
}
|
||||
const out: SubagentRunRecord[] = [];
|
||||
for (const entry of subagentRuns.values()) {
|
||||
if (entry.childSessionKey !== key) {
|
||||
continue;
|
||||
}
|
||||
if (entry.expectsCompletionMessage !== true) {
|
||||
continue;
|
||||
}
|
||||
if (typeof entry.endedAt !== "number") {
|
||||
continue;
|
||||
}
|
||||
if (typeof entry.cleanupCompletedAt === "number") {
|
||||
continue;
|
||||
}
|
||||
out.push(entry);
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
async function refreshFrozenResultFromSession(sessionKey: string): Promise<boolean> {
|
||||
const candidates = listPendingCompletionRunsForSession(sessionKey);
|
||||
if (candidates.length === 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
let captured: string | undefined;
|
||||
try {
|
||||
captured = await captureSubagentCompletionReply(sessionKey);
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
const trimmed = captured?.trim();
|
||||
if (!trimmed || isSilentReplyText(trimmed, SILENT_REPLY_TOKEN)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const nextFrozen = capFrozenResultText(trimmed);
|
||||
const capturedAt = Date.now();
|
||||
let changed = false;
|
||||
for (const entry of candidates) {
|
||||
if (entry.frozenResultText === nextFrozen) {
|
||||
continue;
|
||||
}
|
||||
entry.frozenResultText = nextFrozen;
|
||||
entry.frozenResultCapturedAt = capturedAt;
|
||||
changed = true;
|
||||
}
|
||||
if (changed) {
|
||||
persistSubagentRuns();
|
||||
}
|
||||
return changed;
|
||||
}
|
||||
|
||||
async function completeSubagentRun(params: {
|
||||
runId: string;
|
||||
endedAt?: number;
|
||||
@@ -365,6 +462,10 @@ async function completeSubagentRun(params: {
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
if (await freezeRunResultAtCompletion(entry)) {
|
||||
mutated = true;
|
||||
}
|
||||
|
||||
if (mutated) {
|
||||
persistSubagentRuns();
|
||||
}
|
||||
@@ -413,6 +514,8 @@ function startSubagentAnnounceCleanupFlow(runId: string, entry: SubagentRunRecor
|
||||
task: entry.task,
|
||||
timeoutMs: SUBAGENT_ANNOUNCE_TIMEOUT_MS,
|
||||
cleanup: entry.cleanup,
|
||||
roundOneReply: entry.frozenResultText ?? undefined,
|
||||
fallbackReply: entry.fallbackFrozenResultText ?? undefined,
|
||||
waitForCompletion: false,
|
||||
startedAt: entry.startedAt,
|
||||
endedAt: entry.endedAt,
|
||||
@@ -420,6 +523,7 @@ function startSubagentAnnounceCleanupFlow(runId: string, entry: SubagentRunRecor
|
||||
outcome: entry.outcome,
|
||||
spawnMode: entry.spawnMode,
|
||||
expectsCompletionMessage: entry.expectsCompletionMessage,
|
||||
wakeOnDescendantSettle: entry.wakeOnDescendantSettle === true,
|
||||
})
|
||||
.then((didAnnounce) => {
|
||||
void finalizeSubagentCleanup(runId, entry.cleanup, didAnnounce);
|
||||
@@ -622,11 +726,14 @@ function ensureListener() {
|
||||
if (!evt || evt.stream !== "lifecycle") {
|
||||
return;
|
||||
}
|
||||
const phase = evt.data?.phase;
|
||||
const entry = subagentRuns.get(evt.runId);
|
||||
if (!entry) {
|
||||
if (phase === "end" && typeof evt.sessionKey === "string") {
|
||||
await refreshFrozenResultFromSession(evt.sessionKey);
|
||||
}
|
||||
return;
|
||||
}
|
||||
const phase = evt.data?.phase;
|
||||
if (phase === "start") {
|
||||
clearPendingLifecycleError(evt.runId);
|
||||
const startedAt = typeof evt.data?.startedAt === "number" ? evt.data.startedAt : undefined;
|
||||
@@ -714,6 +821,9 @@ async function finalizeSubagentCleanup(
|
||||
return;
|
||||
}
|
||||
if (didAnnounce) {
|
||||
entry.wakeOnDescendantSettle = undefined;
|
||||
entry.fallbackFrozenResultText = undefined;
|
||||
entry.fallbackFrozenResultCapturedAt = undefined;
|
||||
const completionReason = resolveCleanupCompletionReason(entry);
|
||||
await emitCompletionEndedHookIfNeeded(entry, completionReason);
|
||||
// Clean up attachments before the run record is removed.
|
||||
@@ -721,6 +831,10 @@ async function finalizeSubagentCleanup(
|
||||
if (shouldDeleteAttachments) {
|
||||
await safeRemoveAttachmentsDir(entry);
|
||||
}
|
||||
if (cleanup === "delete") {
|
||||
entry.frozenResultText = undefined;
|
||||
entry.frozenResultCapturedAt = undefined;
|
||||
}
|
||||
completeCleanupBookkeeping({
|
||||
runId,
|
||||
entry,
|
||||
@@ -745,6 +859,7 @@ async function finalizeSubagentCleanup(
|
||||
|
||||
if (deferredDecision.kind === "defer-descendants") {
|
||||
entry.lastAnnounceRetryAt = now;
|
||||
entry.wakeOnDescendantSettle = true;
|
||||
entry.cleanupHandled = false;
|
||||
resumedRuns.delete(runId);
|
||||
persistSubagentRuns();
|
||||
@@ -760,6 +875,9 @@ async function finalizeSubagentCleanup(
|
||||
}
|
||||
|
||||
if (deferredDecision.kind === "give-up") {
|
||||
entry.wakeOnDescendantSettle = undefined;
|
||||
entry.fallbackFrozenResultText = undefined;
|
||||
entry.fallbackFrozenResultCapturedAt = undefined;
|
||||
const shouldDeleteAttachments = cleanup === "delete" || !entry.retainAttachmentsOnKeep;
|
||||
if (shouldDeleteAttachments) {
|
||||
await safeRemoveAttachmentsDir(entry);
|
||||
@@ -918,6 +1036,7 @@ export function replaceSubagentRunAfterSteer(params: {
|
||||
nextRunId: string;
|
||||
fallback?: SubagentRunRecord;
|
||||
runTimeoutSeconds?: number;
|
||||
preserveFrozenResultFallback?: boolean;
|
||||
}) {
|
||||
const previousRunId = params.previousRunId.trim();
|
||||
const nextRunId = params.nextRunId.trim();
|
||||
@@ -945,6 +1064,7 @@ export function replaceSubagentRunAfterSteer(params: {
|
||||
spawnMode === "session" ? undefined : archiveAfterMs ? now + archiveAfterMs : undefined;
|
||||
const runTimeoutSeconds = params.runTimeoutSeconds ?? source.runTimeoutSeconds ?? 0;
|
||||
const waitTimeoutMs = resolveSubagentWaitTimeoutMs(cfg, runTimeoutSeconds);
|
||||
const preserveFrozenResultFallback = params.preserveFrozenResultFallback === true;
|
||||
|
||||
const next: SubagentRunRecord = {
|
||||
...source,
|
||||
@@ -953,7 +1073,14 @@ export function replaceSubagentRunAfterSteer(params: {
|
||||
endedAt: undefined,
|
||||
endedReason: undefined,
|
||||
endedHookEmittedAt: undefined,
|
||||
wakeOnDescendantSettle: undefined,
|
||||
outcome: undefined,
|
||||
frozenResultText: undefined,
|
||||
frozenResultCapturedAt: undefined,
|
||||
fallbackFrozenResultText: preserveFrozenResultFallback ? source.frozenResultText : undefined,
|
||||
fallbackFrozenResultCapturedAt: preserveFrozenResultFallback
|
||||
? source.frozenResultCapturedAt
|
||||
: undefined,
|
||||
cleanupCompletedAt: undefined,
|
||||
cleanupHandled: false,
|
||||
suppressAnnounceReason: undefined,
|
||||
@@ -1017,6 +1144,7 @@ export function registerSubagentRun(params: {
|
||||
startedAt: now,
|
||||
archiveAtMs,
|
||||
cleanupHandled: false,
|
||||
wakeOnDescendantSettle: undefined,
|
||||
attachmentsDir: params.attachmentsDir,
|
||||
attachmentsRootDir: params.attachmentsRootDir,
|
||||
retainAttachmentsOnKeep: params.retainAttachmentsOnKeep,
|
||||
@@ -1164,6 +1292,13 @@ export function isSubagentSessionRunActive(childSessionKey: string): boolean {
|
||||
return false;
|
||||
}
|
||||
|
||||
export function shouldIgnorePostCompletionAnnounceForSession(childSessionKey: string): boolean {
|
||||
return shouldIgnorePostCompletionAnnounceForSessionFromRuns(
|
||||
getSubagentRunsSnapshotForRead(subagentRuns),
|
||||
childSessionKey,
|
||||
);
|
||||
}
|
||||
|
||||
export function markSubagentRunTerminated(params: {
|
||||
runId?: string;
|
||||
childSessionKey?: string;
|
||||
@@ -1225,8 +1360,11 @@ export function markSubagentRunTerminated(params: {
|
||||
return updated;
|
||||
}
|
||||
|
||||
export function listSubagentRunsForRequester(requesterSessionKey: string): SubagentRunRecord[] {
|
||||
return listRunsForRequesterFromRuns(subagentRuns, requesterSessionKey);
|
||||
export function listSubagentRunsForRequester(
|
||||
requesterSessionKey: string,
|
||||
options?: { requesterRunId?: string },
|
||||
): SubagentRunRecord[] {
|
||||
return listRunsForRequesterFromRuns(subagentRuns, requesterSessionKey, options);
|
||||
}
|
||||
|
||||
export function countActiveRunsForSession(requesterSessionKey: string): number {
|
||||
|
||||
Reference in New Issue
Block a user