mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-12 04:33:44 +00:00
feat(hooks): emit compaction lifecycle hooks (#16788)
This commit is contained in:
@@ -11,6 +11,7 @@ import { resolveHeartbeatPrompt } from "../../auto-reply/heartbeat.js";
|
||||
import type { ReasoningLevel, ThinkLevel } from "../../auto-reply/thinking.js";
|
||||
import { resolveChannelCapabilities } from "../../config/channel-capabilities.js";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js";
|
||||
import { getMachineDisplayName } from "../../infra/machine-name.js";
|
||||
import { generateSecureToken } from "../../infra/secure-random.js";
|
||||
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
|
||||
@@ -359,6 +360,7 @@ export async function compactEmbeddedPiSessionDirect(
|
||||
});
|
||||
|
||||
const sessionLabel = params.sessionKey ?? params.sessionId;
|
||||
const resolvedMessageProvider = params.messageChannel ?? params.messageProvider;
|
||||
const { contextFiles } = await resolveBootstrapContextForRun({
|
||||
workspaceDir: effectiveWorkspace,
|
||||
config: params.config,
|
||||
@@ -372,7 +374,7 @@ export async function compactEmbeddedPiSessionDirect(
|
||||
elevated: params.bashElevated,
|
||||
},
|
||||
sandbox,
|
||||
messageProvider: params.messageChannel ?? params.messageProvider,
|
||||
messageProvider: resolvedMessageProvider,
|
||||
agentAccountId: params.agentAccountId,
|
||||
sessionKey: sandboxSessionKey,
|
||||
sessionId: params.sessionId,
|
||||
@@ -577,7 +579,7 @@ export async function compactEmbeddedPiSessionDirect(
|
||||
});
|
||||
|
||||
const { session } = await createAgentSession({
|
||||
cwd: resolvedWorkspace,
|
||||
cwd: effectiveWorkspace,
|
||||
agentDir,
|
||||
authStorage,
|
||||
modelRegistry,
|
||||
@@ -609,10 +611,14 @@ export async function compactEmbeddedPiSessionDirect(
|
||||
const validated = transcriptPolicy.validateAnthropicTurns
|
||||
? validateAnthropicTurns(validatedGemini)
|
||||
: validatedGemini;
|
||||
// Capture full message history BEFORE limiting — plugins need the complete conversation
|
||||
const preCompactionMessages = [...session.messages];
|
||||
// Apply validated transcript to the live session even when no history limit is configured,
|
||||
// so compaction and hook metrics are based on the same message set.
|
||||
session.agent.replaceMessages(validated);
|
||||
// "Original" compaction metrics should describe the validated transcript that enters
|
||||
// limiting/compaction, not the raw on-disk session snapshot.
|
||||
const originalMessages = session.messages.slice();
|
||||
const truncated = limitHistoryTurns(
|
||||
validated,
|
||||
session.messages,
|
||||
getDmHistoryLimitFromSessionKey(params.sessionKey, params.config),
|
||||
);
|
||||
// Re-run tool_use/tool_result pairing repair after truncation, since
|
||||
@@ -624,34 +630,69 @@ export async function compactEmbeddedPiSessionDirect(
|
||||
if (limited.length > 0) {
|
||||
session.agent.replaceMessages(limited);
|
||||
}
|
||||
// Run before_compaction hooks (fire-and-forget).
|
||||
// The session JSONL already contains all messages on disk, so plugins
|
||||
// can read sessionFile asynchronously and process in parallel with
|
||||
// the compaction LLM call — no need to block or wait for after_compaction.
|
||||
const missingSessionKey = !params.sessionKey || !params.sessionKey.trim();
|
||||
const hookSessionKey = params.sessionKey?.trim() || params.sessionId;
|
||||
const hookRunner = getGlobalHookRunner();
|
||||
const hookCtx = {
|
||||
agentId: params.sessionKey?.split(":")[0] ?? "main",
|
||||
sessionKey: params.sessionKey,
|
||||
sessionId: params.sessionId,
|
||||
workspaceDir: params.workspaceDir,
|
||||
messageProvider: params.messageChannel ?? params.messageProvider,
|
||||
};
|
||||
if (hookRunner?.hasHooks("before_compaction")) {
|
||||
hookRunner
|
||||
.runBeforeCompaction(
|
||||
{
|
||||
messageCount: preCompactionMessages.length,
|
||||
compactingCount: limited.length,
|
||||
messages: preCompactionMessages,
|
||||
sessionFile: params.sessionFile,
|
||||
},
|
||||
hookCtx,
|
||||
)
|
||||
.catch((hookErr: unknown) => {
|
||||
log.warn(`before_compaction hook failed: ${String(hookErr)}`);
|
||||
});
|
||||
const messageCountOriginal = originalMessages.length;
|
||||
let tokenCountOriginal: number | undefined;
|
||||
try {
|
||||
tokenCountOriginal = 0;
|
||||
for (const message of originalMessages) {
|
||||
tokenCountOriginal += estimateTokens(message);
|
||||
}
|
||||
} catch {
|
||||
tokenCountOriginal = undefined;
|
||||
}
|
||||
const messageCountBefore = session.messages.length;
|
||||
let tokenCountBefore: number | undefined;
|
||||
try {
|
||||
tokenCountBefore = 0;
|
||||
for (const message of session.messages) {
|
||||
tokenCountBefore += estimateTokens(message);
|
||||
}
|
||||
} catch {
|
||||
tokenCountBefore = undefined;
|
||||
}
|
||||
// TODO(#7175): Consider exposing full message snapshots or pre-compaction injection
|
||||
// hooks; current events only report counts/metadata.
|
||||
try {
|
||||
const hookEvent = createInternalHookEvent("session", "compact:before", hookSessionKey, {
|
||||
sessionId: params.sessionId,
|
||||
missingSessionKey,
|
||||
messageCount: messageCountBefore,
|
||||
tokenCount: tokenCountBefore,
|
||||
messageCountOriginal,
|
||||
tokenCountOriginal,
|
||||
});
|
||||
await triggerInternalHook(hookEvent);
|
||||
} catch (err) {
|
||||
log.warn("session:compact:before hook failed", {
|
||||
errorMessage: err instanceof Error ? err.message : String(err),
|
||||
errorStack: err instanceof Error ? err.stack : undefined,
|
||||
});
|
||||
}
|
||||
if (hookRunner?.hasHooks("before_compaction")) {
|
||||
try {
|
||||
await hookRunner.runBeforeCompaction(
|
||||
{
|
||||
messageCount: messageCountBefore,
|
||||
tokenCount: tokenCountBefore,
|
||||
},
|
||||
{
|
||||
sessionId: params.sessionId,
|
||||
agentId: sessionAgentId,
|
||||
sessionKey: hookSessionKey,
|
||||
workspaceDir: effectiveWorkspace,
|
||||
messageProvider: resolvedMessageProvider,
|
||||
},
|
||||
);
|
||||
} catch (err) {
|
||||
log.warn("before_compaction hook failed", {
|
||||
errorMessage: err instanceof Error ? err.message : String(err),
|
||||
errorStack: err instanceof Error ? err.stack : undefined,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
const diagEnabled = log.isEnabled("debug");
|
||||
const preMetrics = diagEnabled ? summarizeCompactionMessages(session.messages) : undefined;
|
||||
if (diagEnabled && preMetrics) {
|
||||
@@ -679,6 +720,9 @@ export async function compactEmbeddedPiSessionDirect(
|
||||
}
|
||||
|
||||
const compactStartedAt = Date.now();
|
||||
// Measure compactedCount from the original pre-limiting transcript so compaction
|
||||
// lifecycle metrics represent total reduction through the compaction pipeline.
|
||||
const messageCountCompactionInput = messageCountOriginal;
|
||||
const result = await compactWithSafetyTimeout(() =>
|
||||
session.compact(params.customInstructions),
|
||||
);
|
||||
@@ -697,25 +741,8 @@ export async function compactEmbeddedPiSessionDirect(
|
||||
// If estimation fails, leave tokensAfter undefined
|
||||
tokensAfter = undefined;
|
||||
}
|
||||
// Run after_compaction hooks (fire-and-forget).
|
||||
// Also includes sessionFile for plugins that only need to act after
|
||||
// compaction completes (e.g. analytics, cleanup).
|
||||
if (hookRunner?.hasHooks("after_compaction")) {
|
||||
hookRunner
|
||||
.runAfterCompaction(
|
||||
{
|
||||
messageCount: session.messages.length,
|
||||
tokenCount: tokensAfter,
|
||||
compactedCount: limited.length - session.messages.length,
|
||||
sessionFile: params.sessionFile,
|
||||
},
|
||||
hookCtx,
|
||||
)
|
||||
.catch((hookErr) => {
|
||||
log.warn(`after_compaction hook failed: ${hookErr}`);
|
||||
});
|
||||
}
|
||||
|
||||
const messageCountAfter = session.messages.length;
|
||||
const compactedCount = Math.max(0, messageCountCompactionInput - messageCountAfter);
|
||||
const postMetrics = diagEnabled ? summarizeCompactionMessages(session.messages) : undefined;
|
||||
if (diagEnabled && preMetrics && postMetrics) {
|
||||
log.debug(
|
||||
@@ -731,6 +758,50 @@ export async function compactEmbeddedPiSessionDirect(
|
||||
`delta.estTokens=${typeof preMetrics.estTokens === "number" && typeof postMetrics.estTokens === "number" ? postMetrics.estTokens - preMetrics.estTokens : "unknown"}`,
|
||||
);
|
||||
}
|
||||
// TODO(#9611): Consider exposing compaction summaries or post-compaction injection;
|
||||
// current events only report summary metadata.
|
||||
try {
|
||||
const hookEvent = createInternalHookEvent("session", "compact:after", hookSessionKey, {
|
||||
sessionId: params.sessionId,
|
||||
missingSessionKey,
|
||||
messageCount: messageCountAfter,
|
||||
tokenCount: tokensAfter,
|
||||
compactedCount,
|
||||
summaryLength: typeof result.summary === "string" ? result.summary.length : undefined,
|
||||
tokensBefore: result.tokensBefore,
|
||||
tokensAfter,
|
||||
firstKeptEntryId: result.firstKeptEntryId,
|
||||
});
|
||||
await triggerInternalHook(hookEvent);
|
||||
} catch (err) {
|
||||
log.warn("session:compact:after hook failed", {
|
||||
errorMessage: err instanceof Error ? err.message : String(err),
|
||||
errorStack: err instanceof Error ? err.stack : undefined,
|
||||
});
|
||||
}
|
||||
if (hookRunner?.hasHooks("after_compaction")) {
|
||||
try {
|
||||
await hookRunner.runAfterCompaction(
|
||||
{
|
||||
messageCount: messageCountAfter,
|
||||
tokenCount: tokensAfter,
|
||||
compactedCount,
|
||||
},
|
||||
{
|
||||
sessionId: params.sessionId,
|
||||
agentId: sessionAgentId,
|
||||
sessionKey: hookSessionKey,
|
||||
workspaceDir: effectiveWorkspace,
|
||||
messageProvider: resolvedMessageProvider,
|
||||
},
|
||||
);
|
||||
} catch (err) {
|
||||
log.warn("after_compaction hook failed", {
|
||||
errorMessage: err instanceof Error ? err.message : String(err),
|
||||
errorStack: err instanceof Error ? err.stack : undefined,
|
||||
});
|
||||
}
|
||||
}
|
||||
return {
|
||||
ok: true,
|
||||
compacted: true,
|
||||
|
||||
Reference in New Issue
Block a user