Files
openclaw/src/process/command-queue.ts
Tarun Sukhani 0a55711110 fix: guard against undefined path in bootstrap file entries
The session-context hook pushed bootstrap entries without the required
`path` property, causing a TypeError in buildInjectedWorkspaceFiles when
it called .replace() on undefined. Add fallback to file.name when path
is missing, and skip entries with no path in the report builder.

Also add stack trace logging to lane task errors and embedded agent
failures to make future debugging faster.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 17:56:39 +08:00

288 lines
8.4 KiB
TypeScript

import { diagnosticLogger as diag, logLaneDequeue, logLaneEnqueue } from "../logging/diagnostic.js";
import { CommandLane } from "./lanes.js";
/**
* Dedicated error type thrown when a queued command is rejected because
* its lane was cleared. Callers that fire-and-forget enqueued tasks can
* catch (or ignore) this specific type to avoid unhandled-rejection noise.
*/
export class CommandLaneClearedError extends Error {
constructor(lane?: string) {
super(lane ? `Command lane "${lane}" cleared` : "Command lane cleared");
this.name = "CommandLaneClearedError";
}
}
// Minimal in-process queue to serialize command executions.
// Default lane ("main") preserves the existing behavior. Additional lanes allow
// low-risk parallelism (e.g. cron jobs) without interleaving stdin / logs for
// the main auto-reply workflow.
type QueueEntry = {
task: () => Promise<unknown>;
resolve: (value: unknown) => void;
reject: (reason?: unknown) => void;
enqueuedAt: number;
warnAfterMs: number;
onWait?: (waitMs: number, queuedAhead: number) => void;
};
type LaneState = {
lane: string;
queue: QueueEntry[];
activeTaskIds: Set<number>;
maxConcurrent: number;
draining: boolean;
generation: number;
};
const lanes = new Map<string, LaneState>();
let nextTaskId = 1;
function getLaneState(lane: string): LaneState {
const existing = lanes.get(lane);
if (existing) {
return existing;
}
const created: LaneState = {
lane,
queue: [],
activeTaskIds: new Set(),
maxConcurrent: 1,
draining: false,
generation: 0,
};
lanes.set(lane, created);
return created;
}
function completeTask(state: LaneState, taskId: number, taskGeneration: number): boolean {
if (taskGeneration !== state.generation) {
return false;
}
state.activeTaskIds.delete(taskId);
return true;
}
function drainLane(lane: string) {
const state = getLaneState(lane);
if (state.draining) {
return;
}
state.draining = true;
const pump = () => {
while (state.activeTaskIds.size < state.maxConcurrent && state.queue.length > 0) {
const entry = state.queue.shift() as QueueEntry;
const waitedMs = Date.now() - entry.enqueuedAt;
if (waitedMs >= entry.warnAfterMs) {
entry.onWait?.(waitedMs, state.queue.length);
diag.warn(
`lane wait exceeded: lane=${lane} waitedMs=${waitedMs} queueAhead=${state.queue.length}`,
);
}
logLaneDequeue(lane, waitedMs, state.queue.length);
const taskId = nextTaskId++;
const taskGeneration = state.generation;
state.activeTaskIds.add(taskId);
void (async () => {
const startTime = Date.now();
try {
const result = await entry.task();
const completedCurrentGeneration = completeTask(state, taskId, taskGeneration);
if (completedCurrentGeneration) {
diag.debug(
`lane task done: lane=${lane} durationMs=${Date.now() - startTime} active=${state.activeTaskIds.size} queued=${state.queue.length}`,
);
pump();
}
entry.resolve(result);
} catch (err) {
const completedCurrentGeneration = completeTask(state, taskId, taskGeneration);
const isProbeLane = lane.startsWith("auth-probe:") || lane.startsWith("session:probe-");
if (!isProbeLane) {
const stack = err instanceof Error ? err.stack : undefined;
diag.error(
`lane task error: lane=${lane} durationMs=${Date.now() - startTime} error="${String(err)}"${stack ? `\n${stack}` : ""}`,
);
}
if (completedCurrentGeneration) {
pump();
}
entry.reject(err);
}
})();
}
state.draining = false;
};
pump();
}
export function setCommandLaneConcurrency(lane: string, maxConcurrent: number) {
const cleaned = lane.trim() || CommandLane.Main;
const state = getLaneState(cleaned);
state.maxConcurrent = Math.max(1, Math.floor(maxConcurrent));
drainLane(cleaned);
}
export function enqueueCommandInLane<T>(
lane: string,
task: () => Promise<T>,
opts?: {
warnAfterMs?: number;
onWait?: (waitMs: number, queuedAhead: number) => void;
},
): Promise<T> {
const cleaned = lane.trim() || CommandLane.Main;
const warnAfterMs = opts?.warnAfterMs ?? 2_000;
const state = getLaneState(cleaned);
return new Promise<T>((resolve, reject) => {
state.queue.push({
task: () => task(),
resolve: (value) => resolve(value as T),
reject,
enqueuedAt: Date.now(),
warnAfterMs,
onWait: opts?.onWait,
});
logLaneEnqueue(cleaned, state.queue.length + state.activeTaskIds.size);
drainLane(cleaned);
});
}
export function enqueueCommand<T>(
task: () => Promise<T>,
opts?: {
warnAfterMs?: number;
onWait?: (waitMs: number, queuedAhead: number) => void;
},
): Promise<T> {
return enqueueCommandInLane(CommandLane.Main, task, opts);
}
export function getQueueSize(lane: string = CommandLane.Main) {
const resolved = lane.trim() || CommandLane.Main;
const state = lanes.get(resolved);
if (!state) {
return 0;
}
return state.queue.length + state.activeTaskIds.size;
}
export function getTotalQueueSize() {
let total = 0;
for (const s of lanes.values()) {
total += s.queue.length + s.activeTaskIds.size;
}
return total;
}
export function clearCommandLane(lane: string = CommandLane.Main) {
const cleaned = lane.trim() || CommandLane.Main;
const state = lanes.get(cleaned);
if (!state) {
return 0;
}
const removed = state.queue.length;
const pending = state.queue.splice(0);
for (const entry of pending) {
entry.reject(new CommandLaneClearedError(cleaned));
}
return removed;
}
/**
* Reset all lane runtime state to idle. Used after SIGUSR1 in-process
* restarts where interrupted tasks' finally blocks may not run, leaving
* stale active task IDs that permanently block new work from draining.
*
* Bumps lane generation and clears execution counters so stale completions
* from old in-flight tasks are ignored. Queued entries are intentionally
* preserved — they represent pending user work that should still execute
* after restart.
*
* After resetting, drains any lanes that still have queued entries so
* preserved work is pumped immediately rather than waiting for a future
* `enqueueCommandInLane()` call (which may never come).
*/
export function resetAllLanes(): void {
const lanesToDrain: string[] = [];
for (const state of lanes.values()) {
state.generation += 1;
state.activeTaskIds.clear();
state.draining = false;
if (state.queue.length > 0) {
lanesToDrain.push(state.lane);
}
}
// Drain after the full reset pass so all lanes are in a clean state first.
for (const lane of lanesToDrain) {
drainLane(lane);
}
}
/**
* Returns the total number of actively executing tasks across all lanes
* (excludes queued-but-not-started entries).
*/
export function getActiveTaskCount(): number {
let total = 0;
for (const s of lanes.values()) {
total += s.activeTaskIds.size;
}
return total;
}
/**
* Wait for all currently active tasks across all lanes to finish.
* Polls at a short interval; resolves when no tasks are active or
* when `timeoutMs` elapses (whichever comes first).
*
* New tasks enqueued after this call are ignored — only tasks that are
* already executing are waited on.
*/
export function waitForActiveTasks(timeoutMs: number): Promise<{ drained: boolean }> {
// Keep shutdown/drain checks responsive without busy looping.
const POLL_INTERVAL_MS = 50;
const deadline = Date.now() + timeoutMs;
const activeAtStart = new Set<number>();
for (const state of lanes.values()) {
for (const taskId of state.activeTaskIds) {
activeAtStart.add(taskId);
}
}
return new Promise((resolve) => {
const check = () => {
if (activeAtStart.size === 0) {
resolve({ drained: true });
return;
}
let hasPending = false;
for (const state of lanes.values()) {
for (const taskId of state.activeTaskIds) {
if (activeAtStart.has(taskId)) {
hasPending = true;
break;
}
}
if (hasPending) {
break;
}
}
if (!hasPending) {
resolve({ drained: true });
return;
}
if (Date.now() >= deadline) {
resolve({ drained: false });
return;
}
setTimeout(check, POLL_INTERVAL_MS);
};
check();
});
}