mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-18 05:07:29 +00:00
test(agent): cover compaction-timeout parity and unsubscribe abort
This commit is contained in:
@@ -91,6 +91,10 @@ import {
|
||||
import { splitSdkTools } from "../tool-split.js";
|
||||
import { describeUnknownError, mapThinkingLevel } from "../utils.js";
|
||||
import { flushPendingToolResultsAfterIdle } from "../wait-for-idle-before-flush.js";
|
||||
import {
|
||||
selectCompactionTimeoutSnapshot,
|
||||
shouldFlagCompactionTimeout,
|
||||
} from "./compaction-timeout.js";
|
||||
import { detectAndLoadPromptImages } from "./images.js";
|
||||
|
||||
export function injectHistoryImagesIntoMessages(
|
||||
@@ -770,9 +774,13 @@ export async function runEmbeddedAttempt(
|
||||
`embedded run timeout: runId=${params.runId} sessionId=${params.sessionId} timeoutMs=${params.timeoutMs}`,
|
||||
);
|
||||
}
|
||||
// Check full compaction lifecycle (in-flight + pending retry) to avoid penalizing
|
||||
// auth profiles for infrastructure timeouts during compaction.
|
||||
if (subscription.isCompacting() || activeSession.isCompacting) {
|
||||
if (
|
||||
shouldFlagCompactionTimeout({
|
||||
isTimeout: true,
|
||||
isCompactionPendingOrRetrying: subscription.isCompacting(),
|
||||
isCompactionInFlight: activeSession.isCompacting,
|
||||
})
|
||||
) {
|
||||
timedOutDuringCompaction = true;
|
||||
}
|
||||
abortRun(true);
|
||||
@@ -797,7 +805,13 @@ export async function runEmbeddedAttempt(
|
||||
const onAbort = () => {
|
||||
const reason = params.abortSignal ? getAbortReason(params.abortSignal) : undefined;
|
||||
const timeout = reason ? isTimeoutError(reason) : false;
|
||||
if (timeout && (subscription.isCompacting() || activeSession.isCompacting)) {
|
||||
if (
|
||||
shouldFlagCompactionTimeout({
|
||||
isTimeout: timeout,
|
||||
isCompactionPendingOrRetrying: subscription.isCompacting(),
|
||||
isCompactionInFlight: activeSession.isCompacting,
|
||||
})
|
||||
) {
|
||||
timedOutDuringCompaction = true;
|
||||
}
|
||||
abortRun(timeout, reason);
|
||||
@@ -994,28 +1008,24 @@ export async function runEmbeddedAttempt(
|
||||
}
|
||||
}
|
||||
|
||||
// If timeout occurred during compaction, use pre-compaction snapshot
|
||||
// (compaction doesn't add messages, just restructures them)
|
||||
// Note: timedOutDuringCompaction is set in timeout handler to avoid race condition
|
||||
// If timeout occurred during compaction, use pre-compaction snapshot when available
|
||||
// (compaction restructures messages but does not add user/assistant turns).
|
||||
const snapshotSelection = selectCompactionTimeoutSnapshot({
|
||||
timedOutDuringCompaction,
|
||||
preCompactionSnapshot,
|
||||
preCompactionSessionId,
|
||||
currentSnapshot: activeSession.messages.slice(),
|
||||
currentSessionId: activeSession.sessionId,
|
||||
});
|
||||
if (timedOutDuringCompaction) {
|
||||
if (!isProbeSession) {
|
||||
log.warn(
|
||||
`using ${preCompactionSnapshot ? "pre-compaction" : "current"} snapshot: timed out during compaction runId=${params.runId} sessionId=${params.sessionId}`,
|
||||
`using ${snapshotSelection.source} snapshot: timed out during compaction runId=${params.runId} sessionId=${params.sessionId}`,
|
||||
);
|
||||
}
|
||||
// Use pre-compaction snapshot if available (captured when not compacting), otherwise current
|
||||
// Keep sessionId consistent with whichever snapshot source is used
|
||||
if (preCompactionSnapshot) {
|
||||
messagesSnapshot = preCompactionSnapshot;
|
||||
sessionIdUsed = preCompactionSessionId;
|
||||
} else {
|
||||
messagesSnapshot = activeSession.messages.slice();
|
||||
sessionIdUsed = activeSession.sessionId;
|
||||
}
|
||||
} else {
|
||||
messagesSnapshot = activeSession.messages.slice();
|
||||
sessionIdUsed = activeSession.sessionId;
|
||||
}
|
||||
messagesSnapshot = snapshotSelection.messagesSnapshot;
|
||||
sessionIdUsed = snapshotSelection.sessionIdUsed;
|
||||
cacheTrace?.recordStage("session:after", {
|
||||
messages: messagesSnapshot,
|
||||
note: timedOutDuringCompaction
|
||||
|
||||
@@ -0,0 +1,61 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import {
|
||||
selectCompactionTimeoutSnapshot,
|
||||
shouldFlagCompactionTimeout,
|
||||
} from "./compaction-timeout.js";
|
||||
|
||||
describe("compaction-timeout helpers", () => {
|
||||
it("flags compaction timeout consistently for internal and external timeout sources", () => {
|
||||
const internalTimer = shouldFlagCompactionTimeout({
|
||||
isTimeout: true,
|
||||
isCompactionPendingOrRetrying: true,
|
||||
isCompactionInFlight: false,
|
||||
});
|
||||
const externalAbort = shouldFlagCompactionTimeout({
|
||||
isTimeout: true,
|
||||
isCompactionPendingOrRetrying: true,
|
||||
isCompactionInFlight: false,
|
||||
});
|
||||
expect(internalTimer).toBe(true);
|
||||
expect(externalAbort).toBe(true);
|
||||
});
|
||||
|
||||
it("does not flag when timeout is false", () => {
|
||||
expect(
|
||||
shouldFlagCompactionTimeout({
|
||||
isTimeout: false,
|
||||
isCompactionPendingOrRetrying: true,
|
||||
isCompactionInFlight: true,
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it("uses pre-compaction snapshot when compaction timeout occurs", () => {
|
||||
const pre = [{ role: "assistant", content: "pre" }] as const;
|
||||
const current = [{ role: "assistant", content: "current" }] as const;
|
||||
const selected = selectCompactionTimeoutSnapshot({
|
||||
timedOutDuringCompaction: true,
|
||||
preCompactionSnapshot: [...pre],
|
||||
preCompactionSessionId: "session-pre",
|
||||
currentSnapshot: [...current],
|
||||
currentSessionId: "session-current",
|
||||
});
|
||||
expect(selected.source).toBe("pre-compaction");
|
||||
expect(selected.sessionIdUsed).toBe("session-pre");
|
||||
expect(selected.messagesSnapshot).toEqual(pre);
|
||||
});
|
||||
|
||||
it("falls back to current snapshot when pre-compaction snapshot is unavailable", () => {
|
||||
const current = [{ role: "assistant", content: "current" }] as const;
|
||||
const selected = selectCompactionTimeoutSnapshot({
|
||||
timedOutDuringCompaction: true,
|
||||
preCompactionSnapshot: null,
|
||||
preCompactionSessionId: "session-pre",
|
||||
currentSnapshot: [...current],
|
||||
currentSessionId: "session-current",
|
||||
});
|
||||
expect(selected.source).toBe("current");
|
||||
expect(selected.sessionIdUsed).toBe("session-current");
|
||||
expect(selected.messagesSnapshot).toEqual(current);
|
||||
});
|
||||
});
|
||||
54
src/agents/pi-embedded-runner/run/compaction-timeout.ts
Normal file
54
src/agents/pi-embedded-runner/run/compaction-timeout.ts
Normal file
@@ -0,0 +1,54 @@
|
||||
import type { AgentMessage } from "@mariozechner/pi-agent-core";
|
||||
|
||||
export type CompactionTimeoutSignal = {
|
||||
isTimeout: boolean;
|
||||
isCompactionPendingOrRetrying: boolean;
|
||||
isCompactionInFlight: boolean;
|
||||
};
|
||||
|
||||
export function shouldFlagCompactionTimeout(signal: CompactionTimeoutSignal): boolean {
|
||||
if (!signal.isTimeout) {
|
||||
return false;
|
||||
}
|
||||
return signal.isCompactionPendingOrRetrying || signal.isCompactionInFlight;
|
||||
}
|
||||
|
||||
export type SnapshotSelectionParams = {
|
||||
timedOutDuringCompaction: boolean;
|
||||
preCompactionSnapshot: AgentMessage[] | null;
|
||||
preCompactionSessionId: string;
|
||||
currentSnapshot: AgentMessage[];
|
||||
currentSessionId: string;
|
||||
};
|
||||
|
||||
export type SnapshotSelection = {
|
||||
messagesSnapshot: AgentMessage[];
|
||||
sessionIdUsed: string;
|
||||
source: "pre-compaction" | "current";
|
||||
};
|
||||
|
||||
export function selectCompactionTimeoutSnapshot(
|
||||
params: SnapshotSelectionParams,
|
||||
): SnapshotSelection {
|
||||
if (!params.timedOutDuringCompaction) {
|
||||
return {
|
||||
messagesSnapshot: params.currentSnapshot,
|
||||
sessionIdUsed: params.currentSessionId,
|
||||
source: "current",
|
||||
};
|
||||
}
|
||||
|
||||
if (params.preCompactionSnapshot) {
|
||||
return {
|
||||
messagesSnapshot: params.preCompactionSnapshot,
|
||||
sessionIdUsed: params.preCompactionSessionId,
|
||||
source: "pre-compaction",
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
messagesSnapshot: params.currentSnapshot,
|
||||
sessionIdUsed: params.currentSessionId,
|
||||
source: "current",
|
||||
};
|
||||
}
|
||||
@@ -97,6 +97,38 @@ describe("subscribeEmbeddedPiSession", () => {
|
||||
{ phase: "end", willRetry: false },
|
||||
]);
|
||||
});
|
||||
|
||||
it("rejects compaction wait with AbortError when unsubscribed", async () => {
|
||||
const listeners: SessionEventHandler[] = [];
|
||||
const abortCompaction = vi.fn();
|
||||
const session = {
|
||||
isCompacting: true,
|
||||
abortCompaction,
|
||||
subscribe: (listener: SessionEventHandler) => {
|
||||
listeners.push(listener);
|
||||
return () => {};
|
||||
},
|
||||
} as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"];
|
||||
|
||||
const subscription = subscribeEmbeddedPiSession({
|
||||
session,
|
||||
runId: "run-abort-on-unsubscribe",
|
||||
});
|
||||
|
||||
for (const listener of listeners) {
|
||||
listener({ type: "auto_compaction_start" });
|
||||
}
|
||||
|
||||
const waitPromise = subscription.waitForCompactionRetry();
|
||||
subscription.unsubscribe();
|
||||
|
||||
await expect(waitPromise).rejects.toMatchObject({ name: "AbortError" });
|
||||
await expect(subscription.waitForCompactionRetry()).rejects.toMatchObject({
|
||||
name: "AbortError",
|
||||
});
|
||||
expect(abortCompaction).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("emits tool summaries at tool start when verbose is on", async () => {
|
||||
let handler: ((evt: unknown) => void) | undefined;
|
||||
const session: StubSession = {
|
||||
|
||||
Reference in New Issue
Block a user