diff --git a/src/agents/agent-scope.ts b/src/agents/agent-scope.ts index 1af6926784c..178bd1ec7e4 100644 --- a/src/agents/agent-scope.ts +++ b/src/agents/agent-scope.ts @@ -7,6 +7,7 @@ import { parseAgentSessionKey, } from "../routing/session-key.js"; import { resolveUserPath } from "../utils.js"; +import { normalizeSkillFilter } from "./skills/filter.js"; import { resolveDefaultAgentWorkspaceDir } from "./workspace.js"; export { resolveAgentIdFromSessionKey } from "../routing/session-key.js"; @@ -128,12 +129,7 @@ export function resolveAgentSkillsFilter( cfg: OpenClawConfig, agentId: string, ): string[] | undefined { - const raw = resolveAgentConfig(cfg, agentId)?.skills; - if (!raw) { - return undefined; - } - const normalized = raw.map((entry) => String(entry).trim()).filter(Boolean); - return normalized.length > 0 ? normalized : []; + return normalizeSkillFilter(resolveAgentConfig(cfg, agentId)?.skills); } export function resolveAgentModelPrimary(cfg: OpenClawConfig, agentId: string): string | undefined { diff --git a/src/agents/skills/filter.test.ts b/src/agents/skills/filter.test.ts new file mode 100644 index 00000000000..8cd64e429e3 --- /dev/null +++ b/src/agents/skills/filter.test.ts @@ -0,0 +1,35 @@ +import { describe, expect, it } from "vitest"; +import { + matchesSkillFilter, + normalizeSkillFilter, + normalizeSkillFilterForComparison, +} from "./filter.js"; + +describe("skills/filter", () => { + it("normalizes configured filters with trimming", () => { + expect(normalizeSkillFilter([" weather ", "", "meme-factory"])).toEqual([ + "weather", + "meme-factory", + ]); + }); + + it("preserves explicit empty list as []", () => { + expect(normalizeSkillFilter([])).toEqual([]); + expect(normalizeSkillFilter(undefined)).toBeUndefined(); + }); + + it("normalizes for comparison with dedupe + ordering", () => { + expect(normalizeSkillFilterForComparison(["weather", "meme-factory", "weather"])).toEqual([ + "meme-factory", + "weather", + ]); + }); + + it("matches equivalent filters after normalization", () => { + expect(matchesSkillFilter(["weather", "meme-factory"], [" meme-factory ", "weather"])).toBe( + true, + ); + expect(matchesSkillFilter(undefined, undefined)).toBe(true); + expect(matchesSkillFilter([], undefined)).toBe(false); + }); +}); diff --git a/src/agents/skills/filter.ts b/src/agents/skills/filter.ts new file mode 100644 index 00000000000..a5fb8222874 --- /dev/null +++ b/src/agents/skills/filter.ts @@ -0,0 +1,31 @@ +export function normalizeSkillFilter(skillFilter?: ReadonlyArray): string[] | undefined { + if (skillFilter === undefined) { + return undefined; + } + return skillFilter.map((entry) => String(entry).trim()).filter(Boolean); +} + +export function normalizeSkillFilterForComparison( + skillFilter?: ReadonlyArray, +): string[] | undefined { + const normalized = normalizeSkillFilter(skillFilter); + if (normalized === undefined) { + return undefined; + } + return Array.from(new Set(normalized)).toSorted(); +} + +export function matchesSkillFilter( + cached?: ReadonlyArray, + next?: ReadonlyArray, +): boolean { + const cachedNormalized = normalizeSkillFilterForComparison(cached); + const nextNormalized = normalizeSkillFilterForComparison(next); + if (cachedNormalized === undefined || nextNormalized === undefined) { + return cachedNormalized === nextNormalized; + } + if (cachedNormalized.length !== nextNormalized.length) { + return false; + } + return cachedNormalized.every((entry, index) => entry === nextNormalized[index]); +} diff --git a/src/agents/skills/types.ts b/src/agents/skills/types.ts index b518d4bb601..abfb8743dd7 100644 --- a/src/agents/skills/types.ts +++ b/src/agents/skills/types.ts @@ -82,6 +82,8 @@ export type SkillEligibilityContext = { export type SkillSnapshot = { prompt: string; skills: Array<{ name: string; primaryEnv?: string }>; + /** Normalized agent-level filter used to build this snapshot; undefined means unrestricted. */ + skillFilter?: string[]; resolvedSkills?: Skill[]; version?: number; }; diff --git a/src/agents/skills/workspace.ts b/src/agents/skills/workspace.ts index ee666eacaab..51b0c2bbd1d 100644 --- a/src/agents/skills/workspace.ts +++ b/src/agents/skills/workspace.ts @@ -19,6 +19,7 @@ import { CONFIG_DIR, resolveUserPath } from "../../utils.js"; import { resolveSandboxPath } from "../sandbox-paths.js"; import { resolveBundledSkillsDir } from "./bundled-dir.js"; import { shouldIncludeSkill } from "./config.js"; +import { normalizeSkillFilter } from "./filter.js"; import { parseFrontmatter, resolveOpenClawMetadata, @@ -52,14 +53,16 @@ function filterSkillEntries( let filtered = entries.filter((entry) => shouldIncludeSkill({ entry, config, eligibility })); // If skillFilter is provided, only include skills in the filter list. if (skillFilter !== undefined) { - const normalized = skillFilter.map((entry) => String(entry).trim()).filter(Boolean); + const normalized = normalizeSkillFilter(skillFilter) ?? []; const label = normalized.length > 0 ? normalized.join(", ") : "(none)"; - console.log(`[skills] Applying skill filter: ${label}`); + skillsLogger.debug(`Applying skill filter: ${label}`); filtered = normalized.length > 0 ? filtered.filter((entry) => normalized.includes(entry.skill.name)) : []; - console.log(`[skills] After filter: ${filtered.map((entry) => entry.skill.name).join(", ")}`); + skillsLogger.debug( + `After skill filter: ${filtered.map((entry) => entry.skill.name).join(", ") || "(none)"}`, + ); } return filtered; } @@ -232,12 +235,14 @@ export function buildWorkspaceSkillSnapshot( const resolvedSkills = promptEntries.map((entry) => entry.skill); const remoteNote = opts?.eligibility?.remote?.note?.trim(); const prompt = [remoteNote, formatSkillsForPrompt(resolvedSkills)].filter(Boolean).join("\n"); + const skillFilter = normalizeSkillFilter(opts?.skillFilter); return { prompt, skills: eligible.map((entry) => ({ name: entry.skill.name, primaryEnv: entry.metadata?.primaryEnv, })), + ...(skillFilter === undefined ? {} : { skillFilter }), resolvedSkills, version: opts?.snapshotVersion, }; diff --git a/src/config/sessions/types.ts b/src/config/sessions/types.ts index 1d0012a749f..012d59f728d 100644 --- a/src/config/sessions/types.ts +++ b/src/config/sessions/types.ts @@ -144,6 +144,8 @@ export type GroupKeyResolution = { export type SessionSkillSnapshot = { prompt: string; skills: Array<{ name: string; primaryEnv?: string }>; + /** Normalized agent-level filter used to build this snapshot; undefined means unrestricted. */ + skillFilter?: string[]; resolvedSkills?: Skill[]; version?: number; }; diff --git a/src/cron/isolated-agent/run.skill-filter.test.ts b/src/cron/isolated-agent/run.skill-filter.test.ts index d4d776b7116..9c0bd4300a8 100644 --- a/src/cron/isolated-agent/run.skill-filter.test.ts +++ b/src/cron/isolated-agent/run.skill-filter.test.ts @@ -194,7 +194,6 @@ function makeParams(overrides?: Record) { describe("runCronIsolatedAgentTurn — skill filter", () => { let previousFastTestEnv: string | undefined; - beforeEach(() => { vi.clearAllMocks(); previousFastTestEnv = process.env.OPENCLAW_TEST_FAST; @@ -283,4 +282,72 @@ describe("runCronIsolatedAgentTurn — skill filter", () => { // Explicit empty skills list should forward [] to filter out all skills expect(buildWorkspaceSkillSnapshotMock.mock.calls[0][1]).toHaveProperty("skillFilter", []); }); + + it("refreshes cached snapshot when skillFilter changes without version bump", async () => { + resolveAgentSkillsFilterMock.mockReturnValue(["weather"]); + resolveCronSessionMock.mockReturnValue({ + storePath: "/tmp/store.json", + store: {}, + sessionEntry: { + sessionId: "test-session-id", + updatedAt: 0, + systemSent: false, + skillsSnapshot: { + prompt: "meme-factory", + skills: [{ name: "meme-factory" }], + version: 42, + }, + }, + systemSent: false, + isNewSession: true, + }); + + const { runCronIsolatedAgentTurn } = await import("./run.js"); + + const result = await runCronIsolatedAgentTurn( + makeParams({ + cfg: { agents: { list: [{ id: "weather-bot", skills: ["weather"] }] } }, + agentId: "weather-bot", + }), + ); + + expect(result.status).toBe("ok"); + expect(buildWorkspaceSkillSnapshotMock).toHaveBeenCalledOnce(); + expect(buildWorkspaceSkillSnapshotMock.mock.calls[0][1]).toHaveProperty("skillFilter", [ + "weather", + ]); + }); + + it("reuses cached snapshot when version and normalized skillFilter are unchanged", async () => { + resolveAgentSkillsFilterMock.mockReturnValue([" weather ", "meme-factory", "weather"]); + resolveCronSessionMock.mockReturnValue({ + storePath: "/tmp/store.json", + store: {}, + sessionEntry: { + sessionId: "test-session-id", + updatedAt: 0, + systemSent: false, + skillsSnapshot: { + prompt: "weather", + skills: [{ name: "weather" }], + skillFilter: ["meme-factory", "weather"], + version: 42, + }, + }, + systemSent: false, + isNewSession: true, + }); + + const { runCronIsolatedAgentTurn } = await import("./run.js"); + + const result = await runCronIsolatedAgentTurn( + makeParams({ + cfg: { agents: { list: [{ id: "weather-bot", skills: ["weather", "meme-factory"] }] } }, + agentId: "weather-bot", + }), + ); + + expect(result.status).toBe("ok"); + expect(buildWorkspaceSkillSnapshotMock).not.toHaveBeenCalled(); + }); }); diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index 7b3a7eb9566..8aba703dfbd 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -6,7 +6,6 @@ import { resolveAgentConfig, resolveAgentDir, resolveAgentModelFallbacksOverride, - resolveAgentSkillsFilter, resolveAgentWorkspaceDir, resolveDefaultAgentId, } from "../../agents/agent-scope.js"; @@ -26,15 +25,9 @@ import { resolveThinkingDefault, } from "../../agents/model-selection.js"; import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js"; -import { buildWorkspaceSkillSnapshot } from "../../agents/skills.js"; -import { getSkillsSnapshotVersion } from "../../agents/skills/refresh.js"; import { runSubagentAnnounceFlow } from "../../agents/subagent-announce.js"; -import { - countActiveDescendantRuns, - listDescendantRunsForRequester, -} from "../../agents/subagent-registry.js"; +import { countActiveDescendantRuns } from "../../agents/subagent-registry.js"; import { resolveAgentTimeoutMs } from "../../agents/timeout.js"; -import { readLatestAssistantReply } from "../../agents/tools/agent-step.js"; import { deriveSessionTotalTokens, hasNonzeroUsage } from "../../agents/usage.js"; import { ensureAgentWorkspace } from "../../agents/workspace.js"; import { @@ -52,7 +45,6 @@ import { import { registerAgentRunContext } from "../../infra/agent-events.js"; import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js"; import { resolveAgentOutboundIdentity } from "../../infra/outbound/identity.js"; -import { getRemoteSkillEligibility } from "../../infra/skills-remote.js"; import { logWarn } from "../../logger.js"; import { buildAgentMainSessionKey, normalizeAgentId } from "../../routing/session-key.js"; import { @@ -72,6 +64,13 @@ import { resolveHeartbeatAckMaxChars, } from "./helpers.js"; import { resolveCronSession } from "./session.js"; +import { resolveCronSkillsSnapshot } from "./skills-snapshot.js"; +import { + expectsSubagentFollowup, + isLikelyInterimCronMessage, + readDescendantSubagentFallbackReply, + waitForDescendantSubagentSummary, +} from "./subagent-followup.js"; function matchesMessagingToolDeliveryTarget( target: MessagingToolSend, @@ -101,152 +100,6 @@ function resolveCronDeliveryBestEffort(job: CronJob): boolean { return false; } -const CRON_SUBAGENT_WAIT_POLL_MS = 500; -const CRON_SUBAGENT_WAIT_MIN_MS = 30_000; -const CRON_SUBAGENT_FINAL_REPLY_GRACE_MS = 5_000; - -function isLikelyInterimCronMessage(value: string): boolean { - const text = value.trim(); - if (!text) { - return true; - } - const normalized = text.toLowerCase().replace(/\s+/g, " "); - const words = normalized.split(" ").filter(Boolean).length; - const interimHints = [ - "on it", - "pulling everything together", - "give me a few", - "give me a few min", - "few minutes", - "let me compile", - "i'll gather", - "i will gather", - "working on it", - "retrying now", - "should be about", - "should have your summary", - "subagent spawned", - "spawned a subagent", - "it'll auto-announce when done", - "it will auto-announce when done", - "auto-announce when done", - "both subagents are running", - "wait for them to report back", - ]; - return words <= 45 && interimHints.some((hint) => normalized.includes(hint)); -} - -function expectsSubagentFollowup(value: string): boolean { - const normalized = value.trim().toLowerCase().replace(/\s+/g, " "); - if (!normalized) { - return false; - } - const hints = [ - "subagent spawned", - "spawned a subagent", - "auto-announce when done", - "both subagents are running", - "wait for them to report back", - ]; - return hints.some((hint) => normalized.includes(hint)); -} - -async function readDescendantSubagentFallbackReply(params: { - sessionKey: string; - runStartedAt: number; -}): Promise { - const descendants = listDescendantRunsForRequester(params.sessionKey) - .filter( - (entry) => - typeof entry.endedAt === "number" && - entry.endedAt >= params.runStartedAt && - entry.childSessionKey.trim().length > 0, - ) - .toSorted((a, b) => (a.endedAt ?? 0) - (b.endedAt ?? 0)); - if (descendants.length === 0) { - return undefined; - } - - const latestByChild = new Map(); - for (const entry of descendants) { - const childKey = entry.childSessionKey.trim(); - if (!childKey) { - continue; - } - const current = latestByChild.get(childKey); - if (!current || (entry.endedAt ?? 0) >= (current.endedAt ?? 0)) { - latestByChild.set(childKey, entry); - } - } - - const replies: string[] = []; - const latestRuns = [...latestByChild.values()] - .toSorted((a, b) => (a.endedAt ?? 0) - (b.endedAt ?? 0)) - .slice(-4); - for (const entry of latestRuns) { - const reply = (await readLatestAssistantReply({ sessionKey: entry.childSessionKey }))?.trim(); - if (!reply || reply.toUpperCase() === SILENT_REPLY_TOKEN.toUpperCase()) { - continue; - } - replies.push(reply); - } - if (replies.length === 0) { - return undefined; - } - if (replies.length === 1) { - return replies[0]; - } - return replies.join("\n\n"); -} - -async function waitForDescendantSubagentSummary(params: { - sessionKey: string; - initialReply?: string; - timeoutMs: number; - observedActiveDescendants?: boolean; -}): Promise { - const initialReply = params.initialReply?.trim(); - const deadline = Date.now() + Math.max(CRON_SUBAGENT_WAIT_MIN_MS, Math.floor(params.timeoutMs)); - let sawActiveDescendants = params.observedActiveDescendants === true; - let drainedAtMs: number | undefined; - while (Date.now() < deadline) { - const activeDescendants = countActiveDescendantRuns(params.sessionKey); - if (activeDescendants > 0) { - sawActiveDescendants = true; - drainedAtMs = undefined; - await new Promise((resolve) => setTimeout(resolve, CRON_SUBAGENT_WAIT_POLL_MS)); - continue; - } - if (!sawActiveDescendants) { - return initialReply; - } - if (!drainedAtMs) { - drainedAtMs = Date.now(); - } - const latest = (await readLatestAssistantReply({ sessionKey: params.sessionKey }))?.trim(); - if ( - latest && - latest.toUpperCase() !== SILENT_REPLY_TOKEN.toUpperCase() && - (latest !== initialReply || !isLikelyInterimCronMessage(latest)) - ) { - return latest; - } - if (Date.now() - drainedAtMs >= CRON_SUBAGENT_FINAL_REPLY_GRACE_MS) { - return undefined; - } - await new Promise((resolve) => setTimeout(resolve, CRON_SUBAGENT_WAIT_POLL_MS)); - } - const latest = (await readLatestAssistantReply({ sessionKey: params.sessionKey }))?.trim(); - if ( - latest && - latest.toUpperCase() !== SILENT_REPLY_TOKEN.toUpperCase() && - (latest !== initialReply || !isLikelyInterimCronMessage(latest)) - ) { - return latest; - } - return undefined; -} - export type RunCronAgentTurnResult = { status: "ok" | "error" | "skipped"; summary?: string; @@ -521,32 +374,21 @@ export async function runCronIsolatedAgentTurn(params: { `${commandBody}\n\nReturn your summary as plain text; it will be delivered automatically. If the task explicitly calls for messaging a specific external recipient, note who/where it should go instead of sending it yourself.`.trim(); } - let skillsSnapshot = cronSession.sessionEntry.skillsSnapshot; - if (isFastTestEnv) { - // Fast unit-test mode: avoid scanning the workspace and writing session stores. - skillsSnapshot = skillsSnapshot ?? { prompt: "", skills: [] }; - } else { - const existingSnapshot = cronSession.sessionEntry.skillsSnapshot; - const skillsSnapshotVersion = getSkillsSnapshotVersion(workspaceDir); - const needsSkillsSnapshot = - !existingSnapshot || existingSnapshot.version !== skillsSnapshotVersion; - const skillFilter = resolveAgentSkillsFilter(cfgWithAgentDefaults, agentId); - if (needsSkillsSnapshot) { - skillsSnapshot = buildWorkspaceSkillSnapshot(workspaceDir, { - config: cfgWithAgentDefaults, - skillFilter, - eligibility: { remote: getRemoteSkillEligibility() }, - snapshotVersion: skillsSnapshotVersion, - }); - if (skillsSnapshot) { - cronSession.sessionEntry = { - ...cronSession.sessionEntry, - updatedAt: Date.now(), - skillsSnapshot, - }; - await persistSessionEntry(); - } - } + const existingSkillsSnapshot = cronSession.sessionEntry.skillsSnapshot; + const skillsSnapshot = resolveCronSkillsSnapshot({ + workspaceDir, + config: cfgWithAgentDefaults, + agentId, + existingSnapshot: existingSkillsSnapshot, + isFastTestEnv, + }); + if (!isFastTestEnv && skillsSnapshot !== existingSkillsSnapshot) { + cronSession.sessionEntry = { + ...cronSession.sessionEntry, + updatedAt: Date.now(), + skillsSnapshot, + }; + await persistSessionEntry(); } // Persist systemSent before the run, mirroring the inbound auto-reply behavior. diff --git a/src/cron/isolated-agent/skills-snapshot.ts b/src/cron/isolated-agent/skills-snapshot.ts new file mode 100644 index 00000000000..2a491717a2c --- /dev/null +++ b/src/cron/isolated-agent/skills-snapshot.ts @@ -0,0 +1,37 @@ +import type { OpenClawConfig } from "../../config/config.js"; +import { resolveAgentSkillsFilter } from "../../agents/agent-scope.js"; +import { buildWorkspaceSkillSnapshot, type SkillSnapshot } from "../../agents/skills.js"; +import { matchesSkillFilter } from "../../agents/skills/filter.js"; +import { getSkillsSnapshotVersion } from "../../agents/skills/refresh.js"; +import { getRemoteSkillEligibility } from "../../infra/skills-remote.js"; + +export function resolveCronSkillsSnapshot(params: { + workspaceDir: string; + config: OpenClawConfig; + agentId: string; + existingSnapshot?: SkillSnapshot; + isFastTestEnv: boolean; +}): SkillSnapshot { + if (params.isFastTestEnv) { + // Fast unit-test mode skips filesystem scans and snapshot refresh writes. + return params.existingSnapshot ?? { prompt: "", skills: [] }; + } + + const snapshotVersion = getSkillsSnapshotVersion(params.workspaceDir); + const skillFilter = resolveAgentSkillsFilter(params.config, params.agentId); + const existingSnapshot = params.existingSnapshot; + const shouldRefresh = + !existingSnapshot || + existingSnapshot.version !== snapshotVersion || + !matchesSkillFilter(existingSnapshot.skillFilter, skillFilter); + if (!shouldRefresh) { + return existingSnapshot; + } + + return buildWorkspaceSkillSnapshot(params.workspaceDir, { + config: params.config, + skillFilter, + eligibility: { remote: getRemoteSkillEligibility() }, + snapshotVersion, + }); +} diff --git a/src/cron/isolated-agent/subagent-followup.ts b/src/cron/isolated-agent/subagent-followup.ts new file mode 100644 index 00000000000..9bf92925019 --- /dev/null +++ b/src/cron/isolated-agent/subagent-followup.ts @@ -0,0 +1,152 @@ +import { + countActiveDescendantRuns, + listDescendantRunsForRequester, +} from "../../agents/subagent-registry.js"; +import { readLatestAssistantReply } from "../../agents/tools/agent-step.js"; +import { SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js"; + +const CRON_SUBAGENT_WAIT_POLL_MS = 500; +const CRON_SUBAGENT_WAIT_MIN_MS = 30_000; +const CRON_SUBAGENT_FINAL_REPLY_GRACE_MS = 5_000; + +export function isLikelyInterimCronMessage(value: string): boolean { + const text = value.trim(); + if (!text) { + return true; + } + const normalized = text.toLowerCase().replace(/\s+/g, " "); + const words = normalized.split(" ").filter(Boolean).length; + const interimHints = [ + "on it", + "pulling everything together", + "give me a few", + "give me a few min", + "few minutes", + "let me compile", + "i'll gather", + "i will gather", + "working on it", + "retrying now", + "should be about", + "should have your summary", + "subagent spawned", + "spawned a subagent", + "it'll auto-announce when done", + "it will auto-announce when done", + "auto-announce when done", + "both subagents are running", + "wait for them to report back", + ]; + return words <= 45 && interimHints.some((hint) => normalized.includes(hint)); +} + +export function expectsSubagentFollowup(value: string): boolean { + const normalized = value.trim().toLowerCase().replace(/\s+/g, " "); + if (!normalized) { + return false; + } + const hints = [ + "subagent spawned", + "spawned a subagent", + "auto-announce when done", + "both subagents are running", + "wait for them to report back", + ]; + return hints.some((hint) => normalized.includes(hint)); +} + +export async function readDescendantSubagentFallbackReply(params: { + sessionKey: string; + runStartedAt: number; +}): Promise { + const descendants = listDescendantRunsForRequester(params.sessionKey) + .filter( + (entry) => + typeof entry.endedAt === "number" && + entry.endedAt >= params.runStartedAt && + entry.childSessionKey.trim().length > 0, + ) + .toSorted((a, b) => (a.endedAt ?? 0) - (b.endedAt ?? 0)); + if (descendants.length === 0) { + return undefined; + } + + const latestByChild = new Map(); + for (const entry of descendants) { + const childKey = entry.childSessionKey.trim(); + if (!childKey) { + continue; + } + const current = latestByChild.get(childKey); + if (!current || (entry.endedAt ?? 0) >= (current.endedAt ?? 0)) { + latestByChild.set(childKey, entry); + } + } + + const replies: string[] = []; + const latestRuns = [...latestByChild.values()] + .toSorted((a, b) => (a.endedAt ?? 0) - (b.endedAt ?? 0)) + .slice(-4); + for (const entry of latestRuns) { + const reply = (await readLatestAssistantReply({ sessionKey: entry.childSessionKey }))?.trim(); + if (!reply || reply.toUpperCase() === SILENT_REPLY_TOKEN.toUpperCase()) { + continue; + } + replies.push(reply); + } + if (replies.length === 0) { + return undefined; + } + if (replies.length === 1) { + return replies[0]; + } + return replies.join("\n\n"); +} + +export async function waitForDescendantSubagentSummary(params: { + sessionKey: string; + initialReply?: string; + timeoutMs: number; + observedActiveDescendants?: boolean; +}): Promise { + const initialReply = params.initialReply?.trim(); + const deadline = Date.now() + Math.max(CRON_SUBAGENT_WAIT_MIN_MS, Math.floor(params.timeoutMs)); + let sawActiveDescendants = params.observedActiveDescendants === true; + let drainedAtMs: number | undefined; + while (Date.now() < deadline) { + const activeDescendants = countActiveDescendantRuns(params.sessionKey); + if (activeDescendants > 0) { + sawActiveDescendants = true; + drainedAtMs = undefined; + await new Promise((resolve) => setTimeout(resolve, CRON_SUBAGENT_WAIT_POLL_MS)); + continue; + } + if (!sawActiveDescendants) { + return initialReply; + } + if (!drainedAtMs) { + drainedAtMs = Date.now(); + } + const latest = (await readLatestAssistantReply({ sessionKey: params.sessionKey }))?.trim(); + if ( + latest && + latest.toUpperCase() !== SILENT_REPLY_TOKEN.toUpperCase() && + (latest !== initialReply || !isLikelyInterimCronMessage(latest)) + ) { + return latest; + } + if (Date.now() - drainedAtMs >= CRON_SUBAGENT_FINAL_REPLY_GRACE_MS) { + return undefined; + } + await new Promise((resolve) => setTimeout(resolve, CRON_SUBAGENT_WAIT_POLL_MS)); + } + const latest = (await readLatestAssistantReply({ sessionKey: params.sessionKey }))?.trim(); + if ( + latest && + latest.toUpperCase() !== SILENT_REPLY_TOKEN.toUpperCase() && + (latest !== initialReply || !isLikelyInterimCronMessage(latest)) + ) { + return latest; + } + return undefined; +}