mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-30 09:50:39 +00:00
fix: retrigger subagents after gateway restart instead of waiting on dead runs
When the gateway restarts (SIGUSR1), in-flight agent turns are killed along with their exec child processes. The subagent registry correctly persists and restores run records, but the old code simply called waitForSubagentCompletion() on the original runId. Since no agent turn was running anymore, the wait would time out and the subagent would be marked as failed. Now, resumeSubagentRun() detects incomplete runs (no endedAt) and calls retriggerSubagentAfterRestart(), which: 1. Sends a continuation message to the subagent's existing session via the 'agent' gateway method, informing it that the gateway restarted and any exec processes were terminated. 2. Uses the new runId from the response to replace the old run in the registry via replaceSubagentRunAfterSteer(). 3. Falls back to the old waitForSubagentCompletion() if no runId is returned. 4. Marks the run as errored (not hung) if the retrigger call itself fails. This ensures subagents recover gracefully: they get a fresh agent turn with their full conversation history intact and can re-run any terminated processes. Edge cases handled: - Agent call returns no runId: falls back to waiting (backward compat) - Agent call throws: run is marked as errored with completeSubagentRun() - Runs with endedAt already set: untouched (existing announce/cleanup flows)
This commit is contained in:
291
src/agents/subagent-registry.restart-recovery.test.ts
Normal file
291
src/agents/subagent-registry.restart-recovery.test.ts
Normal file
@@ -0,0 +1,291 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
const { callGatewaySpy, announceSpy } = vi.hoisted(() => ({
|
||||
callGatewaySpy: vi.fn(),
|
||||
announceSpy: vi.fn(async () => true),
|
||||
}));
|
||||
|
||||
const noop = () => {};
|
||||
|
||||
vi.mock("../gateway/call.js", () => ({
|
||||
callGateway: callGatewaySpy,
|
||||
randomIdempotencyKey: () => "test-idem-key",
|
||||
}));
|
||||
|
||||
vi.mock("../infra/agent-events.js", () => ({
|
||||
onAgentEvent: vi.fn(() => noop),
|
||||
}));
|
||||
|
||||
vi.mock("./subagent-announce.js", () => ({
|
||||
runSubagentAnnounceFlow: announceSpy,
|
||||
captureSubagentCompletionReply: vi.fn(async () => undefined),
|
||||
}));
|
||||
|
||||
import { captureEnv } from "../test-utils/env.js";
|
||||
import {
|
||||
initSubagentRegistry,
|
||||
listSubagentRunsForRequester,
|
||||
resetSubagentRegistryForTests,
|
||||
} from "./subagent-registry.js";
|
||||
import { loadSubagentRegistryFromDisk } from "./subagent-registry.store.js";
|
||||
|
||||
describe("subagent restart recovery", () => {
|
||||
const envSnapshot = captureEnv(["OPENCLAW_STATE_DIR"]);
|
||||
let tempStateDir: string | null = null;
|
||||
|
||||
const resolveAgentIdFromSessionKey = (sessionKey: string) => {
|
||||
const match = sessionKey.match(/^agent:([^:]+):/i);
|
||||
return (match?.[1] ?? "main").trim().toLowerCase() || "main";
|
||||
};
|
||||
|
||||
const resolveSessionStorePath = (stateDir: string, agentId: string) =>
|
||||
path.join(stateDir, "agents", agentId, "sessions", "sessions.json");
|
||||
|
||||
const readSessionStore = async (storePath: string) => {
|
||||
try {
|
||||
const raw = await fs.readFile(storePath, "utf8");
|
||||
const parsed = JSON.parse(raw) as unknown;
|
||||
if (parsed && typeof parsed === "object" && !Array.isArray(parsed)) {
|
||||
return parsed as Record<string, Record<string, unknown>>;
|
||||
}
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
return {} as Record<string, Record<string, unknown>>;
|
||||
};
|
||||
|
||||
const writeChildSessionEntry = async (params: {
|
||||
sessionKey: string;
|
||||
sessionId?: string;
|
||||
updatedAt?: number;
|
||||
}) => {
|
||||
if (!tempStateDir) {
|
||||
throw new Error("tempStateDir not initialized");
|
||||
}
|
||||
const agentId = resolveAgentIdFromSessionKey(params.sessionKey);
|
||||
const storePath = resolveSessionStorePath(tempStateDir, agentId);
|
||||
const store = await readSessionStore(storePath);
|
||||
store[params.sessionKey] = {
|
||||
...store[params.sessionKey],
|
||||
sessionId: params.sessionId ?? `sess-${agentId}-${Date.now()}`,
|
||||
updatedAt: params.updatedAt ?? Date.now(),
|
||||
};
|
||||
await fs.mkdir(path.dirname(storePath), { recursive: true });
|
||||
await fs.writeFile(storePath, `${JSON.stringify(store)}\n`, "utf8");
|
||||
return storePath;
|
||||
};
|
||||
|
||||
const writePersistedRegistry = async (persisted: Record<string, unknown>) => {
|
||||
tempStateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-subagent-restart-"));
|
||||
process.env.OPENCLAW_STATE_DIR = tempStateDir;
|
||||
const registryPath = path.join(tempStateDir, "subagents", "runs.json");
|
||||
await fs.mkdir(path.dirname(registryPath), { recursive: true });
|
||||
await fs.writeFile(registryPath, `${JSON.stringify(persisted)}\n`, "utf8");
|
||||
// Seed child session entries
|
||||
const runs = (persisted.runs ?? {}) as Record<
|
||||
string,
|
||||
{ runId?: string; childSessionKey?: string }
|
||||
>;
|
||||
for (const [runId, run] of Object.entries(runs)) {
|
||||
const childSessionKey = run?.childSessionKey?.trim();
|
||||
if (!childSessionKey) {
|
||||
continue;
|
||||
}
|
||||
await writeChildSessionEntry({
|
||||
sessionKey: childSessionKey,
|
||||
sessionId: `sess-${run.runId ?? runId}`,
|
||||
});
|
||||
}
|
||||
return registryPath;
|
||||
};
|
||||
|
||||
const flushQueuedRegistryWork = async () => {
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
};
|
||||
|
||||
afterEach(async () => {
|
||||
callGatewaySpy.mockReset();
|
||||
announceSpy.mockClear();
|
||||
resetSubagentRegistryForTests({ persist: false });
|
||||
if (tempStateDir) {
|
||||
await fs.rm(tempStateDir, { recursive: true, force: true });
|
||||
tempStateDir = null;
|
||||
}
|
||||
envSnapshot.restore();
|
||||
});
|
||||
|
||||
it("retriggers subagent with continuation message after restart for incomplete run", async () => {
|
||||
const now = Date.now();
|
||||
const persisted = {
|
||||
version: 2,
|
||||
runs: {
|
||||
"run-active": {
|
||||
runId: "run-active",
|
||||
childSessionKey: "agent:main:subagent:active-child",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
requesterOrigin: { channel: "bluebubbles", accountId: "acct-1" },
|
||||
task: "do something complex",
|
||||
cleanup: "keep",
|
||||
label: "complex-task",
|
||||
createdAt: now - 5000,
|
||||
startedAt: now - 4000,
|
||||
// No endedAt — the run was in progress when gateway restarted
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
// Mock callGateway: "agent" returns new runId, "agent.wait" returns ok
|
||||
callGatewaySpy.mockImplementation(async (opts: { method: string }) => {
|
||||
if (opts.method === "agent") {
|
||||
return { runId: "run-active-v2" };
|
||||
}
|
||||
if (opts.method === "agent.wait") {
|
||||
return { status: "ok", startedAt: now, endedAt: now + 1000 };
|
||||
}
|
||||
return {};
|
||||
});
|
||||
|
||||
await writePersistedRegistry(persisted);
|
||||
resetSubagentRegistryForTests({ persist: false });
|
||||
initSubagentRegistry();
|
||||
await flushQueuedRegistryWork();
|
||||
|
||||
// Verify callGateway was called with method "agent" for re-triggering
|
||||
const agentCalls = callGatewaySpy.mock.calls.filter(
|
||||
(call: unknown[]) => (call[0] as { method: string }).method === "agent",
|
||||
);
|
||||
expect(agentCalls.length).toBeGreaterThanOrEqual(1);
|
||||
|
||||
const agentCallParams = (agentCalls[0][0] as { params: Record<string, unknown> }).params;
|
||||
expect(agentCallParams.sessionKey).toBe("agent:main:subagent:active-child");
|
||||
// Verify continuation message is sent
|
||||
expect(agentCallParams.message).toContain("gateway process restarted");
|
||||
expect(agentCallParams.deliver).toBe(false);
|
||||
expect(agentCallParams.lane).toBeDefined();
|
||||
expect(agentCallParams.label).toBe("complex-task");
|
||||
});
|
||||
|
||||
it("marks run as errored when retrigger agent call fails", async () => {
|
||||
const now = Date.now();
|
||||
const persisted = {
|
||||
version: 2,
|
||||
runs: {
|
||||
"run-fail": {
|
||||
runId: "run-fail",
|
||||
childSessionKey: "agent:main:subagent:fail-child",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
task: "doomed task",
|
||||
cleanup: "keep",
|
||||
createdAt: now - 5000,
|
||||
startedAt: now - 4000,
|
||||
// No endedAt
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
callGatewaySpy.mockImplementation(async (opts: { method: string }) => {
|
||||
if (opts.method === "agent") {
|
||||
throw new Error("gateway unreachable");
|
||||
}
|
||||
return {};
|
||||
});
|
||||
|
||||
const registryPath = await writePersistedRegistry(persisted);
|
||||
resetSubagentRegistryForTests({ persist: false });
|
||||
initSubagentRegistry();
|
||||
await flushQueuedRegistryWork();
|
||||
|
||||
// The run should be marked as ended with an error
|
||||
const persistedAfter = JSON.parse(await fs.readFile(registryPath, "utf8")) as {
|
||||
runs?: Record<string, { endedAt?: number; outcome?: { status: string; error?: string } }>;
|
||||
};
|
||||
|
||||
const run = persistedAfter.runs?.["run-fail"];
|
||||
expect(run?.endedAt).toBeDefined();
|
||||
expect(run?.outcome?.status).toBe("error");
|
||||
expect(run?.outcome?.error).toContain("restart recovery failed");
|
||||
});
|
||||
|
||||
it("falls back to wait when retrigger returns no runId", async () => {
|
||||
const now = Date.now();
|
||||
const persisted = {
|
||||
version: 2,
|
||||
runs: {
|
||||
"run-no-id": {
|
||||
runId: "run-no-id",
|
||||
childSessionKey: "agent:main:subagent:no-id-child",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
task: "task with no runId",
|
||||
cleanup: "keep",
|
||||
createdAt: now - 5000,
|
||||
startedAt: now - 4000,
|
||||
// No endedAt
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
callGatewaySpy.mockImplementation(async (opts: { method: string }) => {
|
||||
if (opts.method === "agent") {
|
||||
return {}; // No runId returned
|
||||
}
|
||||
if (opts.method === "agent.wait") {
|
||||
return { status: "ok", startedAt: now, endedAt: now + 1000 };
|
||||
}
|
||||
return {};
|
||||
});
|
||||
|
||||
await writePersistedRegistry(persisted);
|
||||
resetSubagentRegistryForTests({ persist: false });
|
||||
initSubagentRegistry();
|
||||
await flushQueuedRegistryWork();
|
||||
|
||||
// Should have called agent.wait as fallback
|
||||
const waitCalls = callGatewaySpy.mock.calls.filter(
|
||||
(call: unknown[]) => (call[0] as { method: string }).method === "agent.wait",
|
||||
);
|
||||
expect(waitCalls.length).toBeGreaterThanOrEqual(1);
|
||||
});
|
||||
|
||||
it("does not retrigger runs that already have endedAt set", async () => {
|
||||
const now = Date.now();
|
||||
const persisted = {
|
||||
version: 2,
|
||||
runs: {
|
||||
"run-ended": {
|
||||
runId: "run-ended",
|
||||
childSessionKey: "agent:main:subagent:ended-child",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
task: "already done",
|
||||
cleanup: "keep",
|
||||
createdAt: now - 5000,
|
||||
startedAt: now - 4000,
|
||||
endedAt: now - 1000, // Already ended
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
callGatewaySpy.mockImplementation(async () => ({}));
|
||||
announceSpy.mockResolvedValue(true);
|
||||
|
||||
await writePersistedRegistry(persisted);
|
||||
resetSubagentRegistryForTests({ persist: false });
|
||||
initSubagentRegistry();
|
||||
await flushQueuedRegistryWork();
|
||||
|
||||
// No "agent" method calls for re-triggering (only cleanup/announce flows)
|
||||
const agentCalls = callGatewaySpy.mock.calls.filter(
|
||||
(call: unknown[]) => (call[0] as { method: string }).method === "agent",
|
||||
);
|
||||
expect(agentCalls).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
@@ -11,8 +11,9 @@ import {
|
||||
import { ensureContextEnginesInitialized } from "../context-engine/init.js";
|
||||
import { resolveContextEngine } from "../context-engine/registry.js";
|
||||
import type { SubagentEndReason } from "../context-engine/types.js";
|
||||
import { callGateway } from "../gateway/call.js";
|
||||
import { callGateway, randomIdempotencyKey } from "../gateway/call.js";
|
||||
import { onAgentEvent } from "../infra/agent-events.js";
|
||||
import { AGENT_LANE_SUBAGENT } from "./lanes.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import { defaultRuntime } from "../runtime.js";
|
||||
import { type DeliveryContext, normalizeDeliveryContext } from "../utils/delivery-context.js";
|
||||
@@ -578,6 +579,88 @@ function startSubagentAnnounceCleanupFlow(runId: string, entry: SubagentRunRecor
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Re-trigger a subagent whose agent turn was killed during gateway restart.
|
||||
* Sends a continuation message to the subagent's existing session so it gets
|
||||
* a fresh agent turn to finish its work. The new runId replaces the old one
|
||||
* in the registry so completion tracking continues seamlessly.
|
||||
*/
|
||||
async function retriggerSubagentAfterRestart(
|
||||
runId: string,
|
||||
entry: SubagentRunRecord,
|
||||
): Promise<void> {
|
||||
const childSessionKey = entry.childSessionKey;
|
||||
const requesterOrigin = normalizeDeliveryContext(entry.requesterOrigin);
|
||||
|
||||
try {
|
||||
const cfg = loadConfig();
|
||||
const runTimeoutSeconds = entry.runTimeoutSeconds ?? 0;
|
||||
|
||||
const continuationMessage = [
|
||||
"[System] The gateway process restarted while you were working.",
|
||||
"Any exec processes you had running were terminated.",
|
||||
"Your conversation history is intact. Continue your task from where you left off.",
|
||||
"If you were waiting on a process, check if it needs to be re-run.",
|
||||
].join("\n");
|
||||
|
||||
const response = await callGateway<{ runId?: string }>({
|
||||
method: "agent",
|
||||
params: {
|
||||
message: continuationMessage,
|
||||
sessionKey: childSessionKey,
|
||||
channel: requesterOrigin?.channel,
|
||||
to: requesterOrigin?.to ?? undefined,
|
||||
accountId: requesterOrigin?.accountId ?? undefined,
|
||||
threadId:
|
||||
requesterOrigin?.threadId != null ? String(requesterOrigin.threadId) : undefined,
|
||||
idempotencyKey: randomIdempotencyKey(),
|
||||
deliver: false,
|
||||
lane: AGENT_LANE_SUBAGENT,
|
||||
timeout: runTimeoutSeconds,
|
||||
label: entry.label || undefined,
|
||||
},
|
||||
timeoutMs: 10_000,
|
||||
});
|
||||
|
||||
const newRunId = response?.runId;
|
||||
if (typeof newRunId !== "string" || !newRunId.trim()) {
|
||||
// Agent call didn't return a runId, fall back to waiting on old run.
|
||||
defaultRuntime.log(
|
||||
`[warn] Subagent restart recovery: agent call returned no runId for ${childSessionKey}, falling back to wait`,
|
||||
);
|
||||
const waitTimeoutMs = resolveSubagentWaitTimeoutMs(cfg, runTimeoutSeconds);
|
||||
void waitForSubagentCompletion(runId, waitTimeoutMs);
|
||||
return;
|
||||
}
|
||||
|
||||
// Replace the old run entry with the new runId.
|
||||
replaceSubagentRunAfterSteer({
|
||||
previousRunId: runId,
|
||||
nextRunId: newRunId,
|
||||
fallback: entry,
|
||||
runTimeoutSeconds,
|
||||
});
|
||||
|
||||
defaultRuntime.log(
|
||||
`[info] Subagent restart recovery: retriggered ${childSessionKey} old=${runId} new=${newRunId}`,
|
||||
);
|
||||
} catch (err) {
|
||||
defaultRuntime.log(
|
||||
`[warn] Subagent restart recovery failed for ${childSessionKey}: ${String(err)}`,
|
||||
);
|
||||
// Mark the run as failed so it doesn't hang forever.
|
||||
void completeSubagentRun({
|
||||
runId,
|
||||
endedAt: Date.now(),
|
||||
outcome: { status: "error", error: `restart recovery failed: ${String(err)}` },
|
||||
reason: SUBAGENT_ENDED_REASON_ERROR,
|
||||
sendFarewell: true,
|
||||
accountId: requesterOrigin?.accountId,
|
||||
triggerCleanup: true,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function resumeSubagentRun(runId: string) {
|
||||
if (!runId || resumedRuns.has(runId)) {
|
||||
return;
|
||||
@@ -650,11 +733,10 @@ function resumeSubagentRun(runId: string) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Wait for completion again after restart.
|
||||
const cfg = loadConfig();
|
||||
const waitTimeoutMs = resolveSubagentWaitTimeoutMs(cfg, entry.runTimeoutSeconds);
|
||||
void waitForSubagentCompletion(runId, waitTimeoutMs);
|
||||
// After gateway restart, the agent turn that was mid-execution was killed.
|
||||
// Re-trigger the subagent with a continuation message so it can resume work.
|
||||
resumedRuns.add(runId);
|
||||
void retriggerSubagentAfterRestart(runId, entry);
|
||||
}
|
||||
|
||||
function restoreSubagentRunsOnce() {
|
||||
|
||||
Reference in New Issue
Block a user