mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-21 12:04:59 +00:00
fix(agents): centralize idle-wait flush and clear timeout handle
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
import type { AgentMessage } from "@mariozechner/pi-agent-core";
|
||||
import { SessionManager } from "@mariozechner/pi-coding-agent";
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { flushPendingToolResultsAfterIdle } from "./pi-embedded-runner/wait-for-idle-before-flush.js";
|
||||
import { guardSessionManager } from "./session-tool-result-guard-wrapper.js";
|
||||
|
||||
function assistantToolCall(id: string): AgentMessage {
|
||||
@@ -20,57 +21,71 @@ function toolResult(id: string, text: string): AgentMessage {
|
||||
} as AgentMessage;
|
||||
}
|
||||
|
||||
describe("waitForIdle before flush prevents premature synthetic results", () => {
|
||||
it("should not flush pending tool results while agent is still executing tools", async () => {
|
||||
// Simulates the race condition: a tool call is registered but the tool
|
||||
// hasn't finished executing yet. If we flush immediately, we get a
|
||||
// synthetic error. If we wait for idle first, the real result arrives.
|
||||
function deferred<T>() {
|
||||
let resolve!: (value: T | PromiseLike<T>) => void;
|
||||
const promise = new Promise<T>((r) => {
|
||||
resolve = r;
|
||||
});
|
||||
return { promise, resolve };
|
||||
}
|
||||
|
||||
function getMessages(sm: ReturnType<typeof guardSessionManager>): AgentMessage[] {
|
||||
return sm
|
||||
.getEntries()
|
||||
.filter((e) => e.type === "message")
|
||||
.map((e) => (e as { message: AgentMessage }).message);
|
||||
}
|
||||
|
||||
describe("flushPendingToolResultsAfterIdle", () => {
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("waits for idle so real tool results can land before flush", async () => {
|
||||
const sm = guardSessionManager(SessionManager.inMemory());
|
||||
const idle = deferred<void>();
|
||||
const agent = { waitForIdle: () => idle.promise };
|
||||
|
||||
// Assistant makes a tool call (from a retry after overloaded_error)
|
||||
sm.appendMessage(assistantToolCall("call_retry_1"));
|
||||
const flushPromise = flushPendingToolResultsAfterIdle({
|
||||
agent,
|
||||
sessionManager: sm,
|
||||
timeoutMs: 1_000,
|
||||
});
|
||||
|
||||
// Simulate: tool is still executing (result hasn't arrived yet)
|
||||
// If we flush now, we'd get a synthetic error — this is the bug
|
||||
const entriesBefore = sm
|
||||
.getEntries()
|
||||
.filter((e) => e.type === "message")
|
||||
.map((e) => (e as { message: AgentMessage }).message);
|
||||
// Flush is waiting for idle; synthetic result must not appear yet.
|
||||
await Promise.resolve();
|
||||
expect(getMessages(sm).map((m) => m.role)).toEqual(["assistant"]);
|
||||
|
||||
// Only the assistant message should exist, no synthetic result yet
|
||||
expect(entriesBefore.length).toBe(1);
|
||||
expect(entriesBefore[0].role).toBe("assistant");
|
||||
|
||||
// Now the real tool result arrives (tool finished executing)
|
||||
// Tool completes before idle wait finishes.
|
||||
sm.appendMessage(toolResult("call_retry_1", "command output here"));
|
||||
idle.resolve();
|
||||
await flushPromise;
|
||||
|
||||
const entriesAfter = sm
|
||||
.getEntries()
|
||||
.filter((e) => e.type === "message")
|
||||
.map((e) => (e as { message: AgentMessage }).message);
|
||||
|
||||
// Should have assistant + real tool result, no synthetic error
|
||||
expect(entriesAfter.length).toBe(2);
|
||||
expect(entriesAfter[1].role).toBe("toolResult");
|
||||
expect((entriesAfter[1] as { isError?: boolean }).isError).not.toBe(true);
|
||||
expect((entriesAfter[1] as { content?: Array<{ text?: string }> }).content?.[0]?.text).toBe(
|
||||
const messages = getMessages(sm);
|
||||
expect(messages.map((m) => m.role)).toEqual(["assistant", "toolResult"]);
|
||||
expect((messages[1] as { isError?: boolean }).isError).not.toBe(true);
|
||||
expect((messages[1] as { content?: Array<{ text?: string }> }).content?.[0]?.text).toBe(
|
||||
"command output here",
|
||||
);
|
||||
});
|
||||
|
||||
it("flush inserts synthetic error when tool result never arrives", () => {
|
||||
// Validates that flush still works correctly for genuinely orphaned tool calls
|
||||
it("flushes pending tool call after timeout when idle never resolves", async () => {
|
||||
const sm = guardSessionManager(SessionManager.inMemory());
|
||||
vi.useFakeTimers();
|
||||
const agent = { waitForIdle: () => new Promise<void>(() => {}) };
|
||||
|
||||
sm.appendMessage(assistantToolCall("call_orphan_1"));
|
||||
|
||||
// Tool never executes — flush should insert synthetic error
|
||||
sm.flushPendingToolResults?.();
|
||||
const flushPromise = flushPendingToolResultsAfterIdle({
|
||||
agent,
|
||||
sessionManager: sm,
|
||||
timeoutMs: 30,
|
||||
});
|
||||
await vi.advanceTimersByTimeAsync(30);
|
||||
await flushPromise;
|
||||
|
||||
const entries = sm
|
||||
.getEntries()
|
||||
.filter((e) => e.type === "message")
|
||||
.map((e) => (e as { message: AgentMessage }).message);
|
||||
const entries = getMessages(sm);
|
||||
|
||||
expect(entries.length).toBe(2);
|
||||
expect(entries[1].role).toBe("toolResult");
|
||||
@@ -80,25 +95,18 @@ describe("waitForIdle before flush prevents premature synthetic results", () =>
|
||||
);
|
||||
});
|
||||
|
||||
it("flush is a no-op after real tool result arrives", () => {
|
||||
// If the tool result arrived in time, flush should do nothing
|
||||
it("clears timeout handle when waitForIdle resolves first", async () => {
|
||||
const sm = guardSessionManager(SessionManager.inMemory());
|
||||
vi.useFakeTimers();
|
||||
const agent = {
|
||||
waitForIdle: async () => {},
|
||||
};
|
||||
|
||||
sm.appendMessage(assistantToolCall("call_ok_1"));
|
||||
sm.appendMessage(toolResult("call_ok_1", "success"));
|
||||
|
||||
// Flush after result already arrived — should be a no-op
|
||||
sm.flushPendingToolResults?.();
|
||||
|
||||
const entries = sm
|
||||
.getEntries()
|
||||
.filter((e) => e.type === "message")
|
||||
.map((e) => (e as { message: AgentMessage }).message);
|
||||
|
||||
// Should still be just 2 messages, no extra synthetic result
|
||||
expect(entries.length).toBe(2);
|
||||
expect(entries[0].role).toBe("assistant");
|
||||
expect(entries[1].role).toBe("toolResult");
|
||||
expect((entries[1] as { isError?: boolean }).isError).not.toBe(true);
|
||||
await flushPendingToolResultsAfterIdle({
|
||||
agent,
|
||||
sessionManager: sm,
|
||||
timeoutMs: 30_000,
|
||||
});
|
||||
expect(vi.getTimerCount()).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -73,6 +73,7 @@ import {
|
||||
} from "./system-prompt.js";
|
||||
import { splitSdkTools } from "./tool-split.js";
|
||||
import { describeUnknownError, mapThinkingLevel, resolveExecToolDefaults } from "./utils.js";
|
||||
import { flushPendingToolResultsAfterIdle } from "./wait-for-idle-before-flush.js";
|
||||
|
||||
export type CompactEmbeddedPiSessionParams = {
|
||||
sessionId: string;
|
||||
@@ -464,19 +465,10 @@ export async function compactEmbeddedPiSessionDirect(
|
||||
},
|
||||
};
|
||||
} finally {
|
||||
// Wait for agent idle before flushing to prevent inserting synthetic
|
||||
// "missing tool result" errors while tools are still executing.
|
||||
if (session?.agent?.waitForIdle) {
|
||||
try {
|
||||
await Promise.race([
|
||||
session.agent.waitForIdle(),
|
||||
new Promise<void>((resolve) => setTimeout(resolve, 30_000)),
|
||||
]);
|
||||
} catch {
|
||||
/* best-effort */
|
||||
}
|
||||
}
|
||||
sessionManager.flushPendingToolResults?.();
|
||||
await flushPendingToolResultsAfterIdle({
|
||||
agent: session?.agent,
|
||||
sessionManager,
|
||||
});
|
||||
session.dispose();
|
||||
}
|
||||
} finally {
|
||||
|
||||
@@ -87,6 +87,7 @@ import {
|
||||
} from "../system-prompt.js";
|
||||
import { splitSdkTools } from "../tool-split.js";
|
||||
import { describeUnknownError, mapThinkingLevel } from "../utils.js";
|
||||
import { flushPendingToolResultsAfterIdle } from "../wait-for-idle-before-flush.js";
|
||||
import { detectAndLoadPromptImages } from "./images.js";
|
||||
|
||||
export function injectHistoryImagesIntoMessages(
|
||||
@@ -565,19 +566,10 @@ export async function runEmbeddedAttempt(
|
||||
activeSession.agent.replaceMessages(limited);
|
||||
}
|
||||
} catch (err) {
|
||||
// Wait for agent idle before flushing to avoid the same race condition
|
||||
// as the main finally block (see comment below).
|
||||
if (activeSession?.agent?.waitForIdle) {
|
||||
try {
|
||||
await Promise.race([
|
||||
activeSession.agent.waitForIdle(),
|
||||
new Promise<void>((resolve) => setTimeout(resolve, 30_000)),
|
||||
]);
|
||||
} catch {
|
||||
/* best-effort */
|
||||
}
|
||||
}
|
||||
sessionManager.flushPendingToolResults?.();
|
||||
await flushPendingToolResultsAfterIdle({
|
||||
agent: activeSession?.agent,
|
||||
sessionManager,
|
||||
});
|
||||
activeSession.dispose();
|
||||
throw err;
|
||||
}
|
||||
@@ -936,17 +928,10 @@ export async function runEmbeddedAttempt(
|
||||
// flushPendingToolResults() fires while tools are still executing, inserting
|
||||
// synthetic "missing tool result" errors and causing silent agent failures.
|
||||
// See: https://github.com/openclaw/openclaw/issues/8643
|
||||
if (session?.agent?.waitForIdle) {
|
||||
try {
|
||||
await Promise.race([
|
||||
session.agent.waitForIdle(),
|
||||
new Promise((resolve) => setTimeout(resolve, 30_000)), // safety timeout
|
||||
]);
|
||||
} catch {
|
||||
// Ignore errors during idle wait — we're in cleanup
|
||||
}
|
||||
}
|
||||
sessionManager?.flushPendingToolResults?.();
|
||||
await flushPendingToolResultsAfterIdle({
|
||||
agent: session?.agent,
|
||||
sessionManager,
|
||||
});
|
||||
session?.dispose();
|
||||
await sessionLock.release();
|
||||
}
|
||||
|
||||
45
src/agents/pi-embedded-runner/wait-for-idle-before-flush.ts
Normal file
45
src/agents/pi-embedded-runner/wait-for-idle-before-flush.ts
Normal file
@@ -0,0 +1,45 @@
|
||||
type IdleAwareAgent = {
|
||||
waitForIdle?: (() => Promise<void>) | undefined;
|
||||
};
|
||||
|
||||
type ToolResultFlushManager = {
|
||||
flushPendingToolResults?: (() => void) | undefined;
|
||||
};
|
||||
|
||||
export const DEFAULT_WAIT_FOR_IDLE_TIMEOUT_MS = 30_000;
|
||||
|
||||
async function waitForAgentIdleBestEffort(
|
||||
agent: IdleAwareAgent | null | undefined,
|
||||
timeoutMs: number,
|
||||
): Promise<void> {
|
||||
const waitForIdle = agent?.waitForIdle;
|
||||
if (typeof waitForIdle !== "function") {
|
||||
return;
|
||||
}
|
||||
|
||||
let timeoutHandle: ReturnType<typeof setTimeout> | undefined;
|
||||
try {
|
||||
await Promise.race([
|
||||
waitForIdle.call(agent),
|
||||
new Promise<void>((resolve) => {
|
||||
timeoutHandle = setTimeout(resolve, timeoutMs);
|
||||
timeoutHandle.unref?.();
|
||||
}),
|
||||
]);
|
||||
} catch {
|
||||
// Best-effort during cleanup.
|
||||
} finally {
|
||||
if (timeoutHandle) {
|
||||
clearTimeout(timeoutHandle);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export async function flushPendingToolResultsAfterIdle(opts: {
|
||||
agent: IdleAwareAgent | null | undefined;
|
||||
sessionManager: ToolResultFlushManager | null | undefined;
|
||||
timeoutMs?: number;
|
||||
}): Promise<void> {
|
||||
await waitForAgentIdleBestEffort(opts.agent, opts.timeoutMs ?? DEFAULT_WAIT_FOR_IDLE_TIMEOUT_MS);
|
||||
opts.sessionManager?.flushPendingToolResults?.();
|
||||
}
|
||||
Reference in New Issue
Block a user