Files
openclaw/src/agents/pi-embedded-runner/run/attempt.ts
Yaroslav Boiko 838259331f fix(discord): add media dedup production code for messaging tool pipeline
Wire media URL tracking through the embedded agent pipeline so that
media already sent via messaging tools is not delivered again by the
reply dispatcher.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 23:51:51 +01:00

1214 lines
46 KiB
TypeScript

import type { AgentMessage } from "@mariozechner/pi-agent-core";
import type { ImageContent } from "@mariozechner/pi-ai";
import { streamSimple } from "@mariozechner/pi-ai";
import { createAgentSession, SessionManager, SettingsManager } from "@mariozechner/pi-coding-agent";
import fs from "node:fs/promises";
import os from "node:os";
import type { EmbeddedRunAttemptParams, EmbeddedRunAttemptResult } from "./types.js";
import { resolveHeartbeatPrompt } from "../../../auto-reply/heartbeat.js";
import { resolveChannelCapabilities } from "../../../config/channel-capabilities.js";
import { getMachineDisplayName } from "../../../infra/machine-name.js";
import { MAX_IMAGE_BYTES } from "../../../media/constants.js";
import { getGlobalHookRunner } from "../../../plugins/hook-runner-global.js";
import {
isCronSessionKey,
isSubagentSessionKey,
normalizeAgentId,
} from "../../../routing/session-key.js";
import { resolveSignalReactionLevel } from "../../../signal/reaction-level.js";
import { resolveTelegramInlineButtonsScope } from "../../../telegram/inline-buttons.js";
import { resolveTelegramReactionLevel } from "../../../telegram/reaction-level.js";
import { buildTtsSystemPromptHint } from "../../../tts/tts.js";
import { resolveUserPath } from "../../../utils.js";
import { normalizeMessageChannel } from "../../../utils/message-channel.js";
import { isReasoningTagProvider } from "../../../utils/provider-utils.js";
import { resolveOpenClawAgentDir } from "../../agent-paths.js";
import { resolveSessionAgentIds } from "../../agent-scope.js";
import { createAnthropicPayloadLogger } from "../../anthropic-payload-log.js";
import { makeBootstrapWarn, resolveBootstrapContextForRun } from "../../bootstrap-files.js";
import { createCacheTrace } from "../../cache-trace.js";
import {
listChannelSupportedActions,
resolveChannelMessageToolHints,
} from "../../channel-tools.js";
import { resolveOpenClawDocsPath } from "../../docs-path.js";
import { isTimeoutError } from "../../failover-error.js";
import { resolveModelAuthMode } from "../../model-auth.js";
import { resolveDefaultModelForAgent } from "../../model-selection.js";
import { createOllamaStreamFn, OLLAMA_NATIVE_BASE_URL } from "../../ollama-stream.js";
import {
isCloudCodeAssistFormatError,
resolveBootstrapMaxChars,
resolveBootstrapTotalMaxChars,
validateAnthropicTurns,
validateGeminiTurns,
} from "../../pi-embedded-helpers.js";
import { subscribeEmbeddedPiSession } from "../../pi-embedded-subscribe.js";
import {
ensurePiCompactionReserveTokens,
resolveCompactionReserveTokensFloor,
} from "../../pi-settings.js";
import { toClientToolDefinitions } from "../../pi-tool-definition-adapter.js";
import { createOpenClawCodingTools } from "../../pi-tools.js";
import { resolveSandboxContext } from "../../sandbox.js";
import { resolveSandboxRuntimeStatus } from "../../sandbox/runtime-status.js";
import { repairSessionFileIfNeeded } from "../../session-file-repair.js";
import { guardSessionManager } from "../../session-tool-result-guard-wrapper.js";
import { sanitizeToolUseResultPairing } from "../../session-transcript-repair.js";
import { acquireSessionWriteLock } from "../../session-write-lock.js";
import { detectRuntimeShell } from "../../shell-utils.js";
import {
applySkillEnvOverrides,
applySkillEnvOverridesFromSnapshot,
loadWorkspaceSkillEntries,
resolveSkillsPromptForRun,
} from "../../skills.js";
import { buildSystemPromptParams } from "../../system-prompt-params.js";
import { buildSystemPromptReport } from "../../system-prompt-report.js";
import { resolveTranscriptPolicy } from "../../transcript-policy.js";
import { DEFAULT_BOOTSTRAP_FILENAME } from "../../workspace.js";
import { isRunnerAbortError } from "../abort.js";
import { appendCacheTtlTimestamp, isCacheTtlEligibleProvider } from "../cache-ttl.js";
import { buildEmbeddedExtensionPaths } from "../extensions.js";
import { applyExtraParamsToAgent } from "../extra-params.js";
import {
logToolSchemasForGoogle,
sanitizeAntigravityThinkingBlocks,
sanitizeSessionHistory,
sanitizeToolsForGoogle,
} from "../google.js";
import { getDmHistoryLimitFromSessionKey, limitHistoryTurns } from "../history.js";
import { log } from "../logger.js";
import { buildModelAliasLines } from "../model.js";
import {
clearActiveEmbeddedRun,
type EmbeddedPiQueueHandle,
setActiveEmbeddedRun,
} from "../runs.js";
import { buildEmbeddedSandboxInfo } from "../sandbox-info.js";
import { prewarmSessionFile, trackSessionManagerAccess } from "../session-manager-cache.js";
import { prepareSessionManagerForRun } from "../session-manager-init.js";
import {
applySystemPromptOverrideToSession,
buildEmbeddedSystemPrompt,
createSystemPromptOverride,
} from "../system-prompt.js";
import { splitSdkTools } from "../tool-split.js";
import { describeUnknownError, mapThinkingLevel } from "../utils.js";
import { flushPendingToolResultsAfterIdle } from "../wait-for-idle-before-flush.js";
import {
selectCompactionTimeoutSnapshot,
shouldFlagCompactionTimeout,
} from "./compaction-timeout.js";
import { detectAndLoadPromptImages } from "./images.js";
export function injectHistoryImagesIntoMessages(
messages: AgentMessage[],
historyImagesByIndex: Map<number, ImageContent[]>,
): boolean {
if (historyImagesByIndex.size === 0) {
return false;
}
let didMutate = false;
for (const [msgIndex, images] of historyImagesByIndex) {
// Bounds check: ensure index is valid before accessing
if (msgIndex < 0 || msgIndex >= messages.length) {
continue;
}
const msg = messages[msgIndex];
if (msg && msg.role === "user") {
// Convert string content to array format if needed
if (typeof msg.content === "string") {
msg.content = [{ type: "text", text: msg.content }];
didMutate = true;
}
if (Array.isArray(msg.content)) {
// Check for existing image content to avoid duplicates across turns
const existingImageData = new Set(
msg.content
.filter(
(c): c is ImageContent =>
c != null &&
typeof c === "object" &&
c.type === "image" &&
typeof c.data === "string",
)
.map((c) => c.data),
);
for (const img of images) {
// Only add if this image isn't already in the message
if (!existingImageData.has(img.data)) {
msg.content.push(img);
didMutate = true;
}
}
}
}
}
return didMutate;
}
function summarizeMessagePayload(msg: AgentMessage): { textChars: number; imageBlocks: number } {
const content = (msg as { content?: unknown }).content;
if (typeof content === "string") {
return { textChars: content.length, imageBlocks: 0 };
}
if (!Array.isArray(content)) {
return { textChars: 0, imageBlocks: 0 };
}
let textChars = 0;
let imageBlocks = 0;
for (const block of content) {
if (!block || typeof block !== "object") {
continue;
}
const typedBlock = block as { type?: unknown; text?: unknown };
if (typedBlock.type === "image") {
imageBlocks++;
continue;
}
if (typeof typedBlock.text === "string") {
textChars += typedBlock.text.length;
}
}
return { textChars, imageBlocks };
}
function summarizeSessionContext(messages: AgentMessage[]): {
roleCounts: string;
totalTextChars: number;
totalImageBlocks: number;
maxMessageTextChars: number;
} {
const roleCounts = new Map<string, number>();
let totalTextChars = 0;
let totalImageBlocks = 0;
let maxMessageTextChars = 0;
for (const msg of messages) {
const role = typeof msg.role === "string" ? msg.role : "unknown";
roleCounts.set(role, (roleCounts.get(role) ?? 0) + 1);
const payload = summarizeMessagePayload(msg);
totalTextChars += payload.textChars;
totalImageBlocks += payload.imageBlocks;
if (payload.textChars > maxMessageTextChars) {
maxMessageTextChars = payload.textChars;
}
}
return {
roleCounts:
[...roleCounts.entries()]
.toSorted((a, b) => a[0].localeCompare(b[0]))
.map(([role, count]) => `${role}:${count}`)
.join(",") || "none",
totalTextChars,
totalImageBlocks,
maxMessageTextChars,
};
}
export async function runEmbeddedAttempt(
params: EmbeddedRunAttemptParams,
): Promise<EmbeddedRunAttemptResult> {
const resolvedWorkspace = resolveUserPath(params.workspaceDir);
const prevCwd = process.cwd();
const runAbortController = new AbortController();
log.debug(
`embedded run start: runId=${params.runId} sessionId=${params.sessionId} provider=${params.provider} model=${params.modelId} thinking=${params.thinkLevel} messageChannel=${params.messageChannel ?? params.messageProvider ?? "unknown"}`,
);
await fs.mkdir(resolvedWorkspace, { recursive: true });
const sandboxSessionKey = params.sessionKey?.trim() || params.sessionId;
const sandbox = await resolveSandboxContext({
config: params.config,
sessionKey: sandboxSessionKey,
workspaceDir: resolvedWorkspace,
});
const effectiveWorkspace = sandbox?.enabled
? sandbox.workspaceAccess === "rw"
? resolvedWorkspace
: sandbox.workspaceDir
: resolvedWorkspace;
await fs.mkdir(effectiveWorkspace, { recursive: true });
let restoreSkillEnv: (() => void) | undefined;
process.chdir(effectiveWorkspace);
try {
const shouldLoadSkillEntries = !params.skillsSnapshot || !params.skillsSnapshot.resolvedSkills;
const skillEntries = shouldLoadSkillEntries
? loadWorkspaceSkillEntries(effectiveWorkspace)
: [];
restoreSkillEnv = params.skillsSnapshot
? applySkillEnvOverridesFromSnapshot({
snapshot: params.skillsSnapshot,
config: params.config,
})
: applySkillEnvOverrides({
skills: skillEntries ?? [],
config: params.config,
});
const skillsPrompt = resolveSkillsPromptForRun({
skillsSnapshot: params.skillsSnapshot,
entries: shouldLoadSkillEntries ? skillEntries : undefined,
config: params.config,
workspaceDir: effectiveWorkspace,
});
const sessionLabel = params.sessionKey ?? params.sessionId;
const { bootstrapFiles: hookAdjustedBootstrapFiles, contextFiles } =
await resolveBootstrapContextForRun({
workspaceDir: effectiveWorkspace,
config: params.config,
sessionKey: params.sessionKey,
sessionId: params.sessionId,
warn: makeBootstrapWarn({ sessionLabel, warn: (message) => log.warn(message) }),
});
const workspaceNotes = hookAdjustedBootstrapFiles.some(
(file) => file.name === DEFAULT_BOOTSTRAP_FILENAME && !file.missing,
)
? ["Reminder: commit your changes in this workspace after edits."]
: undefined;
const agentDir = params.agentDir ?? resolveOpenClawAgentDir();
// Check if the model supports native image input
const modelHasVision = params.model.input?.includes("image") ?? false;
const toolsRaw = params.disableTools
? []
: createOpenClawCodingTools({
exec: {
...params.execOverrides,
elevated: params.bashElevated,
},
sandbox,
messageProvider: params.messageChannel ?? params.messageProvider,
agentAccountId: params.agentAccountId,
messageTo: params.messageTo,
messageThreadId: params.messageThreadId,
groupId: params.groupId,
groupChannel: params.groupChannel,
groupSpace: params.groupSpace,
spawnedBy: params.spawnedBy,
senderId: params.senderId,
senderName: params.senderName,
senderUsername: params.senderUsername,
senderE164: params.senderE164,
senderIsOwner: params.senderIsOwner,
sessionKey: params.sessionKey ?? params.sessionId,
agentDir,
workspaceDir: effectiveWorkspace,
config: params.config,
abortSignal: runAbortController.signal,
modelProvider: params.model.provider,
modelId: params.modelId,
modelAuthMode: resolveModelAuthMode(params.model.provider, params.config),
currentChannelId: params.currentChannelId,
currentThreadTs: params.currentThreadTs,
replyToMode: params.replyToMode,
hasRepliedRef: params.hasRepliedRef,
modelHasVision,
requireExplicitMessageTarget:
params.requireExplicitMessageTarget ?? isSubagentSessionKey(params.sessionKey),
disableMessageTool: params.disableMessageTool,
});
const tools = sanitizeToolsForGoogle({ tools: toolsRaw, provider: params.provider });
logToolSchemasForGoogle({ tools, provider: params.provider });
const machineName = await getMachineDisplayName();
const runtimeChannel = normalizeMessageChannel(params.messageChannel ?? params.messageProvider);
let runtimeCapabilities = runtimeChannel
? (resolveChannelCapabilities({
cfg: params.config,
channel: runtimeChannel,
accountId: params.agentAccountId,
}) ?? [])
: undefined;
if (runtimeChannel === "telegram" && params.config) {
const inlineButtonsScope = resolveTelegramInlineButtonsScope({
cfg: params.config,
accountId: params.agentAccountId ?? undefined,
});
if (inlineButtonsScope !== "off") {
if (!runtimeCapabilities) {
runtimeCapabilities = [];
}
if (
!runtimeCapabilities.some((cap) => String(cap).trim().toLowerCase() === "inlinebuttons")
) {
runtimeCapabilities.push("inlineButtons");
}
}
}
const reactionGuidance =
runtimeChannel && params.config
? (() => {
if (runtimeChannel === "telegram") {
const resolved = resolveTelegramReactionLevel({
cfg: params.config,
accountId: params.agentAccountId ?? undefined,
});
const level = resolved.agentReactionGuidance;
return level ? { level, channel: "Telegram" } : undefined;
}
if (runtimeChannel === "signal") {
const resolved = resolveSignalReactionLevel({
cfg: params.config,
accountId: params.agentAccountId ?? undefined,
});
const level = resolved.agentReactionGuidance;
return level ? { level, channel: "Signal" } : undefined;
}
return undefined;
})()
: undefined;
const { defaultAgentId, sessionAgentId } = resolveSessionAgentIds({
sessionKey: params.sessionKey,
config: params.config,
});
const sandboxInfo = buildEmbeddedSandboxInfo(sandbox, params.bashElevated);
const reasoningTagHint = isReasoningTagProvider(params.provider);
// Resolve channel-specific message actions for system prompt
const channelActions = runtimeChannel
? listChannelSupportedActions({
cfg: params.config,
channel: runtimeChannel,
})
: undefined;
const messageToolHints = runtimeChannel
? resolveChannelMessageToolHints({
cfg: params.config,
channel: runtimeChannel,
accountId: params.agentAccountId,
})
: undefined;
const defaultModelRef = resolveDefaultModelForAgent({
cfg: params.config ?? {},
agentId: sessionAgentId,
});
const defaultModelLabel = `${defaultModelRef.provider}/${defaultModelRef.model}`;
const { runtimeInfo, userTimezone, userTime, userTimeFormat } = buildSystemPromptParams({
config: params.config,
agentId: sessionAgentId,
workspaceDir: effectiveWorkspace,
cwd: process.cwd(),
runtime: {
host: machineName,
os: `${os.type()} ${os.release()}`,
arch: os.arch(),
node: process.version,
model: `${params.provider}/${params.modelId}`,
defaultModel: defaultModelLabel,
shell: detectRuntimeShell(),
channel: runtimeChannel,
capabilities: runtimeCapabilities,
channelActions,
},
});
const isDefaultAgent = sessionAgentId === defaultAgentId;
const promptMode =
isSubagentSessionKey(params.sessionKey) || isCronSessionKey(params.sessionKey)
? "minimal"
: "full";
const docsPath = await resolveOpenClawDocsPath({
workspaceDir: effectiveWorkspace,
argv1: process.argv[1],
cwd: process.cwd(),
moduleUrl: import.meta.url,
});
const ttsHint = params.config ? buildTtsSystemPromptHint(params.config) : undefined;
const appendPrompt = buildEmbeddedSystemPrompt({
workspaceDir: effectiveWorkspace,
defaultThinkLevel: params.thinkLevel,
reasoningLevel: params.reasoningLevel ?? "off",
extraSystemPrompt: params.extraSystemPrompt,
ownerNumbers: params.ownerNumbers,
reasoningTagHint,
heartbeatPrompt: isDefaultAgent
? resolveHeartbeatPrompt(params.config?.agents?.defaults?.heartbeat?.prompt)
: undefined,
skillsPrompt,
docsPath: docsPath ?? undefined,
ttsHint,
workspaceNotes,
reactionGuidance,
promptMode,
runtimeInfo,
messageToolHints,
sandboxInfo,
tools,
modelAliasLines: buildModelAliasLines(params.config),
userTimezone,
userTime,
userTimeFormat,
contextFiles,
memoryCitationsMode: params.config?.memory?.citations,
});
const systemPromptReport = buildSystemPromptReport({
source: "run",
generatedAt: Date.now(),
sessionId: params.sessionId,
sessionKey: params.sessionKey,
provider: params.provider,
model: params.modelId,
workspaceDir: effectiveWorkspace,
bootstrapMaxChars: resolveBootstrapMaxChars(params.config),
bootstrapTotalMaxChars: resolveBootstrapTotalMaxChars(params.config),
sandbox: (() => {
const runtime = resolveSandboxRuntimeStatus({
cfg: params.config,
sessionKey: params.sessionKey ?? params.sessionId,
});
return { mode: runtime.mode, sandboxed: runtime.sandboxed };
})(),
systemPrompt: appendPrompt,
bootstrapFiles: hookAdjustedBootstrapFiles,
injectedFiles: contextFiles,
skillsPrompt,
tools,
});
const systemPromptOverride = createSystemPromptOverride(appendPrompt);
const systemPromptText = systemPromptOverride();
const sessionLock = await acquireSessionWriteLock({
sessionFile: params.sessionFile,
});
let sessionManager: ReturnType<typeof guardSessionManager> | undefined;
let session: Awaited<ReturnType<typeof createAgentSession>>["session"] | undefined;
try {
await repairSessionFileIfNeeded({
sessionFile: params.sessionFile,
warn: (message) => log.warn(message),
});
const hadSessionFile = await fs
.stat(params.sessionFile)
.then(() => true)
.catch(() => false);
const transcriptPolicy = resolveTranscriptPolicy({
modelApi: params.model?.api,
provider: params.provider,
modelId: params.modelId,
});
await prewarmSessionFile(params.sessionFile);
sessionManager = guardSessionManager(SessionManager.open(params.sessionFile), {
agentId: sessionAgentId,
sessionKey: params.sessionKey,
inputProvenance: params.inputProvenance,
allowSyntheticToolResults: transcriptPolicy.allowSyntheticToolResults,
});
trackSessionManagerAccess(params.sessionFile);
await prepareSessionManagerForRun({
sessionManager,
sessionFile: params.sessionFile,
hadSessionFile,
sessionId: params.sessionId,
cwd: effectiveWorkspace,
});
const settingsManager = SettingsManager.create(effectiveWorkspace, agentDir);
ensurePiCompactionReserveTokens({
settingsManager,
minReserveTokens: resolveCompactionReserveTokensFloor(params.config),
});
// Call for side effects (sets compaction/pruning runtime state)
buildEmbeddedExtensionPaths({
cfg: params.config,
sessionManager,
provider: params.provider,
modelId: params.modelId,
model: params.model,
});
// Get hook runner early so it's available when creating tools
const hookRunner = getGlobalHookRunner();
const { builtInTools, customTools } = splitSdkTools({
tools,
sandboxEnabled: !!sandbox?.enabled,
});
// Add client tools (OpenResponses hosted tools) to customTools
let clientToolCallDetected: { name: string; params: Record<string, unknown> } | null = null;
const clientToolDefs = params.clientTools
? toClientToolDefinitions(
params.clientTools,
(toolName, toolParams) => {
clientToolCallDetected = { name: toolName, params: toolParams };
},
{
agentId: sessionAgentId,
sessionKey: params.sessionKey,
},
)
: [];
const allCustomTools = [...customTools, ...clientToolDefs];
({ session } = await createAgentSession({
cwd: resolvedWorkspace,
agentDir,
authStorage: params.authStorage,
modelRegistry: params.modelRegistry,
model: params.model,
thinkingLevel: mapThinkingLevel(params.thinkLevel),
tools: builtInTools,
customTools: allCustomTools,
sessionManager,
settingsManager,
}));
applySystemPromptOverrideToSession(session, systemPromptText);
if (!session) {
throw new Error("Embedded agent session missing");
}
const activeSession = session;
const cacheTrace = createCacheTrace({
cfg: params.config,
env: process.env,
runId: params.runId,
sessionId: activeSession.sessionId,
sessionKey: params.sessionKey,
provider: params.provider,
modelId: params.modelId,
modelApi: params.model.api,
workspaceDir: params.workspaceDir,
});
const anthropicPayloadLogger = createAnthropicPayloadLogger({
env: process.env,
runId: params.runId,
sessionId: activeSession.sessionId,
sessionKey: params.sessionKey,
provider: params.provider,
modelId: params.modelId,
modelApi: params.model.api,
workspaceDir: params.workspaceDir,
});
// Ollama native API: bypass SDK's streamSimple and use direct /api/chat calls
// for reliable streaming + tool calling support (#11828).
if (params.model.api === "ollama") {
// Use the resolved model baseUrl first so custom provider aliases work.
const providerConfig = params.config?.models?.providers?.[params.model.provider];
const modelBaseUrl =
typeof params.model.baseUrl === "string" ? params.model.baseUrl.trim() : "";
const providerBaseUrl =
typeof providerConfig?.baseUrl === "string" ? providerConfig.baseUrl.trim() : "";
const ollamaBaseUrl = modelBaseUrl || providerBaseUrl || OLLAMA_NATIVE_BASE_URL;
activeSession.agent.streamFn = createOllamaStreamFn(ollamaBaseUrl);
} else {
// Force a stable streamFn reference so vitest can reliably mock @mariozechner/pi-ai.
activeSession.agent.streamFn = streamSimple;
}
applyExtraParamsToAgent(
activeSession.agent,
params.config,
params.provider,
params.modelId,
params.streamParams,
);
if (cacheTrace) {
cacheTrace.recordStage("session:loaded", {
messages: activeSession.messages,
system: systemPromptText,
note: "after session create",
});
activeSession.agent.streamFn = cacheTrace.wrapStreamFn(activeSession.agent.streamFn);
}
if (anthropicPayloadLogger) {
activeSession.agent.streamFn = anthropicPayloadLogger.wrapStreamFn(
activeSession.agent.streamFn,
);
}
try {
const prior = await sanitizeSessionHistory({
messages: activeSession.messages,
modelApi: params.model.api,
modelId: params.modelId,
provider: params.provider,
sessionManager,
sessionId: params.sessionId,
policy: transcriptPolicy,
});
cacheTrace?.recordStage("session:sanitized", { messages: prior });
const validatedGemini = transcriptPolicy.validateGeminiTurns
? validateGeminiTurns(prior)
: prior;
const validated = transcriptPolicy.validateAnthropicTurns
? validateAnthropicTurns(validatedGemini)
: validatedGemini;
const truncated = limitHistoryTurns(
validated,
getDmHistoryLimitFromSessionKey(params.sessionKey, params.config),
);
// Re-run tool_use/tool_result pairing repair after truncation, since
// limitHistoryTurns can orphan tool_result blocks by removing the
// assistant message that contained the matching tool_use.
const limited = transcriptPolicy.repairToolUseResultPairing
? sanitizeToolUseResultPairing(truncated)
: truncated;
cacheTrace?.recordStage("session:limited", { messages: limited });
if (limited.length > 0) {
activeSession.agent.replaceMessages(limited);
}
} catch (err) {
await flushPendingToolResultsAfterIdle({
agent: activeSession?.agent,
sessionManager,
});
activeSession.dispose();
throw err;
}
let aborted = Boolean(params.abortSignal?.aborted);
let timedOut = false;
let timedOutDuringCompaction = false;
const getAbortReason = (signal: AbortSignal): unknown =>
"reason" in signal ? (signal as { reason?: unknown }).reason : undefined;
const makeTimeoutAbortReason = (): Error => {
const err = new Error("request timed out");
err.name = "TimeoutError";
return err;
};
const makeAbortError = (signal: AbortSignal): Error => {
const reason = getAbortReason(signal);
const err = reason ? new Error("aborted", { cause: reason }) : new Error("aborted");
err.name = "AbortError";
return err;
};
const abortRun = (isTimeout = false, reason?: unknown) => {
aborted = true;
if (isTimeout) {
timedOut = true;
}
if (isTimeout) {
runAbortController.abort(reason ?? makeTimeoutAbortReason());
} else {
runAbortController.abort(reason);
}
void activeSession.abort();
};
const abortable = <T>(promise: Promise<T>): Promise<T> => {
const signal = runAbortController.signal;
if (signal.aborted) {
return Promise.reject(makeAbortError(signal));
}
return new Promise<T>((resolve, reject) => {
const onAbort = () => {
signal.removeEventListener("abort", onAbort);
reject(makeAbortError(signal));
};
signal.addEventListener("abort", onAbort, { once: true });
promise.then(
(value) => {
signal.removeEventListener("abort", onAbort);
resolve(value);
},
(err) => {
signal.removeEventListener("abort", onAbort);
reject(err);
},
);
});
};
const subscription = subscribeEmbeddedPiSession({
session: activeSession,
runId: params.runId,
hookRunner: getGlobalHookRunner() ?? undefined,
verboseLevel: params.verboseLevel,
reasoningMode: params.reasoningLevel ?? "off",
toolResultFormat: params.toolResultFormat,
shouldEmitToolResult: params.shouldEmitToolResult,
shouldEmitToolOutput: params.shouldEmitToolOutput,
onToolResult: params.onToolResult,
onReasoningStream: params.onReasoningStream,
onReasoningEnd: params.onReasoningEnd,
onBlockReply: params.onBlockReply,
onBlockReplyFlush: params.onBlockReplyFlush,
blockReplyBreak: params.blockReplyBreak,
blockReplyChunking: params.blockReplyChunking,
onPartialReply: params.onPartialReply,
onAssistantMessageStart: params.onAssistantMessageStart,
onAgentEvent: params.onAgentEvent,
enforceFinalTag: params.enforceFinalTag,
config: params.config,
sessionKey: params.sessionKey ?? params.sessionId,
});
const {
assistantTexts,
toolMetas,
unsubscribe,
waitForCompactionRetry,
getMessagingToolSentTexts,
getMessagingToolSentMediaUrls,
getMessagingToolSentTargets,
getSuccessfulCronAdds,
didSendViaMessagingTool,
getLastToolError,
getUsageTotals,
getCompactionCount,
} = subscription;
const queueHandle: EmbeddedPiQueueHandle = {
queueMessage: async (text: string) => {
await activeSession.steer(text);
},
isStreaming: () => activeSession.isStreaming,
isCompacting: () => subscription.isCompacting(),
abort: abortRun,
};
setActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey);
let abortWarnTimer: NodeJS.Timeout | undefined;
const isProbeSession = params.sessionId?.startsWith("probe-") ?? false;
const abortTimer = setTimeout(
() => {
if (!isProbeSession) {
log.warn(
`embedded run timeout: runId=${params.runId} sessionId=${params.sessionId} timeoutMs=${params.timeoutMs}`,
);
}
if (
shouldFlagCompactionTimeout({
isTimeout: true,
isCompactionPendingOrRetrying: subscription.isCompacting(),
isCompactionInFlight: activeSession.isCompacting,
})
) {
timedOutDuringCompaction = true;
}
abortRun(true);
if (!abortWarnTimer) {
abortWarnTimer = setTimeout(() => {
if (!activeSession.isStreaming) {
return;
}
if (!isProbeSession) {
log.warn(
`embedded run abort still streaming: runId=${params.runId} sessionId=${params.sessionId}`,
);
}
}, 10_000);
}
},
Math.max(1, params.timeoutMs),
);
let messagesSnapshot: AgentMessage[] = [];
let sessionIdUsed = activeSession.sessionId;
const onAbort = () => {
const reason = params.abortSignal ? getAbortReason(params.abortSignal) : undefined;
const timeout = reason ? isTimeoutError(reason) : false;
if (
shouldFlagCompactionTimeout({
isTimeout: timeout,
isCompactionPendingOrRetrying: subscription.isCompacting(),
isCompactionInFlight: activeSession.isCompacting,
})
) {
timedOutDuringCompaction = true;
}
abortRun(timeout, reason);
};
if (params.abortSignal) {
if (params.abortSignal.aborted) {
onAbort();
} else {
params.abortSignal.addEventListener("abort", onAbort, {
once: true,
});
}
}
// Hook runner was already obtained earlier before tool creation
const hookAgentId =
typeof params.agentId === "string" && params.agentId.trim()
? normalizeAgentId(params.agentId)
: resolveSessionAgentIds({
sessionKey: params.sessionKey,
config: params.config,
}).sessionAgentId;
let promptError: unknown = null;
try {
const promptStartedAt = Date.now();
// Run before_agent_start hooks to allow plugins to inject context.
// If run.ts already fired the hook (for model override), reuse its result.
let effectivePrompt = params.prompt;
const hookResult =
params.earlyHookResult ??
(hookRunner?.hasHooks("before_agent_start")
? await hookRunner
.runBeforeAgentStart(
{
prompt: params.prompt,
messages: activeSession.messages,
},
{
agentId: hookAgentId,
sessionKey: params.sessionKey,
sessionId: params.sessionId,
workspaceDir: params.workspaceDir,
messageProvider: params.messageProvider ?? undefined,
},
)
.catch((hookErr: unknown) => {
log.warn(`before_agent_start hook failed: ${String(hookErr)}`);
return undefined;
})
: undefined);
{
if (hookResult?.prependContext) {
effectivePrompt = `${hookResult.prependContext}\n\n${params.prompt}`;
log.debug(
`hooks: prepended context to prompt (${hookResult.prependContext.length} chars)`,
);
}
}
log.debug(`embedded run prompt start: runId=${params.runId} sessionId=${params.sessionId}`);
cacheTrace?.recordStage("prompt:before", {
prompt: effectivePrompt,
messages: activeSession.messages,
});
// Repair orphaned trailing user messages so new prompts don't violate role ordering.
const leafEntry = sessionManager.getLeafEntry();
if (leafEntry?.type === "message" && leafEntry.message.role === "user") {
if (leafEntry.parentId) {
sessionManager.branch(leafEntry.parentId);
} else {
sessionManager.resetLeaf();
}
const sessionContext = sessionManager.buildSessionContext();
const sanitizedOrphan = transcriptPolicy.normalizeAntigravityThinkingBlocks
? sanitizeAntigravityThinkingBlocks(sessionContext.messages)
: sessionContext.messages;
activeSession.agent.replaceMessages(sanitizedOrphan);
log.warn(
`Removed orphaned user message to prevent consecutive user turns. ` +
`runId=${params.runId} sessionId=${params.sessionId}`,
);
}
try {
// Detect and load images referenced in the prompt for vision-capable models.
// This eliminates the need for an explicit "view" tool call by injecting
// images directly into the prompt when the model supports it.
// Also scans conversation history to enable follow-up questions about earlier images.
const imageResult = await detectAndLoadPromptImages({
prompt: effectivePrompt,
workspaceDir: effectiveWorkspace,
model: params.model,
existingImages: params.images,
historyMessages: activeSession.messages,
maxBytes: MAX_IMAGE_BYTES,
// Enforce sandbox path restrictions when sandbox is enabled
sandbox:
sandbox?.enabled && sandbox?.fsBridge
? { root: sandbox.workspaceDir, bridge: sandbox.fsBridge }
: undefined,
});
// Inject history images into their original message positions.
// This ensures the model sees images in context (e.g., "compare to the first image").
const didMutate = injectHistoryImagesIntoMessages(
activeSession.messages,
imageResult.historyImagesByIndex,
);
if (didMutate) {
// Persist message mutations (e.g., injected history images) so we don't re-scan/reload.
activeSession.agent.replaceMessages(activeSession.messages);
}
cacheTrace?.recordStage("prompt:images", {
prompt: effectivePrompt,
messages: activeSession.messages,
note: `images: prompt=${imageResult.images.length} history=${imageResult.historyImagesByIndex.size}`,
});
// Diagnostic: log context sizes before prompt to help debug early overflow errors.
if (log.isEnabled("debug")) {
const msgCount = activeSession.messages.length;
const systemLen = systemPromptText?.length ?? 0;
const promptLen = effectivePrompt.length;
const sessionSummary = summarizeSessionContext(activeSession.messages);
log.debug(
`[context-diag] pre-prompt: sessionKey=${params.sessionKey ?? params.sessionId} ` +
`messages=${msgCount} roleCounts=${sessionSummary.roleCounts} ` +
`historyTextChars=${sessionSummary.totalTextChars} ` +
`maxMessageTextChars=${sessionSummary.maxMessageTextChars} ` +
`historyImageBlocks=${sessionSummary.totalImageBlocks} ` +
`systemPromptChars=${systemLen} promptChars=${promptLen} ` +
`promptImages=${imageResult.images.length} ` +
`historyImageMessages=${imageResult.historyImagesByIndex.size} ` +
`provider=${params.provider}/${params.modelId} sessionFile=${params.sessionFile}`,
);
}
if (hookRunner?.hasHooks("llm_input")) {
hookRunner
.runLlmInput(
{
runId: params.runId,
sessionId: params.sessionId,
provider: params.provider,
model: params.modelId,
systemPrompt: systemPromptText,
prompt: effectivePrompt,
historyMessages: activeSession.messages,
imagesCount: imageResult.images.length,
},
{
agentId: hookAgentId,
sessionKey: params.sessionKey,
sessionId: params.sessionId,
workspaceDir: params.workspaceDir,
messageProvider: params.messageProvider ?? undefined,
},
)
.catch((err) => {
log.warn(`llm_input hook failed: ${String(err)}`);
});
}
// Only pass images option if there are actually images to pass
// This avoids potential issues with models that don't expect the images parameter
if (imageResult.images.length > 0) {
await abortable(activeSession.prompt(effectivePrompt, { images: imageResult.images }));
} else {
await abortable(activeSession.prompt(effectivePrompt));
}
} catch (err) {
promptError = err;
} finally {
log.debug(
`embedded run prompt end: runId=${params.runId} sessionId=${params.sessionId} durationMs=${Date.now() - promptStartedAt}`,
);
}
// Capture snapshot before compaction wait so we have complete messages if timeout occurs
// Check compaction state before and after to avoid race condition where compaction starts during capture
// Use session state (not subscription) for snapshot decisions - need instantaneous compaction status
const wasCompactingBefore = activeSession.isCompacting;
const snapshot = activeSession.messages.slice();
const wasCompactingAfter = activeSession.isCompacting;
// Only trust snapshot if compaction wasn't running before or after capture
const preCompactionSnapshot = wasCompactingBefore || wasCompactingAfter ? null : snapshot;
const preCompactionSessionId = activeSession.sessionId;
try {
await abortable(waitForCompactionRetry());
} catch (err) {
if (isRunnerAbortError(err)) {
if (!promptError) {
promptError = err;
}
if (!isProbeSession) {
log.debug(
`compaction wait aborted: runId=${params.runId} sessionId=${params.sessionId}`,
);
}
} else {
throw err;
}
}
// Append cache-TTL timestamp AFTER prompt + compaction retry completes.
// Previously this was before the prompt, which caused a custom entry to be
// inserted between compaction and the next prompt — breaking the
// prepareCompaction() guard that checks the last entry type, leading to
// double-compaction. See: https://github.com/openclaw/openclaw/issues/9282
// Skip when timed out during compaction — session state may be inconsistent.
if (!timedOutDuringCompaction) {
const shouldTrackCacheTtl =
params.config?.agents?.defaults?.contextPruning?.mode === "cache-ttl" &&
isCacheTtlEligibleProvider(params.provider, params.modelId);
if (shouldTrackCacheTtl) {
appendCacheTtlTimestamp(sessionManager, {
timestamp: Date.now(),
provider: params.provider,
modelId: params.modelId,
});
}
}
// If timeout occurred during compaction, use pre-compaction snapshot when available
// (compaction restructures messages but does not add user/assistant turns).
const snapshotSelection = selectCompactionTimeoutSnapshot({
timedOutDuringCompaction,
preCompactionSnapshot,
preCompactionSessionId,
currentSnapshot: activeSession.messages.slice(),
currentSessionId: activeSession.sessionId,
});
if (timedOutDuringCompaction) {
if (!isProbeSession) {
log.warn(
`using ${snapshotSelection.source} snapshot: timed out during compaction runId=${params.runId} sessionId=${params.sessionId}`,
);
}
}
messagesSnapshot = snapshotSelection.messagesSnapshot;
sessionIdUsed = snapshotSelection.sessionIdUsed;
cacheTrace?.recordStage("session:after", {
messages: messagesSnapshot,
note: timedOutDuringCompaction
? "compaction timeout"
: promptError
? "prompt error"
: undefined,
});
anthropicPayloadLogger?.recordUsage(messagesSnapshot, promptError);
// Run agent_end hooks to allow plugins to analyze the conversation
// This is fire-and-forget, so we don't await
// Run even on compaction timeout so plugins can log/cleanup
if (hookRunner?.hasHooks("agent_end")) {
hookRunner
.runAgentEnd(
{
messages: messagesSnapshot,
success: !aborted && !promptError,
error: promptError ? describeUnknownError(promptError) : undefined,
durationMs: Date.now() - promptStartedAt,
},
{
agentId: hookAgentId,
sessionKey: params.sessionKey,
sessionId: params.sessionId,
workspaceDir: params.workspaceDir,
messageProvider: params.messageProvider ?? undefined,
},
)
.catch((err) => {
log.warn(`agent_end hook failed: ${err}`);
});
}
} finally {
clearTimeout(abortTimer);
if (abortWarnTimer) {
clearTimeout(abortWarnTimer);
}
if (!isProbeSession && (aborted || timedOut) && !timedOutDuringCompaction) {
log.debug(
`run cleanup: runId=${params.runId} sessionId=${params.sessionId} aborted=${aborted} timedOut=${timedOut}`,
);
}
try {
unsubscribe();
} catch (err) {
// unsubscribe() should never throw; if it does, it indicates a serious bug.
// Log at error level to ensure visibility, but don't rethrow in finally block
// as it would mask any exception from the try block above.
log.error(
`CRITICAL: unsubscribe failed, possible resource leak: runId=${params.runId} ${String(err)}`,
);
}
clearActiveEmbeddedRun(params.sessionId, queueHandle, params.sessionKey);
params.abortSignal?.removeEventListener?.("abort", onAbort);
}
const lastAssistant = messagesSnapshot
.slice()
.toReversed()
.find((m) => m.role === "assistant");
const toolMetasNormalized = toolMetas
.filter(
(entry): entry is { toolName: string; meta?: string } =>
typeof entry.toolName === "string" && entry.toolName.trim().length > 0,
)
.map((entry) => ({ toolName: entry.toolName, meta: entry.meta }));
if (hookRunner?.hasHooks("llm_output")) {
hookRunner
.runLlmOutput(
{
runId: params.runId,
sessionId: params.sessionId,
provider: params.provider,
model: params.modelId,
assistantTexts,
lastAssistant,
usage: getUsageTotals(),
},
{
agentId: hookAgentId,
sessionKey: params.sessionKey,
sessionId: params.sessionId,
workspaceDir: params.workspaceDir,
messageProvider: params.messageProvider ?? undefined,
},
)
.catch((err) => {
log.warn(`llm_output hook failed: ${String(err)}`);
});
}
return {
aborted,
timedOut,
timedOutDuringCompaction,
promptError,
sessionIdUsed,
systemPromptReport,
messagesSnapshot,
assistantTexts,
toolMetas: toolMetasNormalized,
lastAssistant,
lastToolError: getLastToolError?.(),
didSendViaMessagingTool: didSendViaMessagingTool(),
messagingToolSentTexts: getMessagingToolSentTexts(),
messagingToolSentMediaUrls: getMessagingToolSentMediaUrls(),
messagingToolSentTargets: getMessagingToolSentTargets(),
successfulCronAdds: getSuccessfulCronAdds(),
cloudCodeAssistFormatError: Boolean(
lastAssistant?.errorMessage && isCloudCodeAssistFormatError(lastAssistant.errorMessage),
),
attemptUsage: getUsageTotals(),
compactionCount: getCompactionCount(),
// Client tool call detected (OpenResponses hosted tools)
clientToolCall: clientToolCallDetected ?? undefined,
};
} finally {
// Always tear down the session (and release the lock) before we leave this attempt.
//
// BUGFIX: Wait for the agent to be truly idle before flushing pending tool results.
// pi-agent-core's auto-retry resolves waitForRetry() on assistant message receipt,
// *before* tool execution completes in the retried agent loop. Without this wait,
// flushPendingToolResults() fires while tools are still executing, inserting
// synthetic "missing tool result" errors and causing silent agent failures.
// See: https://github.com/openclaw/openclaw/issues/8643
await flushPendingToolResultsAfterIdle({
agent: session?.agent,
sessionManager,
});
session?.dispose();
await sessionLock.release();
}
} finally {
restoreSkillEnv?.();
process.chdir(prevCwd);
}
}