Add mesh orchestration gateway methods with DAG execution and retry

This commit is contained in:
ranausmanai
2026-02-16 19:02:40 +05:00
committed by Peter Steinberger
parent 15fe87e6b7
commit 83990ed542
8 changed files with 974 additions and 0 deletions

View File

@@ -128,6 +128,16 @@ import {
LogsTailParamsSchema, LogsTailParamsSchema,
type LogsTailResult, type LogsTailResult,
LogsTailResultSchema, LogsTailResultSchema,
type MeshPlanParams,
MeshPlanParamsSchema,
type MeshRetryParams,
MeshRetryParamsSchema,
type MeshRunParams,
MeshRunParamsSchema,
type MeshStatusParams,
MeshStatusParamsSchema,
type MeshWorkflowPlan,
MeshWorkflowPlanSchema,
type ModelsListParams, type ModelsListParams,
ModelsListParamsSchema, ModelsListParamsSchema,
type NodeDescribeParams, type NodeDescribeParams,
@@ -358,6 +368,10 @@ export const validateExecApprovalsNodeSetParams = ajv.compile<ExecApprovalsNodeS
ExecApprovalsNodeSetParamsSchema, ExecApprovalsNodeSetParamsSchema,
); );
export const validateLogsTailParams = ajv.compile<LogsTailParams>(LogsTailParamsSchema); export const validateLogsTailParams = ajv.compile<LogsTailParams>(LogsTailParamsSchema);
export const validateMeshPlanParams = ajv.compile<MeshPlanParams>(MeshPlanParamsSchema);
export const validateMeshRunParams = ajv.compile<MeshRunParams>(MeshRunParamsSchema);
export const validateMeshStatusParams = ajv.compile<MeshStatusParams>(MeshStatusParamsSchema);
export const validateMeshRetryParams = ajv.compile<MeshRetryParams>(MeshRetryParamsSchema);
export const validateChatHistoryParams = ajv.compile(ChatHistoryParamsSchema); export const validateChatHistoryParams = ajv.compile(ChatHistoryParamsSchema);
export const validateChatSendParams = ajv.compile(ChatSendParamsSchema); export const validateChatSendParams = ajv.compile(ChatSendParamsSchema);
export const validateChatAbortParams = ajv.compile<ChatAbortParams>(ChatAbortParamsSchema); export const validateChatAbortParams = ajv.compile<ChatAbortParams>(ChatAbortParamsSchema);
@@ -417,6 +431,11 @@ export {
StateVersionSchema, StateVersionSchema,
AgentEventSchema, AgentEventSchema,
ChatEventSchema, ChatEventSchema,
MeshPlanParamsSchema,
MeshWorkflowPlanSchema,
MeshRunParamsSchema,
MeshStatusParamsSchema,
MeshRetryParamsSchema,
SendParamsSchema, SendParamsSchema,
PollParamsSchema, PollParamsSchema,
AgentParamsSchema, AgentParamsSchema,
@@ -516,6 +535,11 @@ export type {
AgentIdentityResult, AgentIdentityResult,
AgentWaitParams, AgentWaitParams,
ChatEvent, ChatEvent,
MeshPlanParams,
MeshWorkflowPlan,
MeshRunParams,
MeshStatusParams,
MeshRetryParams,
TickEvent, TickEvent,
ShutdownEvent, ShutdownEvent,
WakeParams, WakeParams,

View File

@@ -8,6 +8,7 @@ export * from "./schema/exec-approvals.js";
export * from "./schema/devices.js"; export * from "./schema/devices.js";
export * from "./schema/frames.js"; export * from "./schema/frames.js";
export * from "./schema/logs-chat.js"; export * from "./schema/logs-chat.js";
export * from "./schema/mesh.js";
export * from "./schema/nodes.js"; export * from "./schema/nodes.js";
export * from "./schema/protocol-schemas.js"; export * from "./schema/protocol-schemas.js";
export * from "./schema/sessions.js"; export * from "./schema/sessions.js";

View File

@@ -0,0 +1,83 @@
import { Type, type Static } from "@sinclair/typebox";
import { NonEmptyString } from "./primitives.js";
export const MeshPlanStepSchema = Type.Object(
{
id: NonEmptyString,
name: Type.Optional(NonEmptyString),
prompt: NonEmptyString,
dependsOn: Type.Optional(Type.Array(NonEmptyString, { maxItems: 64 })),
agentId: Type.Optional(NonEmptyString),
sessionKey: Type.Optional(NonEmptyString),
thinking: Type.Optional(Type.String()),
timeoutMs: Type.Optional(Type.Integer({ minimum: 1_000, maximum: 3_600_000 })),
},
{ additionalProperties: false },
);
export const MeshWorkflowPlanSchema = Type.Object(
{
planId: NonEmptyString,
goal: NonEmptyString,
createdAt: Type.Integer({ minimum: 0 }),
steps: Type.Array(MeshPlanStepSchema, { minItems: 1, maxItems: 128 }),
},
{ additionalProperties: false },
);
export const MeshPlanParamsSchema = Type.Object(
{
goal: NonEmptyString,
steps: Type.Optional(
Type.Array(
Type.Object(
{
id: Type.Optional(NonEmptyString),
name: Type.Optional(NonEmptyString),
prompt: NonEmptyString,
dependsOn: Type.Optional(Type.Array(NonEmptyString, { maxItems: 64 })),
agentId: Type.Optional(NonEmptyString),
sessionKey: Type.Optional(NonEmptyString),
thinking: Type.Optional(Type.String()),
timeoutMs: Type.Optional(Type.Integer({ minimum: 1_000, maximum: 3_600_000 })),
},
{ additionalProperties: false },
),
{ minItems: 1, maxItems: 128 },
),
),
},
{ additionalProperties: false },
);
export const MeshRunParamsSchema = Type.Object(
{
plan: MeshWorkflowPlanSchema,
continueOnError: Type.Optional(Type.Boolean()),
maxParallel: Type.Optional(Type.Integer({ minimum: 1, maximum: 16 })),
defaultStepTimeoutMs: Type.Optional(Type.Integer({ minimum: 1_000, maximum: 3_600_000 })),
lane: Type.Optional(Type.String()),
},
{ additionalProperties: false },
);
export const MeshStatusParamsSchema = Type.Object(
{
runId: NonEmptyString,
},
{ additionalProperties: false },
);
export const MeshRetryParamsSchema = Type.Object(
{
runId: NonEmptyString,
stepIds: Type.Optional(Type.Array(NonEmptyString, { minItems: 1, maxItems: 128 })),
},
{ additionalProperties: false },
);
export type MeshPlanParams = Static<typeof MeshPlanParamsSchema>;
export type MeshWorkflowPlan = Static<typeof MeshWorkflowPlanSchema>;
export type MeshRunParams = Static<typeof MeshRunParamsSchema>;
export type MeshStatusParams = Static<typeof MeshStatusParamsSchema>;
export type MeshRetryParams = Static<typeof MeshRetryParamsSchema>;

View File

@@ -103,6 +103,13 @@ import {
LogsTailParamsSchema, LogsTailParamsSchema,
LogsTailResultSchema, LogsTailResultSchema,
} from "./logs-chat.js"; } from "./logs-chat.js";
import {
MeshPlanParamsSchema,
MeshRetryParamsSchema,
MeshRunParamsSchema,
MeshStatusParamsSchema,
MeshWorkflowPlanSchema,
} from "./mesh.js";
import { import {
NodeDescribeParamsSchema, NodeDescribeParamsSchema,
NodeEventParamsSchema, NodeEventParamsSchema,
@@ -254,6 +261,11 @@ export const ProtocolSchemas: Record<string, TSchema> = {
ChatAbortParams: ChatAbortParamsSchema, ChatAbortParams: ChatAbortParamsSchema,
ChatInjectParams: ChatInjectParamsSchema, ChatInjectParams: ChatInjectParamsSchema,
ChatEvent: ChatEventSchema, ChatEvent: ChatEventSchema,
MeshPlanParams: MeshPlanParamsSchema,
MeshWorkflowPlan: MeshWorkflowPlanSchema,
MeshRunParams: MeshRunParamsSchema,
MeshStatusParams: MeshStatusParamsSchema,
MeshRetryParams: MeshRetryParamsSchema,
UpdateRunParams: UpdateRunParamsSchema, UpdateRunParams: UpdateRunParamsSchema,
TickEvent: TickEventSchema, TickEvent: TickEventSchema,
ShutdownEvent: ShutdownEventSchema, ShutdownEvent: ShutdownEventSchema,

View File

@@ -85,6 +85,10 @@ const BASE_METHODS = [
"agent", "agent",
"agent.identity.get", "agent.identity.get",
"agent.wait", "agent.wait",
"mesh.plan",
"mesh.run",
"mesh.status",
"mesh.retry",
"browser.request", "browser.request",
// WebChat WebSocket-native chat methods // WebChat WebSocket-native chat methods
"chat.history", "chat.history",

View File

@@ -12,6 +12,7 @@ import { deviceHandlers } from "./server-methods/devices.js";
import { execApprovalsHandlers } from "./server-methods/exec-approvals.js"; import { execApprovalsHandlers } from "./server-methods/exec-approvals.js";
import { healthHandlers } from "./server-methods/health.js"; import { healthHandlers } from "./server-methods/health.js";
import { logsHandlers } from "./server-methods/logs.js"; import { logsHandlers } from "./server-methods/logs.js";
import { meshHandlers } from "./server-methods/mesh.js";
import { modelsHandlers } from "./server-methods/models.js"; import { modelsHandlers } from "./server-methods/models.js";
import { nodeHandlers } from "./server-methods/nodes.js"; import { nodeHandlers } from "./server-methods/nodes.js";
import { sendHandlers } from "./server-methods/send.js"; import { sendHandlers } from "./server-methods/send.js";
@@ -78,6 +79,8 @@ const READ_METHODS = new Set([
"chat.history", "chat.history",
"config.get", "config.get",
"talk.config", "talk.config",
"mesh.plan",
"mesh.status",
]); ]);
const WRITE_METHODS = new Set([ const WRITE_METHODS = new Set([
"send", "send",
@@ -94,6 +97,8 @@ const WRITE_METHODS = new Set([
"chat.send", "chat.send",
"chat.abort", "chat.abort",
"browser.request", "browser.request",
"mesh.run",
"mesh.retry",
]); ]);
function authorizeGatewayMethod(method: string, client: GatewayRequestOptions["client"]) { function authorizeGatewayMethod(method: string, client: GatewayRequestOptions["client"]) {
@@ -171,6 +176,7 @@ function authorizeGatewayMethod(method: string, client: GatewayRequestOptions["c
export const coreGatewayHandlers: GatewayRequestHandlers = { export const coreGatewayHandlers: GatewayRequestHandlers = {
...connectHandlers, ...connectHandlers,
...logsHandlers, ...logsHandlers,
...meshHandlers,
...voicewakeHandlers, ...voicewakeHandlers,
...healthHandlers, ...healthHandlers,
...channelsHandlers, ...channelsHandlers,

View File

@@ -0,0 +1,138 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import type { GatewayRequestContext } from "./types.js";
import { __resetMeshRunsForTest, meshHandlers } from "./mesh.js";
const mocks = vi.hoisted(() => ({
agent: vi.fn(),
agentWait: vi.fn(),
}));
vi.mock("./agent.js", () => ({
agentHandlers: {
agent: (...args: unknown[]) => mocks.agent(...args),
"agent.wait": (...args: unknown[]) => mocks.agentWait(...args),
},
}));
const makeContext = (): GatewayRequestContext =>
({
dedupe: new Map(),
addChatRun: vi.fn(),
logGateway: { info: vi.fn(), error: vi.fn() },
}) as unknown as GatewayRequestContext;
async function callMesh(method: keyof typeof meshHandlers, params: Record<string, unknown>) {
return await new Promise<{ ok: boolean; payload?: unknown; error?: unknown }>((resolve) => {
void meshHandlers[method]({
req: { type: "req", id: `test-${method}`, method },
params,
respond: (ok, payload, error) => resolve({ ok, payload, error }),
context: makeContext(),
client: null,
isWebchatConnect: () => false,
});
});
}
afterEach(() => {
__resetMeshRunsForTest();
mocks.agent.mockReset();
mocks.agentWait.mockReset();
});
describe("mesh handlers", () => {
it("builds a default single-step plan", async () => {
const res = await callMesh("mesh.plan", { goal: "Write release notes" });
expect(res.ok).toBe(true);
const payload = res.payload as { plan: { goal: string; steps: Array<{ id: string }> } };
expect(payload.plan.goal).toBe("Write release notes");
expect(payload.plan.steps).toHaveLength(1);
expect(payload.plan.steps[0]?.id).toBe("step-1");
});
it("rejects cyclic plans", async () => {
const cyclePlan = {
planId: "mesh-plan-1",
goal: "cycle",
createdAt: Date.now(),
steps: [
{ id: "a", prompt: "a", dependsOn: ["b"] },
{ id: "b", prompt: "b", dependsOn: ["a"] },
],
};
const res = await callMesh("mesh.run", { plan: cyclePlan });
expect(res.ok).toBe(false);
});
it("runs steps in DAG order and supports retrying failed steps", async () => {
const runState = new Map<string, "ok" | "error">();
mocks.agent.mockImplementation(
(opts: { params: { idempotencyKey: string }; respond: (ok: boolean, payload?: unknown) => void }) => {
const agentRunId = `agent-${opts.params.idempotencyKey}`;
runState.set(agentRunId, "ok");
if (opts.params.idempotencyKey.includes(":review:1")) {
runState.set(agentRunId, "error");
}
opts.respond(true, { runId: agentRunId, status: "accepted" });
},
);
mocks.agentWait.mockImplementation(
(opts: { params: { runId: string }; respond: (ok: boolean, payload?: unknown) => void }) => {
const status = runState.get(opts.params.runId) ?? "error";
if (status === "ok") {
opts.respond(true, { runId: opts.params.runId, status: "ok" });
return;
}
opts.respond(true, {
runId: opts.params.runId,
status: "error",
error: "simulated failure",
});
},
);
const plan = {
planId: "mesh-plan-2",
goal: "Ship patch",
createdAt: Date.now(),
steps: [
{ id: "research", prompt: "Research requirements" },
{ id: "build", prompt: "Build feature", dependsOn: ["research"] },
{ id: "review", prompt: "Review result", dependsOn: ["build"] },
],
};
const runRes = await callMesh("mesh.run", { plan });
expect(runRes.ok).toBe(true);
const runPayload = runRes.payload as {
runId: string;
status: string;
stats: { failed: number };
};
expect(runPayload.status).toBe("failed");
expect(runPayload.stats.failed).toBe(1);
// Make subsequent retries succeed
mocks.agent.mockImplementation(
(opts: { params: { idempotencyKey: string }; respond: (ok: boolean, payload?: unknown) => void }) => {
const agentRunId = `agent-${opts.params.idempotencyKey}`;
runState.set(agentRunId, "ok");
opts.respond(true, { runId: agentRunId, status: "accepted" });
},
);
const retryRes = await callMesh("mesh.retry", {
runId: runPayload.runId,
stepIds: ["review"],
});
expect(retryRes.ok).toBe(true);
const retryPayload = retryRes.payload as { status: string; stats: { failed: number } };
expect(retryPayload.status).toBe("completed");
expect(retryPayload.stats.failed).toBe(0);
const statusRes = await callMesh("mesh.status", { runId: runPayload.runId });
expect(statusRes.ok).toBe(true);
const statusPayload = statusRes.payload as { status: string };
expect(statusPayload.status).toBe("completed");
});
});

View File

@@ -0,0 +1,706 @@
import { randomUUID } from "node:crypto";
import type { GatewayRequestHandlerOptions, GatewayRequestHandlers, RespondFn } from "./types.js";
import {
ErrorCodes,
errorShape,
formatValidationErrors,
validateMeshPlanParams,
validateMeshRetryParams,
validateMeshRunParams,
validateMeshStatusParams,
type MeshRunParams,
type MeshWorkflowPlan,
} from "../protocol/index.js";
import { agentHandlers } from "./agent.js";
type MeshStepStatus = "pending" | "running" | "succeeded" | "failed" | "skipped";
type MeshRunStatus = "pending" | "running" | "completed" | "failed";
type MeshStepRuntime = {
id: string;
name?: string;
prompt: string;
dependsOn: string[];
agentId?: string;
sessionKey?: string;
thinking?: string;
timeoutMs?: number;
status: MeshStepStatus;
attempts: number;
startedAt?: number;
endedAt?: number;
agentRunId?: string;
error?: string;
};
type MeshRunRecord = {
runId: string;
plan: MeshWorkflowPlan;
status: MeshRunStatus;
startedAt: number;
endedAt?: number;
continueOnError: boolean;
maxParallel: number;
defaultStepTimeoutMs: number;
lane?: string;
stepOrder: string[];
steps: Record<string, MeshStepRuntime>;
history: Array<{ ts: number; type: string; stepId?: string; data?: Record<string, unknown> }>;
};
const meshRuns = new Map<string, MeshRunRecord>();
const MAX_KEEP_RUNS = 200;
function trimMap() {
if (meshRuns.size <= MAX_KEEP_RUNS) {
return;
}
const sorted = [...meshRuns.values()].sort((a, b) => a.startedAt - b.startedAt);
const overflow = meshRuns.size - MAX_KEEP_RUNS;
for (const stale of sorted.slice(0, overflow)) {
meshRuns.delete(stale.runId);
}
}
function normalizeDependsOn(dependsOn: string[] | undefined): string[] {
if (!Array.isArray(dependsOn)) {
return [];
}
const seen = new Set<string>();
const normalized: string[] = [];
for (const raw of dependsOn) {
const trimmed = String(raw ?? "").trim();
if (!trimmed || seen.has(trimmed)) {
continue;
}
seen.add(trimmed);
normalized.push(trimmed);
}
return normalized;
}
function normalizePlan(plan: MeshWorkflowPlan): MeshWorkflowPlan {
return {
planId: plan.planId.trim(),
goal: plan.goal.trim(),
createdAt: plan.createdAt,
steps: plan.steps.map((step) => ({
id: step.id.trim(),
name: typeof step.name === "string" ? step.name.trim() || undefined : undefined,
prompt: step.prompt.trim(),
dependsOn: normalizeDependsOn(step.dependsOn),
agentId: typeof step.agentId === "string" ? step.agentId.trim() || undefined : undefined,
sessionKey:
typeof step.sessionKey === "string" ? step.sessionKey.trim() || undefined : undefined,
thinking: typeof step.thinking === "string" ? step.thinking : undefined,
timeoutMs:
typeof step.timeoutMs === "number" && Number.isFinite(step.timeoutMs)
? Math.max(1_000, Math.floor(step.timeoutMs))
: undefined,
})),
};
}
function createPlanFromParams(params: {
goal: string;
steps?: Array<{
id?: string;
name?: string;
prompt: string;
dependsOn?: string[];
agentId?: string;
sessionKey?: string;
thinking?: string;
timeoutMs?: number;
}>;
}): MeshWorkflowPlan {
const now = Date.now();
const goal = params.goal.trim();
const sourceSteps = params.steps?.length
? params.steps
: [
{
id: "step-1",
name: "Primary Task",
prompt: goal,
},
];
const steps = sourceSteps.map((step, index) => {
const stepId = step.id?.trim() || `step-${index + 1}`;
return {
id: stepId,
name: step.name?.trim() || undefined,
prompt: step.prompt.trim(),
dependsOn: normalizeDependsOn(step.dependsOn),
agentId: step.agentId?.trim() || undefined,
sessionKey: step.sessionKey?.trim() || undefined,
thinking: typeof step.thinking === "string" ? step.thinking : undefined,
timeoutMs:
typeof step.timeoutMs === "number" && Number.isFinite(step.timeoutMs)
? Math.max(1_000, Math.floor(step.timeoutMs))
: undefined,
};
});
return {
planId: `mesh-plan-${randomUUID()}`,
goal,
createdAt: now,
steps,
};
}
function validatePlanGraph(plan: MeshWorkflowPlan): { ok: true; order: string[] } | { ok: false; error: string } {
const ids = new Set<string>();
for (const step of plan.steps) {
if (ids.has(step.id)) {
return { ok: false, error: `duplicate step id: ${step.id}` };
}
ids.add(step.id);
}
for (const step of plan.steps) {
for (const depId of step.dependsOn ?? []) {
if (!ids.has(depId)) {
return { ok: false, error: `unknown dependency "${depId}" on step "${step.id}"` };
}
if (depId === step.id) {
return { ok: false, error: `step "${step.id}" cannot depend on itself` };
}
}
}
const inDegree = new Map<string, number>();
const outgoing = new Map<string, string[]>();
for (const step of plan.steps) {
inDegree.set(step.id, 0);
outgoing.set(step.id, []);
}
for (const step of plan.steps) {
for (const dep of step.dependsOn ?? []) {
inDegree.set(step.id, (inDegree.get(step.id) ?? 0) + 1);
const list = outgoing.get(dep);
if (list) {
list.push(step.id);
}
}
}
const queue = plan.steps.filter((step) => (inDegree.get(step.id) ?? 0) === 0).map((s) => s.id);
const order: string[] = [];
while (queue.length > 0) {
const current = queue.shift();
if (!current) {
continue;
}
order.push(current);
const targets = outgoing.get(current) ?? [];
for (const next of targets) {
const degree = (inDegree.get(next) ?? 0) - 1;
inDegree.set(next, degree);
if (degree === 0) {
queue.push(next);
}
}
}
if (order.length !== plan.steps.length) {
return { ok: false, error: "workflow contains a dependency cycle" };
}
return { ok: true, order };
}
async function callGatewayHandler(
handler: (opts: GatewayRequestHandlerOptions) => Promise<void> | void,
opts: GatewayRequestHandlerOptions,
): Promise<{ ok: boolean; payload?: unknown; error?: unknown; meta?: Record<string, unknown> }> {
return await new Promise((resolve) => {
let settled = false;
const settle = (result: { ok: boolean; payload?: unknown; error?: unknown; meta?: Record<string, unknown> }) => {
if (settled) {
return;
}
settled = true;
resolve(result);
};
const respond: RespondFn = (ok, payload, error, meta) => {
settle({ ok, payload, error, meta });
};
void Promise.resolve(
handler({
...opts,
respond,
}),
).catch((err) => {
settle({ ok: false, error: err });
});
});
}
function buildStepPrompt(step: MeshStepRuntime, run: MeshRunRecord): string {
if (step.dependsOn.length === 0) {
return step.prompt;
}
const lines = step.dependsOn.map((depId) => {
const dep = run.steps[depId];
const details = dep.agentRunId ? ` runId=${dep.agentRunId}` : "";
return `- ${depId}: ${dep.status}${details}`;
});
return `${step.prompt}\n\nDependency context:\n${lines.join("\n")}`;
}
function resolveStepTimeoutMs(step: MeshStepRuntime, run: MeshRunRecord): number {
if (typeof step.timeoutMs === "number" && Number.isFinite(step.timeoutMs)) {
return Math.max(1_000, Math.floor(step.timeoutMs));
}
return run.defaultStepTimeoutMs;
}
async function executeStep(params: {
run: MeshRunRecord;
step: MeshStepRuntime;
opts: GatewayRequestHandlerOptions;
}) {
const { run, step, opts } = params;
step.status = "running";
step.startedAt = Date.now();
step.endedAt = undefined;
step.error = undefined;
step.attempts += 1;
run.history.push({ ts: Date.now(), type: "step.start", stepId: step.id });
const agentRequestId = `${run.runId}:${step.id}:${step.attempts}`;
const prompt = buildStepPrompt(step, run);
const timeoutMs = resolveStepTimeoutMs(step, run);
const timeoutSeconds = Math.ceil(timeoutMs / 1000);
const accepted = await callGatewayHandler(agentHandlers.agent, {
...opts,
req: {
type: "req",
id: `${agentRequestId}:agent`,
method: "agent",
params: {},
},
params: {
message: prompt,
idempotencyKey: agentRequestId,
...(step.agentId ? { agentId: step.agentId } : {}),
...(step.sessionKey ? { sessionKey: step.sessionKey } : {}),
...(step.thinking ? { thinking: step.thinking } : {}),
...(run.lane ? { lane: run.lane } : {}),
timeout: timeoutSeconds,
deliver: false,
},
});
if (!accepted.ok) {
step.status = "failed";
step.endedAt = Date.now();
step.error = String(accepted.error ?? "agent request failed");
run.history.push({
ts: Date.now(),
type: "step.error",
stepId: step.id,
data: { error: step.error },
});
return;
}
const runId = (() => {
const candidate = accepted.payload as { runId?: unknown } | undefined;
return typeof candidate?.runId === "string" ? candidate.runId : undefined;
})();
step.agentRunId = runId;
if (!runId) {
step.status = "failed";
step.endedAt = Date.now();
step.error = "agent did not return runId";
run.history.push({
ts: Date.now(),
type: "step.error",
stepId: step.id,
data: { error: step.error },
});
return;
}
const waited = await callGatewayHandler(agentHandlers["agent.wait"], {
...opts,
req: {
type: "req",
id: `${agentRequestId}:wait`,
method: "agent.wait",
params: {},
},
params: {
runId,
timeoutMs,
},
});
const waitPayload = waited.payload as { status?: unknown; error?: unknown } | undefined;
const waitStatus = typeof waitPayload?.status === "string" ? waitPayload.status : "error";
if (waited.ok && waitStatus === "ok") {
step.status = "succeeded";
step.endedAt = Date.now();
run.history.push({ ts: Date.now(), type: "step.ok", stepId: step.id, data: { runId } });
return;
}
step.status = "failed";
step.endedAt = Date.now();
step.error =
typeof waitPayload?.error === "string"
? waitPayload.error
: String(waited.error ?? `agent.wait returned status ${waitStatus}`);
run.history.push({
ts: Date.now(),
type: "step.error",
stepId: step.id,
data: { runId, status: waitStatus, error: step.error },
});
}
function createRunRecord(params: {
runId: string;
plan: MeshWorkflowPlan;
order: string[];
continueOnError: boolean;
maxParallel: number;
defaultStepTimeoutMs: number;
lane?: string;
}): MeshRunRecord {
const steps: Record<string, MeshStepRuntime> = {};
for (const step of params.plan.steps) {
steps[step.id] = {
id: step.id,
name: step.name,
prompt: step.prompt,
dependsOn: step.dependsOn ?? [],
agentId: step.agentId,
sessionKey: step.sessionKey,
thinking: step.thinking,
timeoutMs: step.timeoutMs,
status: "pending",
attempts: 0,
};
}
return {
runId: params.runId,
plan: params.plan,
status: "pending",
startedAt: Date.now(),
continueOnError: params.continueOnError,
maxParallel: params.maxParallel,
defaultStepTimeoutMs: params.defaultStepTimeoutMs,
lane: params.lane,
stepOrder: params.order,
steps,
history: [],
};
}
function findReadySteps(run: MeshRunRecord): MeshStepRuntime[] {
const ready: MeshStepRuntime[] = [];
for (const stepId of run.stepOrder) {
const step = run.steps[stepId];
if (!step || step.status !== "pending") {
continue;
}
const deps = step.dependsOn.map((depId) => run.steps[depId]).filter(Boolean);
if (deps.some((dep) => dep.status === "failed" || dep.status === "skipped")) {
step.status = "skipped";
step.endedAt = Date.now();
step.error = "dependency failed";
continue;
}
if (deps.every((dep) => dep.status === "succeeded")) {
ready.push(step);
}
}
return ready;
}
async function runWorkflow(run: MeshRunRecord, opts: GatewayRequestHandlerOptions) {
run.status = "running";
run.history.push({ ts: Date.now(), type: "run.start" });
const inFlight = new Set<Promise<void>>();
let stopScheduling = false;
while (true) {
const failed = Object.values(run.steps).some((step) => step.status === "failed");
if (failed && !run.continueOnError) {
stopScheduling = true;
}
if (!stopScheduling) {
const ready = findReadySteps(run);
for (const step of ready) {
if (inFlight.size >= run.maxParallel) {
break;
}
const task = executeStep({ run, step, opts }).finally(() => {
inFlight.delete(task);
});
inFlight.add(task);
}
}
if (inFlight.size > 0) {
await Promise.race(inFlight);
continue;
}
const pending = Object.values(run.steps).filter((step) => step.status === "pending");
if (pending.length === 0) {
break;
}
for (const step of pending) {
step.status = "skipped";
step.endedAt = Date.now();
step.error = stopScheduling ? "cancelled after failure" : "unresolvable dependencies";
}
break;
}
const hasFailure = Object.values(run.steps).some((step) => step.status === "failed");
run.status = hasFailure ? "failed" : "completed";
run.endedAt = Date.now();
run.history.push({
ts: Date.now(),
type: "run.end",
data: { status: run.status },
});
}
function resolveStepIdsForRetry(run: MeshRunRecord, requested?: string[]): string[] {
if (Array.isArray(requested) && requested.length > 0) {
return requested.map((stepId) => stepId.trim()).filter(Boolean);
}
return Object.values(run.steps)
.filter((step) => step.status === "failed" || step.status === "skipped")
.map((step) => step.id);
}
function descendantsOf(run: MeshRunRecord, roots: Set<string>): Set<string> {
const descendants = new Set<string>();
const queue = [...roots];
while (queue.length > 0) {
const current = queue.shift();
if (!current) {
continue;
}
for (const step of Object.values(run.steps)) {
if (!step.dependsOn.includes(current) || descendants.has(step.id)) {
continue;
}
descendants.add(step.id);
queue.push(step.id);
}
}
return descendants;
}
function resetStepsForRetry(run: MeshRunRecord, stepIds: string[]) {
const rootSet = new Set(stepIds);
const descendants = descendantsOf(run, rootSet);
const resetIds = new Set([...rootSet, ...descendants]);
for (const stepId of resetIds) {
const step = run.steps[stepId];
if (!step) {
continue;
}
if (step.status === "succeeded" && !rootSet.has(stepId)) {
continue;
}
step.status = "pending";
step.startedAt = undefined;
step.endedAt = undefined;
step.error = undefined;
if (rootSet.has(stepId)) {
step.agentRunId = undefined;
}
}
}
function summarizeRun(run: MeshRunRecord) {
return {
runId: run.runId,
plan: run.plan,
status: run.status,
startedAt: run.startedAt,
endedAt: run.endedAt,
stats: {
total: Object.keys(run.steps).length,
succeeded: Object.values(run.steps).filter((step) => step.status === "succeeded").length,
failed: Object.values(run.steps).filter((step) => step.status === "failed").length,
skipped: Object.values(run.steps).filter((step) => step.status === "skipped").length,
running: Object.values(run.steps).filter((step) => step.status === "running").length,
pending: Object.values(run.steps).filter((step) => step.status === "pending").length,
},
steps: run.stepOrder.map((stepId) => run.steps[stepId]),
history: run.history,
};
}
export const meshHandlers: GatewayRequestHandlers = {
"mesh.plan": ({ params, respond }) => {
if (!validateMeshPlanParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid mesh.plan params: ${formatValidationErrors(validateMeshPlanParams.errors)}`,
),
);
return;
}
const p = params;
const plan = normalizePlan(
createPlanFromParams({
goal: p.goal,
steps: p.steps,
}),
);
const graph = validatePlanGraph(plan);
if (!graph.ok) {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, graph.error));
return;
}
respond(
true,
{
plan,
order: graph.order,
},
undefined,
);
},
"mesh.run": async (opts) => {
const { params, respond } = opts;
if (!validateMeshRunParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid mesh.run params: ${formatValidationErrors(validateMeshRunParams.errors)}`,
),
);
return;
}
const p = params as MeshRunParams;
const plan = normalizePlan(p.plan);
const graph = validatePlanGraph(plan);
if (!graph.ok) {
respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, graph.error));
return;
}
const maxParallel =
typeof p.maxParallel === "number" && Number.isFinite(p.maxParallel)
? Math.min(16, Math.max(1, Math.floor(p.maxParallel)))
: 2;
const defaultStepTimeoutMs =
typeof p.defaultStepTimeoutMs === "number" && Number.isFinite(p.defaultStepTimeoutMs)
? Math.max(1_000, Math.floor(p.defaultStepTimeoutMs))
: 120_000;
const runId = `mesh-run-${randomUUID()}`;
const record = createRunRecord({
runId,
plan,
order: graph.order,
continueOnError: p.continueOnError === true,
maxParallel,
defaultStepTimeoutMs,
lane: typeof p.lane === "string" ? p.lane : undefined,
});
meshRuns.set(runId, record);
trimMap();
await runWorkflow(record, opts);
respond(true, summarizeRun(record), undefined);
},
"mesh.status": ({ params, respond }) => {
if (!validateMeshStatusParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid mesh.status params: ${formatValidationErrors(validateMeshStatusParams.errors)}`,
),
);
return;
}
const run = meshRuns.get(params.runId.trim());
if (!run) {
respond(false, undefined, errorShape(ErrorCodes.NOT_FOUND, "mesh run not found"));
return;
}
respond(true, summarizeRun(run), undefined);
},
"mesh.retry": async (opts) => {
const { params, respond } = opts;
if (!validateMeshRetryParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid mesh.retry params: ${formatValidationErrors(validateMeshRetryParams.errors)}`,
),
);
return;
}
const runId = params.runId.trim();
const run = meshRuns.get(runId);
if (!run) {
respond(false, undefined, errorShape(ErrorCodes.NOT_FOUND, "mesh run not found"));
return;
}
if (run.status === "running") {
respond(false, undefined, errorShape(ErrorCodes.UNAVAILABLE, "mesh run is currently running"));
return;
}
const stepIds = resolveStepIdsForRetry(run, params.stepIds);
if (stepIds.length === 0) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "no failed or skipped steps available to retry"),
);
return;
}
for (const stepId of stepIds) {
if (!run.steps[stepId]) {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, `unknown retry step id: ${stepId}`),
);
return;
}
}
resetStepsForRetry(run, stepIds);
run.status = "pending";
run.endedAt = undefined;
run.history.push({
ts: Date.now(),
type: "run.retry",
data: { stepIds },
});
await runWorkflow(run, opts);
respond(true, summarizeRun(run), undefined);
},
};
export function __resetMeshRunsForTest() {
meshRuns.clear();
}