mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-09 08:47:40 +00:00
perf(test): speed up compaction hook wiring tests
This commit is contained in:
77
src/agents/pi-embedded-subscribe.handlers.compaction.ts
Normal file
77
src/agents/pi-embedded-subscribe.handlers.compaction.ts
Normal file
@@ -0,0 +1,77 @@
|
|||||||
|
import type { AgentEvent } from "@mariozechner/pi-agent-core";
|
||||||
|
import type { EmbeddedPiSubscribeContext } from "./pi-embedded-subscribe.handlers.types.js";
|
||||||
|
import { emitAgentEvent } from "../infra/agent-events.js";
|
||||||
|
import { getGlobalHookRunner } from "../plugins/hook-runner-global.js";
|
||||||
|
|
||||||
|
export function handleAutoCompactionStart(ctx: EmbeddedPiSubscribeContext) {
|
||||||
|
ctx.state.compactionInFlight = true;
|
||||||
|
ctx.incrementCompactionCount();
|
||||||
|
ctx.ensureCompactionPromise();
|
||||||
|
ctx.log.debug(`embedded run compaction start: runId=${ctx.params.runId}`);
|
||||||
|
emitAgentEvent({
|
||||||
|
runId: ctx.params.runId,
|
||||||
|
stream: "compaction",
|
||||||
|
data: { phase: "start" },
|
||||||
|
});
|
||||||
|
void ctx.params.onAgentEvent?.({
|
||||||
|
stream: "compaction",
|
||||||
|
data: { phase: "start" },
|
||||||
|
});
|
||||||
|
|
||||||
|
// Run before_compaction plugin hook (fire-and-forget)
|
||||||
|
const hookRunner = getGlobalHookRunner();
|
||||||
|
if (hookRunner?.hasHooks("before_compaction")) {
|
||||||
|
void hookRunner
|
||||||
|
.runBeforeCompaction(
|
||||||
|
{
|
||||||
|
messageCount: ctx.params.session.messages?.length ?? 0,
|
||||||
|
},
|
||||||
|
{},
|
||||||
|
)
|
||||||
|
.catch((err) => {
|
||||||
|
ctx.log.warn(`before_compaction hook failed: ${String(err)}`);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export function handleAutoCompactionEnd(
|
||||||
|
ctx: EmbeddedPiSubscribeContext,
|
||||||
|
evt: AgentEvent & { willRetry?: unknown },
|
||||||
|
) {
|
||||||
|
ctx.state.compactionInFlight = false;
|
||||||
|
const willRetry = Boolean(evt.willRetry);
|
||||||
|
if (willRetry) {
|
||||||
|
ctx.noteCompactionRetry();
|
||||||
|
ctx.resetForCompactionRetry();
|
||||||
|
ctx.log.debug(`embedded run compaction retry: runId=${ctx.params.runId}`);
|
||||||
|
} else {
|
||||||
|
ctx.maybeResolveCompactionWait();
|
||||||
|
}
|
||||||
|
emitAgentEvent({
|
||||||
|
runId: ctx.params.runId,
|
||||||
|
stream: "compaction",
|
||||||
|
data: { phase: "end", willRetry },
|
||||||
|
});
|
||||||
|
void ctx.params.onAgentEvent?.({
|
||||||
|
stream: "compaction",
|
||||||
|
data: { phase: "end", willRetry },
|
||||||
|
});
|
||||||
|
|
||||||
|
// Run after_compaction plugin hook (fire-and-forget)
|
||||||
|
if (!willRetry) {
|
||||||
|
const hookRunnerEnd = getGlobalHookRunner();
|
||||||
|
if (hookRunnerEnd?.hasHooks("after_compaction")) {
|
||||||
|
void hookRunnerEnd
|
||||||
|
.runAfterCompaction(
|
||||||
|
{
|
||||||
|
messageCount: ctx.params.session.messages?.length ?? 0,
|
||||||
|
compactedCount: ctx.getCompactionCount(),
|
||||||
|
},
|
||||||
|
{},
|
||||||
|
)
|
||||||
|
.catch((err) => {
|
||||||
|
ctx.log.warn(`after_compaction hook failed: ${String(err)}`);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -2,10 +2,14 @@ import type { AgentEvent } from "@mariozechner/pi-agent-core";
|
|||||||
import type { EmbeddedPiSubscribeContext } from "./pi-embedded-subscribe.handlers.types.js";
|
import type { EmbeddedPiSubscribeContext } from "./pi-embedded-subscribe.handlers.types.js";
|
||||||
import { emitAgentEvent } from "../infra/agent-events.js";
|
import { emitAgentEvent } from "../infra/agent-events.js";
|
||||||
import { createInlineCodeState } from "../markdown/code-spans.js";
|
import { createInlineCodeState } from "../markdown/code-spans.js";
|
||||||
import { getGlobalHookRunner } from "../plugins/hook-runner-global.js";
|
|
||||||
import { formatAssistantErrorText } from "./pi-embedded-helpers.js";
|
import { formatAssistantErrorText } from "./pi-embedded-helpers.js";
|
||||||
import { isAssistantMessage } from "./pi-embedded-utils.js";
|
import { isAssistantMessage } from "./pi-embedded-utils.js";
|
||||||
|
|
||||||
|
export {
|
||||||
|
handleAutoCompactionEnd,
|
||||||
|
handleAutoCompactionStart,
|
||||||
|
} from "./pi-embedded-subscribe.handlers.compaction.js";
|
||||||
|
|
||||||
export function handleAgentStart(ctx: EmbeddedPiSubscribeContext) {
|
export function handleAgentStart(ctx: EmbeddedPiSubscribeContext) {
|
||||||
ctx.log.debug(`embedded run agent start: runId=${ctx.params.runId}`);
|
ctx.log.debug(`embedded run agent start: runId=${ctx.params.runId}`);
|
||||||
emitAgentEvent({
|
emitAgentEvent({
|
||||||
@@ -22,79 +26,6 @@ export function handleAgentStart(ctx: EmbeddedPiSubscribeContext) {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
export function handleAutoCompactionStart(ctx: EmbeddedPiSubscribeContext) {
|
|
||||||
ctx.state.compactionInFlight = true;
|
|
||||||
ctx.incrementCompactionCount();
|
|
||||||
ctx.ensureCompactionPromise();
|
|
||||||
ctx.log.debug(`embedded run compaction start: runId=${ctx.params.runId}`);
|
|
||||||
emitAgentEvent({
|
|
||||||
runId: ctx.params.runId,
|
|
||||||
stream: "compaction",
|
|
||||||
data: { phase: "start" },
|
|
||||||
});
|
|
||||||
void ctx.params.onAgentEvent?.({
|
|
||||||
stream: "compaction",
|
|
||||||
data: { phase: "start" },
|
|
||||||
});
|
|
||||||
|
|
||||||
// Run before_compaction plugin hook (fire-and-forget)
|
|
||||||
const hookRunner = getGlobalHookRunner();
|
|
||||||
if (hookRunner?.hasHooks("before_compaction")) {
|
|
||||||
void hookRunner
|
|
||||||
.runBeforeCompaction(
|
|
||||||
{
|
|
||||||
messageCount: ctx.params.session.messages?.length ?? 0,
|
|
||||||
},
|
|
||||||
{},
|
|
||||||
)
|
|
||||||
.catch((err) => {
|
|
||||||
ctx.log.warn(`before_compaction hook failed: ${String(err)}`);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export function handleAutoCompactionEnd(
|
|
||||||
ctx: EmbeddedPiSubscribeContext,
|
|
||||||
evt: AgentEvent & { willRetry?: unknown },
|
|
||||||
) {
|
|
||||||
ctx.state.compactionInFlight = false;
|
|
||||||
const willRetry = Boolean(evt.willRetry);
|
|
||||||
if (willRetry) {
|
|
||||||
ctx.noteCompactionRetry();
|
|
||||||
ctx.resetForCompactionRetry();
|
|
||||||
ctx.log.debug(`embedded run compaction retry: runId=${ctx.params.runId}`);
|
|
||||||
} else {
|
|
||||||
ctx.maybeResolveCompactionWait();
|
|
||||||
}
|
|
||||||
emitAgentEvent({
|
|
||||||
runId: ctx.params.runId,
|
|
||||||
stream: "compaction",
|
|
||||||
data: { phase: "end", willRetry },
|
|
||||||
});
|
|
||||||
void ctx.params.onAgentEvent?.({
|
|
||||||
stream: "compaction",
|
|
||||||
data: { phase: "end", willRetry },
|
|
||||||
});
|
|
||||||
|
|
||||||
// Run after_compaction plugin hook (fire-and-forget)
|
|
||||||
if (!willRetry) {
|
|
||||||
const hookRunnerEnd = getGlobalHookRunner();
|
|
||||||
if (hookRunnerEnd?.hasHooks("after_compaction")) {
|
|
||||||
void hookRunnerEnd
|
|
||||||
.runAfterCompaction(
|
|
||||||
{
|
|
||||||
messageCount: ctx.params.session.messages?.length ?? 0,
|
|
||||||
compactedCount: ctx.getCompactionCount(),
|
|
||||||
},
|
|
||||||
{},
|
|
||||||
)
|
|
||||||
.catch((err) => {
|
|
||||||
ctx.log.warn(`after_compaction hook failed: ${String(err)}`);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) {
|
export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext) {
|
||||||
const lastAssistant = ctx.state.lastAssistant;
|
const lastAssistant = ctx.state.lastAssistant;
|
||||||
const isError = isAssistantMessage(lastAssistant) && lastAssistant.stopReason === "error";
|
const isError = isAssistantMessage(lastAssistant) && lastAssistant.stopReason === "error";
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ describe("compaction hook wiring", () => {
|
|||||||
hookMocks.runner.hasHooks.mockReturnValue(true);
|
hookMocks.runner.hasHooks.mockReturnValue(true);
|
||||||
|
|
||||||
const { handleAutoCompactionStart } =
|
const { handleAutoCompactionStart } =
|
||||||
await import("../agents/pi-embedded-subscribe.handlers.lifecycle.js");
|
await import("../agents/pi-embedded-subscribe.handlers.compaction.js");
|
||||||
|
|
||||||
const ctx = {
|
const ctx = {
|
||||||
params: { runId: "r1", session: { messages: [1, 2, 3] } },
|
params: { runId: "r1", session: { messages: [1, 2, 3] } },
|
||||||
@@ -45,9 +45,7 @@ describe("compaction hook wiring", () => {
|
|||||||
|
|
||||||
handleAutoCompactionStart(ctx as never);
|
handleAutoCompactionStart(ctx as never);
|
||||||
|
|
||||||
await vi.waitFor(() => {
|
expect(hookMocks.runner.runBeforeCompaction).toHaveBeenCalledTimes(1);
|
||||||
expect(hookMocks.runner.runBeforeCompaction).toHaveBeenCalledTimes(1);
|
|
||||||
});
|
|
||||||
|
|
||||||
const [event] = hookMocks.runner.runBeforeCompaction.mock.calls[0];
|
const [event] = hookMocks.runner.runBeforeCompaction.mock.calls[0];
|
||||||
expect(event.messageCount).toBe(3);
|
expect(event.messageCount).toBe(3);
|
||||||
@@ -57,7 +55,7 @@ describe("compaction hook wiring", () => {
|
|||||||
hookMocks.runner.hasHooks.mockReturnValue(true);
|
hookMocks.runner.hasHooks.mockReturnValue(true);
|
||||||
|
|
||||||
const { handleAutoCompactionEnd } =
|
const { handleAutoCompactionEnd } =
|
||||||
await import("../agents/pi-embedded-subscribe.handlers.lifecycle.js");
|
await import("../agents/pi-embedded-subscribe.handlers.compaction.js");
|
||||||
|
|
||||||
const ctx = {
|
const ctx = {
|
||||||
params: { runId: "r2", session: { messages: [1, 2] } },
|
params: { runId: "r2", session: { messages: [1, 2] } },
|
||||||
@@ -75,9 +73,7 @@ describe("compaction hook wiring", () => {
|
|||||||
} as never,
|
} as never,
|
||||||
);
|
);
|
||||||
|
|
||||||
await vi.waitFor(() => {
|
expect(hookMocks.runner.runAfterCompaction).toHaveBeenCalledTimes(1);
|
||||||
expect(hookMocks.runner.runAfterCompaction).toHaveBeenCalledTimes(1);
|
|
||||||
});
|
|
||||||
|
|
||||||
const [event] = hookMocks.runner.runAfterCompaction.mock.calls[0];
|
const [event] = hookMocks.runner.runAfterCompaction.mock.calls[0];
|
||||||
expect(event.messageCount).toBe(2);
|
expect(event.messageCount).toBe(2);
|
||||||
@@ -88,7 +84,7 @@ describe("compaction hook wiring", () => {
|
|||||||
hookMocks.runner.hasHooks.mockReturnValue(true);
|
hookMocks.runner.hasHooks.mockReturnValue(true);
|
||||||
|
|
||||||
const { handleAutoCompactionEnd } =
|
const { handleAutoCompactionEnd } =
|
||||||
await import("../agents/pi-embedded-subscribe.handlers.lifecycle.js");
|
await import("../agents/pi-embedded-subscribe.handlers.compaction.js");
|
||||||
|
|
||||||
const ctx = {
|
const ctx = {
|
||||||
params: { runId: "r3", session: { messages: [] } },
|
params: { runId: "r3", session: { messages: [] } },
|
||||||
@@ -107,7 +103,6 @@ describe("compaction hook wiring", () => {
|
|||||||
} as never,
|
} as never,
|
||||||
);
|
);
|
||||||
|
|
||||||
await new Promise((r) => setTimeout(r, 50));
|
|
||||||
expect(hookMocks.runner.runAfterCompaction).not.toHaveBeenCalled();
|
expect(hookMocks.runner.runAfterCompaction).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user