mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-08 05:01:23 +00:00
fix(agent): prevent session lock deadlock on timeout during compaction (#9855)
Merged via /review-pr -> /prepare-pr -> /merge-pr.
Prepared head SHA: 64a28900f1
Co-authored-by: mverrilli <816450+mverrilli@users.noreply.github.com>
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Reviewed-by: @gumadeiras
This commit is contained in:
@@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai
|
|||||||
|
|
||||||
### Fixes
|
### Fixes
|
||||||
|
|
||||||
|
- Agents: classify external timeout aborts during compaction the same as internal timeouts, preventing unnecessary auth-profile rotation and preserving compaction-timeout snapshot fallback behavior.
|
||||||
- Sessions/Agents: harden transcript path resolution for mismatched agent context by preserving explicit store roots and adding safe absolute-path fallback to the correct agent sessions directory. (#16288) Thanks @robbyczgw-cla.
|
- Sessions/Agents: harden transcript path resolution for mismatched agent context by preserving explicit store roots and adding safe absolute-path fallback to the correct agent sessions directory. (#16288) Thanks @robbyczgw-cla.
|
||||||
- BlueBubbles: include sender identity in group chat envelopes and pass clean message text to the agent prompt, aligning with iMessage/Signal formatting. (#16210) Thanks @zerone0x.
|
- BlueBubbles: include sender identity in group chat envelopes and pass clean message text to the agent prompt, aligning with iMessage/Signal formatting. (#16210) Thanks @zerone0x.
|
||||||
- WhatsApp: honor per-account `dmPolicy` overrides (account-level settings now take precedence over channel defaults for inbound DMs). (#10082) Thanks @mcaxtr.
|
- WhatsApp: honor per-account `dmPolicy` overrides (account-level settings now take precedence over channel defaults for inbound DMs). (#10082) Thanks @mcaxtr.
|
||||||
|
|||||||
@@ -47,6 +47,7 @@ const buildAssistant = (overrides: Partial<AssistantMessage>): AssistantMessage
|
|||||||
const makeAttempt = (overrides: Partial<EmbeddedRunAttemptResult>): EmbeddedRunAttemptResult => ({
|
const makeAttempt = (overrides: Partial<EmbeddedRunAttemptResult>): EmbeddedRunAttemptResult => ({
|
||||||
aborted: false,
|
aborted: false,
|
||||||
timedOut: false,
|
timedOut: false,
|
||||||
|
timedOutDuringCompaction: false,
|
||||||
promptError: null,
|
promptError: null,
|
||||||
sessionIdUsed: "session:test",
|
sessionIdUsed: "session:test",
|
||||||
systemPromptReport: undefined,
|
systemPromptReport: undefined,
|
||||||
@@ -174,6 +175,54 @@ describe("runEmbeddedPiAgent auth profile rotation", () => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("does not rotate for compaction timeouts", async () => {
|
||||||
|
const agentDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-agent-"));
|
||||||
|
const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-workspace-"));
|
||||||
|
try {
|
||||||
|
await writeAuthStore(agentDir);
|
||||||
|
|
||||||
|
runEmbeddedAttemptMock.mockResolvedValueOnce(
|
||||||
|
makeAttempt({
|
||||||
|
aborted: true,
|
||||||
|
timedOut: true,
|
||||||
|
timedOutDuringCompaction: true,
|
||||||
|
assistantTexts: ["partial"],
|
||||||
|
lastAssistant: buildAssistant({
|
||||||
|
stopReason: "stop",
|
||||||
|
content: [{ type: "text", text: "partial" }],
|
||||||
|
}),
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
const result = await runEmbeddedPiAgent({
|
||||||
|
sessionId: "session:test",
|
||||||
|
sessionKey: "agent:test:compaction-timeout",
|
||||||
|
sessionFile: path.join(workspaceDir, "session.jsonl"),
|
||||||
|
workspaceDir,
|
||||||
|
agentDir,
|
||||||
|
config: makeConfig(),
|
||||||
|
prompt: "hello",
|
||||||
|
provider: "openai",
|
||||||
|
model: "mock-1",
|
||||||
|
authProfileId: "openai:p1",
|
||||||
|
authProfileIdSource: "auto",
|
||||||
|
timeoutMs: 5_000,
|
||||||
|
runId: "run:compaction-timeout",
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(runEmbeddedAttemptMock).toHaveBeenCalledTimes(1);
|
||||||
|
expect(result.meta.aborted).toBe(true);
|
||||||
|
|
||||||
|
const stored = JSON.parse(
|
||||||
|
await fs.readFile(path.join(agentDir, "auth-profiles.json"), "utf-8"),
|
||||||
|
) as { usageStats?: Record<string, { lastUsed?: number }> };
|
||||||
|
expect(stored.usageStats?.["openai:p2"]?.lastUsed).toBe(2);
|
||||||
|
} finally {
|
||||||
|
await fs.rm(agentDir, { recursive: true, force: true });
|
||||||
|
await fs.rm(workspaceDir, { recursive: true, force: true });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
it("does not rotate for user-pinned profiles", async () => {
|
it("does not rotate for user-pinned profiles", async () => {
|
||||||
const agentDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-agent-"));
|
const agentDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-agent-"));
|
||||||
const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-workspace-"));
|
const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-workspace-"));
|
||||||
|
|||||||
@@ -206,6 +206,7 @@ function makeAttemptResult(
|
|||||||
return {
|
return {
|
||||||
aborted: false,
|
aborted: false,
|
||||||
timedOut: false,
|
timedOut: false,
|
||||||
|
timedOutDuringCompaction: false,
|
||||||
promptError: null,
|
promptError: null,
|
||||||
sessionIdUsed: "test-session",
|
sessionIdUsed: "test-session",
|
||||||
assistantTexts: ["Hello!"],
|
assistantTexts: ["Hello!"],
|
||||||
|
|||||||
@@ -480,7 +480,14 @@ export async function runEmbeddedPiAgent(
|
|||||||
enforceFinalTag: params.enforceFinalTag,
|
enforceFinalTag: params.enforceFinalTag,
|
||||||
});
|
});
|
||||||
|
|
||||||
const { aborted, promptError, timedOut, sessionIdUsed, lastAssistant } = attempt;
|
const {
|
||||||
|
aborted,
|
||||||
|
promptError,
|
||||||
|
timedOut,
|
||||||
|
timedOutDuringCompaction,
|
||||||
|
sessionIdUsed,
|
||||||
|
lastAssistant,
|
||||||
|
} = attempt;
|
||||||
const lastAssistantUsage = normalizeUsage(lastAssistant?.usage as UsageLike);
|
const lastAssistantUsage = normalizeUsage(lastAssistant?.usage as UsageLike);
|
||||||
const attemptUsage = attempt.attemptUsage ?? lastAssistantUsage;
|
const attemptUsage = attempt.attemptUsage ?? lastAssistantUsage;
|
||||||
mergeUsageIntoAccumulator(usageAccumulator, attemptUsage);
|
mergeUsageIntoAccumulator(usageAccumulator, attemptUsage);
|
||||||
@@ -801,7 +808,9 @@ export async function runEmbeddedPiAgent(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Treat timeout as potential rate limit (Antigravity hangs on rate limit)
|
// Treat timeout as potential rate limit (Antigravity hangs on rate limit)
|
||||||
const shouldRotate = (!aborted && failoverFailure) || timedOut;
|
// But exclude post-prompt compaction timeouts (model succeeded; no profile issue)
|
||||||
|
const shouldRotate =
|
||||||
|
(!aborted && failoverFailure) || (timedOut && !timedOutDuringCompaction);
|
||||||
|
|
||||||
if (shouldRotate) {
|
if (shouldRotate) {
|
||||||
if (lastProfileId) {
|
if (lastProfileId) {
|
||||||
|
|||||||
@@ -91,6 +91,10 @@ import {
|
|||||||
import { splitSdkTools } from "../tool-split.js";
|
import { splitSdkTools } from "../tool-split.js";
|
||||||
import { describeUnknownError, mapThinkingLevel } from "../utils.js";
|
import { describeUnknownError, mapThinkingLevel } from "../utils.js";
|
||||||
import { flushPendingToolResultsAfterIdle } from "../wait-for-idle-before-flush.js";
|
import { flushPendingToolResultsAfterIdle } from "../wait-for-idle-before-flush.js";
|
||||||
|
import {
|
||||||
|
selectCompactionTimeoutSnapshot,
|
||||||
|
shouldFlagCompactionTimeout,
|
||||||
|
} from "./compaction-timeout.js";
|
||||||
import { detectAndLoadPromptImages } from "./images.js";
|
import { detectAndLoadPromptImages } from "./images.js";
|
||||||
|
|
||||||
export function injectHistoryImagesIntoMessages(
|
export function injectHistoryImagesIntoMessages(
|
||||||
@@ -665,6 +669,7 @@ export async function runEmbeddedAttempt(
|
|||||||
|
|
||||||
let aborted = Boolean(params.abortSignal?.aborted);
|
let aborted = Boolean(params.abortSignal?.aborted);
|
||||||
let timedOut = false;
|
let timedOut = false;
|
||||||
|
let timedOutDuringCompaction = false;
|
||||||
const getAbortReason = (signal: AbortSignal): unknown =>
|
const getAbortReason = (signal: AbortSignal): unknown =>
|
||||||
"reason" in signal ? (signal as { reason?: unknown }).reason : undefined;
|
"reason" in signal ? (signal as { reason?: unknown }).reason : undefined;
|
||||||
const makeTimeoutAbortReason = (): Error => {
|
const makeTimeoutAbortReason = (): Error => {
|
||||||
@@ -769,6 +774,15 @@ export async function runEmbeddedAttempt(
|
|||||||
`embedded run timeout: runId=${params.runId} sessionId=${params.sessionId} timeoutMs=${params.timeoutMs}`,
|
`embedded run timeout: runId=${params.runId} sessionId=${params.sessionId} timeoutMs=${params.timeoutMs}`,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
if (
|
||||||
|
shouldFlagCompactionTimeout({
|
||||||
|
isTimeout: true,
|
||||||
|
isCompactionPendingOrRetrying: subscription.isCompacting(),
|
||||||
|
isCompactionInFlight: activeSession.isCompacting,
|
||||||
|
})
|
||||||
|
) {
|
||||||
|
timedOutDuringCompaction = true;
|
||||||
|
}
|
||||||
abortRun(true);
|
abortRun(true);
|
||||||
if (!abortWarnTimer) {
|
if (!abortWarnTimer) {
|
||||||
abortWarnTimer = setTimeout(() => {
|
abortWarnTimer = setTimeout(() => {
|
||||||
@@ -791,6 +805,15 @@ export async function runEmbeddedAttempt(
|
|||||||
const onAbort = () => {
|
const onAbort = () => {
|
||||||
const reason = params.abortSignal ? getAbortReason(params.abortSignal) : undefined;
|
const reason = params.abortSignal ? getAbortReason(params.abortSignal) : undefined;
|
||||||
const timeout = reason ? isTimeoutError(reason) : false;
|
const timeout = reason ? isTimeoutError(reason) : false;
|
||||||
|
if (
|
||||||
|
shouldFlagCompactionTimeout({
|
||||||
|
isTimeout: timeout,
|
||||||
|
isCompactionPendingOrRetrying: subscription.isCompacting(),
|
||||||
|
isCompactionInFlight: activeSession.isCompacting,
|
||||||
|
})
|
||||||
|
) {
|
||||||
|
timedOutDuringCompaction = true;
|
||||||
|
}
|
||||||
abortRun(timeout, reason);
|
abortRun(timeout, reason);
|
||||||
};
|
};
|
||||||
if (params.abortSignal) {
|
if (params.abortSignal) {
|
||||||
@@ -939,13 +962,28 @@ export async function runEmbeddedAttempt(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Capture snapshot before compaction wait so we have complete messages if timeout occurs
|
||||||
|
// Check compaction state before and after to avoid race condition where compaction starts during capture
|
||||||
|
// Use session state (not subscription) for snapshot decisions - need instantaneous compaction status
|
||||||
|
const wasCompactingBefore = activeSession.isCompacting;
|
||||||
|
const snapshot = activeSession.messages.slice();
|
||||||
|
const wasCompactingAfter = activeSession.isCompacting;
|
||||||
|
// Only trust snapshot if compaction wasn't running before or after capture
|
||||||
|
const preCompactionSnapshot = wasCompactingBefore || wasCompactingAfter ? null : snapshot;
|
||||||
|
const preCompactionSessionId = activeSession.sessionId;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await waitForCompactionRetry();
|
await abortable(waitForCompactionRetry());
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
if (isRunnerAbortError(err)) {
|
if (isRunnerAbortError(err)) {
|
||||||
if (!promptError) {
|
if (!promptError) {
|
||||||
promptError = err;
|
promptError = err;
|
||||||
}
|
}
|
||||||
|
if (!isProbeSession) {
|
||||||
|
log.debug(
|
||||||
|
`compaction wait aborted: runId=${params.runId} sessionId=${params.sessionId}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
@@ -956,27 +994,51 @@ export async function runEmbeddedAttempt(
|
|||||||
// inserted between compaction and the next prompt — breaking the
|
// inserted between compaction and the next prompt — breaking the
|
||||||
// prepareCompaction() guard that checks the last entry type, leading to
|
// prepareCompaction() guard that checks the last entry type, leading to
|
||||||
// double-compaction. See: https://github.com/openclaw/openclaw/issues/9282
|
// double-compaction. See: https://github.com/openclaw/openclaw/issues/9282
|
||||||
const shouldTrackCacheTtl =
|
// Skip when timed out during compaction — session state may be inconsistent.
|
||||||
params.config?.agents?.defaults?.contextPruning?.mode === "cache-ttl" &&
|
if (!timedOutDuringCompaction) {
|
||||||
isCacheTtlEligibleProvider(params.provider, params.modelId);
|
const shouldTrackCacheTtl =
|
||||||
if (shouldTrackCacheTtl) {
|
params.config?.agents?.defaults?.contextPruning?.mode === "cache-ttl" &&
|
||||||
appendCacheTtlTimestamp(sessionManager, {
|
isCacheTtlEligibleProvider(params.provider, params.modelId);
|
||||||
timestamp: Date.now(),
|
if (shouldTrackCacheTtl) {
|
||||||
provider: params.provider,
|
appendCacheTtlTimestamp(sessionManager, {
|
||||||
modelId: params.modelId,
|
timestamp: Date.now(),
|
||||||
});
|
provider: params.provider,
|
||||||
|
modelId: params.modelId,
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
messagesSnapshot = activeSession.messages.slice();
|
// If timeout occurred during compaction, use pre-compaction snapshot when available
|
||||||
sessionIdUsed = activeSession.sessionId;
|
// (compaction restructures messages but does not add user/assistant turns).
|
||||||
|
const snapshotSelection = selectCompactionTimeoutSnapshot({
|
||||||
|
timedOutDuringCompaction,
|
||||||
|
preCompactionSnapshot,
|
||||||
|
preCompactionSessionId,
|
||||||
|
currentSnapshot: activeSession.messages.slice(),
|
||||||
|
currentSessionId: activeSession.sessionId,
|
||||||
|
});
|
||||||
|
if (timedOutDuringCompaction) {
|
||||||
|
if (!isProbeSession) {
|
||||||
|
log.warn(
|
||||||
|
`using ${snapshotSelection.source} snapshot: timed out during compaction runId=${params.runId} sessionId=${params.sessionId}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
messagesSnapshot = snapshotSelection.messagesSnapshot;
|
||||||
|
sessionIdUsed = snapshotSelection.sessionIdUsed;
|
||||||
cacheTrace?.recordStage("session:after", {
|
cacheTrace?.recordStage("session:after", {
|
||||||
messages: messagesSnapshot,
|
messages: messagesSnapshot,
|
||||||
note: promptError ? "prompt error" : undefined,
|
note: timedOutDuringCompaction
|
||||||
|
? "compaction timeout"
|
||||||
|
: promptError
|
||||||
|
? "prompt error"
|
||||||
|
: undefined,
|
||||||
});
|
});
|
||||||
anthropicPayloadLogger?.recordUsage(messagesSnapshot, promptError);
|
anthropicPayloadLogger?.recordUsage(messagesSnapshot, promptError);
|
||||||
|
|
||||||
// Run agent_end hooks to allow plugins to analyze the conversation
|
// Run agent_end hooks to allow plugins to analyze the conversation
|
||||||
// This is fire-and-forget, so we don't await
|
// This is fire-and-forget, so we don't await
|
||||||
|
// Run even on compaction timeout so plugins can log/cleanup
|
||||||
if (hookRunner?.hasHooks("agent_end")) {
|
if (hookRunner?.hasHooks("agent_end")) {
|
||||||
hookRunner
|
hookRunner
|
||||||
.runAgentEnd(
|
.runAgentEnd(
|
||||||
@@ -1003,7 +1065,21 @@ export async function runEmbeddedAttempt(
|
|||||||
if (abortWarnTimer) {
|
if (abortWarnTimer) {
|
||||||
clearTimeout(abortWarnTimer);
|
clearTimeout(abortWarnTimer);
|
||||||
}
|
}
|
||||||
unsubscribe();
|
if (!isProbeSession && (aborted || timedOut) && !timedOutDuringCompaction) {
|
||||||
|
log.debug(
|
||||||
|
`run cleanup: runId=${params.runId} sessionId=${params.sessionId} aborted=${aborted} timedOut=${timedOut}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
unsubscribe();
|
||||||
|
} catch (err) {
|
||||||
|
// unsubscribe() should never throw; if it does, it indicates a serious bug.
|
||||||
|
// Log at error level to ensure visibility, but don't rethrow in finally block
|
||||||
|
// as it would mask any exception from the try block above.
|
||||||
|
log.error(
|
||||||
|
`CRITICAL: unsubscribe failed, possible resource leak: runId=${params.runId} ${String(err)}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
clearActiveEmbeddedRun(params.sessionId, queueHandle);
|
clearActiveEmbeddedRun(params.sessionId, queueHandle);
|
||||||
params.abortSignal?.removeEventListener?.("abort", onAbort);
|
params.abortSignal?.removeEventListener?.("abort", onAbort);
|
||||||
}
|
}
|
||||||
@@ -1023,6 +1099,7 @@ export async function runEmbeddedAttempt(
|
|||||||
return {
|
return {
|
||||||
aborted,
|
aborted,
|
||||||
timedOut,
|
timedOut,
|
||||||
|
timedOutDuringCompaction,
|
||||||
promptError,
|
promptError,
|
||||||
sessionIdUsed,
|
sessionIdUsed,
|
||||||
systemPromptReport,
|
systemPromptReport,
|
||||||
|
|||||||
@@ -0,0 +1,61 @@
|
|||||||
|
import { describe, expect, it } from "vitest";
|
||||||
|
import {
|
||||||
|
selectCompactionTimeoutSnapshot,
|
||||||
|
shouldFlagCompactionTimeout,
|
||||||
|
} from "./compaction-timeout.js";
|
||||||
|
|
||||||
|
describe("compaction-timeout helpers", () => {
|
||||||
|
it("flags compaction timeout consistently for internal and external timeout sources", () => {
|
||||||
|
const internalTimer = shouldFlagCompactionTimeout({
|
||||||
|
isTimeout: true,
|
||||||
|
isCompactionPendingOrRetrying: true,
|
||||||
|
isCompactionInFlight: false,
|
||||||
|
});
|
||||||
|
const externalAbort = shouldFlagCompactionTimeout({
|
||||||
|
isTimeout: true,
|
||||||
|
isCompactionPendingOrRetrying: true,
|
||||||
|
isCompactionInFlight: false,
|
||||||
|
});
|
||||||
|
expect(internalTimer).toBe(true);
|
||||||
|
expect(externalAbort).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("does not flag when timeout is false", () => {
|
||||||
|
expect(
|
||||||
|
shouldFlagCompactionTimeout({
|
||||||
|
isTimeout: false,
|
||||||
|
isCompactionPendingOrRetrying: true,
|
||||||
|
isCompactionInFlight: true,
|
||||||
|
}),
|
||||||
|
).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("uses pre-compaction snapshot when compaction timeout occurs", () => {
|
||||||
|
const pre = [{ role: "assistant", content: "pre" }] as const;
|
||||||
|
const current = [{ role: "assistant", content: "current" }] as const;
|
||||||
|
const selected = selectCompactionTimeoutSnapshot({
|
||||||
|
timedOutDuringCompaction: true,
|
||||||
|
preCompactionSnapshot: [...pre],
|
||||||
|
preCompactionSessionId: "session-pre",
|
||||||
|
currentSnapshot: [...current],
|
||||||
|
currentSessionId: "session-current",
|
||||||
|
});
|
||||||
|
expect(selected.source).toBe("pre-compaction");
|
||||||
|
expect(selected.sessionIdUsed).toBe("session-pre");
|
||||||
|
expect(selected.messagesSnapshot).toEqual(pre);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("falls back to current snapshot when pre-compaction snapshot is unavailable", () => {
|
||||||
|
const current = [{ role: "assistant", content: "current" }] as const;
|
||||||
|
const selected = selectCompactionTimeoutSnapshot({
|
||||||
|
timedOutDuringCompaction: true,
|
||||||
|
preCompactionSnapshot: null,
|
||||||
|
preCompactionSessionId: "session-pre",
|
||||||
|
currentSnapshot: [...current],
|
||||||
|
currentSessionId: "session-current",
|
||||||
|
});
|
||||||
|
expect(selected.source).toBe("current");
|
||||||
|
expect(selected.sessionIdUsed).toBe("session-current");
|
||||||
|
expect(selected.messagesSnapshot).toEqual(current);
|
||||||
|
});
|
||||||
|
});
|
||||||
54
src/agents/pi-embedded-runner/run/compaction-timeout.ts
Normal file
54
src/agents/pi-embedded-runner/run/compaction-timeout.ts
Normal file
@@ -0,0 +1,54 @@
|
|||||||
|
import type { AgentMessage } from "@mariozechner/pi-agent-core";
|
||||||
|
|
||||||
|
export type CompactionTimeoutSignal = {
|
||||||
|
isTimeout: boolean;
|
||||||
|
isCompactionPendingOrRetrying: boolean;
|
||||||
|
isCompactionInFlight: boolean;
|
||||||
|
};
|
||||||
|
|
||||||
|
export function shouldFlagCompactionTimeout(signal: CompactionTimeoutSignal): boolean {
|
||||||
|
if (!signal.isTimeout) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return signal.isCompactionPendingOrRetrying || signal.isCompactionInFlight;
|
||||||
|
}
|
||||||
|
|
||||||
|
export type SnapshotSelectionParams = {
|
||||||
|
timedOutDuringCompaction: boolean;
|
||||||
|
preCompactionSnapshot: AgentMessage[] | null;
|
||||||
|
preCompactionSessionId: string;
|
||||||
|
currentSnapshot: AgentMessage[];
|
||||||
|
currentSessionId: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
export type SnapshotSelection = {
|
||||||
|
messagesSnapshot: AgentMessage[];
|
||||||
|
sessionIdUsed: string;
|
||||||
|
source: "pre-compaction" | "current";
|
||||||
|
};
|
||||||
|
|
||||||
|
export function selectCompactionTimeoutSnapshot(
|
||||||
|
params: SnapshotSelectionParams,
|
||||||
|
): SnapshotSelection {
|
||||||
|
if (!params.timedOutDuringCompaction) {
|
||||||
|
return {
|
||||||
|
messagesSnapshot: params.currentSnapshot,
|
||||||
|
sessionIdUsed: params.currentSessionId,
|
||||||
|
source: "current",
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
if (params.preCompactionSnapshot) {
|
||||||
|
return {
|
||||||
|
messagesSnapshot: params.preCompactionSnapshot,
|
||||||
|
sessionIdUsed: params.preCompactionSessionId,
|
||||||
|
source: "pre-compaction",
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
messagesSnapshot: params.currentSnapshot,
|
||||||
|
sessionIdUsed: params.currentSessionId,
|
||||||
|
source: "current",
|
||||||
|
};
|
||||||
|
}
|
||||||
@@ -24,6 +24,8 @@ export type EmbeddedRunAttemptParams = EmbeddedRunAttemptBase & {
|
|||||||
export type EmbeddedRunAttemptResult = {
|
export type EmbeddedRunAttemptResult = {
|
||||||
aborted: boolean;
|
aborted: boolean;
|
||||||
timedOut: boolean;
|
timedOut: boolean;
|
||||||
|
/** True if the timeout occurred while compaction was in progress or pending. */
|
||||||
|
timedOutDuringCompaction: boolean;
|
||||||
promptError: unknown;
|
promptError: unknown;
|
||||||
sessionIdUsed: string;
|
sessionIdUsed: string;
|
||||||
systemPromptReport?: SessionSystemPromptReport;
|
systemPromptReport?: SessionSystemPromptReport;
|
||||||
|
|||||||
@@ -55,7 +55,9 @@ export type EmbeddedPiSubscribeState = {
|
|||||||
compactionInFlight: boolean;
|
compactionInFlight: boolean;
|
||||||
pendingCompactionRetry: number;
|
pendingCompactionRetry: number;
|
||||||
compactionRetryResolve?: () => void;
|
compactionRetryResolve?: () => void;
|
||||||
|
compactionRetryReject?: (reason?: unknown) => void;
|
||||||
compactionRetryPromise: Promise<void> | null;
|
compactionRetryPromise: Promise<void> | null;
|
||||||
|
unsubscribed: boolean;
|
||||||
|
|
||||||
messagingToolSentTexts: string[];
|
messagingToolSentTexts: string[];
|
||||||
messagingToolSentTextsNormalized: string[];
|
messagingToolSentTextsNormalized: string[];
|
||||||
|
|||||||
@@ -97,6 +97,38 @@ describe("subscribeEmbeddedPiSession", () => {
|
|||||||
{ phase: "end", willRetry: false },
|
{ phase: "end", willRetry: false },
|
||||||
]);
|
]);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("rejects compaction wait with AbortError when unsubscribed", async () => {
|
||||||
|
const listeners: SessionEventHandler[] = [];
|
||||||
|
const abortCompaction = vi.fn();
|
||||||
|
const session = {
|
||||||
|
isCompacting: true,
|
||||||
|
abortCompaction,
|
||||||
|
subscribe: (listener: SessionEventHandler) => {
|
||||||
|
listeners.push(listener);
|
||||||
|
return () => {};
|
||||||
|
},
|
||||||
|
} as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"];
|
||||||
|
|
||||||
|
const subscription = subscribeEmbeddedPiSession({
|
||||||
|
session,
|
||||||
|
runId: "run-abort-on-unsubscribe",
|
||||||
|
});
|
||||||
|
|
||||||
|
for (const listener of listeners) {
|
||||||
|
listener({ type: "auto_compaction_start" });
|
||||||
|
}
|
||||||
|
|
||||||
|
const waitPromise = subscription.waitForCompactionRetry();
|
||||||
|
subscription.unsubscribe();
|
||||||
|
|
||||||
|
await expect(waitPromise).rejects.toMatchObject({ name: "AbortError" });
|
||||||
|
await expect(subscription.waitForCompactionRetry()).rejects.toMatchObject({
|
||||||
|
name: "AbortError",
|
||||||
|
});
|
||||||
|
expect(abortCompaction).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
it("emits tool summaries at tool start when verbose is on", async () => {
|
it("emits tool summaries at tool start when verbose is on", async () => {
|
||||||
let handler: ((evt: unknown) => void) | undefined;
|
let handler: ((evt: unknown) => void) | undefined;
|
||||||
const session: StubSession = {
|
const session: StubSession = {
|
||||||
|
|||||||
@@ -65,7 +65,9 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
|
|||||||
compactionInFlight: false,
|
compactionInFlight: false,
|
||||||
pendingCompactionRetry: 0,
|
pendingCompactionRetry: 0,
|
||||||
compactionRetryResolve: undefined,
|
compactionRetryResolve: undefined,
|
||||||
|
compactionRetryReject: undefined,
|
||||||
compactionRetryPromise: null,
|
compactionRetryPromise: null,
|
||||||
|
unsubscribed: false,
|
||||||
messagingToolSentTexts: [],
|
messagingToolSentTexts: [],
|
||||||
messagingToolSentTextsNormalized: [],
|
messagingToolSentTextsNormalized: [],
|
||||||
messagingToolSentTargets: [],
|
messagingToolSentTargets: [],
|
||||||
@@ -203,8 +205,15 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
|
|||||||
|
|
||||||
const ensureCompactionPromise = () => {
|
const ensureCompactionPromise = () => {
|
||||||
if (!state.compactionRetryPromise) {
|
if (!state.compactionRetryPromise) {
|
||||||
state.compactionRetryPromise = new Promise((resolve) => {
|
// Create a single promise that resolves when ALL pending compactions complete
|
||||||
|
// (tracked by pendingCompactionRetry counter, decremented in resolveCompactionRetry)
|
||||||
|
state.compactionRetryPromise = new Promise((resolve, reject) => {
|
||||||
state.compactionRetryResolve = resolve;
|
state.compactionRetryResolve = resolve;
|
||||||
|
state.compactionRetryReject = reject;
|
||||||
|
});
|
||||||
|
// Prevent unhandled rejection if rejected after all consumers have resolved
|
||||||
|
state.compactionRetryPromise.catch((err) => {
|
||||||
|
log.debug(`compaction promise rejected (no waiter): ${String(err)}`);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -222,6 +231,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
|
|||||||
if (state.pendingCompactionRetry === 0 && !state.compactionInFlight) {
|
if (state.pendingCompactionRetry === 0 && !state.compactionInFlight) {
|
||||||
state.compactionRetryResolve?.();
|
state.compactionRetryResolve?.();
|
||||||
state.compactionRetryResolve = undefined;
|
state.compactionRetryResolve = undefined;
|
||||||
|
state.compactionRetryReject = undefined;
|
||||||
state.compactionRetryPromise = null;
|
state.compactionRetryPromise = null;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -230,6 +240,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
|
|||||||
if (state.pendingCompactionRetry === 0 && !state.compactionInFlight) {
|
if (state.pendingCompactionRetry === 0 && !state.compactionInFlight) {
|
||||||
state.compactionRetryResolve?.();
|
state.compactionRetryResolve?.();
|
||||||
state.compactionRetryResolve = undefined;
|
state.compactionRetryResolve = undefined;
|
||||||
|
state.compactionRetryReject = undefined;
|
||||||
state.compactionRetryPromise = null;
|
state.compactionRetryPromise = null;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -608,13 +619,47 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
|
|||||||
getCompactionCount: () => compactionCount,
|
getCompactionCount: () => compactionCount,
|
||||||
};
|
};
|
||||||
|
|
||||||
const unsubscribe = params.session.subscribe(createEmbeddedPiSessionEventHandler(ctx));
|
const sessionUnsubscribe = params.session.subscribe(createEmbeddedPiSessionEventHandler(ctx));
|
||||||
|
|
||||||
|
const unsubscribe = () => {
|
||||||
|
if (state.unsubscribed) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Mark as unsubscribed FIRST to prevent waitForCompactionRetry from creating
|
||||||
|
// new un-resolvable promises during teardown.
|
||||||
|
state.unsubscribed = true;
|
||||||
|
// Reject pending compaction wait to unblock awaiting code.
|
||||||
|
// Don't resolve, as that would incorrectly signal "compaction complete" when it's still in-flight.
|
||||||
|
if (state.compactionRetryPromise) {
|
||||||
|
log.debug(`unsubscribe: rejecting compaction wait runId=${params.runId}`);
|
||||||
|
const reject = state.compactionRetryReject;
|
||||||
|
state.compactionRetryResolve = undefined;
|
||||||
|
state.compactionRetryReject = undefined;
|
||||||
|
state.compactionRetryPromise = null;
|
||||||
|
// Reject with AbortError so it's caught by isAbortError() check in cleanup paths
|
||||||
|
const abortErr = new Error("Unsubscribed during compaction");
|
||||||
|
abortErr.name = "AbortError";
|
||||||
|
reject?.(abortErr);
|
||||||
|
}
|
||||||
|
// Cancel any in-flight compaction to prevent resource leaks when unsubscribing.
|
||||||
|
// Only abort if compaction is actually running to avoid unnecessary work.
|
||||||
|
if (params.session.isCompacting) {
|
||||||
|
log.debug(`unsubscribe: aborting in-flight compaction runId=${params.runId}`);
|
||||||
|
try {
|
||||||
|
params.session.abortCompaction();
|
||||||
|
} catch (err) {
|
||||||
|
log.warn(`unsubscribe: compaction abort failed runId=${params.runId} err=${String(err)}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sessionUnsubscribe();
|
||||||
|
};
|
||||||
|
|
||||||
return {
|
return {
|
||||||
assistantTexts,
|
assistantTexts,
|
||||||
toolMetas,
|
toolMetas,
|
||||||
unsubscribe,
|
unsubscribe,
|
||||||
isCompacting: () => state.compactionInFlight || state.pendingCompactionRetry > 0,
|
isCompacting: () => state.compactionInFlight || state.pendingCompactionRetry > 0,
|
||||||
|
isCompactionInFlight: () => state.compactionInFlight,
|
||||||
getMessagingToolSentTexts: () => messagingToolSentTexts.slice(),
|
getMessagingToolSentTexts: () => messagingToolSentTexts.slice(),
|
||||||
getMessagingToolSentTargets: () => messagingToolSentTargets.slice(),
|
getMessagingToolSentTargets: () => messagingToolSentTargets.slice(),
|
||||||
// Returns true if any messaging tool successfully sent a message.
|
// Returns true if any messaging tool successfully sent a message.
|
||||||
@@ -625,15 +670,27 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
|
|||||||
getUsageTotals,
|
getUsageTotals,
|
||||||
getCompactionCount: () => compactionCount,
|
getCompactionCount: () => compactionCount,
|
||||||
waitForCompactionRetry: () => {
|
waitForCompactionRetry: () => {
|
||||||
|
// Reject after unsubscribe so callers treat it as cancellation, not success
|
||||||
|
if (state.unsubscribed) {
|
||||||
|
const err = new Error("Unsubscribed during compaction wait");
|
||||||
|
err.name = "AbortError";
|
||||||
|
return Promise.reject(err);
|
||||||
|
}
|
||||||
if (state.compactionInFlight || state.pendingCompactionRetry > 0) {
|
if (state.compactionInFlight || state.pendingCompactionRetry > 0) {
|
||||||
ensureCompactionPromise();
|
ensureCompactionPromise();
|
||||||
return state.compactionRetryPromise ?? Promise.resolve();
|
return state.compactionRetryPromise ?? Promise.resolve();
|
||||||
}
|
}
|
||||||
return new Promise<void>((resolve) => {
|
return new Promise<void>((resolve, reject) => {
|
||||||
queueMicrotask(() => {
|
queueMicrotask(() => {
|
||||||
|
if (state.unsubscribed) {
|
||||||
|
const err = new Error("Unsubscribed during compaction wait");
|
||||||
|
err.name = "AbortError";
|
||||||
|
reject(err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (state.compactionInFlight || state.pendingCompactionRetry > 0) {
|
if (state.compactionInFlight || state.pendingCompactionRetry > 0) {
|
||||||
ensureCompactionPromise();
|
ensureCompactionPromise();
|
||||||
void (state.compactionRetryPromise ?? Promise.resolve()).then(resolve);
|
void (state.compactionRetryPromise ?? Promise.resolve()).then(resolve, reject);
|
||||||
} else {
|
} else {
|
||||||
resolve();
|
resolve();
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user