mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-08 10:31:24 +00:00
fix(subagent): harden read-tool overflow guards and sticky reply threading (#19508)
* fix(gateway): avoid premature agent.wait completion on transient errors * fix(agent): preemptively guard tool results against context overflow * fix: harden tool-result context guard and add message_id metadata * fix: use importOriginal in session-key mock to include DEFAULT_ACCOUNT_ID The run.skill-filter test was mocking ../../routing/session-key.js with only buildAgentMainSessionKey and normalizeAgentId, but the module also exports DEFAULT_ACCOUNT_ID which is required transitively by src/web/auth-store.ts. Switch to importOriginal pattern so all real exports are preserved alongside the mocked functions. * pi-runner: guard accumulated tool-result overflow in transformContext * PI runner: compact overflowing tool-result context * Subagent: harden tool-result context recovery * Enhance tool-result context handling by adding support for legacy tool outputs and improving character estimation for message truncation. This includes a new function to create legacy tool results and updates to existing functions to better manage context overflow scenarios. * Enhance iMessage handling by adding reply tag support in send functions and tests. This includes modifications to prepend or rewrite reply tags based on provided replyToId, ensuring proper message formatting for replies. * Enhance message delivery across multiple channels by implementing sticky reply context for chunked messages. This includes preserving reply references in Discord, Telegram, and iMessage, ensuring that follow-up messages maintain their intended reply targets. Additionally, improve handling of reply tags in system prompts and tests to support consistent reply behavior. * Enhance read tool functionality by implementing auto-paging across chunks when no explicit limit is provided, scaling output budget based on model context window. Additionally, add tests for adaptive reading behavior and capped continuation guidance for large outputs. Update related functions to support these features. * Refine tool-result context management by stripping oversized read-tool details payloads during compaction, ensuring repeated read calls do not bypass context limits. Introduce new utility functions for handling truncation content and enhance character estimation for tool results. Add tests to validate the removal of excessive details in context overflow scenarios. * Refine message delivery logic in Matrix and Telegram by introducing a flag to track if a text chunk was sent. This ensures that replies are only marked as delivered when a text chunk has been successfully sent, improving the accuracy of reply handling in both channels. * fix: tighten reply threading coverage and prep fixes (#19508) (thanks @tyler6204)
This commit is contained in:
@@ -1,8 +1,16 @@
|
||||
import { onAgentEvent } from "../../infra/agent-events.js";
|
||||
|
||||
const AGENT_RUN_CACHE_TTL_MS = 10 * 60_000;
|
||||
/**
|
||||
* Embedded runs can emit transient lifecycle `error` events while auth/model
|
||||
* failover is still in progress. Give errors a short grace window so a
|
||||
* subsequent `start` event can cancel premature terminal snapshots.
|
||||
*/
|
||||
const AGENT_RUN_ERROR_RETRY_GRACE_MS = 15_000;
|
||||
|
||||
const agentRunCache = new Map<string, AgentRunSnapshot>();
|
||||
const agentRunStarts = new Map<string, number>();
|
||||
const pendingAgentRunErrors = new Map<string, PendingAgentRunError>();
|
||||
let agentRunListenerStarted = false;
|
||||
|
||||
type AgentRunSnapshot = {
|
||||
@@ -14,6 +22,12 @@ type AgentRunSnapshot = {
|
||||
ts: number;
|
||||
};
|
||||
|
||||
type PendingAgentRunError = {
|
||||
snapshot: AgentRunSnapshot;
|
||||
dueAt: number;
|
||||
timer: NodeJS.Timeout;
|
||||
};
|
||||
|
||||
function pruneAgentRunCache(now = Date.now()) {
|
||||
for (const [runId, entry] of agentRunCache) {
|
||||
if (now - entry.ts > AGENT_RUN_CACHE_TTL_MS) {
|
||||
@@ -27,6 +41,61 @@ function recordAgentRunSnapshot(entry: AgentRunSnapshot) {
|
||||
agentRunCache.set(entry.runId, entry);
|
||||
}
|
||||
|
||||
function clearPendingAgentRunError(runId: string) {
|
||||
const pending = pendingAgentRunErrors.get(runId);
|
||||
if (!pending) {
|
||||
return;
|
||||
}
|
||||
clearTimeout(pending.timer);
|
||||
pendingAgentRunErrors.delete(runId);
|
||||
}
|
||||
|
||||
function schedulePendingAgentRunError(snapshot: AgentRunSnapshot) {
|
||||
clearPendingAgentRunError(snapshot.runId);
|
||||
const dueAt = Date.now() + AGENT_RUN_ERROR_RETRY_GRACE_MS;
|
||||
const timer = setTimeout(() => {
|
||||
const pending = pendingAgentRunErrors.get(snapshot.runId);
|
||||
if (!pending) {
|
||||
return;
|
||||
}
|
||||
pendingAgentRunErrors.delete(snapshot.runId);
|
||||
recordAgentRunSnapshot(pending.snapshot);
|
||||
}, AGENT_RUN_ERROR_RETRY_GRACE_MS);
|
||||
timer.unref?.();
|
||||
pendingAgentRunErrors.set(snapshot.runId, { snapshot, dueAt, timer });
|
||||
}
|
||||
|
||||
function getPendingAgentRunError(runId: string) {
|
||||
const pending = pendingAgentRunErrors.get(runId);
|
||||
if (!pending) {
|
||||
return undefined;
|
||||
}
|
||||
return {
|
||||
snapshot: pending.snapshot,
|
||||
dueAt: pending.dueAt,
|
||||
};
|
||||
}
|
||||
|
||||
function createSnapshotFromLifecycleEvent(params: {
|
||||
runId: string;
|
||||
phase: "end" | "error";
|
||||
data?: Record<string, unknown>;
|
||||
}): AgentRunSnapshot {
|
||||
const { runId, phase, data } = params;
|
||||
const startedAt =
|
||||
typeof data?.startedAt === "number" ? data.startedAt : agentRunStarts.get(runId);
|
||||
const endedAt = typeof data?.endedAt === "number" ? data.endedAt : undefined;
|
||||
const error = typeof data?.error === "string" ? data.error : undefined;
|
||||
return {
|
||||
runId,
|
||||
status: phase === "error" ? "error" : data?.aborted ? "timeout" : "ok",
|
||||
startedAt,
|
||||
endedAt,
|
||||
error,
|
||||
ts: Date.now(),
|
||||
};
|
||||
}
|
||||
|
||||
function ensureAgentRunListener() {
|
||||
if (agentRunListenerStarted) {
|
||||
return;
|
||||
@@ -43,24 +112,27 @@ function ensureAgentRunListener() {
|
||||
if (phase === "start") {
|
||||
const startedAt = typeof evt.data?.startedAt === "number" ? evt.data.startedAt : undefined;
|
||||
agentRunStarts.set(evt.runId, startedAt ?? Date.now());
|
||||
clearPendingAgentRunError(evt.runId);
|
||||
// A new start means this run is active again (or retried). Drop stale
|
||||
// terminal snapshots so waiters don't resolve from old state.
|
||||
agentRunCache.delete(evt.runId);
|
||||
return;
|
||||
}
|
||||
if (phase !== "end" && phase !== "error") {
|
||||
return;
|
||||
}
|
||||
const startedAt =
|
||||
typeof evt.data?.startedAt === "number" ? evt.data.startedAt : agentRunStarts.get(evt.runId);
|
||||
const endedAt = typeof evt.data?.endedAt === "number" ? evt.data.endedAt : undefined;
|
||||
const error = typeof evt.data?.error === "string" ? evt.data.error : undefined;
|
||||
agentRunStarts.delete(evt.runId);
|
||||
recordAgentRunSnapshot({
|
||||
const snapshot = createSnapshotFromLifecycleEvent({
|
||||
runId: evt.runId,
|
||||
status: phase === "error" ? "error" : evt.data?.aborted ? "timeout" : "ok",
|
||||
startedAt,
|
||||
endedAt,
|
||||
error,
|
||||
ts: Date.now(),
|
||||
phase,
|
||||
data: evt.data,
|
||||
});
|
||||
agentRunStarts.delete(evt.runId);
|
||||
if (phase === "error") {
|
||||
schedulePendingAgentRunError(snapshot);
|
||||
return;
|
||||
}
|
||||
clearPendingAgentRunError(evt.runId);
|
||||
recordAgentRunSnapshot(snapshot);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -85,15 +157,50 @@ export async function waitForAgentJob(params: {
|
||||
|
||||
return await new Promise((resolve) => {
|
||||
let settled = false;
|
||||
let pendingErrorTimer: NodeJS.Timeout | undefined;
|
||||
|
||||
const clearPendingErrorTimer = () => {
|
||||
if (!pendingErrorTimer) {
|
||||
return;
|
||||
}
|
||||
clearTimeout(pendingErrorTimer);
|
||||
pendingErrorTimer = undefined;
|
||||
};
|
||||
|
||||
const finish = (entry: AgentRunSnapshot | null) => {
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
settled = true;
|
||||
clearTimeout(timer);
|
||||
clearPendingErrorTimer();
|
||||
unsubscribe();
|
||||
resolve(entry);
|
||||
};
|
||||
|
||||
const scheduleErrorFinish = (
|
||||
snapshot: AgentRunSnapshot,
|
||||
delayMs = AGENT_RUN_ERROR_RETRY_GRACE_MS,
|
||||
) => {
|
||||
clearPendingErrorTimer();
|
||||
const effectiveDelay = Math.max(1, Math.min(Math.floor(delayMs), 2_147_483_647));
|
||||
pendingErrorTimer = setTimeout(() => {
|
||||
const latest = getCachedAgentRun(runId);
|
||||
if (latest) {
|
||||
finish(latest);
|
||||
return;
|
||||
}
|
||||
recordAgentRunSnapshot(snapshot);
|
||||
finish(snapshot);
|
||||
}, effectiveDelay);
|
||||
pendingErrorTimer.unref?.();
|
||||
};
|
||||
|
||||
const pending = getPendingAgentRunError(runId);
|
||||
if (pending) {
|
||||
scheduleErrorFinish(pending.snapshot, pending.dueAt - Date.now());
|
||||
}
|
||||
|
||||
const unsubscribe = onAgentEvent((evt) => {
|
||||
if (!evt || evt.stream !== "lifecycle") {
|
||||
return;
|
||||
@@ -102,31 +209,31 @@ export async function waitForAgentJob(params: {
|
||||
return;
|
||||
}
|
||||
const phase = evt.data?.phase;
|
||||
if (phase === "start") {
|
||||
clearPendingErrorTimer();
|
||||
return;
|
||||
}
|
||||
if (phase !== "end" && phase !== "error") {
|
||||
return;
|
||||
}
|
||||
const cached = getCachedAgentRun(runId);
|
||||
if (cached) {
|
||||
finish(cached);
|
||||
const latest = getCachedAgentRun(runId);
|
||||
if (latest) {
|
||||
finish(latest);
|
||||
return;
|
||||
}
|
||||
const startedAt =
|
||||
typeof evt.data?.startedAt === "number"
|
||||
? evt.data.startedAt
|
||||
: agentRunStarts.get(evt.runId);
|
||||
const endedAt = typeof evt.data?.endedAt === "number" ? evt.data.endedAt : undefined;
|
||||
const error = typeof evt.data?.error === "string" ? evt.data.error : undefined;
|
||||
const snapshot: AgentRunSnapshot = {
|
||||
const snapshot = createSnapshotFromLifecycleEvent({
|
||||
runId: evt.runId,
|
||||
status: phase === "error" ? "error" : evt.data?.aborted ? "timeout" : "ok",
|
||||
startedAt,
|
||||
endedAt,
|
||||
error,
|
||||
ts: Date.now(),
|
||||
};
|
||||
phase,
|
||||
data: evt.data,
|
||||
});
|
||||
if (phase === "error") {
|
||||
scheduleErrorFinish(snapshot);
|
||||
return;
|
||||
}
|
||||
recordAgentRunSnapshot(snapshot);
|
||||
finish(snapshot);
|
||||
});
|
||||
|
||||
const timerDelayMs = Math.max(1, Math.min(Math.floor(timeoutMs), 2_147_483_647));
|
||||
const timer = setTimeout(() => finish(null), timerDelayMs);
|
||||
});
|
||||
|
||||
@@ -25,6 +25,17 @@ type HealthStatusHandlerParams = Parameters<
|
||||
>[0];
|
||||
|
||||
describe("waitForAgentJob", () => {
|
||||
const AGENT_RUN_ERROR_RETRY_GRACE_MS = 15_000;
|
||||
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z"));
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("maps lifecycle end events with aborted=true to timeout", async () => {
|
||||
const runId = `run-timeout-${Date.now()}-${Math.random().toString(36).slice(2)}`;
|
||||
const waitPromise = waitForAgentJob({ runId, timeoutMs: 1_000 });
|
||||
@@ -56,6 +67,86 @@ describe("waitForAgentJob", () => {
|
||||
expect(snapshot?.startedAt).toBe(300);
|
||||
expect(snapshot?.endedAt).toBe(400);
|
||||
});
|
||||
|
||||
it("treats transient error->start->end as recovered when restart lands inside grace", async () => {
|
||||
const runId = `run-recover-${Date.now()}-${Math.random().toString(36).slice(2)}`;
|
||||
const waitPromise = waitForAgentJob({ runId, timeoutMs: 60_000 });
|
||||
|
||||
emitAgentEvent({ runId, stream: "lifecycle", data: { phase: "start", startedAt: 100 } });
|
||||
emitAgentEvent({
|
||||
runId,
|
||||
stream: "lifecycle",
|
||||
data: { phase: "error", endedAt: 110, error: "transient" },
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1_000);
|
||||
emitAgentEvent({ runId, stream: "lifecycle", data: { phase: "start", startedAt: 200 } });
|
||||
emitAgentEvent({ runId, stream: "lifecycle", data: { phase: "end", endedAt: 260 } });
|
||||
|
||||
const snapshot = await waitPromise;
|
||||
expect(snapshot).not.toBeNull();
|
||||
expect(snapshot?.status).toBe("ok");
|
||||
expect(snapshot?.startedAt).toBe(200);
|
||||
expect(snapshot?.endedAt).toBe(260);
|
||||
});
|
||||
|
||||
it("resolves error only after grace expires when no recovery start arrives", async () => {
|
||||
const runId = `run-error-${Date.now()}-${Math.random().toString(36).slice(2)}`;
|
||||
const waitPromise = waitForAgentJob({ runId, timeoutMs: 60_000 });
|
||||
|
||||
emitAgentEvent({ runId, stream: "lifecycle", data: { phase: "start", startedAt: 10 } });
|
||||
emitAgentEvent({
|
||||
runId,
|
||||
stream: "lifecycle",
|
||||
data: { phase: "error", endedAt: 20, error: "fatal" },
|
||||
});
|
||||
|
||||
let settled = false;
|
||||
void waitPromise.finally(() => {
|
||||
settled = true;
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(AGENT_RUN_ERROR_RETRY_GRACE_MS - 1);
|
||||
expect(settled).toBe(false);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
const snapshot = await waitPromise;
|
||||
expect(snapshot).not.toBeNull();
|
||||
expect(snapshot?.status).toBe("error");
|
||||
expect(snapshot?.error).toBe("fatal");
|
||||
expect(snapshot?.startedAt).toBe(10);
|
||||
expect(snapshot?.endedAt).toBe(20);
|
||||
});
|
||||
|
||||
it("honors pending error grace when waiter attaches after the error event", async () => {
|
||||
const runId = `run-late-wait-${Date.now()}-${Math.random().toString(36).slice(2)}`;
|
||||
|
||||
emitAgentEvent({ runId, stream: "lifecycle", data: { phase: "start", startedAt: 900 } });
|
||||
emitAgentEvent({
|
||||
runId,
|
||||
stream: "lifecycle",
|
||||
data: { phase: "error", endedAt: 999, error: "late-listener" },
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(5_000);
|
||||
|
||||
const waitPromise = waitForAgentJob({ runId, timeoutMs: 60_000 });
|
||||
let settled = false;
|
||||
void waitPromise.finally(() => {
|
||||
settled = true;
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(AGENT_RUN_ERROR_RETRY_GRACE_MS - 5_001);
|
||||
expect(settled).toBe(false);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
const snapshot = await waitPromise;
|
||||
expect(snapshot).not.toBeNull();
|
||||
expect(snapshot?.status).toBe("error");
|
||||
expect(snapshot?.error).toBe("late-listener");
|
||||
expect(snapshot?.startedAt).toBe(900);
|
||||
expect(snapshot?.endedAt).toBe(999);
|
||||
});
|
||||
});
|
||||
|
||||
describe("injectTimestamp", () => {
|
||||
|
||||
@@ -263,7 +263,7 @@ describe("voice transcript events", () => {
|
||||
sessionKey: "voice-store-fail-session",
|
||||
}),
|
||||
});
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
await Promise.resolve();
|
||||
|
||||
expect(agentCommandMock).toHaveBeenCalledTimes(1);
|
||||
expect(warn).toHaveBeenCalledWith(expect.stringContaining("voice session-store update failed"));
|
||||
|
||||
Reference in New Issue
Block a user