mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-18 10:37:27 +00:00
fix(announce): break infinite retry loop with max attempts and expiry (#18264)
When runSubagentAnnounceFlow returns false (deferred), finalizeSubagentCleanup resets cleanupHandled=false and removes from resumedRuns, allowing retryDeferredCompletedAnnounces to pick it up again. If the underlying condition persists (stale registry data, transient state), this creates an infinite loop delivering 100+ announces over hours. Fix: - Add announceRetryCount + lastAnnounceRetryAt to SubagentRunRecord - finalizeSubagentCleanup: after MAX_ANNOUNCE_RETRY_COUNT (3) failed attempts or ANNOUNCE_EXPIRY_MS (5 min) since endedAt, mark as completed and stop - resumeSubagentRun: skip entries that have exhausted retries or expired - retryDeferredCompletedAnnounces: force-expire stale entries
This commit is contained in:
committed by
Peter Steinberger
parent
0764999e2c
commit
a6c741eb46
123
src/agents/subagent-registry.announce-loop-guard.test.ts
Normal file
123
src/agents/subagent-registry.announce-loop-guard.test.ts
Normal file
@@ -0,0 +1,123 @@
|
||||
import { describe, expect, test, vi, beforeEach, afterEach } from "vitest";
|
||||
|
||||
/**
|
||||
* Regression test for #18264: Gateway announcement delivery loop.
|
||||
*
|
||||
* When `runSubagentAnnounceFlow` repeatedly returns `false` (deferred),
|
||||
* `finalizeSubagentCleanup` must eventually give up rather than retrying
|
||||
* forever via the max-retry and expiration guards.
|
||||
*/
|
||||
|
||||
vi.mock("../config/config.js", () => ({
|
||||
loadConfig: () => ({
|
||||
session: { store: "/tmp/test-store", mainKey: "main" },
|
||||
agents: {},
|
||||
}),
|
||||
}));
|
||||
|
||||
vi.mock("../config/sessions.js", () => ({
|
||||
loadSessionStore: () => ({}),
|
||||
resolveAgentIdFromSessionKey: (key: string) => {
|
||||
const match = key.match(/^agent:([^:]+)/);
|
||||
return match?.[1] ?? "main";
|
||||
},
|
||||
resolveMainSessionKey: () => "agent:main:main",
|
||||
resolveStorePath: () => "/tmp/test-store",
|
||||
updateSessionStore: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("../gateway/call.js", () => ({
|
||||
callGateway: vi.fn().mockResolvedValue({ status: "ok" }),
|
||||
}));
|
||||
|
||||
vi.mock("../infra/agent-events.js", () => ({
|
||||
onAgentEvent: vi.fn().mockReturnValue(() => {}),
|
||||
}));
|
||||
|
||||
vi.mock("./subagent-announce.js", () => ({
|
||||
runSubagentAnnounceFlow: vi.fn().mockResolvedValue(false),
|
||||
}));
|
||||
|
||||
vi.mock("./subagent-registry.store.js", () => ({
|
||||
loadSubagentRegistryFromDisk: () => new Map(),
|
||||
saveSubagentRegistryToDisk: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("./subagent-announce-queue.js", () => ({
|
||||
resetAnnounceQueuesForTests: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("./timeout.js", () => ({
|
||||
resolveAgentTimeoutMs: () => 60_000,
|
||||
}));
|
||||
|
||||
describe("announce loop guard (#18264)", () => {
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
test("SubagentRunRecord has announceRetryCount and lastAnnounceRetryAt fields", async () => {
|
||||
const registry = await import("./subagent-registry.js");
|
||||
registry.resetSubagentRegistryForTests();
|
||||
|
||||
const now = Date.now();
|
||||
// Add a run that has already ended and exhausted retries
|
||||
registry.addSubagentRunForTests({
|
||||
runId: "test-loop-guard",
|
||||
childSessionKey: "agent:main:subagent:child-1",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "agent:main:main",
|
||||
task: "test task",
|
||||
cleanup: "keep",
|
||||
createdAt: now - 60_000,
|
||||
startedAt: now - 55_000,
|
||||
endedAt: now - 50_000,
|
||||
announceRetryCount: 3,
|
||||
lastAnnounceRetryAt: now - 10_000,
|
||||
});
|
||||
|
||||
const runs = registry.listSubagentRunsForRequester("agent:main:main");
|
||||
const entry = runs.find((r) => r.runId === "test-loop-guard");
|
||||
expect(entry).toBeDefined();
|
||||
expect(entry!.announceRetryCount).toBe(3);
|
||||
expect(entry!.lastAnnounceRetryAt).toBeDefined();
|
||||
});
|
||||
|
||||
test("expired entries with high retry count are skipped by resumeSubagentRun", async () => {
|
||||
const registry = await import("./subagent-registry.js");
|
||||
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
|
||||
const announceFn = vi.mocked(runSubagentAnnounceFlow);
|
||||
announceFn.mockClear();
|
||||
|
||||
registry.resetSubagentRegistryForTests();
|
||||
|
||||
const now = Date.now();
|
||||
// Add a run that ended 10 minutes ago (well past ANNOUNCE_EXPIRY_MS of 5 min)
|
||||
// with 3 retries already attempted
|
||||
registry.addSubagentRunForTests({
|
||||
runId: "test-expired-loop",
|
||||
childSessionKey: "agent:main:subagent:expired-child",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "agent:main:main",
|
||||
task: "expired test task",
|
||||
cleanup: "keep",
|
||||
createdAt: now - 15 * 60_000,
|
||||
startedAt: now - 14 * 60_000,
|
||||
endedAt: now - 10 * 60_000, // 10 minutes ago
|
||||
announceRetryCount: 3,
|
||||
lastAnnounceRetryAt: now - 9 * 60_000,
|
||||
});
|
||||
|
||||
// Initialize the registry — this triggers resumeSubagentRun for persisted entries
|
||||
registry.initSubagentRegistry();
|
||||
|
||||
// The announce flow should NOT be called because the entry has exceeded
|
||||
// both the retry count and the expiry window.
|
||||
expect(announceFn).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
@@ -29,6 +29,10 @@ export type SubagentRunRecord = {
|
||||
cleanupCompletedAt?: number;
|
||||
cleanupHandled?: boolean;
|
||||
suppressAnnounceReason?: "steer-restart" | "killed";
|
||||
/** Number of times announce delivery has been attempted and returned false (deferred). */
|
||||
announceRetryCount?: number;
|
||||
/** Timestamp of the last announce retry attempt (for backoff). */
|
||||
lastAnnounceRetryAt?: number;
|
||||
};
|
||||
|
||||
const subagentRuns = new Map<string, SubagentRunRecord>();
|
||||
@@ -38,6 +42,18 @@ let listenerStop: (() => void) | null = null;
|
||||
// Use var to avoid TDZ when init runs across circular imports during bootstrap.
|
||||
var restoreAttempted = false;
|
||||
const SUBAGENT_ANNOUNCE_TIMEOUT_MS = 120_000;
|
||||
/**
|
||||
* Maximum number of announce delivery attempts before giving up.
|
||||
* Prevents infinite retry loops when `runSubagentAnnounceFlow` repeatedly
|
||||
* returns `false` due to stale state or transient conditions (#18264).
|
||||
*/
|
||||
const MAX_ANNOUNCE_RETRY_COUNT = 3;
|
||||
/**
|
||||
* Announce entries older than this are force-expired even if delivery never
|
||||
* succeeded. Guards against stale registry entries surviving gateway restarts.
|
||||
*/
|
||||
const ANNOUNCE_EXPIRY_MS = 5 * 60_000; // 5 minutes
|
||||
// (Backoff constant removed — max-retry + expiry guards are sufficient.)
|
||||
|
||||
function persistSubagentRuns() {
|
||||
try {
|
||||
@@ -89,6 +105,17 @@ function resumeSubagentRun(runId: string) {
|
||||
if (entry.cleanupCompletedAt) {
|
||||
return;
|
||||
}
|
||||
// Skip entries that have exhausted their retry budget or expired (#18264).
|
||||
if ((entry.announceRetryCount ?? 0) >= MAX_ANNOUNCE_RETRY_COUNT) {
|
||||
entry.cleanupCompletedAt = Date.now();
|
||||
persistSubagentRuns();
|
||||
return;
|
||||
}
|
||||
if (typeof entry.endedAt === "number" && Date.now() - entry.endedAt > ANNOUNCE_EXPIRY_MS) {
|
||||
entry.cleanupCompletedAt = Date.now();
|
||||
persistSubagentRuns();
|
||||
return;
|
||||
}
|
||||
|
||||
if (typeof entry.endedAt === "number" && entry.endedAt > 0) {
|
||||
if (suppressAnnounceForSteerRestart(entry)) {
|
||||
@@ -256,6 +283,20 @@ function finalizeSubagentCleanup(runId: string, cleanup: "delete" | "keep", didA
|
||||
return;
|
||||
}
|
||||
if (!didAnnounce) {
|
||||
const retryCount = (entry.announceRetryCount ?? 0) + 1;
|
||||
entry.announceRetryCount = retryCount;
|
||||
entry.lastAnnounceRetryAt = Date.now();
|
||||
|
||||
// Check if the announce has exceeded retry limits or expired (#18264).
|
||||
const endedAgo = typeof entry.endedAt === "number" ? Date.now() - entry.endedAt : 0;
|
||||
if (retryCount >= MAX_ANNOUNCE_RETRY_COUNT || endedAgo > ANNOUNCE_EXPIRY_MS) {
|
||||
// Give up: mark as completed to break the infinite retry loop.
|
||||
entry.cleanupCompletedAt = Date.now();
|
||||
persistSubagentRuns();
|
||||
retryDeferredCompletedAnnounces(runId);
|
||||
return;
|
||||
}
|
||||
|
||||
// Allow retry on the next wake if announce was deferred or failed.
|
||||
entry.cleanupHandled = false;
|
||||
resumedRuns.delete(runId);
|
||||
@@ -274,6 +315,7 @@ function finalizeSubagentCleanup(runId: string, cleanup: "delete" | "keep", didA
|
||||
}
|
||||
|
||||
function retryDeferredCompletedAnnounces(excludeRunId?: string) {
|
||||
const now = Date.now();
|
||||
for (const [runId, entry] of subagentRuns.entries()) {
|
||||
if (excludeRunId && runId === excludeRunId) {
|
||||
continue;
|
||||
@@ -287,6 +329,13 @@ function retryDeferredCompletedAnnounces(excludeRunId?: string) {
|
||||
if (suppressAnnounceForSteerRestart(entry)) {
|
||||
continue;
|
||||
}
|
||||
// Force-expire announces that have been pending too long (#18264).
|
||||
const endedAgo = now - (entry.endedAt ?? now);
|
||||
if (endedAgo > ANNOUNCE_EXPIRY_MS) {
|
||||
entry.cleanupCompletedAt = now;
|
||||
persistSubagentRuns();
|
||||
continue;
|
||||
}
|
||||
resumedRuns.delete(runId);
|
||||
resumeSubagentRun(runId);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user