mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-09 11:27:39 +00:00
fix(subagents): reconcile orphaned restored runs
This commit is contained in:
committed by
Peter Steinberger
parent
cd3927ad67
commit
c3b3065cc9
@@ -16,7 +16,11 @@ vi.mock("../config/config.js", () => ({
|
|||||||
}));
|
}));
|
||||||
|
|
||||||
vi.mock("../config/sessions.js", () => ({
|
vi.mock("../config/sessions.js", () => ({
|
||||||
loadSessionStore: () => ({}),
|
loadSessionStore: () => ({
|
||||||
|
"agent:main:subagent:child-1": { sessionId: "sess-child-1", updatedAt: 1 },
|
||||||
|
"agent:main:subagent:expired-child": { sessionId: "sess-expired", updatedAt: 1 },
|
||||||
|
"agent:main:subagent:retry-budget": { sessionId: "sess-retry", updatedAt: 1 },
|
||||||
|
}),
|
||||||
resolveAgentIdFromSessionKey: (key: string) => {
|
resolveAgentIdFromSessionKey: (key: string) => {
|
||||||
const match = key.match(/^agent:([^:]+)/);
|
const match = key.match(/^agent:([^:]+)/);
|
||||||
return match?.[1] ?? "main";
|
return match?.[1] ?? "main";
|
||||||
|
|||||||
@@ -5,7 +5,10 @@ import { afterEach, describe, expect, it, vi } from "vitest";
|
|||||||
import "./subagent-registry.mocks.shared.js";
|
import "./subagent-registry.mocks.shared.js";
|
||||||
import { captureEnv } from "../test-utils/env.js";
|
import { captureEnv } from "../test-utils/env.js";
|
||||||
import {
|
import {
|
||||||
|
addSubagentRunForTests,
|
||||||
|
clearSubagentRunSteerRestart,
|
||||||
initSubagentRegistry,
|
initSubagentRegistry,
|
||||||
|
listSubagentRunsForRequester,
|
||||||
registerSubagentRun,
|
registerSubagentRun,
|
||||||
resetSubagentRegistryForTests,
|
resetSubagentRegistryForTests,
|
||||||
} from "./subagent-registry.js";
|
} from "./subagent-registry.js";
|
||||||
@@ -22,12 +25,93 @@ describe("subagent registry persistence", () => {
|
|||||||
const envSnapshot = captureEnv(["OPENCLAW_STATE_DIR"]);
|
const envSnapshot = captureEnv(["OPENCLAW_STATE_DIR"]);
|
||||||
let tempStateDir: string | null = null;
|
let tempStateDir: string | null = null;
|
||||||
|
|
||||||
const writePersistedRegistry = async (persisted: Record<string, unknown>) => {
|
const resolveAgentIdFromSessionKey = (sessionKey: string) => {
|
||||||
|
const match = sessionKey.match(/^agent:([^:]+):/i);
|
||||||
|
return (match?.[1] ?? "main").trim().toLowerCase() || "main";
|
||||||
|
};
|
||||||
|
|
||||||
|
const resolveSessionStorePath = (stateDir: string, agentId: string) =>
|
||||||
|
path.join(stateDir, "agents", agentId, "sessions", "sessions.json");
|
||||||
|
|
||||||
|
const readSessionStore = async (storePath: string) => {
|
||||||
|
try {
|
||||||
|
const raw = await fs.readFile(storePath, "utf8");
|
||||||
|
const parsed = JSON.parse(raw) as unknown;
|
||||||
|
if (parsed && typeof parsed === "object" && !Array.isArray(parsed)) {
|
||||||
|
return parsed as Record<string, Record<string, unknown>>;
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
return {} as Record<string, Record<string, unknown>>;
|
||||||
|
};
|
||||||
|
|
||||||
|
const writeChildSessionEntry = async (params: {
|
||||||
|
sessionKey: string;
|
||||||
|
sessionId?: string;
|
||||||
|
updatedAt?: number;
|
||||||
|
}) => {
|
||||||
|
if (!tempStateDir) {
|
||||||
|
throw new Error("tempStateDir not initialized");
|
||||||
|
}
|
||||||
|
const agentId = resolveAgentIdFromSessionKey(params.sessionKey);
|
||||||
|
const storePath = resolveSessionStorePath(tempStateDir, agentId);
|
||||||
|
const store = await readSessionStore(storePath);
|
||||||
|
store[params.sessionKey] = {
|
||||||
|
...(store[params.sessionKey] ?? {}),
|
||||||
|
sessionId: params.sessionId ?? `sess-${agentId}-${Date.now()}`,
|
||||||
|
updatedAt: params.updatedAt ?? Date.now(),
|
||||||
|
};
|
||||||
|
await fs.mkdir(path.dirname(storePath), { recursive: true });
|
||||||
|
await fs.writeFile(storePath, `${JSON.stringify(store)}\n`, "utf8");
|
||||||
|
return storePath;
|
||||||
|
};
|
||||||
|
|
||||||
|
const removeChildSessionEntry = async (sessionKey: string) => {
|
||||||
|
if (!tempStateDir) {
|
||||||
|
throw new Error("tempStateDir not initialized");
|
||||||
|
}
|
||||||
|
const agentId = resolveAgentIdFromSessionKey(sessionKey);
|
||||||
|
const storePath = resolveSessionStorePath(tempStateDir, agentId);
|
||||||
|
const store = await readSessionStore(storePath);
|
||||||
|
delete store[sessionKey];
|
||||||
|
await fs.mkdir(path.dirname(storePath), { recursive: true });
|
||||||
|
await fs.writeFile(storePath, `${JSON.stringify(store)}\n`, "utf8");
|
||||||
|
return storePath;
|
||||||
|
};
|
||||||
|
|
||||||
|
const seedChildSessionsForPersistedRuns = async (persisted: Record<string, unknown>) => {
|
||||||
|
const runs = (persisted.runs ?? {}) as Record<
|
||||||
|
string,
|
||||||
|
{
|
||||||
|
runId?: string;
|
||||||
|
childSessionKey?: string;
|
||||||
|
}
|
||||||
|
>;
|
||||||
|
for (const [runId, run] of Object.entries(runs)) {
|
||||||
|
const childSessionKey = run?.childSessionKey?.trim();
|
||||||
|
if (!childSessionKey) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
await writeChildSessionEntry({
|
||||||
|
sessionKey: childSessionKey,
|
||||||
|
sessionId: `sess-${run.runId ?? runId}`,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const writePersistedRegistry = async (
|
||||||
|
persisted: Record<string, unknown>,
|
||||||
|
opts?: { seedChildSessions?: boolean },
|
||||||
|
) => {
|
||||||
tempStateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-subagent-"));
|
tempStateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-subagent-"));
|
||||||
process.env.OPENCLAW_STATE_DIR = tempStateDir;
|
process.env.OPENCLAW_STATE_DIR = tempStateDir;
|
||||||
const registryPath = path.join(tempStateDir, "subagents", "runs.json");
|
const registryPath = path.join(tempStateDir, "subagents", "runs.json");
|
||||||
await fs.mkdir(path.dirname(registryPath), { recursive: true });
|
await fs.mkdir(path.dirname(registryPath), { recursive: true });
|
||||||
await fs.writeFile(registryPath, `${JSON.stringify(persisted)}\n`, "utf8");
|
await fs.writeFile(registryPath, `${JSON.stringify(persisted)}\n`, "utf8");
|
||||||
|
if (opts?.seedChildSessions !== false) {
|
||||||
|
await seedChildSessionsForPersistedRuns(persisted);
|
||||||
|
}
|
||||||
return registryPath;
|
return registryPath;
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -90,6 +174,10 @@ describe("subagent registry persistence", () => {
|
|||||||
task: "do the thing",
|
task: "do the thing",
|
||||||
cleanup: "keep",
|
cleanup: "keep",
|
||||||
});
|
});
|
||||||
|
await writeChildSessionEntry({
|
||||||
|
sessionKey: "agent:main:subagent:test",
|
||||||
|
sessionId: "sess-test",
|
||||||
|
});
|
||||||
|
|
||||||
const registryPath = path.join(tempStateDir, "subagents", "runs.json");
|
const registryPath = path.join(tempStateDir, "subagents", "runs.json");
|
||||||
const raw = await fs.readFile(registryPath, "utf8");
|
const raw = await fs.readFile(registryPath, "utf8");
|
||||||
@@ -162,6 +250,10 @@ describe("subagent registry persistence", () => {
|
|||||||
};
|
};
|
||||||
await fs.mkdir(path.dirname(registryPath), { recursive: true });
|
await fs.mkdir(path.dirname(registryPath), { recursive: true });
|
||||||
await fs.writeFile(registryPath, `${JSON.stringify(persisted)}\n`, "utf8");
|
await fs.writeFile(registryPath, `${JSON.stringify(persisted)}\n`, "utf8");
|
||||||
|
await writeChildSessionEntry({
|
||||||
|
sessionKey: "agent:main:subagent:two",
|
||||||
|
sessionId: "sess-two",
|
||||||
|
});
|
||||||
|
|
||||||
resetSubagentRegistryForTests({ persist: false });
|
resetSubagentRegistryForTests({ persist: false });
|
||||||
initSubagentRegistry();
|
initSubagentRegistry();
|
||||||
@@ -268,6 +360,64 @@ describe("subagent registry persistence", () => {
|
|||||||
expect(afterSecond.runs?.["run-4"]).toBeUndefined();
|
expect(afterSecond.runs?.["run-4"]).toBeUndefined();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("reconciles orphaned restored runs by pruning them from registry", async () => {
|
||||||
|
const persisted = createPersistedEndedRun({
|
||||||
|
runId: "run-orphan-restore",
|
||||||
|
childSessionKey: "agent:main:subagent:ghost-restore",
|
||||||
|
task: "orphan restore",
|
||||||
|
cleanup: "keep",
|
||||||
|
});
|
||||||
|
const registryPath = await writePersistedRegistry(persisted, {
|
||||||
|
seedChildSessions: false,
|
||||||
|
});
|
||||||
|
|
||||||
|
await restartRegistryAndFlush();
|
||||||
|
|
||||||
|
expect(announceSpy).not.toHaveBeenCalled();
|
||||||
|
const after = JSON.parse(await fs.readFile(registryPath, "utf8")) as {
|
||||||
|
runs?: Record<string, unknown>;
|
||||||
|
};
|
||||||
|
expect(after.runs?.["run-orphan-restore"]).toBeUndefined();
|
||||||
|
expect(listSubagentRunsForRequester("agent:main:main")).toHaveLength(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("resume guard prunes orphan runs before announce retry", async () => {
|
||||||
|
tempStateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-subagent-"));
|
||||||
|
process.env.OPENCLAW_STATE_DIR = tempStateDir;
|
||||||
|
const runId = "run-orphan-resume-guard";
|
||||||
|
const childSessionKey = "agent:main:subagent:ghost-resume";
|
||||||
|
const now = Date.now();
|
||||||
|
|
||||||
|
await writeChildSessionEntry({
|
||||||
|
sessionKey: childSessionKey,
|
||||||
|
sessionId: "sess-resume-guard",
|
||||||
|
updatedAt: now,
|
||||||
|
});
|
||||||
|
addSubagentRunForTests({
|
||||||
|
runId,
|
||||||
|
childSessionKey,
|
||||||
|
requesterSessionKey: "agent:main:main",
|
||||||
|
requesterDisplayKey: "main",
|
||||||
|
task: "resume orphan guard",
|
||||||
|
cleanup: "keep",
|
||||||
|
createdAt: now - 50,
|
||||||
|
startedAt: now - 25,
|
||||||
|
endedAt: now,
|
||||||
|
suppressAnnounceReason: "steer-restart",
|
||||||
|
cleanupHandled: false,
|
||||||
|
});
|
||||||
|
await removeChildSessionEntry(childSessionKey);
|
||||||
|
|
||||||
|
const changed = clearSubagentRunSteerRestart(runId);
|
||||||
|
expect(changed).toBe(true);
|
||||||
|
await flushQueuedRegistryWork();
|
||||||
|
|
||||||
|
expect(announceSpy).not.toHaveBeenCalled();
|
||||||
|
expect(listSubagentRunsForRequester("agent:main:main")).toHaveLength(0);
|
||||||
|
const persisted = loadSubagentRegistryFromDisk();
|
||||||
|
expect(persisted.has(runId)).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
it("uses isolated temp state when OPENCLAW_STATE_DIR is unset in tests", async () => {
|
it("uses isolated temp state when OPENCLAW_STATE_DIR is unset in tests", async () => {
|
||||||
delete process.env.OPENCLAW_STATE_DIR;
|
delete process.env.OPENCLAW_STATE_DIR;
|
||||||
vi.resetModules();
|
vi.resetModules();
|
||||||
|
|||||||
@@ -1,4 +1,10 @@
|
|||||||
import { loadConfig } from "../config/config.js";
|
import { loadConfig } from "../config/config.js";
|
||||||
|
import {
|
||||||
|
loadSessionStore,
|
||||||
|
resolveAgentIdFromSessionKey,
|
||||||
|
resolveStorePath,
|
||||||
|
type SessionEntry,
|
||||||
|
} from "../config/sessions.js";
|
||||||
import { callGateway } from "../gateway/call.js";
|
import { callGateway } from "../gateway/call.js";
|
||||||
import { onAgentEvent } from "../infra/agent-events.js";
|
import { onAgentEvent } from "../infra/agent-events.js";
|
||||||
import { defaultRuntime } from "../runtime.js";
|
import { defaultRuntime } from "../runtime.js";
|
||||||
@@ -59,6 +65,7 @@ const MAX_ANNOUNCE_RETRY_COUNT = 3;
|
|||||||
* succeeded. Guards against stale registry entries surviving gateway restarts.
|
* succeeded. Guards against stale registry entries surviving gateway restarts.
|
||||||
*/
|
*/
|
||||||
const ANNOUNCE_EXPIRY_MS = 5 * 60_000; // 5 minutes
|
const ANNOUNCE_EXPIRY_MS = 5 * 60_000; // 5 minutes
|
||||||
|
type SubagentRunOrphanReason = "missing-session-entry" | "missing-session-id";
|
||||||
|
|
||||||
function resolveAnnounceRetryDelayMs(retryCount: number) {
|
function resolveAnnounceRetryDelayMs(retryCount: number) {
|
||||||
const boundedRetryCount = Math.max(0, Math.min(retryCount, 10));
|
const boundedRetryCount = Math.max(0, Math.min(retryCount, 10));
|
||||||
@@ -82,6 +89,119 @@ function persistSubagentRuns() {
|
|||||||
persistSubagentRunsToDisk(subagentRuns);
|
persistSubagentRunsToDisk(subagentRuns);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function findSessionEntryByKey(store: Record<string, SessionEntry>, sessionKey: string) {
|
||||||
|
const direct = store[sessionKey];
|
||||||
|
if (direct) {
|
||||||
|
return direct;
|
||||||
|
}
|
||||||
|
const normalized = sessionKey.toLowerCase();
|
||||||
|
for (const [key, entry] of Object.entries(store)) {
|
||||||
|
if (key.toLowerCase() === normalized) {
|
||||||
|
return entry;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
function resolveSubagentRunOrphanReason(params: {
|
||||||
|
entry: SubagentRunRecord;
|
||||||
|
storeCache?: Map<string, Record<string, SessionEntry>>;
|
||||||
|
}): SubagentRunOrphanReason | null {
|
||||||
|
const childSessionKey = params.entry.childSessionKey?.trim();
|
||||||
|
if (!childSessionKey) {
|
||||||
|
return "missing-session-entry";
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
const cfg = loadConfig();
|
||||||
|
const agentId = resolveAgentIdFromSessionKey(childSessionKey);
|
||||||
|
const storePath = resolveStorePath(cfg.session?.store, { agentId });
|
||||||
|
let store = params.storeCache?.get(storePath);
|
||||||
|
if (!store) {
|
||||||
|
store = loadSessionStore(storePath);
|
||||||
|
params.storeCache?.set(storePath, store);
|
||||||
|
}
|
||||||
|
const sessionEntry = findSessionEntryByKey(store, childSessionKey);
|
||||||
|
if (!sessionEntry) {
|
||||||
|
return "missing-session-entry";
|
||||||
|
}
|
||||||
|
if (typeof sessionEntry.sessionId !== "string" || !sessionEntry.sessionId.trim()) {
|
||||||
|
return "missing-session-id";
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
} catch {
|
||||||
|
// Best-effort guard: avoid false orphan pruning on transient read/config failures.
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function reconcileOrphanedRun(params: {
|
||||||
|
runId: string;
|
||||||
|
entry: SubagentRunRecord;
|
||||||
|
reason: SubagentRunOrphanReason;
|
||||||
|
source: "restore" | "resume";
|
||||||
|
}) {
|
||||||
|
const now = Date.now();
|
||||||
|
let changed = false;
|
||||||
|
if (typeof params.entry.endedAt !== "number") {
|
||||||
|
params.entry.endedAt = now;
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
const orphanOutcome: SubagentRunOutcome = {
|
||||||
|
status: "error",
|
||||||
|
error: `orphaned subagent run (${params.reason})`,
|
||||||
|
};
|
||||||
|
if (!runOutcomesEqual(params.entry.outcome, orphanOutcome)) {
|
||||||
|
params.entry.outcome = orphanOutcome;
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
if (params.entry.endedReason !== SUBAGENT_ENDED_REASON_ERROR) {
|
||||||
|
params.entry.endedReason = SUBAGENT_ENDED_REASON_ERROR;
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
if (params.entry.cleanupHandled !== true) {
|
||||||
|
params.entry.cleanupHandled = true;
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
if (typeof params.entry.cleanupCompletedAt !== "number") {
|
||||||
|
params.entry.cleanupCompletedAt = now;
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
const removed = subagentRuns.delete(params.runId);
|
||||||
|
resumedRuns.delete(params.runId);
|
||||||
|
if (!removed && !changed) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
defaultRuntime.log(
|
||||||
|
`[warn] Subagent orphan run pruned source=${params.source} run=${params.runId} child=${params.entry.childSessionKey} reason=${params.reason}`,
|
||||||
|
);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
function reconcileOrphanedRestoredRuns() {
|
||||||
|
const storeCache = new Map<string, Record<string, SessionEntry>>();
|
||||||
|
let changed = false;
|
||||||
|
for (const [runId, entry] of subagentRuns.entries()) {
|
||||||
|
const orphanReason = resolveSubagentRunOrphanReason({
|
||||||
|
entry,
|
||||||
|
storeCache,
|
||||||
|
});
|
||||||
|
if (!orphanReason) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (
|
||||||
|
reconcileOrphanedRun({
|
||||||
|
runId,
|
||||||
|
entry,
|
||||||
|
reason: orphanReason,
|
||||||
|
source: "restore",
|
||||||
|
})
|
||||||
|
) {
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return changed;
|
||||||
|
}
|
||||||
|
|
||||||
const resumedRuns = new Set<string>();
|
const resumedRuns = new Set<string>();
|
||||||
const endedHookInFlightRunIds = new Set<string>();
|
const endedHookInFlightRunIds = new Set<string>();
|
||||||
|
|
||||||
@@ -225,6 +345,20 @@ function resumeSubagentRun(runId: string) {
|
|||||||
if (!entry) {
|
if (!entry) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
const orphanReason = resolveSubagentRunOrphanReason({ entry });
|
||||||
|
if (orphanReason) {
|
||||||
|
if (
|
||||||
|
reconcileOrphanedRun({
|
||||||
|
runId,
|
||||||
|
entry,
|
||||||
|
reason: orphanReason,
|
||||||
|
source: "resume",
|
||||||
|
})
|
||||||
|
) {
|
||||||
|
persistSubagentRuns();
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (entry.cleanupCompletedAt) {
|
if (entry.cleanupCompletedAt) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -290,6 +424,12 @@ function restoreSubagentRunsOnce() {
|
|||||||
if (restoredCount === 0) {
|
if (restoredCount === 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (reconcileOrphanedRestoredRuns()) {
|
||||||
|
persistSubagentRuns();
|
||||||
|
}
|
||||||
|
if (subagentRuns.size === 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
// Resume pending work.
|
// Resume pending work.
|
||||||
ensureListener();
|
ensureListener();
|
||||||
if ([...subagentRuns.values()].some((entry) => entry.archiveAtMs)) {
|
if ([...subagentRuns.values()].some((entry) => entry.archiveAtMs)) {
|
||||||
|
|||||||
Reference in New Issue
Block a user