fix(cron): normalize skill-filter snapshots and split isolated run helpers

This commit is contained in:
Peter Steinberger
2026-02-16 04:26:30 +01:00
parent 6754a926ee
commit aef1d55300
10 changed files with 360 additions and 191 deletions

View File

@@ -194,7 +194,6 @@ function makeParams(overrides?: Record<string, unknown>) {
describe("runCronIsolatedAgentTurn — skill filter", () => {
let previousFastTestEnv: string | undefined;
beforeEach(() => {
vi.clearAllMocks();
previousFastTestEnv = process.env.OPENCLAW_TEST_FAST;
@@ -283,4 +282,72 @@ describe("runCronIsolatedAgentTurn — skill filter", () => {
// Explicit empty skills list should forward [] to filter out all skills
expect(buildWorkspaceSkillSnapshotMock.mock.calls[0][1]).toHaveProperty("skillFilter", []);
});
it("refreshes cached snapshot when skillFilter changes without version bump", async () => {
resolveAgentSkillsFilterMock.mockReturnValue(["weather"]);
resolveCronSessionMock.mockReturnValue({
storePath: "/tmp/store.json",
store: {},
sessionEntry: {
sessionId: "test-session-id",
updatedAt: 0,
systemSent: false,
skillsSnapshot: {
prompt: "<available_skills><skill>meme-factory</skill></available_skills>",
skills: [{ name: "meme-factory" }],
version: 42,
},
},
systemSent: false,
isNewSession: true,
});
const { runCronIsolatedAgentTurn } = await import("./run.js");
const result = await runCronIsolatedAgentTurn(
makeParams({
cfg: { agents: { list: [{ id: "weather-bot", skills: ["weather"] }] } },
agentId: "weather-bot",
}),
);
expect(result.status).toBe("ok");
expect(buildWorkspaceSkillSnapshotMock).toHaveBeenCalledOnce();
expect(buildWorkspaceSkillSnapshotMock.mock.calls[0][1]).toHaveProperty("skillFilter", [
"weather",
]);
});
it("reuses cached snapshot when version and normalized skillFilter are unchanged", async () => {
resolveAgentSkillsFilterMock.mockReturnValue([" weather ", "meme-factory", "weather"]);
resolveCronSessionMock.mockReturnValue({
storePath: "/tmp/store.json",
store: {},
sessionEntry: {
sessionId: "test-session-id",
updatedAt: 0,
systemSent: false,
skillsSnapshot: {
prompt: "<available_skills><skill>weather</skill></available_skills>",
skills: [{ name: "weather" }],
skillFilter: ["meme-factory", "weather"],
version: 42,
},
},
systemSent: false,
isNewSession: true,
});
const { runCronIsolatedAgentTurn } = await import("./run.js");
const result = await runCronIsolatedAgentTurn(
makeParams({
cfg: { agents: { list: [{ id: "weather-bot", skills: ["weather", "meme-factory"] }] } },
agentId: "weather-bot",
}),
);
expect(result.status).toBe("ok");
expect(buildWorkspaceSkillSnapshotMock).not.toHaveBeenCalled();
});
});

View File

@@ -6,7 +6,6 @@ import {
resolveAgentConfig,
resolveAgentDir,
resolveAgentModelFallbacksOverride,
resolveAgentSkillsFilter,
resolveAgentWorkspaceDir,
resolveDefaultAgentId,
} from "../../agents/agent-scope.js";
@@ -26,15 +25,9 @@ import {
resolveThinkingDefault,
} from "../../agents/model-selection.js";
import { runEmbeddedPiAgent } from "../../agents/pi-embedded.js";
import { buildWorkspaceSkillSnapshot } from "../../agents/skills.js";
import { getSkillsSnapshotVersion } from "../../agents/skills/refresh.js";
import { runSubagentAnnounceFlow } from "../../agents/subagent-announce.js";
import {
countActiveDescendantRuns,
listDescendantRunsForRequester,
} from "../../agents/subagent-registry.js";
import { countActiveDescendantRuns } from "../../agents/subagent-registry.js";
import { resolveAgentTimeoutMs } from "../../agents/timeout.js";
import { readLatestAssistantReply } from "../../agents/tools/agent-step.js";
import { deriveSessionTotalTokens, hasNonzeroUsage } from "../../agents/usage.js";
import { ensureAgentWorkspace } from "../../agents/workspace.js";
import {
@@ -52,7 +45,6 @@ import {
import { registerAgentRunContext } from "../../infra/agent-events.js";
import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js";
import { resolveAgentOutboundIdentity } from "../../infra/outbound/identity.js";
import { getRemoteSkillEligibility } from "../../infra/skills-remote.js";
import { logWarn } from "../../logger.js";
import { buildAgentMainSessionKey, normalizeAgentId } from "../../routing/session-key.js";
import {
@@ -72,6 +64,13 @@ import {
resolveHeartbeatAckMaxChars,
} from "./helpers.js";
import { resolveCronSession } from "./session.js";
import { resolveCronSkillsSnapshot } from "./skills-snapshot.js";
import {
expectsSubagentFollowup,
isLikelyInterimCronMessage,
readDescendantSubagentFallbackReply,
waitForDescendantSubagentSummary,
} from "./subagent-followup.js";
function matchesMessagingToolDeliveryTarget(
target: MessagingToolSend,
@@ -101,152 +100,6 @@ function resolveCronDeliveryBestEffort(job: CronJob): boolean {
return false;
}
const CRON_SUBAGENT_WAIT_POLL_MS = 500;
const CRON_SUBAGENT_WAIT_MIN_MS = 30_000;
const CRON_SUBAGENT_FINAL_REPLY_GRACE_MS = 5_000;
function isLikelyInterimCronMessage(value: string): boolean {
const text = value.trim();
if (!text) {
return true;
}
const normalized = text.toLowerCase().replace(/\s+/g, " ");
const words = normalized.split(" ").filter(Boolean).length;
const interimHints = [
"on it",
"pulling everything together",
"give me a few",
"give me a few min",
"few minutes",
"let me compile",
"i'll gather",
"i will gather",
"working on it",
"retrying now",
"should be about",
"should have your summary",
"subagent spawned",
"spawned a subagent",
"it'll auto-announce when done",
"it will auto-announce when done",
"auto-announce when done",
"both subagents are running",
"wait for them to report back",
];
return words <= 45 && interimHints.some((hint) => normalized.includes(hint));
}
function expectsSubagentFollowup(value: string): boolean {
const normalized = value.trim().toLowerCase().replace(/\s+/g, " ");
if (!normalized) {
return false;
}
const hints = [
"subagent spawned",
"spawned a subagent",
"auto-announce when done",
"both subagents are running",
"wait for them to report back",
];
return hints.some((hint) => normalized.includes(hint));
}
async function readDescendantSubagentFallbackReply(params: {
sessionKey: string;
runStartedAt: number;
}): Promise<string | undefined> {
const descendants = listDescendantRunsForRequester(params.sessionKey)
.filter(
(entry) =>
typeof entry.endedAt === "number" &&
entry.endedAt >= params.runStartedAt &&
entry.childSessionKey.trim().length > 0,
)
.toSorted((a, b) => (a.endedAt ?? 0) - (b.endedAt ?? 0));
if (descendants.length === 0) {
return undefined;
}
const latestByChild = new Map<string, (typeof descendants)[number]>();
for (const entry of descendants) {
const childKey = entry.childSessionKey.trim();
if (!childKey) {
continue;
}
const current = latestByChild.get(childKey);
if (!current || (entry.endedAt ?? 0) >= (current.endedAt ?? 0)) {
latestByChild.set(childKey, entry);
}
}
const replies: string[] = [];
const latestRuns = [...latestByChild.values()]
.toSorted((a, b) => (a.endedAt ?? 0) - (b.endedAt ?? 0))
.slice(-4);
for (const entry of latestRuns) {
const reply = (await readLatestAssistantReply({ sessionKey: entry.childSessionKey }))?.trim();
if (!reply || reply.toUpperCase() === SILENT_REPLY_TOKEN.toUpperCase()) {
continue;
}
replies.push(reply);
}
if (replies.length === 0) {
return undefined;
}
if (replies.length === 1) {
return replies[0];
}
return replies.join("\n\n");
}
async function waitForDescendantSubagentSummary(params: {
sessionKey: string;
initialReply?: string;
timeoutMs: number;
observedActiveDescendants?: boolean;
}): Promise<string | undefined> {
const initialReply = params.initialReply?.trim();
const deadline = Date.now() + Math.max(CRON_SUBAGENT_WAIT_MIN_MS, Math.floor(params.timeoutMs));
let sawActiveDescendants = params.observedActiveDescendants === true;
let drainedAtMs: number | undefined;
while (Date.now() < deadline) {
const activeDescendants = countActiveDescendantRuns(params.sessionKey);
if (activeDescendants > 0) {
sawActiveDescendants = true;
drainedAtMs = undefined;
await new Promise((resolve) => setTimeout(resolve, CRON_SUBAGENT_WAIT_POLL_MS));
continue;
}
if (!sawActiveDescendants) {
return initialReply;
}
if (!drainedAtMs) {
drainedAtMs = Date.now();
}
const latest = (await readLatestAssistantReply({ sessionKey: params.sessionKey }))?.trim();
if (
latest &&
latest.toUpperCase() !== SILENT_REPLY_TOKEN.toUpperCase() &&
(latest !== initialReply || !isLikelyInterimCronMessage(latest))
) {
return latest;
}
if (Date.now() - drainedAtMs >= CRON_SUBAGENT_FINAL_REPLY_GRACE_MS) {
return undefined;
}
await new Promise((resolve) => setTimeout(resolve, CRON_SUBAGENT_WAIT_POLL_MS));
}
const latest = (await readLatestAssistantReply({ sessionKey: params.sessionKey }))?.trim();
if (
latest &&
latest.toUpperCase() !== SILENT_REPLY_TOKEN.toUpperCase() &&
(latest !== initialReply || !isLikelyInterimCronMessage(latest))
) {
return latest;
}
return undefined;
}
export type RunCronAgentTurnResult = {
status: "ok" | "error" | "skipped";
summary?: string;
@@ -521,32 +374,21 @@ export async function runCronIsolatedAgentTurn(params: {
`${commandBody}\n\nReturn your summary as plain text; it will be delivered automatically. If the task explicitly calls for messaging a specific external recipient, note who/where it should go instead of sending it yourself.`.trim();
}
let skillsSnapshot = cronSession.sessionEntry.skillsSnapshot;
if (isFastTestEnv) {
// Fast unit-test mode: avoid scanning the workspace and writing session stores.
skillsSnapshot = skillsSnapshot ?? { prompt: "", skills: [] };
} else {
const existingSnapshot = cronSession.sessionEntry.skillsSnapshot;
const skillsSnapshotVersion = getSkillsSnapshotVersion(workspaceDir);
const needsSkillsSnapshot =
!existingSnapshot || existingSnapshot.version !== skillsSnapshotVersion;
const skillFilter = resolveAgentSkillsFilter(cfgWithAgentDefaults, agentId);
if (needsSkillsSnapshot) {
skillsSnapshot = buildWorkspaceSkillSnapshot(workspaceDir, {
config: cfgWithAgentDefaults,
skillFilter,
eligibility: { remote: getRemoteSkillEligibility() },
snapshotVersion: skillsSnapshotVersion,
});
if (skillsSnapshot) {
cronSession.sessionEntry = {
...cronSession.sessionEntry,
updatedAt: Date.now(),
skillsSnapshot,
};
await persistSessionEntry();
}
}
const existingSkillsSnapshot = cronSession.sessionEntry.skillsSnapshot;
const skillsSnapshot = resolveCronSkillsSnapshot({
workspaceDir,
config: cfgWithAgentDefaults,
agentId,
existingSnapshot: existingSkillsSnapshot,
isFastTestEnv,
});
if (!isFastTestEnv && skillsSnapshot !== existingSkillsSnapshot) {
cronSession.sessionEntry = {
...cronSession.sessionEntry,
updatedAt: Date.now(),
skillsSnapshot,
};
await persistSessionEntry();
}
// Persist systemSent before the run, mirroring the inbound auto-reply behavior.

View File

@@ -0,0 +1,37 @@
import type { OpenClawConfig } from "../../config/config.js";
import { resolveAgentSkillsFilter } from "../../agents/agent-scope.js";
import { buildWorkspaceSkillSnapshot, type SkillSnapshot } from "../../agents/skills.js";
import { matchesSkillFilter } from "../../agents/skills/filter.js";
import { getSkillsSnapshotVersion } from "../../agents/skills/refresh.js";
import { getRemoteSkillEligibility } from "../../infra/skills-remote.js";
export function resolveCronSkillsSnapshot(params: {
workspaceDir: string;
config: OpenClawConfig;
agentId: string;
existingSnapshot?: SkillSnapshot;
isFastTestEnv: boolean;
}): SkillSnapshot {
if (params.isFastTestEnv) {
// Fast unit-test mode skips filesystem scans and snapshot refresh writes.
return params.existingSnapshot ?? { prompt: "", skills: [] };
}
const snapshotVersion = getSkillsSnapshotVersion(params.workspaceDir);
const skillFilter = resolveAgentSkillsFilter(params.config, params.agentId);
const existingSnapshot = params.existingSnapshot;
const shouldRefresh =
!existingSnapshot ||
existingSnapshot.version !== snapshotVersion ||
!matchesSkillFilter(existingSnapshot.skillFilter, skillFilter);
if (!shouldRefresh) {
return existingSnapshot;
}
return buildWorkspaceSkillSnapshot(params.workspaceDir, {
config: params.config,
skillFilter,
eligibility: { remote: getRemoteSkillEligibility() },
snapshotVersion,
});
}

View File

@@ -0,0 +1,152 @@
import {
countActiveDescendantRuns,
listDescendantRunsForRequester,
} from "../../agents/subagent-registry.js";
import { readLatestAssistantReply } from "../../agents/tools/agent-step.js";
import { SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js";
const CRON_SUBAGENT_WAIT_POLL_MS = 500;
const CRON_SUBAGENT_WAIT_MIN_MS = 30_000;
const CRON_SUBAGENT_FINAL_REPLY_GRACE_MS = 5_000;
export function isLikelyInterimCronMessage(value: string): boolean {
const text = value.trim();
if (!text) {
return true;
}
const normalized = text.toLowerCase().replace(/\s+/g, " ");
const words = normalized.split(" ").filter(Boolean).length;
const interimHints = [
"on it",
"pulling everything together",
"give me a few",
"give me a few min",
"few minutes",
"let me compile",
"i'll gather",
"i will gather",
"working on it",
"retrying now",
"should be about",
"should have your summary",
"subagent spawned",
"spawned a subagent",
"it'll auto-announce when done",
"it will auto-announce when done",
"auto-announce when done",
"both subagents are running",
"wait for them to report back",
];
return words <= 45 && interimHints.some((hint) => normalized.includes(hint));
}
export function expectsSubagentFollowup(value: string): boolean {
const normalized = value.trim().toLowerCase().replace(/\s+/g, " ");
if (!normalized) {
return false;
}
const hints = [
"subagent spawned",
"spawned a subagent",
"auto-announce when done",
"both subagents are running",
"wait for them to report back",
];
return hints.some((hint) => normalized.includes(hint));
}
export async function readDescendantSubagentFallbackReply(params: {
sessionKey: string;
runStartedAt: number;
}): Promise<string | undefined> {
const descendants = listDescendantRunsForRequester(params.sessionKey)
.filter(
(entry) =>
typeof entry.endedAt === "number" &&
entry.endedAt >= params.runStartedAt &&
entry.childSessionKey.trim().length > 0,
)
.toSorted((a, b) => (a.endedAt ?? 0) - (b.endedAt ?? 0));
if (descendants.length === 0) {
return undefined;
}
const latestByChild = new Map<string, (typeof descendants)[number]>();
for (const entry of descendants) {
const childKey = entry.childSessionKey.trim();
if (!childKey) {
continue;
}
const current = latestByChild.get(childKey);
if (!current || (entry.endedAt ?? 0) >= (current.endedAt ?? 0)) {
latestByChild.set(childKey, entry);
}
}
const replies: string[] = [];
const latestRuns = [...latestByChild.values()]
.toSorted((a, b) => (a.endedAt ?? 0) - (b.endedAt ?? 0))
.slice(-4);
for (const entry of latestRuns) {
const reply = (await readLatestAssistantReply({ sessionKey: entry.childSessionKey }))?.trim();
if (!reply || reply.toUpperCase() === SILENT_REPLY_TOKEN.toUpperCase()) {
continue;
}
replies.push(reply);
}
if (replies.length === 0) {
return undefined;
}
if (replies.length === 1) {
return replies[0];
}
return replies.join("\n\n");
}
export async function waitForDescendantSubagentSummary(params: {
sessionKey: string;
initialReply?: string;
timeoutMs: number;
observedActiveDescendants?: boolean;
}): Promise<string | undefined> {
const initialReply = params.initialReply?.trim();
const deadline = Date.now() + Math.max(CRON_SUBAGENT_WAIT_MIN_MS, Math.floor(params.timeoutMs));
let sawActiveDescendants = params.observedActiveDescendants === true;
let drainedAtMs: number | undefined;
while (Date.now() < deadline) {
const activeDescendants = countActiveDescendantRuns(params.sessionKey);
if (activeDescendants > 0) {
sawActiveDescendants = true;
drainedAtMs = undefined;
await new Promise((resolve) => setTimeout(resolve, CRON_SUBAGENT_WAIT_POLL_MS));
continue;
}
if (!sawActiveDescendants) {
return initialReply;
}
if (!drainedAtMs) {
drainedAtMs = Date.now();
}
const latest = (await readLatestAssistantReply({ sessionKey: params.sessionKey }))?.trim();
if (
latest &&
latest.toUpperCase() !== SILENT_REPLY_TOKEN.toUpperCase() &&
(latest !== initialReply || !isLikelyInterimCronMessage(latest))
) {
return latest;
}
if (Date.now() - drainedAtMs >= CRON_SUBAGENT_FINAL_REPLY_GRACE_MS) {
return undefined;
}
await new Promise((resolve) => setTimeout(resolve, CRON_SUBAGENT_WAIT_POLL_MS));
}
const latest = (await readLatestAssistantReply({ sessionKey: params.sessionKey }))?.trim();
if (
latest &&
latest.toUpperCase() !== SILENT_REPLY_TOKEN.toUpperCase() &&
(latest !== initialReply || !isLikelyInterimCronMessage(latest))
) {
return latest;
}
return undefined;
}