feat(context-engine): plumb sessionKey into all ContextEngine methods (#44157)

Merged via squash.

Prepared head SHA: 0b341f6f4c
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Reviewed-by: @jalehman
This commit is contained in:
Josh Lehman
2026-03-12 12:43:36 -07:00
committed by GitHub
parent e525957b4f
commit 50cc375c11
9 changed files with 285 additions and 13 deletions

View File

@@ -53,6 +53,7 @@ Docs: https://docs.openclaw.ai
- Doctor/gateway service audit: canonicalize service entrypoint paths before comparing them so symlink-vs-realpath installs no longer trigger false "entrypoint does not match the current install" repair prompts. (#43882) Thanks @ngutman.
- Doctor/gateway service audit: earlier groundwork for this fix landed in the superseded #28338 branch. Thanks @realriphub.
- Gateway/session stores: regenerate the Swift push-test protocol models and align Windows native session-store realpath handling so protocol checks and sync session discovery stop drifting on Windows. (#44266) thanks @jalehman.
- Context engine/session routing: forward optional `sessionKey` through context-engine lifecycle calls so plugins can see structured routing metadata during bootstrap, assembly, post-turn ingestion, and compaction. (#44157) thanks @jalehman.
## 2026.3.11

View File

@@ -991,6 +991,7 @@ export async function compactEmbeddedPiSession(
}
const result = await contextEngine.compact({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
tokenBudget: ceCtxInfo.tokens,
currentTokenCount: params.currentTokenCount,

View File

@@ -1053,6 +1053,7 @@ export async function runEmbeddedPiAgent(
try {
compactResult = await contextEngine.compact({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
tokenBudget: ctxInfo.tokens,
...(observedOverflowTokens !== undefined

View File

@@ -1,6 +1,7 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import type { Api, Model } from "@mariozechner/pi-ai";
import type {
AuthStorage,
@@ -9,6 +10,14 @@ import type {
ToolDefinition,
} from "@mariozechner/pi-coding-agent";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type {
AssembleResult,
BootstrapResult,
CompactResult,
ContextEngineInfo,
IngestBatchResult,
IngestResult,
} from "../../../context-engine/types.js";
import { createHostSandboxFsBridge } from "../../test-helpers/host-sandbox-fs-bridge.js";
import { createPiToolsSandboxContext } from "../../test-helpers/pi-tools-sandbox-context.js";
@@ -23,7 +32,7 @@ const hoisted = vi.hoisted(() => {
getLeafEntry: vi.fn(() => null),
branch: vi.fn(),
resetLeaf: vi.fn(),
buildSessionContext: vi.fn(() => ({ messages: [] })),
buildSessionContext: vi.fn<() => { messages: AgentMessage[] }>(() => ({ messages: [] })),
appendCustomEntry: vi.fn(),
};
return {
@@ -240,6 +249,14 @@ function createSubscriptionMock() {
};
}
const testModel = {
api: "openai-completions",
provider: "openai",
compat: {},
contextWindow: 8192,
input: ["text"],
} as unknown as Model<Api>;
describe("runEmbeddedAttempt sessions_spawn workspace inheritance", () => {
const tempPaths: string[] = [];
@@ -326,14 +343,6 @@ describe("runEmbeddedAttempt sessions_spawn workspace inheritance", () => {
},
);
const model = {
api: "openai-completions",
provider: "openai",
compat: {},
contextWindow: 8192,
input: ["text"],
} as unknown as Model<Api>;
const result = await runEmbeddedAttempt({
sessionId: "embedded-session",
sessionKey: "agent:main:main",
@@ -346,7 +355,7 @@ describe("runEmbeddedAttempt sessions_spawn workspace inheritance", () => {
runId: "run-1",
provider: "openai",
modelId: "gpt-test",
model,
model: testModel,
authStorage: {} as AuthStorage,
modelRegistry: {} as ModelRegistry,
thinkLevel: "off",
@@ -372,3 +381,243 @@ describe("runEmbeddedAttempt sessions_spawn workspace inheritance", () => {
);
});
});
describe("runEmbeddedAttempt context engine sessionKey forwarding", () => {
const tempPaths: string[] = [];
const sessionKey = "agent:main:discord:channel:test-ctx-engine";
beforeEach(() => {
hoisted.createAgentSessionMock.mockReset();
hoisted.sessionManagerOpenMock.mockReset().mockReturnValue(hoisted.sessionManager);
hoisted.resolveSandboxContextMock.mockReset();
hoisted.subscribeEmbeddedPiSessionMock.mockReset().mockImplementation(createSubscriptionMock);
hoisted.acquireSessionWriteLockMock.mockReset().mockResolvedValue({
release: async () => {},
});
hoisted.sessionManager.getLeafEntry.mockReset().mockReturnValue(null);
hoisted.sessionManager.branch.mockReset();
hoisted.sessionManager.resetLeaf.mockReset();
hoisted.sessionManager.appendCustomEntry.mockReset();
});
afterEach(async () => {
while (tempPaths.length > 0) {
const target = tempPaths.pop();
if (target) {
await fs.rm(target, { recursive: true, force: true });
}
}
});
// Build a minimal real attempt harness so lifecycle hooks run against
// the actual runner flow instead of a hand-written wrapper.
async function runAttemptWithContextEngine(contextEngine: {
bootstrap?: (params: {
sessionId: string;
sessionKey?: string;
sessionFile: string;
}) => Promise<BootstrapResult>;
assemble: (params: {
sessionId: string;
sessionKey?: string;
messages: AgentMessage[];
tokenBudget?: number;
}) => Promise<AssembleResult>;
afterTurn?: (params: {
sessionId: string;
sessionKey?: string;
sessionFile: string;
messages: AgentMessage[];
prePromptMessageCount: number;
tokenBudget?: number;
runtimeContext?: Record<string, unknown>;
}) => Promise<void>;
ingestBatch?: (params: {
sessionId: string;
sessionKey?: string;
messages: AgentMessage[];
}) => Promise<IngestBatchResult>;
ingest?: (params: {
sessionId: string;
sessionKey?: string;
message: AgentMessage;
}) => Promise<IngestResult>;
compact?: (params: {
sessionId: string;
sessionKey?: string;
sessionFile: string;
tokenBudget?: number;
}) => Promise<CompactResult>;
info?: Partial<ContextEngineInfo>;
}) {
const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-ctx-engine-workspace-"));
const agentDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-ctx-engine-agent-"));
const sessionFile = path.join(workspaceDir, "session.jsonl");
tempPaths.push(workspaceDir, agentDir);
await fs.writeFile(sessionFile, "", "utf8");
const seedMessages: AgentMessage[] = [
{ role: "user", content: "seed", timestamp: 1 } as AgentMessage,
];
const infoId = contextEngine.info?.id ?? "test-context-engine";
const infoName = contextEngine.info?.name ?? "Test Context Engine";
const infoVersion = contextEngine.info?.version ?? "0.0.1";
hoisted.sessionManager.buildSessionContext
.mockReset()
.mockReturnValue({ messages: seedMessages });
hoisted.createAgentSessionMock.mockImplementation(async () => {
const session: MutableSession = {
sessionId: "embedded-session",
messages: [],
isCompacting: false,
isStreaming: false,
agent: {
replaceMessages: (messages: unknown[]) => {
session.messages = [...messages];
},
},
prompt: async () => {
session.messages = [
...session.messages,
{ role: "assistant", content: "done", timestamp: 2 },
];
},
abort: async () => {},
dispose: () => {},
steer: async () => {},
};
return { session };
});
return await runEmbeddedAttempt({
sessionId: "embedded-session",
sessionKey,
sessionFile,
workspaceDir,
agentDir,
config: {},
prompt: "hello",
timeoutMs: 10_000,
runId: "run-context-engine-forwarding",
provider: "openai",
modelId: "gpt-test",
model: testModel,
authStorage: {} as AuthStorage,
modelRegistry: {} as ModelRegistry,
thinkLevel: "off",
senderIsOwner: true,
disableMessageTool: true,
contextTokenBudget: 2048,
contextEngine: {
...contextEngine,
ingest:
contextEngine.ingest ??
(async () => ({
ingested: true,
})),
compact:
contextEngine.compact ??
(async () => ({
ok: false,
compacted: false,
reason: "not used in this test",
})),
info: {
id: infoId,
name: infoName,
version: infoVersion,
},
},
});
}
it("forwards sessionKey to bootstrap, assemble, and afterTurn", async () => {
const bootstrap = vi.fn(async (_params: { sessionKey?: string }) => ({ bootstrapped: true }));
const assemble = vi.fn(
async ({ messages }: { messages: AgentMessage[]; sessionKey?: string }) => ({
messages,
estimatedTokens: 1,
}),
);
const afterTurn = vi.fn(async (_params: { sessionKey?: string }) => {});
const result = await runAttemptWithContextEngine({
bootstrap,
assemble,
afterTurn,
});
expect(result.promptError).toBeNull();
expect(bootstrap).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey,
}),
);
expect(assemble).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey,
}),
);
expect(afterTurn).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey,
}),
);
});
it("forwards sessionKey to ingestBatch when afterTurn is absent", async () => {
const bootstrap = vi.fn(async (_params: { sessionKey?: string }) => ({ bootstrapped: true }));
const assemble = vi.fn(
async ({ messages }: { messages: AgentMessage[]; sessionKey?: string }) => ({
messages,
estimatedTokens: 1,
}),
);
const ingestBatch = vi.fn(
async (_params: { sessionKey?: string; messages: AgentMessage[] }) => ({ ingestedCount: 1 }),
);
const result = await runAttemptWithContextEngine({
bootstrap,
assemble,
ingestBatch,
});
expect(result.promptError).toBeNull();
expect(ingestBatch).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey,
}),
);
});
it("forwards sessionKey to per-message ingest when ingestBatch is absent", async () => {
const bootstrap = vi.fn(async (_params: { sessionKey?: string }) => ({ bootstrapped: true }));
const assemble = vi.fn(
async ({ messages }: { messages: AgentMessage[]; sessionKey?: string }) => ({
messages,
estimatedTokens: 1,
}),
);
const ingest = vi.fn(async (_params: { sessionKey?: string; message: AgentMessage }) => ({
ingested: true,
}));
const result = await runAttemptWithContextEngine({
bootstrap,
assemble,
ingest,
});
expect(result.promptError).toBeNull();
expect(ingest).toHaveBeenCalled();
expect(
ingest.mock.calls.every((call) => {
const params = call[0];
return params.sessionKey === sessionKey;
}),
).toBe(true);
});
});

View File

@@ -1741,6 +1741,7 @@ export async function runEmbeddedAttempt(
try {
await params.contextEngine.bootstrap({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
});
} catch (bootstrapErr) {
@@ -2093,6 +2094,7 @@ export async function runEmbeddedAttempt(
try {
const assembled = await params.contextEngine.assemble({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
messages: activeSession.messages,
tokenBudget: params.contextTokenBudget,
});
@@ -2608,6 +2610,7 @@ export async function runEmbeddedAttempt(
try {
await params.contextEngine.afterTurn({
sessionId: sessionIdUsed,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
messages: messagesSnapshot,
prePromptMessageCount,
@@ -2625,6 +2628,7 @@ export async function runEmbeddedAttempt(
try {
await params.contextEngine.ingestBatch({
sessionId: sessionIdUsed,
sessionKey: params.sessionKey,
messages: newMessages,
});
} catch (ingestErr) {
@@ -2635,6 +2639,7 @@ export async function runEmbeddedAttempt(
try {
await params.contextEngine.ingest({
sessionId: sessionIdUsed,
sessionKey: params.sessionKey,
message: msg,
});
} catch (ingestErr) {

View File

@@ -61,6 +61,7 @@ class MockContextEngine implements ContextEngine {
async ingest(_params: {
sessionId: string;
sessionKey?: string;
message: AgentMessage;
isHeartbeat?: boolean;
}): Promise<IngestResult> {
@@ -69,6 +70,7 @@ class MockContextEngine implements ContextEngine {
async assemble(params: {
sessionId: string;
sessionKey?: string;
messages: AgentMessage[];
tokenBudget?: number;
}): Promise<AssembleResult> {
@@ -81,6 +83,7 @@ class MockContextEngine implements ContextEngine {
async compact(_params: {
sessionId: string;
sessionKey?: string;
sessionFile: string;
tokenBudget?: number;
compactionTarget?: "budget" | "threshold";

View File

@@ -26,6 +26,7 @@ export class LegacyContextEngine implements ContextEngine {
async ingest(_params: {
sessionId: string;
sessionKey?: string;
message: AgentMessage;
isHeartbeat?: boolean;
}): Promise<IngestResult> {
@@ -35,6 +36,7 @@ export class LegacyContextEngine implements ContextEngine {
async assemble(params: {
sessionId: string;
sessionKey?: string;
messages: AgentMessage[];
tokenBudget?: number;
}): Promise<AssembleResult> {
@@ -49,6 +51,7 @@ export class LegacyContextEngine implements ContextEngine {
async afterTurn(_params: {
sessionId: string;
sessionKey?: string;
sessionFile: string;
messages: AgentMessage[];
prePromptMessageCount: number;
@@ -62,6 +65,7 @@ export class LegacyContextEngine implements ContextEngine {
async compact(params: {
sessionId: string;
sessionKey?: string;
sessionFile: string;
tokenBudget?: number;
force?: boolean;

View File

@@ -72,13 +72,18 @@ export interface ContextEngine {
/**
* Initialize engine state for a session, optionally importing historical context.
*/
bootstrap?(params: { sessionId: string; sessionFile: string }): Promise<BootstrapResult>;
bootstrap?(params: {
sessionId: string;
sessionKey?: string;
sessionFile: string;
}): Promise<BootstrapResult>;
/**
* Ingest a single message into the engine's store.
*/
ingest(params: {
sessionId: string;
sessionKey?: string;
message: AgentMessage;
/** True when the message belongs to a heartbeat run. */
isHeartbeat?: boolean;
@@ -89,6 +94,7 @@ export interface ContextEngine {
*/
ingestBatch?(params: {
sessionId: string;
sessionKey?: string;
messages: AgentMessage[];
/** True when the batch belongs to a heartbeat run. */
isHeartbeat?: boolean;
@@ -101,6 +107,7 @@ export interface ContextEngine {
*/
afterTurn?(params: {
sessionId: string;
sessionKey?: string;
sessionFile: string;
messages: AgentMessage[];
/** Number of messages that existed before the prompt was sent. */
@@ -121,6 +128,7 @@ export interface ContextEngine {
*/
assemble(params: {
sessionId: string;
sessionKey?: string;
messages: AgentMessage[];
tokenBudget?: number;
}): Promise<AssembleResult>;
@@ -131,6 +139,7 @@ export interface ContextEngine {
*/
compact(params: {
sessionId: string;
sessionKey?: string;
sessionFile: string;
tokenBudget?: number;
/** Force compaction even below the default trigger threshold. */

View File

@@ -1,8 +1,7 @@
export function resolveGlobalSingleton<T>(key: symbol, create: () => T): T {
const globalStore = globalThis as Record<PropertyKey, unknown>;
const existing = globalStore[key] as T | undefined;
if (Object.prototype.hasOwnProperty.call(globalStore, key)) {
return existing;
return globalStore[key] as T;
}
const created = create();
globalStore[key] = created;