mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-09 04:07:39 +00:00
Agents: add nested subagent orchestration controls and reduce subagent token waste (#14447)
* Agents: add subagent orchestration controls
* Agents: add subagent orchestration controls (WIP uncommitted changes)
* feat(subagents): add depth-based spawn gating for sub-sub-agents
* feat(subagents): tool policy, registry, and announce chain for nested agents
* feat(subagents): system prompt, docs, changelog for nested sub-agents
* fix(subagents): prevent model fallback override, show model during active runs, and block context overflow fallback
Bug 1: When a session has an explicit model override (e.g., gpt/openai-codex),
the fallback candidate logic in resolveFallbackCandidates silently appended the
global primary model (opus) as a backstop. On reinjection/steer with a transient
error, the session could fall back to opus which has a smaller context window
and crash. Fix: when storedModelOverride is set, pass fallbacksOverride ?? []
instead of undefined, preventing the implicit primary backstop.
Bug 2: Active subagents showed 'model n/a' in /subagents list because
resolveModelDisplay only read entry.model/modelProvider (populated after run
completes). Fix: fall back to modelOverride/providerOverride fields which are
populated at spawn time via sessions.patch.
Bug 3: Context overflow errors (prompt too long, context_length_exceeded) could
theoretically escape runEmbeddedPiAgent and be treated as failover candidates
in runWithModelFallback, causing a switch to a model with a smaller context
window. Fix: in runWithModelFallback, detect context overflow errors via
isLikelyContextOverflowError and rethrow them immediately instead of trying the
next model candidate.
* fix(subagents): track spawn depth in session store and fix announce routing for nested agents
* Fix compaction status tracking and dedupe overflow compaction triggers
* fix(subagents): enforce depth block via session store and implement cascade kill
* fix: inject group chat context into system prompt
* fix(subagents): always write model to session store at spawn time
* Preserve spawnDepth when agent handler rewrites session entry
* fix(subagents): suppress announce on steer-restart
* fix(subagents): fallback spawned session model to runtime default
* fix(subagents): enforce spawn depth when caller key resolves by sessionId
* feat(subagents): implement active-first ordering for numeric targets and enhance task display
- Added a test to verify that subagents with numeric targets follow an active-first list ordering.
- Updated `resolveSubagentTarget` to sort subagent runs based on active status and recent activity.
- Enhanced task display in command responses to prevent truncation of long task descriptions.
- Introduced new utility functions for compacting task text and managing subagent run states.
* fix(subagents): show model for active runs via run record fallback
When the spawned model matches the agent's default model, the session
store's override fields are intentionally cleared (isDefault: true).
The model/modelProvider fields are only populated after the run
completes. This left active subagents showing 'model n/a'.
Fix: store the resolved model on SubagentRunRecord at registration
time, and use it as a fallback in both display paths (subagents tool
and /subagents command) when the session store entry has no model info.
Changes:
- SubagentRunRecord: add optional model field
- registerSubagentRun: accept and persist model param
- sessions-spawn-tool: pass resolvedModel to registerSubagentRun
- subagents-tool: pass run record model as fallback to resolveModelDisplay
- commands-subagents: pass run record model as fallback to resolveModelDisplay
* feat(chat): implement session key resolution and reset on sidebar navigation
- Added functions to resolve the main session key and reset chat state when switching sessions from the sidebar.
- Updated the `renderTab` function to handle session key changes when navigating to the chat tab.
- Introduced a test to verify that the session resets to "main" when opening chat from the sidebar navigation.
* fix: subagent timeout=0 passthrough and fallback prompt duplication
Bug 1: runTimeoutSeconds=0 now means 'no timeout' instead of applying 600s default
- sessions-spawn-tool: default to undefined (not 0) when neither timeout param
is provided; use != null check so explicit 0 passes through to gateway
- agent.ts: accept 0 as valid timeout (resolveAgentTimeoutMs already handles
0 → MAX_SAFE_TIMEOUT_MS)
Bug 2: model fallback no longer re-injects the original prompt as a duplicate
- agent.ts: track fallback attempt index; on retries use a short continuation
message instead of the full original prompt since the session file already
contains it from the first attempt
- Also skip re-sending images on fallback retries (already in session)
* feat(subagents): truncate long task descriptions in subagents command output
- Introduced a new utility function to format task previews, limiting their length to improve readability.
- Updated the command handler to use the new formatting function, ensuring task descriptions are truncated appropriately.
- Adjusted related tests to verify that long task descriptions are now truncated in the output.
* refactor(subagents): update subagent registry path resolution and improve command output formatting
- Replaced direct import of STATE_DIR with a utility function to resolve the state directory dynamically.
- Enhanced the formatting of command output for active and recent subagents, adding separators for better readability.
- Updated related tests to reflect changes in command output structure.
* fix(subagent): default sessions_spawn to no timeout when runTimeoutSeconds omitted
The previous fix (75a791106) correctly handled the case where
runTimeoutSeconds was explicitly set to 0 ("no timeout"). However,
when models omit the parameter entirely (which is common since the
schema marks it as optional), runTimeoutSeconds resolved to undefined.
undefined flowed through the chain as:
sessions_spawn → timeout: undefined (since undefined != null is false)
→ gateway agent handler → agentCommand opts.timeout: undefined
→ resolveAgentTimeoutMs({ overrideSeconds: undefined })
→ DEFAULT_AGENT_TIMEOUT_SECONDS (600s = 10 minutes)
This caused subagents to be killed at exactly 10 minutes even though
the user's intent (via TOOLS.md) was for subagents to run without a
timeout.
Fix: default runTimeoutSeconds to 0 (no timeout) when neither
runTimeoutSeconds nor timeoutSeconds is provided by the caller.
Subagent spawns are long-running by design and should not inherit the
600s agent-command default timeout.
* fix(subagent): accept timeout=0 in agent-via-gateway path (second 600s default)
* fix: thread timeout override through getReplyFromConfig dispatch path
getReplyFromConfig called resolveAgentTimeoutMs({ cfg }) with no override,
always falling back to the config default (600s). Add timeoutOverrideSeconds
to GetReplyOptions and pass it through as overrideSeconds so callers of the
dispatch chain can specify a custom timeout (0 = no timeout).
This complements the existing timeout threading in agentCommand and the
cron isolated-agent runner, which already pass overrideSeconds correctly.
* feat(model-fallback): normalize OpenAI Codex model references and enhance fallback handling
- Added normalization for OpenAI Codex model references, specifically converting "gpt-5.3-codex" to "openai-codex" before execution.
- Updated the `resolveFallbackCandidates` function to utilize the new normalization logic.
- Enhanced tests to verify the correct behavior of model normalization and fallback mechanisms.
- Introduced a new test case to ensure that the normalization process works as expected for various input formats.
* feat(tests): add unit tests for steer failure behavior in openclaw-tools
- Introduced a new test file to validate the behavior of subagents when steer replacement dispatch fails.
- Implemented tests to ensure that the announce behavior is restored correctly and that the suppression reason is cleared as expected.
- Enhanced the subagent registry with a new function to clear steer restart suppression.
- Updated related components to support the new test scenarios.
* fix(subagents): replace stop command with kill in slash commands and documentation
- Updated the `/subagents` command to replace `stop` with `kill` for consistency in controlling sub-agent runs.
- Modified related documentation to reflect the change in command usage.
- Removed legacy timeoutSeconds references from the sessions-spawn-tool schema and tests to streamline timeout handling.
- Enhanced tests to ensure correct behavior of the updated commands and their interactions.
* feat(tests): add unit tests for readLatestAssistantReply function
- Introduced a new test file for the `readLatestAssistantReply` function to validate its behavior with various message scenarios.
- Implemented tests to ensure the function correctly retrieves the latest assistant message and handles cases where the latest message has no text.
- Mocked the gateway call to simulate different message histories for comprehensive testing.
* feat(tests): enhance subagent kill-all cascade tests and announce formatting
- Added a new test to verify that the `kill-all` command cascades through ended parents to active descendants in subagents.
- Updated the subagent announce formatting tests to reflect changes in message structure, including the replacement of "Findings:" with "Result:" and the addition of new expectations for message content.
- Improved the handling of long findings and stats in the announce formatting logic to ensure concise output.
- Refactored related functions to enhance clarity and maintainability in the subagent registry and tools.
* refactor(subagent): update announce formatting and remove unused constants
- Modified the subagent announce formatting to replace "Findings:" with "Result:" and adjusted related expectations in tests.
- Removed constants for maximum announce findings characters and summary words, simplifying the announcement logic.
- Updated the handling of findings to retain full content instead of truncating, ensuring more informative outputs.
- Cleaned up unused imports in the commands-subagents file to enhance code clarity.
* feat(tests): enhance billing error handling in user-facing text
- Added tests to ensure that normal text mentioning billing plans is not rewritten, preserving user context.
- Updated the `isBillingErrorMessage` and `sanitizeUserFacingText` functions to improve handling of billing-related messages.
- Introduced new test cases for various scenarios involving billing messages to ensure accurate processing and output.
- Enhanced the subagent announce flow to correctly manage active descendant runs, preventing premature announcements.
* feat(subagent): enhance workflow guidance and auto-announcement clarity
- Added a new guideline in the subagent system prompt to emphasize trust in push-based completion, discouraging busy polling for status updates.
- Updated documentation to clarify that sub-agents will automatically announce their results, improving user understanding of the workflow.
- Enhanced tests to verify the new guidance on avoiding polling loops and to ensure the accuracy of the updated prompts.
* fix(cron): avoid announcing interim subagent spawn acks
* chore: clean post-rebase imports
* fix(cron): fall back to child replies when parent stays interim
* fix(subagents): make active-run guidance advisory
* fix(subagents): update announce flow to handle active descendants and enhance test coverage
- Modified the announce flow to defer announcements when active descendant runs are present, ensuring accurate status reporting.
- Updated tests to verify the new behavior, including scenarios where no fallback requester is available and ensuring proper handling of finished subagents.
- Enhanced the announce formatting to include an `expectFinal` flag for better clarity in the announcement process.
* fix(subagents): enhance announce flow and formatting for user updates
- Updated the announce flow to provide clearer instructions for user updates based on active subagent runs and requester context.
- Refactored the announcement logic to improve clarity and ensure internal context remains private.
- Enhanced tests to verify the new message expectations and formatting, including updated prompts for user-facing updates.
- Introduced a new function to build reply instructions based on session context, improving the overall announcement process.
* fix: resolve prep blockers and changelog placement (#14447) (thanks @tyler6204)
* fix: restore cron delivery-plan import after rebase (#14447) (thanks @tyler6204)
* fix: resolve test failures from rebase conflicts (#14447) (thanks @tyler6204)
* fix: apply formatting after rebase (#14447) (thanks @tyler6204)
This commit is contained in:
49
src/agents/tools/agent-step.test.ts
Normal file
49
src/agents/tools/agent-step.test.ts
Normal file
@@ -0,0 +1,49 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
const callGatewayMock = vi.fn();
|
||||
vi.mock("../../gateway/call.js", () => ({
|
||||
callGateway: (opts: unknown) => callGatewayMock(opts),
|
||||
}));
|
||||
|
||||
import { readLatestAssistantReply } from "./agent-step.js";
|
||||
|
||||
describe("readLatestAssistantReply", () => {
|
||||
beforeEach(() => {
|
||||
callGatewayMock.mockReset();
|
||||
});
|
||||
|
||||
it("returns the most recent assistant message when compaction markers trail history", async () => {
|
||||
callGatewayMock.mockResolvedValue({
|
||||
messages: [
|
||||
{
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "All checks passed and changes were pushed." }],
|
||||
},
|
||||
{ role: "toolResult", content: [{ type: "text", text: "tool output" }] },
|
||||
{ role: "system", content: [{ type: "text", text: "Compaction" }] },
|
||||
],
|
||||
});
|
||||
|
||||
const result = await readLatestAssistantReply({ sessionKey: "agent:main:child" });
|
||||
|
||||
expect(result).toBe("All checks passed and changes were pushed.");
|
||||
expect(callGatewayMock).toHaveBeenCalledWith({
|
||||
method: "chat.history",
|
||||
params: { sessionKey: "agent:main:child", limit: 50 },
|
||||
});
|
||||
});
|
||||
|
||||
it("falls back to older assistant text when latest assistant has no text", async () => {
|
||||
callGatewayMock.mockResolvedValue({
|
||||
messages: [
|
||||
{ role: "assistant", content: [{ type: "text", text: "older output" }] },
|
||||
{ role: "assistant", content: [] },
|
||||
{ role: "system", content: [{ type: "text", text: "Compaction" }] },
|
||||
],
|
||||
});
|
||||
|
||||
const result = await readLatestAssistantReply({ sessionKey: "agent:main:child" });
|
||||
|
||||
expect(result).toBe("older output");
|
||||
});
|
||||
});
|
||||
@@ -13,8 +13,21 @@ export async function readLatestAssistantReply(params: {
|
||||
params: { sessionKey: params.sessionKey, limit: params.limit ?? 50 },
|
||||
});
|
||||
const filtered = stripToolMessages(Array.isArray(history?.messages) ? history.messages : []);
|
||||
const last = filtered.length > 0 ? filtered[filtered.length - 1] : undefined;
|
||||
return last ? extractAssistantText(last) : undefined;
|
||||
for (let i = filtered.length - 1; i >= 0; i -= 1) {
|
||||
const candidate = filtered[i];
|
||||
if (!candidate || typeof candidate !== "object") {
|
||||
continue;
|
||||
}
|
||||
if ((candidate as { role?: unknown }).role !== "assistant") {
|
||||
continue;
|
||||
}
|
||||
const text = extractAssistantText(candidate);
|
||||
if (!text?.trim()) {
|
||||
continue;
|
||||
}
|
||||
return text;
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
export async function runAgentStep(params: {
|
||||
|
||||
@@ -40,4 +40,19 @@ describe("extractAssistantText", () => {
|
||||
};
|
||||
expect(extractAssistantText(message)).toBe("HTTP 500: Internal Server Error");
|
||||
});
|
||||
|
||||
it("keeps normal status text that mentions billing", () => {
|
||||
const message = {
|
||||
role: "assistant",
|
||||
content: [
|
||||
{
|
||||
type: "text",
|
||||
text: "Firebase downgraded us to the free Spark plan. Check whether billing should be re-enabled.",
|
||||
},
|
||||
],
|
||||
};
|
||||
expect(extractAssistantText(message)).toBe(
|
||||
"Firebase downgraded us to the free Spark plan. Check whether billing should be re-enabled.",
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -5,17 +5,15 @@ import type { AnyAgentTool } from "./common.js";
|
||||
import { formatThinkingLevels, normalizeThinkLevel } from "../../auto-reply/thinking.js";
|
||||
import { loadConfig } from "../../config/config.js";
|
||||
import { callGateway } from "../../gateway/call.js";
|
||||
import {
|
||||
isSubagentSessionKey,
|
||||
normalizeAgentId,
|
||||
parseAgentSessionKey,
|
||||
} from "../../routing/session-key.js";
|
||||
import { normalizeAgentId, parseAgentSessionKey } from "../../routing/session-key.js";
|
||||
import { normalizeDeliveryContext } from "../../utils/delivery-context.js";
|
||||
import { resolveAgentConfig } from "../agent-scope.js";
|
||||
import { AGENT_LANE_SUBAGENT } from "../lanes.js";
|
||||
import { resolveDefaultModelForAgent } from "../model-selection.js";
|
||||
import { optionalStringEnum } from "../schema/typebox.js";
|
||||
import { buildSubagentSystemPrompt } from "../subagent-announce.js";
|
||||
import { registerSubagentRun } from "../subagent-registry.js";
|
||||
import { getSubagentDepthFromSessionStore } from "../subagent-depth.js";
|
||||
import { countActiveRunsForSession, registerSubagentRun } from "../subagent-registry.js";
|
||||
import { jsonResult, readStringParam } from "./common.js";
|
||||
import {
|
||||
resolveDisplaySessionKey,
|
||||
@@ -30,8 +28,6 @@ const SessionsSpawnToolSchema = Type.Object({
|
||||
model: Type.Optional(Type.String()),
|
||||
thinking: Type.Optional(Type.String()),
|
||||
runTimeoutSeconds: Type.Optional(Type.Number({ minimum: 0 })),
|
||||
// Back-compat alias. Prefer runTimeoutSeconds.
|
||||
timeoutSeconds: Type.Optional(Type.Number({ minimum: 0 })),
|
||||
cleanup: optionalStringEnum(["delete", "keep"] as const),
|
||||
});
|
||||
|
||||
@@ -99,32 +95,18 @@ export function createSessionsSpawnTool(opts?: {
|
||||
to: opts?.agentTo,
|
||||
threadId: opts?.agentThreadId,
|
||||
});
|
||||
const runTimeoutSeconds = (() => {
|
||||
const explicit =
|
||||
typeof params.runTimeoutSeconds === "number" && Number.isFinite(params.runTimeoutSeconds)
|
||||
? Math.max(0, Math.floor(params.runTimeoutSeconds))
|
||||
: undefined;
|
||||
if (explicit !== undefined) {
|
||||
return explicit;
|
||||
}
|
||||
const legacy =
|
||||
typeof params.timeoutSeconds === "number" && Number.isFinite(params.timeoutSeconds)
|
||||
? Math.max(0, Math.floor(params.timeoutSeconds))
|
||||
: undefined;
|
||||
return legacy ?? 0;
|
||||
})();
|
||||
// Default to 0 (no timeout) when omitted. Sub-agent runs are long-lived
|
||||
// by default and should not inherit the main agent 600s timeout.
|
||||
const runTimeoutSeconds =
|
||||
typeof params.runTimeoutSeconds === "number" && Number.isFinite(params.runTimeoutSeconds)
|
||||
? Math.max(0, Math.floor(params.runTimeoutSeconds))
|
||||
: 0;
|
||||
let modelWarning: string | undefined;
|
||||
let modelApplied = false;
|
||||
|
||||
const cfg = loadConfig();
|
||||
const { mainKey, alias } = resolveMainSessionAlias(cfg);
|
||||
const requesterSessionKey = opts?.agentSessionKey;
|
||||
if (typeof requesterSessionKey === "string" && isSubagentSessionKey(requesterSessionKey)) {
|
||||
return jsonResult({
|
||||
status: "forbidden",
|
||||
error: "sessions_spawn is not allowed from sub-agent sessions",
|
||||
});
|
||||
}
|
||||
const requesterInternalKey = requesterSessionKey
|
||||
? resolveInternalSessionKey({
|
||||
key: requesterSessionKey,
|
||||
@@ -138,6 +120,24 @@ export function createSessionsSpawnTool(opts?: {
|
||||
mainKey,
|
||||
});
|
||||
|
||||
const callerDepth = getSubagentDepthFromSessionStore(requesterInternalKey, { cfg });
|
||||
const maxSpawnDepth = cfg.agents?.defaults?.subagents?.maxSpawnDepth ?? 1;
|
||||
if (callerDepth >= maxSpawnDepth) {
|
||||
return jsonResult({
|
||||
status: "forbidden",
|
||||
error: `sessions_spawn is not allowed at this depth (current depth: ${callerDepth}, max: ${maxSpawnDepth})`,
|
||||
});
|
||||
}
|
||||
|
||||
const maxChildren = cfg.agents?.defaults?.subagents?.maxChildrenPerAgent ?? 5;
|
||||
const activeChildren = countActiveRunsForSession(requesterInternalKey);
|
||||
if (activeChildren >= maxChildren) {
|
||||
return jsonResult({
|
||||
status: "forbidden",
|
||||
error: `sessions_spawn has reached max active children for this session (${activeChildren}/${maxChildren})`,
|
||||
});
|
||||
}
|
||||
|
||||
const requesterAgentId = normalizeAgentId(
|
||||
opts?.requesterAgentIdOverride ?? parseAgentSessionKey(requesterInternalKey)?.agentId,
|
||||
);
|
||||
@@ -166,12 +166,19 @@ export function createSessionsSpawnTool(opts?: {
|
||||
}
|
||||
}
|
||||
const childSessionKey = `agent:${targetAgentId}:subagent:${crypto.randomUUID()}`;
|
||||
const childDepth = callerDepth + 1;
|
||||
const spawnedByKey = requesterInternalKey;
|
||||
const targetAgentConfig = resolveAgentConfig(cfg, targetAgentId);
|
||||
const runtimeDefaultModel = resolveDefaultModelForAgent({
|
||||
cfg,
|
||||
agentId: targetAgentId,
|
||||
});
|
||||
const resolvedModel =
|
||||
normalizeModelSelection(modelOverride) ??
|
||||
normalizeModelSelection(targetAgentConfig?.subagents?.model) ??
|
||||
normalizeModelSelection(cfg.agents?.defaults?.subagents?.model);
|
||||
normalizeModelSelection(cfg.agents?.defaults?.subagents?.model) ??
|
||||
normalizeModelSelection(cfg.agents?.defaults?.model?.primary) ??
|
||||
normalizeModelSelection(`${runtimeDefaultModel.provider}/${runtimeDefaultModel.model}`);
|
||||
|
||||
const resolvedThinkingDefaultRaw =
|
||||
readStringParam(targetAgentConfig?.subagents ?? {}, "thinking") ??
|
||||
@@ -191,6 +198,22 @@ export function createSessionsSpawnTool(opts?: {
|
||||
}
|
||||
thinkingOverride = normalized;
|
||||
}
|
||||
try {
|
||||
await callGateway({
|
||||
method: "sessions.patch",
|
||||
params: { key: childSessionKey, spawnDepth: childDepth },
|
||||
timeoutMs: 10_000,
|
||||
});
|
||||
} catch (err) {
|
||||
const messageText =
|
||||
err instanceof Error ? err.message : typeof err === "string" ? err : "error";
|
||||
return jsonResult({
|
||||
status: "error",
|
||||
error: messageText,
|
||||
childSessionKey,
|
||||
});
|
||||
}
|
||||
|
||||
if (resolvedModel) {
|
||||
try {
|
||||
await callGateway({
|
||||
@@ -240,6 +263,8 @@ export function createSessionsSpawnTool(opts?: {
|
||||
childSessionKey,
|
||||
label: label || undefined,
|
||||
task,
|
||||
childDepth,
|
||||
maxSpawnDepth,
|
||||
});
|
||||
|
||||
const childIdem = crypto.randomUUID();
|
||||
@@ -260,7 +285,7 @@ export function createSessionsSpawnTool(opts?: {
|
||||
lane: AGENT_LANE_SUBAGENT,
|
||||
extraSystemPrompt: childSystemPrompt,
|
||||
thinking: thinkingOverride,
|
||||
timeout: runTimeoutSeconds > 0 ? runTimeoutSeconds : undefined,
|
||||
timeout: runTimeoutSeconds,
|
||||
label: label || undefined,
|
||||
spawnedBy: spawnedByKey,
|
||||
groupId: opts?.agentGroupId ?? undefined,
|
||||
@@ -292,6 +317,7 @@ export function createSessionsSpawnTool(opts?: {
|
||||
task,
|
||||
cleanup,
|
||||
label: label || undefined,
|
||||
model: resolvedModel,
|
||||
runTimeoutSeconds,
|
||||
});
|
||||
|
||||
|
||||
840
src/agents/tools/subagents-tool.ts
Normal file
840
src/agents/tools/subagents-tool.ts
Normal file
@@ -0,0 +1,840 @@
|
||||
import { Type } from "@sinclair/typebox";
|
||||
import crypto from "node:crypto";
|
||||
import type { SessionEntry } from "../../config/sessions.js";
|
||||
import type { AnyAgentTool } from "./common.js";
|
||||
import { clearSessionQueues } from "../../auto-reply/reply/queue.js";
|
||||
import { loadConfig } from "../../config/config.js";
|
||||
import { loadSessionStore, resolveStorePath, updateSessionStore } from "../../config/sessions.js";
|
||||
import { callGateway } from "../../gateway/call.js";
|
||||
import { logVerbose } from "../../globals.js";
|
||||
import {
|
||||
isSubagentSessionKey,
|
||||
parseAgentSessionKey,
|
||||
type ParsedAgentSessionKey,
|
||||
} from "../../routing/session-key.js";
|
||||
import { INTERNAL_MESSAGE_CHANNEL } from "../../utils/message-channel.js";
|
||||
import { AGENT_LANE_SUBAGENT } from "../lanes.js";
|
||||
import { abortEmbeddedPiRun } from "../pi-embedded.js";
|
||||
import { optionalStringEnum } from "../schema/typebox.js";
|
||||
import { getSubagentDepthFromSessionStore } from "../subagent-depth.js";
|
||||
import {
|
||||
clearSubagentRunSteerRestart,
|
||||
listSubagentRunsForRequester,
|
||||
markSubagentRunTerminated,
|
||||
markSubagentRunForSteerRestart,
|
||||
replaceSubagentRunAfterSteer,
|
||||
type SubagentRunRecord,
|
||||
} from "../subagent-registry.js";
|
||||
import { jsonResult, readNumberParam, readStringParam } from "./common.js";
|
||||
import { resolveInternalSessionKey, resolveMainSessionAlias } from "./sessions-helpers.js";
|
||||
|
||||
const SUBAGENT_ACTIONS = ["list", "kill", "steer"] as const;
|
||||
type SubagentAction = (typeof SUBAGENT_ACTIONS)[number];
|
||||
|
||||
const DEFAULT_RECENT_MINUTES = 30;
|
||||
const MAX_RECENT_MINUTES = 24 * 60;
|
||||
const MAX_STEER_MESSAGE_CHARS = 4_000;
|
||||
const STEER_RATE_LIMIT_MS = 2_000;
|
||||
const STEER_ABORT_SETTLE_TIMEOUT_MS = 5_000;
|
||||
|
||||
const steerRateLimit = new Map<string, number>();
|
||||
|
||||
const SubagentsToolSchema = Type.Object({
|
||||
action: optionalStringEnum(SUBAGENT_ACTIONS),
|
||||
target: Type.Optional(Type.String()),
|
||||
message: Type.Optional(Type.String()),
|
||||
recentMinutes: Type.Optional(Type.Number({ minimum: 1 })),
|
||||
});
|
||||
|
||||
type SessionEntryResolution = {
|
||||
storePath: string;
|
||||
entry: SessionEntry | undefined;
|
||||
};
|
||||
|
||||
type ResolvedRequesterKey = {
|
||||
requesterSessionKey: string;
|
||||
callerSessionKey: string;
|
||||
callerIsSubagent: boolean;
|
||||
};
|
||||
|
||||
type TargetResolution = {
|
||||
entry?: SubagentRunRecord;
|
||||
error?: string;
|
||||
};
|
||||
|
||||
function formatDurationCompact(valueMs?: number) {
|
||||
if (!valueMs || !Number.isFinite(valueMs) || valueMs <= 0) {
|
||||
return "n/a";
|
||||
}
|
||||
const minutes = Math.max(1, Math.round(valueMs / 60_000));
|
||||
if (minutes < 60) {
|
||||
return `${minutes}m`;
|
||||
}
|
||||
const hours = Math.floor(minutes / 60);
|
||||
const minutesRemainder = minutes % 60;
|
||||
if (hours < 24) {
|
||||
return minutesRemainder > 0 ? `${hours}h${minutesRemainder}m` : `${hours}h`;
|
||||
}
|
||||
const days = Math.floor(hours / 24);
|
||||
const hoursRemainder = hours % 24;
|
||||
return hoursRemainder > 0 ? `${days}d${hoursRemainder}h` : `${days}d`;
|
||||
}
|
||||
|
||||
function formatTokenShort(value?: number) {
|
||||
if (!value || !Number.isFinite(value) || value <= 0) {
|
||||
return undefined;
|
||||
}
|
||||
const n = Math.floor(value);
|
||||
if (n < 1_000) {
|
||||
return `${n}`;
|
||||
}
|
||||
if (n < 10_000) {
|
||||
return `${(n / 1_000).toFixed(1).replace(/\.0$/, "")}k`;
|
||||
}
|
||||
if (n < 1_000_000) {
|
||||
return `${Math.round(n / 1_000)}k`;
|
||||
}
|
||||
return `${(n / 1_000_000).toFixed(1).replace(/\.0$/, "")}m`;
|
||||
}
|
||||
|
||||
function truncate(text: string, maxLength: number) {
|
||||
if (text.length <= maxLength) {
|
||||
return text;
|
||||
}
|
||||
return `${text.slice(0, maxLength).trimEnd()}...`;
|
||||
}
|
||||
|
||||
function resolveRunLabel(entry: SubagentRunRecord, fallback = "subagent") {
|
||||
const raw = entry.label?.trim() || entry.task?.trim() || "";
|
||||
return raw || fallback;
|
||||
}
|
||||
|
||||
function resolveRunStatus(entry: SubagentRunRecord) {
|
||||
if (!entry.endedAt) {
|
||||
return "running";
|
||||
}
|
||||
const status = entry.outcome?.status ?? "done";
|
||||
if (status === "ok") {
|
||||
return "done";
|
||||
}
|
||||
if (status === "error") {
|
||||
return "failed";
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
function sortRuns(runs: SubagentRunRecord[]) {
|
||||
return [...runs].toSorted((a, b) => {
|
||||
const aTime = a.startedAt ?? a.createdAt ?? 0;
|
||||
const bTime = b.startedAt ?? b.createdAt ?? 0;
|
||||
return bTime - aTime;
|
||||
});
|
||||
}
|
||||
|
||||
function resolveModelRef(entry?: SessionEntry) {
|
||||
const model = typeof entry?.model === "string" ? entry.model.trim() : "";
|
||||
const provider = typeof entry?.modelProvider === "string" ? entry.modelProvider.trim() : "";
|
||||
if (model.includes("/")) {
|
||||
return model;
|
||||
}
|
||||
if (model && provider) {
|
||||
return `${provider}/${model}`;
|
||||
}
|
||||
if (model) {
|
||||
return model;
|
||||
}
|
||||
if (provider) {
|
||||
return provider;
|
||||
}
|
||||
// Fall back to override fields which are populated at spawn time,
|
||||
// before the first run completes and writes model/modelProvider.
|
||||
const overrideModel = typeof entry?.modelOverride === "string" ? entry.modelOverride.trim() : "";
|
||||
const overrideProvider =
|
||||
typeof entry?.providerOverride === "string" ? entry.providerOverride.trim() : "";
|
||||
if (overrideModel.includes("/")) {
|
||||
return overrideModel;
|
||||
}
|
||||
if (overrideModel && overrideProvider) {
|
||||
return `${overrideProvider}/${overrideModel}`;
|
||||
}
|
||||
if (overrideModel) {
|
||||
return overrideModel;
|
||||
}
|
||||
return overrideProvider || undefined;
|
||||
}
|
||||
|
||||
function resolveModelDisplay(entry?: SessionEntry, fallbackModel?: string) {
|
||||
const modelRef = resolveModelRef(entry) || fallbackModel || undefined;
|
||||
if (!modelRef) {
|
||||
return "model n/a";
|
||||
}
|
||||
const slash = modelRef.lastIndexOf("/");
|
||||
if (slash >= 0 && slash < modelRef.length - 1) {
|
||||
return modelRef.slice(slash + 1);
|
||||
}
|
||||
return modelRef;
|
||||
}
|
||||
|
||||
function resolveTotalTokens(entry?: SessionEntry) {
|
||||
if (!entry) {
|
||||
return undefined;
|
||||
}
|
||||
if (typeof entry.totalTokens === "number" && Number.isFinite(entry.totalTokens)) {
|
||||
return entry.totalTokens;
|
||||
}
|
||||
const input = typeof entry.inputTokens === "number" ? entry.inputTokens : 0;
|
||||
const output = typeof entry.outputTokens === "number" ? entry.outputTokens : 0;
|
||||
const total = input + output;
|
||||
return total > 0 ? total : undefined;
|
||||
}
|
||||
|
||||
function resolveIoTokens(entry?: SessionEntry) {
|
||||
if (!entry) {
|
||||
return undefined;
|
||||
}
|
||||
const input =
|
||||
typeof entry.inputTokens === "number" && Number.isFinite(entry.inputTokens)
|
||||
? entry.inputTokens
|
||||
: 0;
|
||||
const output =
|
||||
typeof entry.outputTokens === "number" && Number.isFinite(entry.outputTokens)
|
||||
? entry.outputTokens
|
||||
: 0;
|
||||
const total = input + output;
|
||||
if (total <= 0) {
|
||||
return undefined;
|
||||
}
|
||||
return { input, output, total };
|
||||
}
|
||||
|
||||
function resolveUsageDisplay(entry?: SessionEntry) {
|
||||
const io = resolveIoTokens(entry);
|
||||
const promptCache = resolveTotalTokens(entry);
|
||||
const parts: string[] = [];
|
||||
if (io) {
|
||||
const input = formatTokenShort(io.input) ?? "0";
|
||||
const output = formatTokenShort(io.output) ?? "0";
|
||||
parts.push(`tokens ${formatTokenShort(io.total)} (in ${input} / out ${output})`);
|
||||
} else if (typeof promptCache === "number" && promptCache > 0) {
|
||||
parts.push(`tokens ${formatTokenShort(promptCache)} prompt/cache`);
|
||||
}
|
||||
if (typeof promptCache === "number" && io && promptCache > io.total) {
|
||||
parts.push(`prompt/cache ${formatTokenShort(promptCache)}`);
|
||||
}
|
||||
return parts.join(", ");
|
||||
}
|
||||
|
||||
function resolveSubagentTarget(
|
||||
runs: SubagentRunRecord[],
|
||||
token: string | undefined,
|
||||
options?: { recentMinutes?: number },
|
||||
): TargetResolution {
|
||||
const trimmed = token?.trim();
|
||||
if (!trimmed) {
|
||||
return { error: "Missing subagent target." };
|
||||
}
|
||||
const sorted = sortRuns(runs);
|
||||
const recentMinutes = options?.recentMinutes ?? DEFAULT_RECENT_MINUTES;
|
||||
const recentCutoff = Date.now() - recentMinutes * 60_000;
|
||||
const numericOrder = [
|
||||
...sorted.filter((entry) => !entry.endedAt),
|
||||
...sorted.filter((entry) => !!entry.endedAt && (entry.endedAt ?? 0) >= recentCutoff),
|
||||
];
|
||||
if (trimmed === "last") {
|
||||
return { entry: sorted[0] };
|
||||
}
|
||||
if (/^\d+$/.test(trimmed)) {
|
||||
const idx = Number.parseInt(trimmed, 10);
|
||||
if (!Number.isFinite(idx) || idx <= 0 || idx > numericOrder.length) {
|
||||
return { error: `Invalid subagent index: ${trimmed}` };
|
||||
}
|
||||
return { entry: numericOrder[idx - 1] };
|
||||
}
|
||||
if (trimmed.includes(":")) {
|
||||
const bySessionKey = sorted.find((entry) => entry.childSessionKey === trimmed);
|
||||
return bySessionKey
|
||||
? { entry: bySessionKey }
|
||||
: { error: `Unknown subagent session: ${trimmed}` };
|
||||
}
|
||||
const lowered = trimmed.toLowerCase();
|
||||
const byExactLabel = sorted.filter((entry) => resolveRunLabel(entry).toLowerCase() === lowered);
|
||||
if (byExactLabel.length === 1) {
|
||||
return { entry: byExactLabel[0] };
|
||||
}
|
||||
if (byExactLabel.length > 1) {
|
||||
return { error: `Ambiguous subagent label: ${trimmed}` };
|
||||
}
|
||||
const byLabelPrefix = sorted.filter((entry) =>
|
||||
resolveRunLabel(entry).toLowerCase().startsWith(lowered),
|
||||
);
|
||||
if (byLabelPrefix.length === 1) {
|
||||
return { entry: byLabelPrefix[0] };
|
||||
}
|
||||
if (byLabelPrefix.length > 1) {
|
||||
return { error: `Ambiguous subagent label prefix: ${trimmed}` };
|
||||
}
|
||||
const byRunIdPrefix = sorted.filter((entry) => entry.runId.startsWith(trimmed));
|
||||
if (byRunIdPrefix.length === 1) {
|
||||
return { entry: byRunIdPrefix[0] };
|
||||
}
|
||||
if (byRunIdPrefix.length > 1) {
|
||||
return { error: `Ambiguous subagent run id prefix: ${trimmed}` };
|
||||
}
|
||||
return { error: `Unknown subagent target: ${trimmed}` };
|
||||
}
|
||||
|
||||
function resolveStorePathForKey(
|
||||
cfg: ReturnType<typeof loadConfig>,
|
||||
key: string,
|
||||
parsed?: ParsedAgentSessionKey | null,
|
||||
) {
|
||||
return resolveStorePath(cfg.session?.store, {
|
||||
agentId: parsed?.agentId,
|
||||
});
|
||||
}
|
||||
|
||||
function resolveSessionEntryForKey(params: {
|
||||
cfg: ReturnType<typeof loadConfig>;
|
||||
key: string;
|
||||
cache: Map<string, Record<string, SessionEntry>>;
|
||||
}): SessionEntryResolution {
|
||||
const parsed = parseAgentSessionKey(params.key);
|
||||
const storePath = resolveStorePathForKey(params.cfg, params.key, parsed);
|
||||
let store = params.cache.get(storePath);
|
||||
if (!store) {
|
||||
store = loadSessionStore(storePath);
|
||||
params.cache.set(storePath, store);
|
||||
}
|
||||
return {
|
||||
storePath,
|
||||
entry: store[params.key],
|
||||
};
|
||||
}
|
||||
|
||||
function resolveRequesterKey(params: {
|
||||
cfg: ReturnType<typeof loadConfig>;
|
||||
agentSessionKey?: string;
|
||||
}): ResolvedRequesterKey {
|
||||
const { mainKey, alias } = resolveMainSessionAlias(params.cfg);
|
||||
const callerRaw = params.agentSessionKey?.trim() || alias;
|
||||
const callerSessionKey = resolveInternalSessionKey({
|
||||
key: callerRaw,
|
||||
alias,
|
||||
mainKey,
|
||||
});
|
||||
if (!isSubagentSessionKey(callerSessionKey)) {
|
||||
return {
|
||||
requesterSessionKey: callerSessionKey,
|
||||
callerSessionKey,
|
||||
callerIsSubagent: false,
|
||||
};
|
||||
}
|
||||
|
||||
// Check if this sub-agent can spawn children (orchestrator).
|
||||
// If so, it should see its own children, not its parent's children.
|
||||
const callerDepth = getSubagentDepthFromSessionStore(callerSessionKey, { cfg: params.cfg });
|
||||
const maxSpawnDepth = params.cfg.agents?.defaults?.subagents?.maxSpawnDepth ?? 1;
|
||||
if (callerDepth < maxSpawnDepth) {
|
||||
// Orchestrator sub-agent: use its own session key as requester
|
||||
// so it sees children it spawned.
|
||||
return {
|
||||
requesterSessionKey: callerSessionKey,
|
||||
callerSessionKey,
|
||||
callerIsSubagent: true,
|
||||
};
|
||||
}
|
||||
|
||||
// Leaf sub-agent: walk up to its parent so it can see sibling runs.
|
||||
const cache = new Map<string, Record<string, SessionEntry>>();
|
||||
const callerEntry = resolveSessionEntryForKey({
|
||||
cfg: params.cfg,
|
||||
key: callerSessionKey,
|
||||
cache,
|
||||
}).entry;
|
||||
const spawnedBy = typeof callerEntry?.spawnedBy === "string" ? callerEntry.spawnedBy.trim() : "";
|
||||
return {
|
||||
requesterSessionKey: spawnedBy || callerSessionKey,
|
||||
callerSessionKey,
|
||||
callerIsSubagent: true,
|
||||
};
|
||||
}
|
||||
|
||||
async function killSubagentRun(params: {
|
||||
cfg: ReturnType<typeof loadConfig>;
|
||||
entry: SubagentRunRecord;
|
||||
cache: Map<string, Record<string, SessionEntry>>;
|
||||
}): Promise<{ killed: boolean; sessionId?: string }> {
|
||||
if (params.entry.endedAt) {
|
||||
return { killed: false };
|
||||
}
|
||||
const childSessionKey = params.entry.childSessionKey;
|
||||
const resolved = resolveSessionEntryForKey({
|
||||
cfg: params.cfg,
|
||||
key: childSessionKey,
|
||||
cache: params.cache,
|
||||
});
|
||||
const sessionId = resolved.entry?.sessionId;
|
||||
const aborted = sessionId ? abortEmbeddedPiRun(sessionId) : false;
|
||||
const cleared = clearSessionQueues([childSessionKey, sessionId]);
|
||||
if (cleared.followupCleared > 0 || cleared.laneCleared > 0) {
|
||||
logVerbose(
|
||||
`subagents tool kill: cleared followups=${cleared.followupCleared} lane=${cleared.laneCleared} keys=${cleared.keys.join(",")}`,
|
||||
);
|
||||
}
|
||||
if (resolved.entry) {
|
||||
await updateSessionStore(resolved.storePath, (store) => {
|
||||
const current = store[childSessionKey];
|
||||
if (!current) {
|
||||
return;
|
||||
}
|
||||
current.abortedLastRun = true;
|
||||
current.updatedAt = Date.now();
|
||||
store[childSessionKey] = current;
|
||||
});
|
||||
}
|
||||
const marked = markSubagentRunTerminated({
|
||||
runId: params.entry.runId,
|
||||
childSessionKey,
|
||||
reason: "killed",
|
||||
});
|
||||
const killed = marked > 0 || aborted || cleared.followupCleared > 0 || cleared.laneCleared > 0;
|
||||
return { killed, sessionId };
|
||||
}
|
||||
|
||||
/**
|
||||
* Recursively kill all descendant subagent runs spawned by a given parent session key.
|
||||
* This ensures that when a subagent is killed, all of its children (and their children) are also killed.
|
||||
*/
|
||||
async function cascadeKillChildren(params: {
|
||||
cfg: ReturnType<typeof loadConfig>;
|
||||
parentChildSessionKey: string;
|
||||
cache: Map<string, Record<string, SessionEntry>>;
|
||||
seenChildSessionKeys?: Set<string>;
|
||||
}): Promise<{ killed: number; labels: string[] }> {
|
||||
const childRuns = listSubagentRunsForRequester(params.parentChildSessionKey);
|
||||
const seenChildSessionKeys = params.seenChildSessionKeys ?? new Set<string>();
|
||||
let killed = 0;
|
||||
const labels: string[] = [];
|
||||
|
||||
for (const run of childRuns) {
|
||||
const childKey = run.childSessionKey?.trim();
|
||||
if (!childKey || seenChildSessionKeys.has(childKey)) {
|
||||
continue;
|
||||
}
|
||||
seenChildSessionKeys.add(childKey);
|
||||
|
||||
if (!run.endedAt) {
|
||||
const stopResult = await killSubagentRun({
|
||||
cfg: params.cfg,
|
||||
entry: run,
|
||||
cache: params.cache,
|
||||
});
|
||||
if (stopResult.killed) {
|
||||
killed += 1;
|
||||
labels.push(resolveRunLabel(run));
|
||||
}
|
||||
}
|
||||
|
||||
// Recurse for grandchildren even if this parent already ended.
|
||||
const cascade = await cascadeKillChildren({
|
||||
cfg: params.cfg,
|
||||
parentChildSessionKey: childKey,
|
||||
cache: params.cache,
|
||||
seenChildSessionKeys,
|
||||
});
|
||||
killed += cascade.killed;
|
||||
labels.push(...cascade.labels);
|
||||
}
|
||||
|
||||
return { killed, labels };
|
||||
}
|
||||
|
||||
function buildListText(params: {
|
||||
active: Array<{ line: string }>;
|
||||
recent: Array<{ line: string }>;
|
||||
recentMinutes: number;
|
||||
}) {
|
||||
const lines: string[] = [];
|
||||
lines.push("active subagents:");
|
||||
if (params.active.length === 0) {
|
||||
lines.push("(none)");
|
||||
} else {
|
||||
lines.push(...params.active.map((entry) => entry.line));
|
||||
}
|
||||
lines.push("");
|
||||
lines.push(`recent (last ${params.recentMinutes}m):`);
|
||||
if (params.recent.length === 0) {
|
||||
lines.push("(none)");
|
||||
} else {
|
||||
lines.push(...params.recent.map((entry) => entry.line));
|
||||
}
|
||||
return lines.join("\n");
|
||||
}
|
||||
|
||||
export function createSubagentsTool(opts?: { agentSessionKey?: string }): AnyAgentTool {
|
||||
return {
|
||||
label: "Subagents",
|
||||
name: "subagents",
|
||||
description:
|
||||
"List, kill, or steer spawned sub-agents for this requester session. Use this for sub-agent orchestration.",
|
||||
parameters: SubagentsToolSchema,
|
||||
execute: async (_toolCallId, args) => {
|
||||
const params = args as Record<string, unknown>;
|
||||
const action = (readStringParam(params, "action") ?? "list") as SubagentAction;
|
||||
const cfg = loadConfig();
|
||||
const requester = resolveRequesterKey({
|
||||
cfg,
|
||||
agentSessionKey: opts?.agentSessionKey,
|
||||
});
|
||||
const runs = sortRuns(listSubagentRunsForRequester(requester.requesterSessionKey));
|
||||
const recentMinutesRaw = readNumberParam(params, "recentMinutes");
|
||||
const recentMinutes = recentMinutesRaw
|
||||
? Math.max(1, Math.min(MAX_RECENT_MINUTES, Math.floor(recentMinutesRaw)))
|
||||
: DEFAULT_RECENT_MINUTES;
|
||||
|
||||
if (action === "list") {
|
||||
const now = Date.now();
|
||||
const recentCutoff = now - recentMinutes * 60_000;
|
||||
const cache = new Map<string, Record<string, SessionEntry>>();
|
||||
|
||||
let index = 1;
|
||||
const active = runs
|
||||
.filter((entry) => !entry.endedAt)
|
||||
.map((entry) => {
|
||||
const sessionEntry = resolveSessionEntryForKey({
|
||||
cfg,
|
||||
key: entry.childSessionKey,
|
||||
cache,
|
||||
}).entry;
|
||||
const totalTokens = resolveTotalTokens(sessionEntry);
|
||||
const usageText = resolveUsageDisplay(sessionEntry);
|
||||
const status = resolveRunStatus(entry);
|
||||
const runtime = formatDurationCompact(now - (entry.startedAt ?? entry.createdAt));
|
||||
const label = truncate(resolveRunLabel(entry), 48);
|
||||
const task = truncate(entry.task.trim(), 72);
|
||||
const line = `${index}. ${label} (${resolveModelDisplay(sessionEntry, entry.model)}, ${runtime}${usageText ? `, ${usageText}` : ""}) ${status}${task.toLowerCase() !== label.toLowerCase() ? ` - ${task}` : ""}`;
|
||||
const view = {
|
||||
index,
|
||||
runId: entry.runId,
|
||||
sessionKey: entry.childSessionKey,
|
||||
label,
|
||||
task,
|
||||
status,
|
||||
runtime,
|
||||
runtimeMs: now - (entry.startedAt ?? entry.createdAt),
|
||||
model: resolveModelRef(sessionEntry) || entry.model,
|
||||
totalTokens,
|
||||
startedAt: entry.startedAt,
|
||||
};
|
||||
index += 1;
|
||||
return { line, view };
|
||||
});
|
||||
const recent = runs
|
||||
.filter((entry) => !!entry.endedAt && (entry.endedAt ?? 0) >= recentCutoff)
|
||||
.map((entry) => {
|
||||
const sessionEntry = resolveSessionEntryForKey({
|
||||
cfg,
|
||||
key: entry.childSessionKey,
|
||||
cache,
|
||||
}).entry;
|
||||
const totalTokens = resolveTotalTokens(sessionEntry);
|
||||
const usageText = resolveUsageDisplay(sessionEntry);
|
||||
const status = resolveRunStatus(entry);
|
||||
const runtime = formatDurationCompact(
|
||||
(entry.endedAt ?? now) - (entry.startedAt ?? entry.createdAt),
|
||||
);
|
||||
const label = truncate(resolveRunLabel(entry), 48);
|
||||
const task = truncate(entry.task.trim(), 72);
|
||||
const line = `${index}. ${label} (${resolveModelDisplay(sessionEntry, entry.model)}, ${runtime}${usageText ? `, ${usageText}` : ""}) ${status}${task.toLowerCase() !== label.toLowerCase() ? ` - ${task}` : ""}`;
|
||||
const view = {
|
||||
index,
|
||||
runId: entry.runId,
|
||||
sessionKey: entry.childSessionKey,
|
||||
label,
|
||||
task,
|
||||
status,
|
||||
runtime,
|
||||
runtimeMs: (entry.endedAt ?? now) - (entry.startedAt ?? entry.createdAt),
|
||||
model: resolveModelRef(sessionEntry) || entry.model,
|
||||
totalTokens,
|
||||
startedAt: entry.startedAt,
|
||||
endedAt: entry.endedAt,
|
||||
};
|
||||
index += 1;
|
||||
return { line, view };
|
||||
});
|
||||
|
||||
const text = buildListText({ active, recent, recentMinutes });
|
||||
return jsonResult({
|
||||
status: "ok",
|
||||
action: "list",
|
||||
requesterSessionKey: requester.requesterSessionKey,
|
||||
callerSessionKey: requester.callerSessionKey,
|
||||
callerIsSubagent: requester.callerIsSubagent,
|
||||
total: runs.length,
|
||||
active: active.map((entry) => entry.view),
|
||||
recent: recent.map((entry) => entry.view),
|
||||
text,
|
||||
});
|
||||
}
|
||||
|
||||
if (action === "kill") {
|
||||
const target = readStringParam(params, "target", { required: true });
|
||||
if (target === "all" || target === "*") {
|
||||
const cache = new Map<string, Record<string, SessionEntry>>();
|
||||
const seenChildSessionKeys = new Set<string>();
|
||||
const killedLabels: string[] = [];
|
||||
let killed = 0;
|
||||
for (const entry of runs) {
|
||||
const childKey = entry.childSessionKey?.trim();
|
||||
if (!childKey || seenChildSessionKeys.has(childKey)) {
|
||||
continue;
|
||||
}
|
||||
seenChildSessionKeys.add(childKey);
|
||||
|
||||
if (!entry.endedAt) {
|
||||
const stopResult = await killSubagentRun({ cfg, entry, cache });
|
||||
if (stopResult.killed) {
|
||||
killed += 1;
|
||||
killedLabels.push(resolveRunLabel(entry));
|
||||
}
|
||||
}
|
||||
|
||||
// Traverse descendants even when the direct run is already finished.
|
||||
const cascade = await cascadeKillChildren({
|
||||
cfg,
|
||||
parentChildSessionKey: childKey,
|
||||
cache,
|
||||
seenChildSessionKeys,
|
||||
});
|
||||
killed += cascade.killed;
|
||||
killedLabels.push(...cascade.labels);
|
||||
}
|
||||
return jsonResult({
|
||||
status: "ok",
|
||||
action: "kill",
|
||||
target: "all",
|
||||
killed,
|
||||
labels: killedLabels,
|
||||
text:
|
||||
killed > 0
|
||||
? `killed ${killed} subagent${killed === 1 ? "" : "s"}.`
|
||||
: "no running subagents to kill.",
|
||||
});
|
||||
}
|
||||
const resolved = resolveSubagentTarget(runs, target, { recentMinutes });
|
||||
if (!resolved.entry) {
|
||||
return jsonResult({
|
||||
status: "error",
|
||||
action: "kill",
|
||||
target,
|
||||
error: resolved.error ?? "Unknown subagent target.",
|
||||
});
|
||||
}
|
||||
const killCache = new Map<string, Record<string, SessionEntry>>();
|
||||
const stopResult = await killSubagentRun({
|
||||
cfg,
|
||||
entry: resolved.entry,
|
||||
cache: killCache,
|
||||
});
|
||||
const seenChildSessionKeys = new Set<string>();
|
||||
const targetChildKey = resolved.entry.childSessionKey?.trim();
|
||||
if (targetChildKey) {
|
||||
seenChildSessionKeys.add(targetChildKey);
|
||||
}
|
||||
// Traverse descendants even when the selected run is already finished.
|
||||
const cascade = await cascadeKillChildren({
|
||||
cfg,
|
||||
parentChildSessionKey: resolved.entry.childSessionKey,
|
||||
cache: killCache,
|
||||
seenChildSessionKeys,
|
||||
});
|
||||
if (!stopResult.killed && cascade.killed === 0) {
|
||||
return jsonResult({
|
||||
status: "done",
|
||||
action: "kill",
|
||||
target,
|
||||
runId: resolved.entry.runId,
|
||||
sessionKey: resolved.entry.childSessionKey,
|
||||
text: `${resolveRunLabel(resolved.entry)} is already finished.`,
|
||||
});
|
||||
}
|
||||
const cascadeText =
|
||||
cascade.killed > 0
|
||||
? ` (+ ${cascade.killed} descendant${cascade.killed === 1 ? "" : "s"})`
|
||||
: "";
|
||||
return jsonResult({
|
||||
status: "ok",
|
||||
action: "kill",
|
||||
target,
|
||||
runId: resolved.entry.runId,
|
||||
sessionKey: resolved.entry.childSessionKey,
|
||||
label: resolveRunLabel(resolved.entry),
|
||||
cascadeKilled: cascade.killed,
|
||||
cascadeLabels: cascade.killed > 0 ? cascade.labels : undefined,
|
||||
text: stopResult.killed
|
||||
? `killed ${resolveRunLabel(resolved.entry)}${cascadeText}.`
|
||||
: `killed ${cascade.killed} descendant${cascade.killed === 1 ? "" : "s"} of ${resolveRunLabel(resolved.entry)}.`,
|
||||
});
|
||||
}
|
||||
if (action === "steer") {
|
||||
const target = readStringParam(params, "target", { required: true });
|
||||
const message = readStringParam(params, "message", { required: true });
|
||||
if (message.length > MAX_STEER_MESSAGE_CHARS) {
|
||||
return jsonResult({
|
||||
status: "error",
|
||||
action: "steer",
|
||||
target,
|
||||
error: `Message too long (${message.length} chars, max ${MAX_STEER_MESSAGE_CHARS}).`,
|
||||
});
|
||||
}
|
||||
const resolved = resolveSubagentTarget(runs, target, { recentMinutes });
|
||||
if (!resolved.entry) {
|
||||
return jsonResult({
|
||||
status: "error",
|
||||
action: "steer",
|
||||
target,
|
||||
error: resolved.error ?? "Unknown subagent target.",
|
||||
});
|
||||
}
|
||||
if (resolved.entry.endedAt) {
|
||||
return jsonResult({
|
||||
status: "done",
|
||||
action: "steer",
|
||||
target,
|
||||
runId: resolved.entry.runId,
|
||||
sessionKey: resolved.entry.childSessionKey,
|
||||
text: `${resolveRunLabel(resolved.entry)} is already finished.`,
|
||||
});
|
||||
}
|
||||
if (
|
||||
requester.callerIsSubagent &&
|
||||
requester.callerSessionKey === resolved.entry.childSessionKey
|
||||
) {
|
||||
return jsonResult({
|
||||
status: "forbidden",
|
||||
action: "steer",
|
||||
target,
|
||||
runId: resolved.entry.runId,
|
||||
sessionKey: resolved.entry.childSessionKey,
|
||||
error: "Subagents cannot steer themselves.",
|
||||
});
|
||||
}
|
||||
|
||||
const rateKey = `${requester.callerSessionKey}:${resolved.entry.childSessionKey}`;
|
||||
const now = Date.now();
|
||||
const lastSentAt = steerRateLimit.get(rateKey) ?? 0;
|
||||
if (now - lastSentAt < STEER_RATE_LIMIT_MS) {
|
||||
return jsonResult({
|
||||
status: "rate_limited",
|
||||
action: "steer",
|
||||
target,
|
||||
runId: resolved.entry.runId,
|
||||
sessionKey: resolved.entry.childSessionKey,
|
||||
error: "Steer rate limit exceeded. Wait a moment before sending another steer.",
|
||||
});
|
||||
}
|
||||
steerRateLimit.set(rateKey, now);
|
||||
|
||||
// Suppress announce for the interrupted run before aborting so we don't
|
||||
// emit stale pre-steer findings if the run exits immediately.
|
||||
markSubagentRunForSteerRestart(resolved.entry.runId);
|
||||
|
||||
const targetSession = resolveSessionEntryForKey({
|
||||
cfg,
|
||||
key: resolved.entry.childSessionKey,
|
||||
cache: new Map<string, Record<string, SessionEntry>>(),
|
||||
});
|
||||
const sessionId =
|
||||
typeof targetSession.entry?.sessionId === "string" && targetSession.entry.sessionId.trim()
|
||||
? targetSession.entry.sessionId.trim()
|
||||
: undefined;
|
||||
|
||||
// Interrupt current work first so steer takes precedence immediately.
|
||||
if (sessionId) {
|
||||
abortEmbeddedPiRun(sessionId);
|
||||
}
|
||||
const cleared = clearSessionQueues([resolved.entry.childSessionKey, sessionId]);
|
||||
if (cleared.followupCleared > 0 || cleared.laneCleared > 0) {
|
||||
logVerbose(
|
||||
`subagents tool steer: cleared followups=${cleared.followupCleared} lane=${cleared.laneCleared} keys=${cleared.keys.join(",")}`,
|
||||
);
|
||||
}
|
||||
|
||||
// Best effort: wait for the interrupted run to settle so the steer
|
||||
// message appends onto the existing conversation context.
|
||||
try {
|
||||
await callGateway({
|
||||
method: "agent.wait",
|
||||
params: {
|
||||
runId: resolved.entry.runId,
|
||||
timeoutMs: STEER_ABORT_SETTLE_TIMEOUT_MS,
|
||||
},
|
||||
timeoutMs: STEER_ABORT_SETTLE_TIMEOUT_MS + 2_000,
|
||||
});
|
||||
} catch {
|
||||
// Continue even if wait fails; steer should still be attempted.
|
||||
}
|
||||
|
||||
const idempotencyKey = crypto.randomUUID();
|
||||
let runId: string = idempotencyKey;
|
||||
try {
|
||||
const response = await callGateway<{ runId: string }>({
|
||||
method: "agent",
|
||||
params: {
|
||||
message,
|
||||
sessionKey: resolved.entry.childSessionKey,
|
||||
sessionId,
|
||||
idempotencyKey,
|
||||
deliver: false,
|
||||
channel: INTERNAL_MESSAGE_CHANNEL,
|
||||
lane: AGENT_LANE_SUBAGENT,
|
||||
timeout: 0,
|
||||
},
|
||||
timeoutMs: 10_000,
|
||||
});
|
||||
if (typeof response?.runId === "string" && response.runId) {
|
||||
runId = response.runId;
|
||||
}
|
||||
} catch (err) {
|
||||
// Replacement launch failed; restore normal announce behavior for the
|
||||
// original run so completion is not silently suppressed.
|
||||
clearSubagentRunSteerRestart(resolved.entry.runId);
|
||||
const error = err instanceof Error ? err.message : String(err);
|
||||
return jsonResult({
|
||||
status: "error",
|
||||
action: "steer",
|
||||
target,
|
||||
runId,
|
||||
sessionKey: resolved.entry.childSessionKey,
|
||||
sessionId,
|
||||
error,
|
||||
});
|
||||
}
|
||||
|
||||
replaceSubagentRunAfterSteer({
|
||||
previousRunId: resolved.entry.runId,
|
||||
nextRunId: runId,
|
||||
fallback: resolved.entry,
|
||||
runTimeoutSeconds: resolved.entry.runTimeoutSeconds ?? 0,
|
||||
});
|
||||
|
||||
return jsonResult({
|
||||
status: "accepted",
|
||||
action: "steer",
|
||||
target,
|
||||
runId,
|
||||
sessionKey: resolved.entry.childSessionKey,
|
||||
sessionId,
|
||||
mode: "restart",
|
||||
label: resolveRunLabel(resolved.entry),
|
||||
text: `steered ${resolveRunLabel(resolved.entry)}.`,
|
||||
});
|
||||
}
|
||||
return jsonResult({
|
||||
status: "error",
|
||||
error: "Unsupported action.",
|
||||
});
|
||||
},
|
||||
};
|
||||
}
|
||||
Reference in New Issue
Block a user