fix: preserve inter-session input provenance (thanks @anbecker)

This commit is contained in:
Peter Steinberger
2026-02-13 02:01:53 +01:00
parent 7081dee1af
commit 85409e401b
25 changed files with 415 additions and 12 deletions

View File

@@ -475,6 +475,7 @@ describe("sessions tools", () => {
expect(call.params).toMatchObject({
lane: "nested",
channel: "webchat",
inputProvenance: { kind: "inter_session" },
});
}
expect(
@@ -652,6 +653,7 @@ describe("sessions tools", () => {
expect(call.params).toMatchObject({
lane: "nested",
channel: "webchat",
inputProvenance: { kind: "inter_session" },
});
}

View File

@@ -112,6 +112,36 @@ describe("sanitizeSessionHistory", () => {
);
});
it("annotates inter-session user messages before context sanitization", async () => {
vi.mocked(helpers.isGoogleModelApi).mockReturnValue(false);
const messages: AgentMessage[] = [
{
role: "user",
content: "forwarded instruction",
provenance: {
kind: "inter_session",
sourceSessionKey: "agent:main:req",
sourceTool: "sessions_send",
},
} as unknown as AgentMessage,
];
const result = await sanitizeSessionHistory({
messages,
modelApi: "openai-responses",
provider: "openai",
sessionManager: mockSessionManager,
sessionId: "test-session",
});
const first = result[0] as Extract<AgentMessage, { role: "user" }>;
expect(first.role).toBe("user");
expect(typeof first.content).toBe("string");
expect(first.content as string).toContain("[Inter-session message]");
expect(first.content as string).toContain("sourceSession=agent:main:req");
});
it("keeps reasoning-only assistant messages for openai-responses", async () => {
vi.mocked(helpers.isGoogleModelApi).mockReturnValue(false);

View File

@@ -4,6 +4,10 @@ import type { TSchema } from "@sinclair/typebox";
import { EventEmitter } from "node:events";
import type { TranscriptPolicy } from "../transcript-policy.js";
import { registerUnhandledRejectionHandler } from "../../infra/unhandled-rejections.js";
import {
hasInterSessionUserProvenance,
normalizeInputProvenance,
} from "../../sessions/input-provenance.js";
import {
downgradeOpenAIReasoningBlocks,
isCompactionFailureError,
@@ -44,6 +48,7 @@ const GOOGLE_SCHEMA_UNSUPPORTED_KEYWORDS = new Set([
"maxProperties",
]);
const ANTIGRAVITY_SIGNATURE_RE = /^[A-Za-z0-9+/]+={0,2}$/;
const INTER_SESSION_PREFIX_BASE = "[Inter-session message]";
function isValidAntigravitySignature(value: unknown): value is string {
if (typeof value !== "string") {
@@ -119,6 +124,85 @@ export function sanitizeAntigravityThinkingBlocks(messages: AgentMessage[]): Age
return touched ? out : messages;
}
function buildInterSessionPrefix(message: AgentMessage): string {
const provenance = normalizeInputProvenance((message as { provenance?: unknown }).provenance);
if (!provenance) {
return INTER_SESSION_PREFIX_BASE;
}
const details = [
provenance.sourceSessionKey ? `sourceSession=${provenance.sourceSessionKey}` : undefined,
provenance.sourceChannel ? `sourceChannel=${provenance.sourceChannel}` : undefined,
provenance.sourceTool ? `sourceTool=${provenance.sourceTool}` : undefined,
].filter(Boolean);
if (details.length === 0) {
return INTER_SESSION_PREFIX_BASE;
}
return `${INTER_SESSION_PREFIX_BASE} ${details.join(" ")}`;
}
function annotateInterSessionUserMessages(messages: AgentMessage[]): AgentMessage[] {
let touched = false;
const out: AgentMessage[] = [];
for (const msg of messages) {
if (!hasInterSessionUserProvenance(msg as { role?: unknown; provenance?: unknown })) {
out.push(msg);
continue;
}
const prefix = buildInterSessionPrefix(msg);
const user = msg as Extract<AgentMessage, { role: "user" }>;
if (typeof user.content === "string") {
if (user.content.startsWith(prefix)) {
out.push(msg);
continue;
}
touched = true;
out.push({
...(msg as unknown as Record<string, unknown>),
content: `${prefix}\n${user.content}`,
} as AgentMessage);
continue;
}
if (!Array.isArray(user.content)) {
out.push(msg);
continue;
}
const textIndex = user.content.findIndex(
(block) =>
block &&
typeof block === "object" &&
(block as { type?: unknown }).type === "text" &&
typeof (block as { text?: unknown }).text === "string",
);
if (textIndex >= 0) {
const existing = user.content[textIndex] as { type: "text"; text: string };
if (existing.text.startsWith(prefix)) {
out.push(msg);
continue;
}
const nextContent = [...user.content];
nextContent[textIndex] = {
...existing,
text: `${prefix}\n${existing.text}`,
};
touched = true;
out.push({
...(msg as unknown as Record<string, unknown>),
content: nextContent,
} as AgentMessage);
continue;
}
touched = true;
out.push({
...(msg as unknown as Record<string, unknown>),
content: [{ type: "text", text: prefix }, ...user.content],
} as AgentMessage);
}
return touched ? out : messages;
}
function findUnsupportedSchemaKeywords(schema: unknown, path: string): string[] {
if (!schema || typeof schema !== "object") {
return [];
@@ -358,13 +442,18 @@ export async function sanitizeSessionHistory(params: {
provider: params.provider,
modelId: params.modelId,
});
const sanitizedImages = await sanitizeSessionMessagesImages(params.messages, "session:history", {
sanitizeMode: policy.sanitizeMode,
sanitizeToolCallIds: policy.sanitizeToolCallIds,
toolCallIdMode: policy.toolCallIdMode,
preserveSignatures: policy.preserveSignatures,
sanitizeThoughtSignatures: policy.sanitizeThoughtSignatures,
});
const withInterSessionMarkers = annotateInterSessionUserMessages(params.messages);
const sanitizedImages = await sanitizeSessionMessagesImages(
withInterSessionMarkers,
"session:history",
{
sanitizeMode: policy.sanitizeMode,
sanitizeToolCallIds: policy.sanitizeToolCallIds,
toolCallIdMode: policy.toolCallIdMode,
preserveSignatures: policy.preserveSignatures,
sanitizeThoughtSignatures: policy.sanitizeThoughtSignatures,
},
);
const sanitizedThinking = policy.normalizeAntigravityThinkingBlocks
? sanitizeAntigravityThinkingBlocks(sanitizedImages)
: sanitizedImages;

View File

@@ -470,6 +470,7 @@ export async function runEmbeddedPiAgent(
onToolResult: params.onToolResult,
onAgentEvent: params.onAgentEvent,
extraSystemPrompt: params.extraSystemPrompt,
inputProvenance: params.inputProvenance,
streamParams: params.streamParams,
ownerNumbers: params.ownerNumbers,
enforceFinalTag: params.enforceFinalTag,

View File

@@ -428,6 +428,7 @@ export async function runEmbeddedAttempt(
sessionManager = guardSessionManager(SessionManager.open(params.sessionFile), {
agentId: sessionAgentId,
sessionKey: params.sessionKey,
inputProvenance: params.inputProvenance,
allowSyntheticToolResults: transcriptPolicy.allowSyntheticToolResults,
});
trackSessionManagerAccess(params.sessionFile);

View File

@@ -3,6 +3,7 @@ import type { ReasoningLevel, ThinkLevel, VerboseLevel } from "../../../auto-rep
import type { AgentStreamParams } from "../../../commands/agent/types.js";
import type { OpenClawConfig } from "../../../config/config.js";
import type { enqueueCommand } from "../../../process/command-queue.js";
import type { InputProvenance } from "../../../sessions/input-provenance.js";
import type { ExecElevatedDefaults, ExecToolDefaults } from "../../bash-tools.js";
import type { BlockReplyChunking, ToolResultFormat } from "../../pi-embedded-subscribe.js";
import type { SkillSnapshot } from "../../skills.js";
@@ -99,6 +100,7 @@ export type RunEmbeddedPiAgentParams = {
lane?: string;
enqueue?: typeof enqueueCommand;
extraSystemPrompt?: string;
inputProvenance?: InputProvenance;
streamParams?: AgentStreamParams;
ownerNumbers?: string[];
enforceFinalTag?: boolean;

View File

@@ -4,6 +4,7 @@ import type { ReasoningLevel, ThinkLevel, VerboseLevel } from "../../../auto-rep
import type { AgentStreamParams } from "../../../commands/agent/types.js";
import type { OpenClawConfig } from "../../../config/config.js";
import type { SessionSystemPromptReport } from "../../../config/sessions/types.js";
import type { InputProvenance } from "../../../sessions/input-provenance.js";
import type { ExecElevatedDefaults, ExecToolDefaults } from "../../bash-tools.js";
import type { MessagingToolSend } from "../../pi-embedded-messaging.js";
import type { BlockReplyChunking, ToolResultFormat } from "../../pi-embedded-subscribe.js";
@@ -87,6 +88,7 @@ export type EmbeddedRunAttemptParams = {
/** If true, omit the message tool from the tool list. */
disableMessageTool?: boolean;
extraSystemPrompt?: string;
inputProvenance?: InputProvenance;
streamParams?: AgentStreamParams;
ownerNumbers?: string[];
enforceFinalTag?: boolean;

View File

@@ -1,5 +1,9 @@
import type { SessionManager } from "@mariozechner/pi-coding-agent";
import { getGlobalHookRunner } from "../plugins/hook-runner-global.js";
import {
applyInputProvenanceToUserMessage,
type InputProvenance,
} from "../sessions/input-provenance.js";
import { installSessionToolResultGuard } from "./session-tool-result-guard.js";
export type GuardedSessionManager = SessionManager & {
@@ -16,6 +20,7 @@ export function guardSessionManager(
opts?: {
agentId?: string;
sessionKey?: string;
inputProvenance?: InputProvenance;
allowSyntheticToolResults?: boolean;
},
): GuardedSessionManager {
@@ -46,6 +51,8 @@ export function guardSessionManager(
: undefined;
const guard = installSessionToolResultGuard(sessionManager, {
transformMessageForPersistence: (message) =>
applyInputProvenanceToUserMessage(message, opts?.inputProvenance),
transformToolResultForPersistence: transform,
allowSyntheticToolResults: opts?.allowSyntheticToolResults,
});

View File

@@ -269,4 +269,34 @@ describe("installSessionToolResultGuard", () => {
};
expect(textBlock.text).toBe(originalText);
});
it("applies message persistence transform to user messages", () => {
const sm = SessionManager.inMemory();
installSessionToolResultGuard(sm, {
transformMessageForPersistence: (message) =>
(message as { role?: string }).role === "user"
? ({
...(message as unknown as Record<string, unknown>),
provenance: { kind: "inter_session", sourceTool: "sessions_send" },
} as AgentMessage)
: message,
});
sm.appendMessage(
asAppendMessage({
role: "user",
content: "forwarded",
timestamp: Date.now(),
}),
);
const persisted = sm.getEntries().find((e) => e.type === "message") as
| { message?: Record<string, unknown> }
| undefined;
expect(persisted?.message?.role).toBe("user");
expect(persisted?.message?.provenance).toEqual({
kind: "inter_session",
sourceTool: "sessions_send",
});
});
});

View File

@@ -113,6 +113,10 @@ function extractToolResultId(msg: Extract<AgentMessage, { role: "toolResult" }>)
export function installSessionToolResultGuard(
sessionManager: SessionManager,
opts?: {
/**
* Optional transform applied to any message before persistence.
*/
transformMessageForPersistence?: (message: AgentMessage) => AgentMessage;
/**
* Optional, synchronous transform applied to toolResult messages *before* they are
* persisted to the session transcript.
@@ -133,6 +137,10 @@ export function installSessionToolResultGuard(
} {
const originalAppend = sessionManager.appendMessage.bind(sessionManager);
const pending = new Map<string, string | undefined>();
const persistMessage = (message: AgentMessage) => {
const transformer = opts?.transformMessageForPersistence;
return transformer ? transformer(message) : message;
};
const persistToolResult = (
message: AgentMessage,
@@ -152,7 +160,7 @@ export function installSessionToolResultGuard(
for (const [id, name] of pending.entries()) {
const synthetic = makeMissingToolResult({ toolCallId: id, toolName: name });
originalAppend(
persistToolResult(synthetic, {
persistToolResult(persistMessage(synthetic), {
toolCallId: id,
toolName: name,
isSynthetic: true,
@@ -186,7 +194,7 @@ export function installSessionToolResultGuard(
}
// Apply hard size cap before persistence to prevent oversized tool results
// from consuming the entire context window on subsequent LLM calls.
const capped = capToolResultSize(nextMessage);
const capped = capToolResultSize(persistMessage(nextMessage));
return originalAppend(
persistToolResult(capped, {
toolCallId: id ?? undefined,
@@ -212,7 +220,7 @@ export function installSessionToolResultGuard(
}
}
const result = originalAppend(nextMessage as never);
const result = originalAppend(persistMessage(nextMessage) as never);
const sessionFile = (
sessionManager as { getSessionFile?: () => string | null }

View File

@@ -24,6 +24,9 @@ export async function runAgentStep(params: {
timeoutMs: number;
channel?: string;
lane?: string;
sourceSessionKey?: string;
sourceChannel?: string;
sourceTool?: string;
}): Promise<string | undefined> {
const stepIdem = crypto.randomUUID();
const response = await callGateway<{ runId?: string }>({
@@ -36,6 +39,12 @@ export async function runAgentStep(params: {
channel: params.channel ?? INTERNAL_MESSAGE_CHANNEL,
lane: params.lane ?? AGENT_LANE_NESTED,
extraSystemPrompt: params.extraSystemPrompt,
inputProvenance: {
kind: "inter_session",
sourceSessionKey: params.sourceSessionKey,
sourceChannel: params.sourceChannel,
sourceTool: params.sourceTool ?? "sessions_send",
},
},
timeoutMs: 10_000,
});

View File

@@ -83,6 +83,10 @@ export async function runSessionsSendA2AFlow(params: {
extraSystemPrompt: replyPrompt,
timeoutMs: params.announceTimeoutMs,
lane: AGENT_LANE_NESTED,
sourceSessionKey: nextSessionKey,
sourceChannel:
nextSessionKey === params.requesterSessionKey ? params.requesterChannel : targetChannel,
sourceTool: "sessions_send",
});
if (!replyText || isReplySkip(replyText)) {
break;
@@ -110,6 +114,9 @@ export async function runSessionsSendA2AFlow(params: {
extraSystemPrompt: announcePrompt,
timeoutMs: params.announceTimeoutMs,
lane: AGENT_LANE_NESTED,
sourceSessionKey: params.requesterSessionKey,
sourceChannel: params.requesterChannel,
sourceTool: "sessions_send",
});
if (announceTarget && announceReply && announceReply.trim() && !isAnnounceSkip(announceReply)) {
try {

View File

@@ -260,6 +260,12 @@ export function createSessionsSendTool(opts?: {
channel: INTERNAL_MESSAGE_CHANNEL,
lane: AGENT_LANE_NESTED,
extraSystemPrompt: agentMessageContext,
inputProvenance: {
kind: "inter_session",
sourceSessionKey: opts?.agentSessionKey,
sourceChannel: opts?.agentChannel,
sourceTool: "sessions_send",
},
};
const requesterSessionKey = opts?.agentSessionKey;
const requesterChannel = opts?.agentChannel;