From 50cc375c110d9d9d49b02af9ee0079753a6e9cb6 Mon Sep 17 00:00:00 2001 From: Josh Lehman Date: Thu, 12 Mar 2026 12:43:36 -0700 Subject: [PATCH] feat(context-engine): plumb sessionKey into all ContextEngine methods (#44157) Merged via squash. Prepared head SHA: 0b341f6f4ce487055d8bc0c0d335c42577941592 Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com> Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com> Reviewed-by: @jalehman --- CHANGELOG.md | 1 + src/agents/pi-embedded-runner/compact.ts | 1 + src/agents/pi-embedded-runner/run.ts | 1 + .../run/attempt.spawn-workspace.test.ts | 269 +++++++++++++++++- src/agents/pi-embedded-runner/run/attempt.ts | 5 + src/context-engine/context-engine.test.ts | 3 + src/context-engine/legacy.ts | 4 + src/context-engine/types.ts | 11 +- src/shared/global-singleton.ts | 3 +- 9 files changed, 285 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ad84201479..5a3ddf9af09 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,6 +53,7 @@ Docs: https://docs.openclaw.ai - Doctor/gateway service audit: canonicalize service entrypoint paths before comparing them so symlink-vs-realpath installs no longer trigger false "entrypoint does not match the current install" repair prompts. (#43882) Thanks @ngutman. - Doctor/gateway service audit: earlier groundwork for this fix landed in the superseded #28338 branch. Thanks @realriphub. - Gateway/session stores: regenerate the Swift push-test protocol models and align Windows native session-store realpath handling so protocol checks and sync session discovery stop drifting on Windows. (#44266) thanks @jalehman. +- Context engine/session routing: forward optional `sessionKey` through context-engine lifecycle calls so plugins can see structured routing metadata during bootstrap, assembly, post-turn ingestion, and compaction. (#44157) thanks @jalehman. ## 2026.3.11 diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index a62ed2eecb0..f1eea72165a 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -991,6 +991,7 @@ export async function compactEmbeddedPiSession( } const result = await contextEngine.compact({ sessionId: params.sessionId, + sessionKey: params.sessionKey, sessionFile: params.sessionFile, tokenBudget: ceCtxInfo.tokens, currentTokenCount: params.currentTokenCount, diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 5111fc6d9f9..7db6e2f61c8 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -1053,6 +1053,7 @@ export async function runEmbeddedPiAgent( try { compactResult = await contextEngine.compact({ sessionId: params.sessionId, + sessionKey: params.sessionKey, sessionFile: params.sessionFile, tokenBudget: ctxInfo.tokens, ...(observedOverflowTokens !== undefined diff --git a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts index 3801231f1f2..2d0e8900b8c 100644 --- a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test.ts @@ -1,6 +1,7 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; +import type { AgentMessage } from "@mariozechner/pi-agent-core"; import type { Api, Model } from "@mariozechner/pi-ai"; import type { AuthStorage, @@ -9,6 +10,14 @@ import type { ToolDefinition, } from "@mariozechner/pi-coding-agent"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { + AssembleResult, + BootstrapResult, + CompactResult, + ContextEngineInfo, + IngestBatchResult, + IngestResult, +} from "../../../context-engine/types.js"; import { createHostSandboxFsBridge } from "../../test-helpers/host-sandbox-fs-bridge.js"; import { createPiToolsSandboxContext } from "../../test-helpers/pi-tools-sandbox-context.js"; @@ -23,7 +32,7 @@ const hoisted = vi.hoisted(() => { getLeafEntry: vi.fn(() => null), branch: vi.fn(), resetLeaf: vi.fn(), - buildSessionContext: vi.fn(() => ({ messages: [] })), + buildSessionContext: vi.fn<() => { messages: AgentMessage[] }>(() => ({ messages: [] })), appendCustomEntry: vi.fn(), }; return { @@ -240,6 +249,14 @@ function createSubscriptionMock() { }; } +const testModel = { + api: "openai-completions", + provider: "openai", + compat: {}, + contextWindow: 8192, + input: ["text"], +} as unknown as Model; + describe("runEmbeddedAttempt sessions_spawn workspace inheritance", () => { const tempPaths: string[] = []; @@ -326,14 +343,6 @@ describe("runEmbeddedAttempt sessions_spawn workspace inheritance", () => { }, ); - const model = { - api: "openai-completions", - provider: "openai", - compat: {}, - contextWindow: 8192, - input: ["text"], - } as unknown as Model; - const result = await runEmbeddedAttempt({ sessionId: "embedded-session", sessionKey: "agent:main:main", @@ -346,7 +355,7 @@ describe("runEmbeddedAttempt sessions_spawn workspace inheritance", () => { runId: "run-1", provider: "openai", modelId: "gpt-test", - model, + model: testModel, authStorage: {} as AuthStorage, modelRegistry: {} as ModelRegistry, thinkLevel: "off", @@ -372,3 +381,243 @@ describe("runEmbeddedAttempt sessions_spawn workspace inheritance", () => { ); }); }); + +describe("runEmbeddedAttempt context engine sessionKey forwarding", () => { + const tempPaths: string[] = []; + const sessionKey = "agent:main:discord:channel:test-ctx-engine"; + + beforeEach(() => { + hoisted.createAgentSessionMock.mockReset(); + hoisted.sessionManagerOpenMock.mockReset().mockReturnValue(hoisted.sessionManager); + hoisted.resolveSandboxContextMock.mockReset(); + hoisted.subscribeEmbeddedPiSessionMock.mockReset().mockImplementation(createSubscriptionMock); + hoisted.acquireSessionWriteLockMock.mockReset().mockResolvedValue({ + release: async () => {}, + }); + hoisted.sessionManager.getLeafEntry.mockReset().mockReturnValue(null); + hoisted.sessionManager.branch.mockReset(); + hoisted.sessionManager.resetLeaf.mockReset(); + hoisted.sessionManager.appendCustomEntry.mockReset(); + }); + + afterEach(async () => { + while (tempPaths.length > 0) { + const target = tempPaths.pop(); + if (target) { + await fs.rm(target, { recursive: true, force: true }); + } + } + }); + + // Build a minimal real attempt harness so lifecycle hooks run against + // the actual runner flow instead of a hand-written wrapper. + async function runAttemptWithContextEngine(contextEngine: { + bootstrap?: (params: { + sessionId: string; + sessionKey?: string; + sessionFile: string; + }) => Promise; + assemble: (params: { + sessionId: string; + sessionKey?: string; + messages: AgentMessage[]; + tokenBudget?: number; + }) => Promise; + afterTurn?: (params: { + sessionId: string; + sessionKey?: string; + sessionFile: string; + messages: AgentMessage[]; + prePromptMessageCount: number; + tokenBudget?: number; + runtimeContext?: Record; + }) => Promise; + ingestBatch?: (params: { + sessionId: string; + sessionKey?: string; + messages: AgentMessage[]; + }) => Promise; + ingest?: (params: { + sessionId: string; + sessionKey?: string; + message: AgentMessage; + }) => Promise; + compact?: (params: { + sessionId: string; + sessionKey?: string; + sessionFile: string; + tokenBudget?: number; + }) => Promise; + info?: Partial; + }) { + const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-ctx-engine-workspace-")); + const agentDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-ctx-engine-agent-")); + const sessionFile = path.join(workspaceDir, "session.jsonl"); + tempPaths.push(workspaceDir, agentDir); + await fs.writeFile(sessionFile, "", "utf8"); + const seedMessages: AgentMessage[] = [ + { role: "user", content: "seed", timestamp: 1 } as AgentMessage, + ]; + const infoId = contextEngine.info?.id ?? "test-context-engine"; + const infoName = contextEngine.info?.name ?? "Test Context Engine"; + const infoVersion = contextEngine.info?.version ?? "0.0.1"; + + hoisted.sessionManager.buildSessionContext + .mockReset() + .mockReturnValue({ messages: seedMessages }); + + hoisted.createAgentSessionMock.mockImplementation(async () => { + const session: MutableSession = { + sessionId: "embedded-session", + messages: [], + isCompacting: false, + isStreaming: false, + agent: { + replaceMessages: (messages: unknown[]) => { + session.messages = [...messages]; + }, + }, + prompt: async () => { + session.messages = [ + ...session.messages, + { role: "assistant", content: "done", timestamp: 2 }, + ]; + }, + abort: async () => {}, + dispose: () => {}, + steer: async () => {}, + }; + + return { session }; + }); + + return await runEmbeddedAttempt({ + sessionId: "embedded-session", + sessionKey, + sessionFile, + workspaceDir, + agentDir, + config: {}, + prompt: "hello", + timeoutMs: 10_000, + runId: "run-context-engine-forwarding", + provider: "openai", + modelId: "gpt-test", + model: testModel, + authStorage: {} as AuthStorage, + modelRegistry: {} as ModelRegistry, + thinkLevel: "off", + senderIsOwner: true, + disableMessageTool: true, + contextTokenBudget: 2048, + contextEngine: { + ...contextEngine, + ingest: + contextEngine.ingest ?? + (async () => ({ + ingested: true, + })), + compact: + contextEngine.compact ?? + (async () => ({ + ok: false, + compacted: false, + reason: "not used in this test", + })), + info: { + id: infoId, + name: infoName, + version: infoVersion, + }, + }, + }); + } + + it("forwards sessionKey to bootstrap, assemble, and afterTurn", async () => { + const bootstrap = vi.fn(async (_params: { sessionKey?: string }) => ({ bootstrapped: true })); + const assemble = vi.fn( + async ({ messages }: { messages: AgentMessage[]; sessionKey?: string }) => ({ + messages, + estimatedTokens: 1, + }), + ); + const afterTurn = vi.fn(async (_params: { sessionKey?: string }) => {}); + + const result = await runAttemptWithContextEngine({ + bootstrap, + assemble, + afterTurn, + }); + + expect(result.promptError).toBeNull(); + expect(bootstrap).toHaveBeenCalledWith( + expect.objectContaining({ + sessionKey, + }), + ); + expect(assemble).toHaveBeenCalledWith( + expect.objectContaining({ + sessionKey, + }), + ); + expect(afterTurn).toHaveBeenCalledWith( + expect.objectContaining({ + sessionKey, + }), + ); + }); + + it("forwards sessionKey to ingestBatch when afterTurn is absent", async () => { + const bootstrap = vi.fn(async (_params: { sessionKey?: string }) => ({ bootstrapped: true })); + const assemble = vi.fn( + async ({ messages }: { messages: AgentMessage[]; sessionKey?: string }) => ({ + messages, + estimatedTokens: 1, + }), + ); + const ingestBatch = vi.fn( + async (_params: { sessionKey?: string; messages: AgentMessage[] }) => ({ ingestedCount: 1 }), + ); + + const result = await runAttemptWithContextEngine({ + bootstrap, + assemble, + ingestBatch, + }); + + expect(result.promptError).toBeNull(); + expect(ingestBatch).toHaveBeenCalledWith( + expect.objectContaining({ + sessionKey, + }), + ); + }); + + it("forwards sessionKey to per-message ingest when ingestBatch is absent", async () => { + const bootstrap = vi.fn(async (_params: { sessionKey?: string }) => ({ bootstrapped: true })); + const assemble = vi.fn( + async ({ messages }: { messages: AgentMessage[]; sessionKey?: string }) => ({ + messages, + estimatedTokens: 1, + }), + ); + const ingest = vi.fn(async (_params: { sessionKey?: string; message: AgentMessage }) => ({ + ingested: true, + })); + + const result = await runAttemptWithContextEngine({ + bootstrap, + assemble, + ingest, + }); + + expect(result.promptError).toBeNull(); + expect(ingest).toHaveBeenCalled(); + expect( + ingest.mock.calls.every((call) => { + const params = call[0]; + return params.sessionKey === sessionKey; + }), + ).toBe(true); + }); +}); diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 6b3751c16ee..08c9b26f6f4 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -1741,6 +1741,7 @@ export async function runEmbeddedAttempt( try { await params.contextEngine.bootstrap({ sessionId: params.sessionId, + sessionKey: params.sessionKey, sessionFile: params.sessionFile, }); } catch (bootstrapErr) { @@ -2093,6 +2094,7 @@ export async function runEmbeddedAttempt( try { const assembled = await params.contextEngine.assemble({ sessionId: params.sessionId, + sessionKey: params.sessionKey, messages: activeSession.messages, tokenBudget: params.contextTokenBudget, }); @@ -2608,6 +2610,7 @@ export async function runEmbeddedAttempt( try { await params.contextEngine.afterTurn({ sessionId: sessionIdUsed, + sessionKey: params.sessionKey, sessionFile: params.sessionFile, messages: messagesSnapshot, prePromptMessageCount, @@ -2625,6 +2628,7 @@ export async function runEmbeddedAttempt( try { await params.contextEngine.ingestBatch({ sessionId: sessionIdUsed, + sessionKey: params.sessionKey, messages: newMessages, }); } catch (ingestErr) { @@ -2635,6 +2639,7 @@ export async function runEmbeddedAttempt( try { await params.contextEngine.ingest({ sessionId: sessionIdUsed, + sessionKey: params.sessionKey, message: msg, }); } catch (ingestErr) { diff --git a/src/context-engine/context-engine.test.ts b/src/context-engine/context-engine.test.ts index 3d9b7dc4fc1..cd0f2f50439 100644 --- a/src/context-engine/context-engine.test.ts +++ b/src/context-engine/context-engine.test.ts @@ -61,6 +61,7 @@ class MockContextEngine implements ContextEngine { async ingest(_params: { sessionId: string; + sessionKey?: string; message: AgentMessage; isHeartbeat?: boolean; }): Promise { @@ -69,6 +70,7 @@ class MockContextEngine implements ContextEngine { async assemble(params: { sessionId: string; + sessionKey?: string; messages: AgentMessage[]; tokenBudget?: number; }): Promise { @@ -81,6 +83,7 @@ class MockContextEngine implements ContextEngine { async compact(_params: { sessionId: string; + sessionKey?: string; sessionFile: string; tokenBudget?: number; compactionTarget?: "budget" | "threshold"; diff --git a/src/context-engine/legacy.ts b/src/context-engine/legacy.ts index ffeb5cab9bd..0485a4feae4 100644 --- a/src/context-engine/legacy.ts +++ b/src/context-engine/legacy.ts @@ -26,6 +26,7 @@ export class LegacyContextEngine implements ContextEngine { async ingest(_params: { sessionId: string; + sessionKey?: string; message: AgentMessage; isHeartbeat?: boolean; }): Promise { @@ -35,6 +36,7 @@ export class LegacyContextEngine implements ContextEngine { async assemble(params: { sessionId: string; + sessionKey?: string; messages: AgentMessage[]; tokenBudget?: number; }): Promise { @@ -49,6 +51,7 @@ export class LegacyContextEngine implements ContextEngine { async afterTurn(_params: { sessionId: string; + sessionKey?: string; sessionFile: string; messages: AgentMessage[]; prePromptMessageCount: number; @@ -62,6 +65,7 @@ export class LegacyContextEngine implements ContextEngine { async compact(params: { sessionId: string; + sessionKey?: string; sessionFile: string; tokenBudget?: number; force?: boolean; diff --git a/src/context-engine/types.ts b/src/context-engine/types.ts index b886190a1e0..7ddd695b5b6 100644 --- a/src/context-engine/types.ts +++ b/src/context-engine/types.ts @@ -72,13 +72,18 @@ export interface ContextEngine { /** * Initialize engine state for a session, optionally importing historical context. */ - bootstrap?(params: { sessionId: string; sessionFile: string }): Promise; + bootstrap?(params: { + sessionId: string; + sessionKey?: string; + sessionFile: string; + }): Promise; /** * Ingest a single message into the engine's store. */ ingest(params: { sessionId: string; + sessionKey?: string; message: AgentMessage; /** True when the message belongs to a heartbeat run. */ isHeartbeat?: boolean; @@ -89,6 +94,7 @@ export interface ContextEngine { */ ingestBatch?(params: { sessionId: string; + sessionKey?: string; messages: AgentMessage[]; /** True when the batch belongs to a heartbeat run. */ isHeartbeat?: boolean; @@ -101,6 +107,7 @@ export interface ContextEngine { */ afterTurn?(params: { sessionId: string; + sessionKey?: string; sessionFile: string; messages: AgentMessage[]; /** Number of messages that existed before the prompt was sent. */ @@ -121,6 +128,7 @@ export interface ContextEngine { */ assemble(params: { sessionId: string; + sessionKey?: string; messages: AgentMessage[]; tokenBudget?: number; }): Promise; @@ -131,6 +139,7 @@ export interface ContextEngine { */ compact(params: { sessionId: string; + sessionKey?: string; sessionFile: string; tokenBudget?: number; /** Force compaction even below the default trigger threshold. */ diff --git a/src/shared/global-singleton.ts b/src/shared/global-singleton.ts index 2d3ea38204f..3e896429fa5 100644 --- a/src/shared/global-singleton.ts +++ b/src/shared/global-singleton.ts @@ -1,8 +1,7 @@ export function resolveGlobalSingleton(key: symbol, create: () => T): T { const globalStore = globalThis as Record; - const existing = globalStore[key] as T | undefined; if (Object.prototype.hasOwnProperty.call(globalStore, key)) { - return existing; + return globalStore[key] as T; } const created = create(); globalStore[key] = created;