Revert "Add mesh orchestration gateway methods with DAG execution and retry"

This reverts commit 83990ed542.
This commit is contained in:
Peter Steinberger
2026-02-18 02:10:37 +01:00
parent 01672a8f25
commit 972d1b74d0
7 changed files with 1 additions and 837 deletions

View File

@@ -128,16 +128,6 @@ import {
LogsTailParamsSchema, LogsTailParamsSchema,
type LogsTailResult, type LogsTailResult,
LogsTailResultSchema, LogsTailResultSchema,
type MeshPlanParams,
MeshPlanParamsSchema,
type MeshRetryParams,
MeshRetryParamsSchema,
type MeshRunParams,
MeshRunParamsSchema,
type MeshStatusParams,
MeshStatusParamsSchema,
type MeshWorkflowPlan,
MeshWorkflowPlanSchema,
type ModelsListParams, type ModelsListParams,
ModelsListParamsSchema, ModelsListParamsSchema,
type NodeDescribeParams, type NodeDescribeParams,
@@ -368,10 +358,6 @@ export const validateExecApprovalsNodeSetParams = ajv.compile<ExecApprovalsNodeS
ExecApprovalsNodeSetParamsSchema, ExecApprovalsNodeSetParamsSchema,
); );
export const validateLogsTailParams = ajv.compile<LogsTailParams>(LogsTailParamsSchema); export const validateLogsTailParams = ajv.compile<LogsTailParams>(LogsTailParamsSchema);
export const validateMeshPlanParams = ajv.compile<MeshPlanParams>(MeshPlanParamsSchema);
export const validateMeshRunParams = ajv.compile<MeshRunParams>(MeshRunParamsSchema);
export const validateMeshStatusParams = ajv.compile<MeshStatusParams>(MeshStatusParamsSchema);
export const validateMeshRetryParams = ajv.compile<MeshRetryParams>(MeshRetryParamsSchema);
export const validateChatHistoryParams = ajv.compile(ChatHistoryParamsSchema); export const validateChatHistoryParams = ajv.compile(ChatHistoryParamsSchema);
export const validateChatSendParams = ajv.compile(ChatSendParamsSchema); export const validateChatSendParams = ajv.compile(ChatSendParamsSchema);
export const validateChatAbortParams = ajv.compile<ChatAbortParams>(ChatAbortParamsSchema); export const validateChatAbortParams = ajv.compile<ChatAbortParams>(ChatAbortParamsSchema);
@@ -431,11 +417,6 @@ export {
StateVersionSchema, StateVersionSchema,
AgentEventSchema, AgentEventSchema,
ChatEventSchema, ChatEventSchema,
MeshPlanParamsSchema,
MeshWorkflowPlanSchema,
MeshRunParamsSchema,
MeshStatusParamsSchema,
MeshRetryParamsSchema,
SendParamsSchema, SendParamsSchema,
PollParamsSchema, PollParamsSchema,
AgentParamsSchema, AgentParamsSchema,
@@ -535,11 +516,6 @@ export type {
AgentIdentityResult, AgentIdentityResult,
AgentWaitParams, AgentWaitParams,
ChatEvent, ChatEvent,
MeshPlanParams,
MeshWorkflowPlan,
MeshRunParams,
MeshStatusParams,
MeshRetryParams,
TickEvent, TickEvent,
ShutdownEvent, ShutdownEvent,
WakeParams, WakeParams,

View File

@@ -8,7 +8,6 @@ export * from "./schema/exec-approvals.js";
export * from "./schema/devices.js"; export * from "./schema/devices.js";
export * from "./schema/frames.js"; export * from "./schema/frames.js";
export * from "./schema/logs-chat.js"; export * from "./schema/logs-chat.js";
export * from "./schema/mesh.js";
export * from "./schema/nodes.js"; export * from "./schema/nodes.js";
export * from "./schema/protocol-schemas.js"; export * from "./schema/protocol-schemas.js";
export * from "./schema/sessions.js"; export * from "./schema/sessions.js";

View File

@@ -1,83 +0,0 @@
import { Type, type Static } from "@sinclair/typebox";
import { NonEmptyString } from "./primitives.js";
export const MeshPlanStepSchema = Type.Object(
{
id: NonEmptyString,
name: Type.Optional(NonEmptyString),
prompt: NonEmptyString,
dependsOn: Type.Optional(Type.Array(NonEmptyString, { maxItems: 64 })),
agentId: Type.Optional(NonEmptyString),
sessionKey: Type.Optional(NonEmptyString),
thinking: Type.Optional(Type.String()),
timeoutMs: Type.Optional(Type.Integer({ minimum: 1_000, maximum: 3_600_000 })),
},
{ additionalProperties: false },
);
export const MeshWorkflowPlanSchema = Type.Object(
{
planId: NonEmptyString,
goal: NonEmptyString,
createdAt: Type.Integer({ minimum: 0 }),
steps: Type.Array(MeshPlanStepSchema, { minItems: 1, maxItems: 128 }),
},
{ additionalProperties: false },
);
export const MeshPlanParamsSchema = Type.Object(
{
goal: NonEmptyString,
steps: Type.Optional(
Type.Array(
Type.Object(
{
id: Type.Optional(NonEmptyString),
name: Type.Optional(NonEmptyString),
prompt: NonEmptyString,
dependsOn: Type.Optional(Type.Array(NonEmptyString, { maxItems: 64 })),
agentId: Type.Optional(NonEmptyString),
sessionKey: Type.Optional(NonEmptyString),
thinking: Type.Optional(Type.String()),
timeoutMs: Type.Optional(Type.Integer({ minimum: 1_000, maximum: 3_600_000 })),
},
{ additionalProperties: false },
),
{ minItems: 1, maxItems: 128 },
),
),
},
{ additionalProperties: false },
);
export const MeshRunParamsSchema = Type.Object(
{
plan: MeshWorkflowPlanSchema,
continueOnError: Type.Optional(Type.Boolean()),
maxParallel: Type.Optional(Type.Integer({ minimum: 1, maximum: 16 })),
defaultStepTimeoutMs: Type.Optional(Type.Integer({ minimum: 1_000, maximum: 3_600_000 })),
lane: Type.Optional(Type.String()),
},
{ additionalProperties: false },
);
export const MeshStatusParamsSchema = Type.Object(
{
runId: NonEmptyString,
},
{ additionalProperties: false },
);
export const MeshRetryParamsSchema = Type.Object(
{
runId: NonEmptyString,
stepIds: Type.Optional(Type.Array(NonEmptyString, { minItems: 1, maxItems: 128 })),
},
{ additionalProperties: false },
);
export type MeshPlanParams = Static<typeof MeshPlanParamsSchema>;
export type MeshWorkflowPlan = Static<typeof MeshWorkflowPlanSchema>;
export type MeshRunParams = Static<typeof MeshRunParamsSchema>;
export type MeshStatusParams = Static<typeof MeshStatusParamsSchema>;
export type MeshRetryParams = Static<typeof MeshRetryParamsSchema>;

View File

@@ -103,13 +103,6 @@ import {
LogsTailParamsSchema, LogsTailParamsSchema,
LogsTailResultSchema, LogsTailResultSchema,
} from "./logs-chat.js"; } from "./logs-chat.js";
import {
MeshPlanParamsSchema,
MeshRetryParamsSchema,
MeshRunParamsSchema,
MeshStatusParamsSchema,
MeshWorkflowPlanSchema,
} from "./mesh.js";
import { import {
NodeDescribeParamsSchema, NodeDescribeParamsSchema,
NodeEventParamsSchema, NodeEventParamsSchema,
@@ -261,11 +254,6 @@ export const ProtocolSchemas: Record<string, TSchema> = {
ChatAbortParams: ChatAbortParamsSchema, ChatAbortParams: ChatAbortParamsSchema,
ChatInjectParams: ChatInjectParamsSchema, ChatInjectParams: ChatInjectParamsSchema,
ChatEvent: ChatEventSchema, ChatEvent: ChatEventSchema,
MeshPlanParams: MeshPlanParamsSchema,
MeshWorkflowPlan: MeshWorkflowPlanSchema,
MeshRunParams: MeshRunParamsSchema,
MeshStatusParams: MeshStatusParamsSchema,
MeshRetryParams: MeshRetryParamsSchema,
UpdateRunParams: UpdateRunParamsSchema, UpdateRunParams: UpdateRunParamsSchema,
TickEvent: TickEventSchema, TickEvent: TickEventSchema,
ShutdownEvent: ShutdownEventSchema, ShutdownEvent: ShutdownEventSchema,

View File

@@ -85,10 +85,6 @@ const BASE_METHODS = [
"agent", "agent",
"agent.identity.get", "agent.identity.get",
"agent.wait", "agent.wait",
"mesh.plan",
"mesh.run",
"mesh.status",
"mesh.retry",
"browser.request", "browser.request",
// WebChat WebSocket-native chat methods // WebChat WebSocket-native chat methods
"chat.history", "chat.history",

View File

@@ -1,3 +1,4 @@
import type { GatewayRequestHandlers, GatewayRequestOptions } from "./server-methods/types.js";
import { ErrorCodes, errorShape } from "./protocol/index.js"; import { ErrorCodes, errorShape } from "./protocol/index.js";
import { agentHandlers } from "./server-methods/agent.js"; import { agentHandlers } from "./server-methods/agent.js";
import { agentsHandlers } from "./server-methods/agents.js"; import { agentsHandlers } from "./server-methods/agents.js";
@@ -11,7 +12,6 @@ import { deviceHandlers } from "./server-methods/devices.js";
import { execApprovalsHandlers } from "./server-methods/exec-approvals.js"; import { execApprovalsHandlers } from "./server-methods/exec-approvals.js";
import { healthHandlers } from "./server-methods/health.js"; import { healthHandlers } from "./server-methods/health.js";
import { logsHandlers } from "./server-methods/logs.js"; import { logsHandlers } from "./server-methods/logs.js";
import { meshHandlers } from "./server-methods/mesh.js";
import { modelsHandlers } from "./server-methods/models.js"; import { modelsHandlers } from "./server-methods/models.js";
import { nodeHandlers } from "./server-methods/nodes.js"; import { nodeHandlers } from "./server-methods/nodes.js";
import { sendHandlers } from "./server-methods/send.js"; import { sendHandlers } from "./server-methods/send.js";
@@ -20,7 +20,6 @@ import { skillsHandlers } from "./server-methods/skills.js";
import { systemHandlers } from "./server-methods/system.js"; import { systemHandlers } from "./server-methods/system.js";
import { talkHandlers } from "./server-methods/talk.js"; import { talkHandlers } from "./server-methods/talk.js";
import { ttsHandlers } from "./server-methods/tts.js"; import { ttsHandlers } from "./server-methods/tts.js";
import type { GatewayRequestHandlers, GatewayRequestOptions } from "./server-methods/types.js";
import { updateHandlers } from "./server-methods/update.js"; import { updateHandlers } from "./server-methods/update.js";
import { usageHandlers } from "./server-methods/usage.js"; import { usageHandlers } from "./server-methods/usage.js";
import { voicewakeHandlers } from "./server-methods/voicewake.js"; import { voicewakeHandlers } from "./server-methods/voicewake.js";
@@ -79,8 +78,6 @@ const READ_METHODS = new Set([
"chat.history", "chat.history",
"config.get", "config.get",
"talk.config", "talk.config",
"mesh.plan",
"mesh.status",
]); ]);
const WRITE_METHODS = new Set([ const WRITE_METHODS = new Set([
"send", "send",
@@ -97,8 +94,6 @@ const WRITE_METHODS = new Set([
"chat.send", "chat.send",
"chat.abort", "chat.abort",
"browser.request", "browser.request",
"mesh.run",
"mesh.retry",
]); ]);
function authorizeGatewayMethod(method: string, client: GatewayRequestOptions["client"]) { function authorizeGatewayMethod(method: string, client: GatewayRequestOptions["client"]) {
@@ -176,7 +171,6 @@ function authorizeGatewayMethod(method: string, client: GatewayRequestOptions["c
export const coreGatewayHandlers: GatewayRequestHandlers = { export const coreGatewayHandlers: GatewayRequestHandlers = {
...connectHandlers, ...connectHandlers,
...logsHandlers, ...logsHandlers,
...meshHandlers,
...voicewakeHandlers, ...voicewakeHandlers,
...healthHandlers, ...healthHandlers,
...channelsHandlers, ...channelsHandlers,

View File

@@ -1,706 +0,0 @@
import { randomUUID } from "node:crypto";
import type { GatewayRequestHandlerOptions, GatewayRequestHandlers, RespondFn } from "./types.js";
import {
ErrorCodes,
errorShape,
formatValidationErrors,
validateMeshPlanParams,
validateMeshRetryParams,
validateMeshRunParams,
validateMeshStatusParams,
type MeshRunParams,
type MeshWorkflowPlan,
} from "../protocol/index.js";
import { agentHandlers } from "./agent.js";
type MeshStepStatus = "pending" | "running" | "succeeded" | "failed" | "skipped";
type MeshRunStatus = "pending" | "running" | "completed" | "failed";
type MeshStepRuntime = {
id: string;
name?: string;
prompt: string;
dependsOn: string[];
agentId?: string;
sessionKey?: string;
thinking?: string;
timeoutMs?: number;
status: MeshStepStatus;
attempts: number;
startedAt?: number;
endedAt?: number;
agentRunId?: string;
error?: string;
};
type MeshRunRecord = {
runId: string;
plan: MeshWorkflowPlan;
status: MeshRunStatus;
startedAt: number;
endedAt?: number;
continueOnError: boolean;
maxParallel: number;
defaultStepTimeoutMs: number;
lane?: string;
stepOrder: string[];
steps: Record<string, MeshStepRuntime>;
history: Array<{ ts: number; type: string; stepId?: string; data?: Record<string, unknown> }>;
};
const meshRuns = new Map<string, MeshRunRecord>();
const MAX_KEEP_RUNS = 200;
function trimMap() {
if (meshRuns.size <= MAX_KEEP_RUNS) {
return;
}
const sorted = [...meshRuns.values()].sort((a, b) => a.startedAt - b.startedAt);
const overflow = meshRuns.size - MAX_KEEP_RUNS;
for (const stale of sorted.slice(0, overflow)) {
meshRuns.delete(stale.runId);
}
}
function normalizeDependsOn(dependsOn: string[] | undefined): string[] {
if (!Array.isArray(dependsOn)) {
return [];
}
const seen = new Set<string>();
const normalized: string[] = [];
for (const raw of dependsOn) {
const trimmed = String(raw ?? "").trim();
if (!trimmed || seen.has(trimmed)) {
continue;
}
seen.add(trimmed);
normalized.push(trimmed);
}
return normalized;
}
function normalizePlan(plan: MeshWorkflowPlan): MeshWorkflowPlan {
return {
planId: plan.planId.trim(),
goal: plan.goal.trim(),
createdAt: plan.createdAt,
steps: plan.steps.map((step) => ({
id: step.id.trim(),
name: typeof step.name === "string" ? step.name.trim() || undefined : undefined,
prompt: step.prompt.trim(),
dependsOn: normalizeDependsOn(step.dependsOn),
agentId: typeof step.agentId === "string" ? step.agentId.trim() || undefined : undefined,
sessionKey:
typeof step.sessionKey === "string" ? step.sessionKey.trim() || undefined : undefined,
thinking: typeof step.thinking === "string" ? step.thinking : undefined,
timeoutMs:
typeof step.timeoutMs === "number" && Number.isFinite(step.timeoutMs)
? Math.max(1_000, Math.floor(step.timeoutMs))
: undefined,
})),
};
}
function createPlanFromParams(params: {
goal: string;
steps?: Array<{
id?: string;
name?: string;
prompt: string;
dependsOn?: string[];
agentId?: string;
sessionKey?: string;
thinking?: string;
timeoutMs?: number;
}>;
}): MeshWorkflowPlan {
const now = Date.now();
const goal = params.goal.trim();
const sourceSteps = params.steps?.length
? params.steps
: [
{
id: "step-1",
name: "Primary Task",
prompt: goal,
},
];
const steps = sourceSteps.map((step, index) => {
const stepId = step.id?.trim() || `step-${index + 1}`;
return {
id: stepId,
name: step.name?.trim() || undefined,
prompt: step.prompt.trim(),
dependsOn: normalizeDependsOn(step.dependsOn),
agentId: step.agentId?.trim() || undefined,
sessionKey: step.sessionKey?.trim() || undefined,
thinking: typeof step.thinking === "string" ? step.thinking : undefined,
timeoutMs:
typeof step.timeoutMs === "number" && Number.isFinite(step.timeoutMs)
? Math.max(1_000, Math.floor(step.timeoutMs))
: undefined,
};
});
return {
planId: `mesh-plan-${randomUUID()}`,
goal,
createdAt: now,
steps,
};
}
function validatePlanGraph(plan: MeshWorkflowPlan): { ok: true; order: string[] } | { ok: false; error: string } {
const ids = new Set<string>();
for (const step of plan.steps) {
if (ids.has(step.id)) {
return { ok: false, error: `duplicate step id: ${step.id}` };
}
ids.add(step.id);
}
for (const step of plan.steps) {
for (const depId of step.dependsOn ?? []) {
if (!ids.has(depId)) {
return { ok: false, error: `unknown dependency "${depId}" on step "${step.id}"` };
}
if (depId === step.id) {
return { ok: false, error: `step "${step.id}" cannot depend on itself` };
}
}
}
const inDegree = new Map<string, number>();
const outgoing = new Map<string, string[]>();
for (const step of plan.steps) {
inDegree.set(step.id, 0);
outgoing.set(step.id, []);
}
for (const step of plan.steps) {
for (const dep of step.dependsOn ?? []) {
inDegree.set(step.id, (inDegree.get(step.id) ?? 0) + 1);
const list = outgoing.get(dep);
if (list) {
list.push(step.id);
}
}
}
const queue = plan.steps.filter((step) => (inDegree.get(step.id) ?? 0) === 0).map((s) => s.id);
const order: string[] = [];
while (queue.length > 0) {
const current = queue.shift();
if (!current) {
continue;
}
order.push(current);
const targets = outgoing.get(current) ?? [];
for (const next of targets) {
const degree = (inDegree.get(next) ?? 0) - 1;
inDegree.set(next, degree);
if (degree === 0) {
queue.push(next);
}
}
}
if (order.length !== plan.steps.length) {
return { ok: false, error: "workflow contains a dependency cycle" };
}
return { ok: true, order };
}
async function callGatewayHandler(
handler: (opts: GatewayRequestHandlerOptions) => Promise<void> | void,
opts: GatewayRequestHandlerOptions,
): Promise<{ ok: boolean; payload?: unknown; error?: unknown; meta?: Record<string, unknown> }> {
return await new Promise((resolve) => {
let settled = false;
const settle = (result: { ok: boolean; payload?: unknown; error?: unknown; meta?: Record<string, unknown> }) => {
if (settled) {
return;
}
settled = true;
resolve(result);
};
const respond: RespondFn = (ok, payload, error, meta) => {
settle({ ok, payload, error, meta });
};
void Promise.resolve(
handler({
...opts,
respond,
}),
).catch((err) => {
settle({ ok: false, error: err });
});
});
}
function buildStepPrompt(step: MeshStepRuntime, run: MeshRunRecord): string {
if (step.dependsOn.length === 0) {
return step.prompt;
}
const lines = step.dependsOn.map((depId) => {
const dep = run.steps[depId];
const details = dep.agentRunId ? ` runId=${dep.agentRunId}` : "";
return `- ${depId}: ${dep.status}${details}`;
});
return `${step.prompt}\n\nDependency context:\n${lines.join("\n")}`;
}
function resolveStepTimeoutMs(step: MeshStepRuntime, run: MeshRunRecord): number {
if (typeof step.timeoutMs === "number" && Number.isFinite(step.timeoutMs)) {
return Math.max(1_000, Math.floor(step.timeoutMs));
}
return run.defaultStepTimeoutMs;
}
async function executeStep(params: {
run: MeshRunRecord;
step: MeshStepRuntime;
opts: GatewayRequestHandlerOptions;
}) {
const { run, step, opts } = params;
step.status = "running";
step.startedAt = Date.now();
step.endedAt = undefined;
step.error = undefined;
step.attempts += 1;
run.history.push({ ts: Date.now(), type: "step.start", stepId: step.id });
const agentRequestId = `${run.runId}:${step.id}:${step.attempts}`;
const prompt = buildStepPrompt(step, run);
const timeoutMs = resolveStepTimeoutMs(step, run);
const timeoutSeconds = Math.ceil(timeoutMs / 1000);
const accepted = await callGatewayHandler(agentHandlers.agent, {
...opts,
req: {
type: "req",
id: `${agentRequestId}:agent`,
method: "agent",
params: {},
},
params: {
message: prompt,
idempotencyKey: agentRequestId,
...(step.agentId ? { agentId: step.agentId } : {}),
...(step.sessionKey ? { sessionKey: step.sessionKey } : {}),
...(step.thinking ? { thinking: step.thinking } : {}),
...(run.lane ? { lane: run.lane } : {}),
timeout: timeoutSeconds,
deliver: false,
},
});
if (!accepted.ok) {
step.status = "failed";
step.endedAt = Date.now();
step.error = String(accepted.error ?? "agent request failed");
run.history.push({
ts: Date.now(),
type: "step.error",
stepId: step.id,
data: { error: step.error },
});
return;
}
const runId = (() => {
const candidate = accepted.payload as { runId?: unknown } | undefined;
return typeof candidate?.runId === "string" ? candidate.runId : undefined;
})();
step.agentRunId = runId;
if (!runId) {
step.status = "failed";
step.endedAt = Date.now();
step.error = "agent did not return runId";
run.history.push({
ts: Date.now(),
type: "step.error",
stepId: step.id,
data: { error: step.error },
});
return;
}
const waited = await callGatewayHandler(agentHandlers["agent.wait"], {
...opts,
req: {
type: "req",
id: `${agentRequestId}:wait`,
method: "agent.wait",
params: {},
},
params: {
runId,
timeoutMs,
},
});
const waitPayload = waited.payload as { status?: unknown; error?: unknown } | undefined;
const waitStatus = typeof waitPayload?.status === "string" ? waitPayload.status : "error";
if (waited.ok && waitStatus === "ok") {
step.status = "succeeded";
step.endedAt = Date.now();
run.history.push({ ts: Date.now(), type: "step.ok", stepId: step.id, data: { runId } });
return;
}
step.status = "failed";
step.endedAt = Date.now();
step.error =
typeof waitPayload?.error === "string"
? waitPayload.error
: String(waited.error ?? `agent.wait returned status ${waitStatus}`);
run.history.push({
ts: Date.now(),
type: "step.error",
stepId: step.id,
data: { runId, status: waitStatus, error: step.error },
});
}
function createRunRecord(params: {
runId: string;
plan: MeshWorkflowPlan;
order: string[];
continueOnError: boolean;
maxParallel: number;
defaultStepTimeoutMs: number;
lane?: string;
}): MeshRunRecord {
const steps: Record<string, MeshStepRuntime> = {};
for (const step of params.plan.steps) {
steps[step.id] = {
id: step.id,
name: step.name,
prompt: step.prompt,
dependsOn: step.dependsOn ?? [],
agentId: step.agentId,
sessionKey: step.sessionKey,
thinking: step.thinking,
timeoutMs: step.timeoutMs,
status: "pending",
attempts: 0,
};
}
return {
runId: params.runId,
plan: params.plan,
status: "pending",
startedAt: Date.now(),
continueOnError: params.continueOnError,
maxParallel: params.maxParallel,
defaultStepTimeoutMs: params.defaultStepTimeoutMs,
lane: params.lane,
stepOrder: params.order,
steps,
history: [],
};
}
function findReadySteps(run: MeshRunRecord): MeshStepRuntime[] {
const ready: MeshStepRuntime[] = [];
for (const stepId of run.stepOrder) {
const step = run.steps[stepId];
if (!step || step.status !== "pending") {
continue;
}
const deps = step.dependsOn.map((depId) => run.steps[depId]).filter(Boolean);
if (deps.some((dep) => dep.status === "failed" || dep.status === "skipped")) {
step.status = "skipped";
step.endedAt = Date.now();
step.error = "dependency failed";
continue;
}
if (deps.every((dep) => dep.status === "succeeded")) {
ready.push(step);
}
}
return ready;
}
async function runWorkflow(run: MeshRunRecord, opts: GatewayRequestHandlerOptions) {
run.status = "running";
run.history.push({ ts: Date.now(), type: "run.start" });
const inFlight = new Set<Promise<void>>();
let stopScheduling = false;
while (true) {
const failed = Object.values(run.steps).some((step) => step.status === "failed");
if (failed && !run.continueOnError) {
stopScheduling = true;
}
if (!stopScheduling) {
const ready = findReadySteps(run);
for (const step of ready) {
if (inFlight.size >= run.maxParallel) {
break;
}
const task = executeStep({ run, step, opts }).finally(() => {
inFlight.delete(task);
});
inFlight.add(task);
}
}
if (inFlight.size > 0) {
await Promise.race(inFlight);
continue;
}
const pending = Object.values(run.steps).filter((step) => step.status === "pending");
if (pending.length === 0) {
break;
}
for (const step of pending) {
step.status = "skipped";
step.endedAt = Date.now();
step.error = stopScheduling ? "cancelled after failure" : "unresolvable dependencies";
}
break;
}
const hasFailure = Object.values(run.steps).some((step) => step.status === "failed");
run.status = hasFailure ? "failed" : "completed";
run.endedAt = Date.now();
run.history.push({
ts: Date.now(),
type: "run.end",
data: { status: run.status },
});
}
function resolveStepIdsForRetry(run: MeshRunRecord, requested?: string[]): string[] {
if (Array.isArray(requested) && requested.length > 0) {
return requested.map((stepId) => stepId.trim()).filter(Boolean);
}
return Object.values(run.steps)
.filter((step) => step.status === "failed" || step.status === "skipped")
.map((step) => step.id);
}
function descendantsOf(run: MeshRunRecord, roots: Set<string>): Set<string> {
const descendants = new Set<string>();
const queue = [...roots];
while (queue.length > 0) {
const current = queue.shift();
if (!current) {
continue;
}
for (const step of Object.values(run.steps)) {
if (!step.dependsOn.includes(current) || descendants.has(step.id)) {
continue;
}
descendants.add(step.id);
queue.push(step.id);
}
}
return descendants;
}
function resetStepsForRetry(run: MeshRunRecord, stepIds: string[]) {
const rootSet = new Set(stepIds);
const descendants = descendantsOf(run, rootSet);
const resetIds = new Set([...rootSet, ...descendants]);
for (const stepId of resetIds) {
const step = run.steps[stepId];
if (!step) {
continue;
}
if (step.status === "succeeded" && !rootSet.has(stepId)) {
continue;
}
step.status = "pending";
step.startedAt = undefined;
step.endedAt = undefined;
step.error = undefined;
if (rootSet.has(stepId)) {
step.agentRunId = undefined;
}
}
}
function summarizeRun(run: MeshRunRecord) {
return {
runId: run.runId,
plan: run.plan,
status: run.status,
startedAt: run.startedAt,
endedAt: run.endedAt,
stats: {
total: Object.keys(run.steps).length,
succeeded: Object.values(run.steps).filter((step) => step.status === "succeeded").length,
failed: Object.values(run.steps).filter((step) => step.status === "failed").length,
skipped: Object.values(run.steps).filter((step) => step.status === "skipped").length,
running: Object.values(run.steps).filter((step) => step.status === "running").length,
pending: Object.values(run.steps).filter((step) => step.status === "pending").length,
},
steps: run.stepOrder.map((stepId) => run.steps[stepId]),
history: run.history,
};
}
export const meshHandlers: GatewayRequestHandlers = {
"mesh.plan": ({ params, respond }) => {
if (!validateMeshPlanParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid mesh.plan params: ${formatValidationErrors(validateMeshPlanParams.errors)}`,
),
);
return;
}
const p = params;
const plan = normalizePlan(
createPlanFromParams({
goal: p.goal,
steps: p.steps,
}),
);
const graph = validatePlanGraph(plan);
if (!graph.ok) {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, graph.error));
return;
}
respond(
true,
{
plan,
order: graph.order,
},
undefined,
);
},
"mesh.run": async (opts) => {
const { params, respond } = opts;
if (!validateMeshRunParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid mesh.run params: ${formatValidationErrors(validateMeshRunParams.errors)}`,
),
);
return;
}
const p = params as MeshRunParams;
const plan = normalizePlan(p.plan);
const graph = validatePlanGraph(plan);
if (!graph.ok) {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, graph.error));
return;
}
const maxParallel =
typeof p.maxParallel === "number" && Number.isFinite(p.maxParallel)
? Math.min(16, Math.max(1, Math.floor(p.maxParallel)))
: 2;
const defaultStepTimeoutMs =
typeof p.defaultStepTimeoutMs === "number" && Number.isFinite(p.defaultStepTimeoutMs)
? Math.max(1_000, Math.floor(p.defaultStepTimeoutMs))
: 120_000;
const runId = `mesh-run-${randomUUID()}`;
const record = createRunRecord({
runId,
plan,
order: graph.order,
continueOnError: p.continueOnError === true,
maxParallel,
defaultStepTimeoutMs,
lane: typeof p.lane === "string" ? p.lane : undefined,
});
meshRuns.set(runId, record);
trimMap();
await runWorkflow(record, opts);
respond(true, summarizeRun(record), undefined);
},
"mesh.status": ({ params, respond }) => {
if (!validateMeshStatusParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid mesh.status params: ${formatValidationErrors(validateMeshStatusParams.errors)}`,
),
);
return;
}
const run = meshRuns.get(params.runId.trim());
if (!run) {
respond(false, undefined, errorShape(ErrorCodes.NOT_FOUND, "mesh run not found"));
return;
}
respond(true, summarizeRun(run), undefined);
},
"mesh.retry": async (opts) => {
const { params, respond } = opts;
if (!validateMeshRetryParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid mesh.retry params: ${formatValidationErrors(validateMeshRetryParams.errors)}`,
),
);
return;
}
const runId = params.runId.trim();
const run = meshRuns.get(runId);
if (!run) {
respond(false, undefined, errorShape(ErrorCodes.NOT_FOUND, "mesh run not found"));
return;
}
if (run.status === "running") {
respond(false, undefined, errorShape(ErrorCodes.UNAVAILABLE, "mesh run is currently running"));
return;
}
const stepIds = resolveStepIdsForRetry(run, params.stepIds);
if (stepIds.length === 0) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "no failed or skipped steps available to retry"),
);
return;
}
for (const stepId of stepIds) {
if (!run.steps[stepId]) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, `unknown retry step id: ${stepId}`),
);
return;
}
}
resetStepsForRetry(run, stepIds);
run.status = "pending";
run.endedAt = undefined;
run.history.push({
ts: Date.now(),
type: "run.retry",
data: { stepIds },
});
await runWorkflow(run, opts);
respond(true, summarizeRun(run), undefined);
},
};
export function __resetMeshRunsForTest() {
meshRuns.clear();
}