feat(agent): queue steering messages

This commit is contained in:
Peter Steinberger
2025-12-20 16:10:46 +01:00
parent 675aadc6a9
commit 44339a6447
4 changed files with 578 additions and 3 deletions

View File

@@ -5,7 +5,6 @@ import {
Agent,
type AgentEvent,
type AppMessage,
ProviderTransport,
type ThinkingLevel,
} from "@mariozechner/pi-agent-core";
import {
@@ -44,6 +43,7 @@ import {
createClawdisCodingTools,
sanitizeContentBlocksImages,
} from "./pi-tools.js";
import { SteerableProviderTransport } from "./steerable-provider-transport.js";
import {
applySkillEnvOverrides,
applySkillEnvOverridesFromSnapshot,
@@ -82,6 +82,24 @@ export type EmbeddedPiRunResult = {
meta: EmbeddedPiRunMeta;
};
type EmbeddedPiQueueHandle = {
queueMessage: (text: string) => Promise<void>;
isStreaming: () => boolean;
};
const ACTIVE_EMBEDDED_RUNS = new Map<string, EmbeddedPiQueueHandle>();
export function queueEmbeddedPiMessage(
sessionId: string,
text: string,
): boolean {
const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId);
if (!handle) return false;
if (!handle.isStreaming()) return false;
void handle.queueMessage(text);
return true;
}
function mapThinkingLevel(level?: ThinkLevel): ThinkingLevel {
// pi-agent-core supports "xhigh" too; Clawdis doesn't surface it for now.
if (!level) return "off";
@@ -310,7 +328,7 @@ export async function runEmbeddedPiAgent(params: {
},
messageTransformer,
queueMode: settingsManager.getQueueMode(),
transport: new ProviderTransport({
transport: new SteerableProviderTransport({
getApiKey: async (providerName) => {
const key = await getApiKeyForProvider(providerName);
if (!key) {
@@ -338,6 +356,13 @@ export async function runEmbeddedPiAgent(params: {
sessionManager,
settingsManager,
});
const queueHandle: EmbeddedPiQueueHandle = {
queueMessage: async (text: string) => {
await session.queueMessage(text);
},
isStreaming: () => session.isStreaming,
};
ACTIVE_EMBEDDED_RUNS.set(params.sessionId, queueHandle);
const assistantTexts: string[] = [];
const toolDebouncer = createToolDebouncer((toolName, metas) => {
@@ -553,6 +578,9 @@ export async function runEmbeddedPiAgent(params: {
clearTimeout(abortTimer);
unsubscribe();
toolDebouncer.flush();
if (ACTIVE_EMBEDDED_RUNS.get(params.sessionId) === queueHandle) {
ACTIVE_EMBEDDED_RUNS.delete(params.sessionId);
}
session.dispose();
params.abortSignal?.removeEventListener?.("abort", onAbort);
}