diff --git a/src/agents/pi-embedded-subscribe.handlers.compaction.ts b/src/agents/pi-embedded-subscribe.handlers.compaction.ts new file mode 100644 index 00000000000..fa7c46b8bdd --- /dev/null +++ b/src/agents/pi-embedded-subscribe.handlers.compaction.ts @@ -0,0 +1,77 @@ +import type { AgentEvent } from "@mariozechner/pi-agent-core"; +import type { EmbeddedPiSubscribeContext } from "./pi-embedded-subscribe.handlers.types.js"; +import { emitAgentEvent } from "../infra/agent-events.js"; +import { getGlobalHookRunner } from "../plugins/hook-runner-global.js"; + +export function handleAutoCompactionStart(ctx: EmbeddedPiSubscribeContext) { + ctx.state.compactionInFlight = true; + ctx.incrementCompactionCount(); + ctx.ensureCompactionPromise(); + ctx.log.debug(`embedded run compaction start: runId=${ctx.params.runId}`); + emitAgentEvent({ + runId: ctx.params.runId, + stream: "compaction", + data: { phase: "start" }, + }); + void ctx.params.onAgentEvent?.({ + stream: "compaction", + data: { phase: "start" }, + }); + + // Run before_compaction plugin hook (fire-and-forget) + const hookRunner = getGlobalHookRunner(); + if (hookRunner?.hasHooks("before_compaction")) { + void hookRunner + .runBeforeCompaction( + { + messageCount: ctx.params.session.messages?.length ?? 0, + }, + {}, + ) + .catch((err) => { + ctx.log.warn(`before_compaction hook failed: ${String(err)}`); + }); + } +} + +export function handleAutoCompactionEnd( + ctx: EmbeddedPiSubscribeContext, + evt: AgentEvent & { willRetry?: unknown }, +) { + ctx.state.compactionInFlight = false; + const willRetry = Boolean(evt.willRetry); + if (willRetry) { + ctx.noteCompactionRetry(); + ctx.resetForCompactionRetry(); + ctx.log.debug(`embedded run compaction retry: runId=${ctx.params.runId}`); + } else { + ctx.maybeResolveCompactionWait(); + } + emitAgentEvent({ + runId: ctx.params.runId, + stream: "compaction", + data: { phase: "end", willRetry }, + }); + void ctx.params.onAgentEvent?.({ + stream: "compaction", + data: { phase: "end", willRetry }, + }); + + // Run after_compaction plugin hook (fire-and-forget) + if (!willRetry) { + const hookRunnerEnd = getGlobalHookRunner(); + if (hookRunnerEnd?.hasHooks("after_compaction")) { + void hookRunnerEnd + .runAfterCompaction( + { + messageCount: ctx.params.session.messages?.length ?? 0, + compactedCount: ctx.getCompactionCount(), + }, + {}, + ) + .catch((err) => { + ctx.log.warn(`after_compaction hook failed: ${String(err)}`); + }); + } + } +} diff --git a/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts b/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts index ffa1eeee98b..49dbd7367b0 100644 --- a/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts +++ b/src/agents/pi-embedded-subscribe.handlers.lifecycle.ts @@ -2,10 +2,14 @@ import type { AgentEvent } from "@mariozechner/pi-agent-core"; import type { EmbeddedPiSubscribeContext } from "./pi-embedded-subscribe.handlers.types.js"; import { emitAgentEvent } from "../infra/agent-events.js"; import { createInlineCodeState } from "../markdown/code-spans.js"; -import { getGlobalHookRunner } from "../plugins/hook-runner-global.js"; import { formatAssistantErrorText } from "./pi-embedded-helpers.js"; import { isAssistantMessage } from "./pi-embedded-utils.js"; +export { + handleAutoCompactionEnd, + handleAutoCompactionStart, +} from "./pi-embedded-subscribe.handlers.compaction.js"; + export function handleAgentStart(ctx: EmbeddedPiSubscribeContext) { ctx.log.debug(`embedded run agent start: runId=${ctx.params.runId}`); emitAgentEvent({ @@ -22,79 +26,6 @@ export function handleAgentStart(ctx: EmbeddedPiSubscribeContext) { }); } -export function handleAutoCompactionStart(ctx: EmbeddedPiSubscribeContext) { - ctx.state.compactionInFlight = true; - ctx.incrementCompactionCount(); - ctx.ensureCompactionPromise(); - ctx.log.debug(`embedded run compaction start: runId=${ctx.params.runId}`); - emitAgentEvent({ - runId: ctx.params.runId, - stream: "compaction", - data: { phase: "start" }, - }); - void ctx.params.onAgentEvent?.({ - stream: "compaction", - data: { phase: "start" }, - }); - - // Run before_compaction plugin hook (fire-and-forget) - const hookRunner = getGlobalHookRunner(); - if (hookRunner?.hasHooks("before_compaction")) { - void hookRunner - .runBeforeCompaction( - { - messageCount: ctx.params.session.messages?.length ?? 0, - }, - {}, - ) - .catch((err) => { - ctx.log.warn(`before_compaction hook failed: ${String(err)}`); - }); - } -} - -export function handleAutoCompactionEnd( - ctx: EmbeddedPiSubscribeContext, - evt: AgentEvent & { willRetry?: unknown }, -) { - ctx.state.compactionInFlight = false; - const willRetry = Boolean(evt.willRetry); - if (willRetry) { - ctx.noteCompactionRetry(); - ctx.resetForCompactionRetry(); - ctx.log.debug(`embedded run compaction retry: runId=${ctx.params.runId}`); - } else { - ctx.maybeResolveCompactionWait(); - } - emitAgentEvent({ - runId: ctx.params.runId, - stream: "compaction", - data: { phase: "end", willRetry }, - }); - void ctx.params.onAgentEvent?.({ - stream: "compaction", - data: { phase: "end", willRetry }, - }); - - // Run after_compaction plugin hook (fire-and-forget) - if (!willRetry) { - const hookRunnerEnd = getGlobalHookRunner(); - if (hookRunnerEnd?.hasHooks("after_compaction")) { - void hookRunnerEnd - .runAfterCompaction( - { - messageCount: ctx.params.session.messages?.length ?? 0, - compactedCount: ctx.getCompactionCount(), - }, - {}, - ) - .catch((err) => { - ctx.log.warn(`after_compaction hook failed: ${String(err)}`); - }); - } - } -} - export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) { const lastAssistant = ctx.state.lastAssistant; const isError = isAssistantMessage(lastAssistant) && lastAssistant.stopReason === "error"; diff --git a/src/plugins/wired-hooks-compaction.test.ts b/src/plugins/wired-hooks-compaction.test.ts index a298f80d154..fc5b6b83f89 100644 --- a/src/plugins/wired-hooks-compaction.test.ts +++ b/src/plugins/wired-hooks-compaction.test.ts @@ -33,7 +33,7 @@ describe("compaction hook wiring", () => { hookMocks.runner.hasHooks.mockReturnValue(true); const { handleAutoCompactionStart } = - await import("../agents/pi-embedded-subscribe.handlers.lifecycle.js"); + await import("../agents/pi-embedded-subscribe.handlers.compaction.js"); const ctx = { params: { runId: "r1", session: { messages: [1, 2, 3] } }, @@ -45,9 +45,7 @@ describe("compaction hook wiring", () => { handleAutoCompactionStart(ctx as never); - await vi.waitFor(() => { - expect(hookMocks.runner.runBeforeCompaction).toHaveBeenCalledTimes(1); - }); + expect(hookMocks.runner.runBeforeCompaction).toHaveBeenCalledTimes(1); const [event] = hookMocks.runner.runBeforeCompaction.mock.calls[0]; expect(event.messageCount).toBe(3); @@ -57,7 +55,7 @@ describe("compaction hook wiring", () => { hookMocks.runner.hasHooks.mockReturnValue(true); const { handleAutoCompactionEnd } = - await import("../agents/pi-embedded-subscribe.handlers.lifecycle.js"); + await import("../agents/pi-embedded-subscribe.handlers.compaction.js"); const ctx = { params: { runId: "r2", session: { messages: [1, 2] } }, @@ -75,9 +73,7 @@ describe("compaction hook wiring", () => { } as never, ); - await vi.waitFor(() => { - expect(hookMocks.runner.runAfterCompaction).toHaveBeenCalledTimes(1); - }); + expect(hookMocks.runner.runAfterCompaction).toHaveBeenCalledTimes(1); const [event] = hookMocks.runner.runAfterCompaction.mock.calls[0]; expect(event.messageCount).toBe(2); @@ -88,7 +84,7 @@ describe("compaction hook wiring", () => { hookMocks.runner.hasHooks.mockReturnValue(true); const { handleAutoCompactionEnd } = - await import("../agents/pi-embedded-subscribe.handlers.lifecycle.js"); + await import("../agents/pi-embedded-subscribe.handlers.compaction.js"); const ctx = { params: { runId: "r3", session: { messages: [] } }, @@ -107,7 +103,6 @@ describe("compaction hook wiring", () => { } as never, ); - await new Promise((r) => setTimeout(r, 50)); expect(hookMocks.runner.runAfterCompaction).not.toHaveBeenCalled(); }); });