diff --git a/src/agents/pi-embedded-runner/compact.hooks.test.ts b/src/agents/pi-embedded-runner/compact.hooks.test.ts index 9ef2a3efe76..7d09cf89759 100644 --- a/src/agents/pi-embedded-runner/compact.hooks.test.ts +++ b/src/agents/pi-embedded-runner/compact.hooks.test.ts @@ -160,7 +160,7 @@ vi.mock("../transcript-policy.js", () => ({ })); vi.mock("./extensions.js", () => ({ - buildEmbeddedExtensionFactories: vi.fn(() => []), + buildEmbeddedExtensionFactories: vi.fn(() => ({ factories: [] })), })); vi.mock("./history.js", () => ({ diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index 91f99571db4..64426e5f783 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -936,6 +936,40 @@ export async function compactEmbeddedPiSession( modelContextWindow: ceModel?.contextWindow, defaultTokens: DEFAULT_CONTEXT_TOKENS, }); + // When the context engine owns compaction, its compact() implementation + // bypasses compactEmbeddedPiSessionDirect (which fires the hooks internally). + // Fire before_compaction / after_compaction hooks here so plugin subscribers + // are notified regardless of which engine is active. + const engineOwnsCompaction = contextEngine.info.ownsCompaction === true; + const hookRunner = engineOwnsCompaction ? getGlobalHookRunner() : null; + const hookSessionKey = params.sessionKey?.trim() || params.sessionId; + const { sessionAgentId } = resolveSessionAgentIds({ + sessionKey: params.sessionKey, + config: params.config, + }); + const resolvedMessageProvider = params.messageChannel ?? params.messageProvider; + const hookCtx = { + sessionId: params.sessionId, + agentId: sessionAgentId, + sessionKey: hookSessionKey, + workspaceDir: params.workspaceDir, + messageProvider: resolvedMessageProvider, + }; + if (hookRunner?.hasHooks("before_compaction")) { + try { + await hookRunner.runBeforeCompaction( + { + messageCount: 0, + sessionFile: params.sessionFile, + }, + hookCtx, + ); + } catch (err) { + log.warn("before_compaction hook failed", { + errorMessage: err instanceof Error ? err.message : String(err), + }); + } + } const result = await contextEngine.compact({ sessionId: params.sessionId, sessionFile: params.sessionFile, @@ -944,6 +978,23 @@ export async function compactEmbeddedPiSession( force: params.trigger === "manual", runtimeContext: params as Record, }); + if (hookRunner?.hasHooks("after_compaction")) { + try { + await hookRunner.runAfterCompaction( + { + messageCount: 0, + compactedCount: 0, + tokenCount: result.result?.tokensAfter, + sessionFile: params.sessionFile, + }, + hookCtx, + ); + } catch (err) { + log.warn("after_compaction hook failed", { + errorMessage: err instanceof Error ? err.message : String(err), + }); + } + } return { ok: result.ok, compacted: result.compacted, diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index a28d74bf71e..6695f174801 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -1028,37 +1028,45 @@ export async function runEmbeddedPiAgent( log.warn( `context overflow detected (attempt ${overflowCompactionAttempts}/${MAX_OVERFLOW_COMPACTION_ATTEMPTS}); attempting auto-compaction for ${provider}/${modelId}`, ); - const compactResult = await contextEngine.compact({ - sessionId: params.sessionId, - sessionFile: params.sessionFile, - tokenBudget: ctxInfo.tokens, - force: true, - compactionTarget: "budget", - runtimeContext: { - sessionKey: params.sessionKey, - messageChannel: params.messageChannel, - messageProvider: params.messageProvider, - agentAccountId: params.agentAccountId, - authProfileId: lastProfileId, - workspaceDir: resolvedWorkspace, - agentDir, - config: params.config, - skillsSnapshot: params.skillsSnapshot, - senderIsOwner: params.senderIsOwner, - provider, - model: modelId, - runId: params.runId, - thinkLevel, - reasoningLevel: params.reasoningLevel, - bashElevated: params.bashElevated, - extraSystemPrompt: params.extraSystemPrompt, - ownerNumbers: params.ownerNumbers, - trigger: "overflow", - diagId: overflowDiagId, - attempt: overflowCompactionAttempts, - maxAttempts: MAX_OVERFLOW_COMPACTION_ATTEMPTS, - }, - }); + let compactResult: Awaited>; + try { + compactResult = await contextEngine.compact({ + sessionId: params.sessionId, + sessionFile: params.sessionFile, + tokenBudget: ctxInfo.tokens, + force: true, + compactionTarget: "budget", + runtimeContext: { + sessionKey: params.sessionKey, + messageChannel: params.messageChannel, + messageProvider: params.messageProvider, + agentAccountId: params.agentAccountId, + authProfileId: lastProfileId, + workspaceDir: resolvedWorkspace, + agentDir, + config: params.config, + skillsSnapshot: params.skillsSnapshot, + senderIsOwner: params.senderIsOwner, + provider, + model: modelId, + runId: params.runId, + thinkLevel, + reasoningLevel: params.reasoningLevel, + bashElevated: params.bashElevated, + extraSystemPrompt: params.extraSystemPrompt, + ownerNumbers: params.ownerNumbers, + trigger: "overflow", + diagId: overflowDiagId, + attempt: overflowCompactionAttempts, + maxAttempts: MAX_OVERFLOW_COMPACTION_ATTEMPTS, + }, + }); + } catch (compactErr) { + log.warn( + `contextEngine.compact() threw during overflow recovery for ${provider}/${modelId}: ${String(compactErr)}`, + ); + compactResult = { ok: false, compacted: false, reason: String(compactErr) }; + } if (compactResult.compacted) { autoCompactionCount += 1; log.info(`auto-compaction succeeded for ${provider}/${modelId}; retrying prompt`);