mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-18 12:07:27 +00:00
refactor(restart): extract stale pid cleanup and supervisor markers
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { captureFullEnv } from "../test-utils/env.js";
|
||||
import { SUPERVISOR_HINT_ENV_VARS } from "./supervisor-markers.js";
|
||||
|
||||
const spawnMock = vi.hoisted(() => vi.fn());
|
||||
|
||||
@@ -21,14 +22,9 @@ afterEach(() => {
|
||||
});
|
||||
|
||||
function clearSupervisorHints() {
|
||||
delete process.env.LAUNCH_JOB_LABEL;
|
||||
delete process.env.LAUNCH_JOB_NAME;
|
||||
delete process.env.OPENCLAW_LAUNCHD_LABEL;
|
||||
delete process.env.INVOCATION_ID;
|
||||
delete process.env.SYSTEMD_EXEC_PID;
|
||||
delete process.env.JOURNAL_STREAM;
|
||||
delete process.env.OPENCLAW_SYSTEMD_UNIT;
|
||||
delete process.env.OPENCLAW_SERVICE_MARKER;
|
||||
for (const key of SUPERVISOR_HINT_ENV_VARS) {
|
||||
delete process.env[key];
|
||||
}
|
||||
}
|
||||
|
||||
describe("restartGatewayProcessWithFreshPid", () => {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { spawn } from "node:child_process";
|
||||
import { hasSupervisorHint } from "./supervisor-markers.js";
|
||||
|
||||
type RespawnMode = "spawned" | "supervised" | "disabled" | "failed";
|
||||
|
||||
@@ -8,24 +9,6 @@ export type GatewayRespawnResult = {
|
||||
detail?: string;
|
||||
};
|
||||
|
||||
const SUPERVISOR_HINT_ENV_VARS = [
|
||||
// macOS launchd — native env vars (may be set by launchd itself)
|
||||
"LAUNCH_JOB_LABEL",
|
||||
"LAUNCH_JOB_NAME",
|
||||
// macOS launchd — OpenClaw's own plist generator sets these via
|
||||
// buildServiceEnvironment() in service-env.ts. launchd does NOT
|
||||
// automatically inject LAUNCH_JOB_LABEL into the child environment,
|
||||
// so OPENCLAW_LAUNCHD_LABEL is the reliable supervised-mode signal.
|
||||
"OPENCLAW_LAUNCHD_LABEL",
|
||||
// Linux systemd
|
||||
"INVOCATION_ID",
|
||||
"SYSTEMD_EXEC_PID",
|
||||
"JOURNAL_STREAM",
|
||||
"OPENCLAW_SYSTEMD_UNIT",
|
||||
// Generic service marker (set by both launchd and systemd plist/unit generators)
|
||||
"OPENCLAW_SERVICE_MARKER",
|
||||
];
|
||||
|
||||
function isTruthy(value: string | undefined): boolean {
|
||||
if (!value) {
|
||||
return false;
|
||||
@@ -35,10 +18,7 @@ function isTruthy(value: string | undefined): boolean {
|
||||
}
|
||||
|
||||
function isLikelySupervisedProcess(env: NodeJS.ProcessEnv = process.env): boolean {
|
||||
return SUPERVISOR_HINT_ENV_VARS.some((key) => {
|
||||
const value = env[key];
|
||||
return typeof value === "string" && value.trim().length > 0;
|
||||
});
|
||||
return hasSupervisorHint(env);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
127
src/infra/restart-stale-pids.ts
Normal file
127
src/infra/restart-stale-pids.ts
Normal file
@@ -0,0 +1,127 @@
|
||||
import { spawnSync } from "node:child_process";
|
||||
import { resolveGatewayPort } from "../config/paths.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import { resolveLsofCommandSync } from "./ports-lsof.js";
|
||||
|
||||
const SPAWN_TIMEOUT_MS = 2000;
|
||||
const STALE_SIGTERM_WAIT_MS = 300;
|
||||
const STALE_SIGKILL_WAIT_MS = 200;
|
||||
|
||||
const restartLog = createSubsystemLogger("restart");
|
||||
let sleepSyncOverride: ((ms: number) => void) | null = null;
|
||||
|
||||
function sleepSync(ms: number): void {
|
||||
const timeoutMs = Math.max(0, Math.floor(ms));
|
||||
if (timeoutMs <= 0) {
|
||||
return;
|
||||
}
|
||||
if (sleepSyncOverride) {
|
||||
sleepSyncOverride(timeoutMs);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
const lock = new Int32Array(new SharedArrayBuffer(4));
|
||||
Atomics.wait(lock, 0, 0, timeoutMs);
|
||||
} catch {
|
||||
const start = Date.now();
|
||||
while (Date.now() - start < timeoutMs) {
|
||||
// Best-effort fallback when Atomics.wait is unavailable.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find PIDs of gateway processes listening on the given port using synchronous lsof.
|
||||
* Returns only PIDs that belong to openclaw gateway processes (not the current process).
|
||||
*/
|
||||
export function findGatewayPidsOnPortSync(port: number): number[] {
|
||||
if (process.platform === "win32") {
|
||||
return [];
|
||||
}
|
||||
const lsof = resolveLsofCommandSync();
|
||||
const res = spawnSync(lsof, ["-nP", `-iTCP:${port}`, "-sTCP:LISTEN", "-Fpc"], {
|
||||
encoding: "utf8",
|
||||
timeout: SPAWN_TIMEOUT_MS,
|
||||
});
|
||||
if (res.error || res.status !== 0) {
|
||||
return [];
|
||||
}
|
||||
const pids: number[] = [];
|
||||
let currentPid: number | undefined;
|
||||
let currentCmd: string | undefined;
|
||||
for (const line of res.stdout.split(/\r?\n/).filter(Boolean)) {
|
||||
if (line.startsWith("p")) {
|
||||
if (currentPid != null && currentCmd && currentCmd.toLowerCase().includes("openclaw")) {
|
||||
pids.push(currentPid);
|
||||
}
|
||||
const parsed = Number.parseInt(line.slice(1), 10);
|
||||
currentPid = Number.isFinite(parsed) && parsed > 0 ? parsed : undefined;
|
||||
currentCmd = undefined;
|
||||
} else if (line.startsWith("c")) {
|
||||
currentCmd = line.slice(1);
|
||||
}
|
||||
}
|
||||
if (currentPid != null && currentCmd && currentCmd.toLowerCase().includes("openclaw")) {
|
||||
pids.push(currentPid);
|
||||
}
|
||||
return pids.filter((pid) => pid !== process.pid);
|
||||
}
|
||||
|
||||
/**
|
||||
* Synchronously terminate stale gateway processes.
|
||||
* Sends SIGTERM, waits briefly, then SIGKILL for survivors.
|
||||
*/
|
||||
function terminateStaleProcessesSync(pids: number[]): number[] {
|
||||
if (pids.length === 0) {
|
||||
return [];
|
||||
}
|
||||
const killed: number[] = [];
|
||||
for (const pid of pids) {
|
||||
try {
|
||||
process.kill(pid, "SIGTERM");
|
||||
killed.push(pid);
|
||||
} catch {
|
||||
// ESRCH — already gone
|
||||
}
|
||||
}
|
||||
if (killed.length === 0) {
|
||||
return killed;
|
||||
}
|
||||
sleepSync(STALE_SIGTERM_WAIT_MS);
|
||||
for (const pid of killed) {
|
||||
try {
|
||||
process.kill(pid, 0);
|
||||
process.kill(pid, "SIGKILL");
|
||||
} catch {
|
||||
// already gone
|
||||
}
|
||||
}
|
||||
sleepSync(STALE_SIGKILL_WAIT_MS);
|
||||
return killed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Inspect the gateway port and kill any stale gateway processes holding it.
|
||||
* Called before service restart commands to prevent port conflicts.
|
||||
*/
|
||||
export function cleanStaleGatewayProcessesSync(): number[] {
|
||||
try {
|
||||
const port = resolveGatewayPort(undefined, process.env);
|
||||
const stalePids = findGatewayPidsOnPortSync(port);
|
||||
if (stalePids.length === 0) {
|
||||
return [];
|
||||
}
|
||||
restartLog.warn(
|
||||
`killing ${stalePids.length} stale gateway process(es) before restart: ${stalePids.join(", ")}`,
|
||||
);
|
||||
return terminateStaleProcessesSync(stalePids);
|
||||
} catch {
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
export const __testing = {
|
||||
setSleepSyncOverride(fn: ((ms: number) => void) | null) {
|
||||
sleepSyncOverride = fn;
|
||||
},
|
||||
};
|
||||
@@ -1,19 +1,111 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { findGatewayPidsOnPortSync } from "./restart.js";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
const spawnSyncMock = vi.hoisted(() => vi.fn());
|
||||
const resolveLsofCommandSyncMock = vi.hoisted(() => vi.fn());
|
||||
const resolveGatewayPortMock = vi.hoisted(() => vi.fn());
|
||||
|
||||
vi.mock("node:child_process", () => ({
|
||||
spawnSync: (...args: unknown[]) => spawnSyncMock(...args),
|
||||
}));
|
||||
|
||||
vi.mock("./ports-lsof.js", () => ({
|
||||
resolveLsofCommandSync: (...args: unknown[]) => resolveLsofCommandSyncMock(...args),
|
||||
}));
|
||||
|
||||
vi.mock("../config/paths.js", () => ({
|
||||
resolveGatewayPort: (...args: unknown[]) => resolveGatewayPortMock(...args),
|
||||
}));
|
||||
|
||||
import {
|
||||
__testing,
|
||||
cleanStaleGatewayProcessesSync,
|
||||
findGatewayPidsOnPortSync,
|
||||
} from "./restart-stale-pids.js";
|
||||
|
||||
beforeEach(() => {
|
||||
spawnSyncMock.mockReset();
|
||||
resolveLsofCommandSyncMock.mockReset();
|
||||
resolveGatewayPortMock.mockReset();
|
||||
|
||||
resolveLsofCommandSyncMock.mockReturnValue("/usr/sbin/lsof");
|
||||
resolveGatewayPortMock.mockReturnValue(18789);
|
||||
__testing.setSleepSyncOverride(() => {});
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
__testing.setSleepSyncOverride(null);
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
describe("findGatewayPidsOnPortSync", () => {
|
||||
it("returns an empty array for a port with no listeners", () => {
|
||||
const pids = findGatewayPidsOnPortSync(19999);
|
||||
expect(pids).toEqual([]);
|
||||
});
|
||||
it("parses lsof output and filters non-openclaw/current processes", () => {
|
||||
spawnSyncMock.mockReturnValue({
|
||||
error: undefined,
|
||||
status: 0,
|
||||
stdout: [
|
||||
`p${process.pid}`,
|
||||
"copenclaw",
|
||||
"p4100",
|
||||
"copenclaw-gateway",
|
||||
"p4200",
|
||||
"cnode",
|
||||
"p4300",
|
||||
"cOpenClaw",
|
||||
].join("\n"),
|
||||
});
|
||||
|
||||
it("never includes the current process PID", () => {
|
||||
const pids = findGatewayPidsOnPortSync(18789);
|
||||
expect(pids).not.toContain(process.pid);
|
||||
|
||||
expect(pids).toEqual([4100, 4300]);
|
||||
expect(spawnSyncMock).toHaveBeenCalledWith(
|
||||
"/usr/sbin/lsof",
|
||||
["-nP", "-iTCP:18789", "-sTCP:LISTEN", "-Fpc"],
|
||||
expect.objectContaining({ encoding: "utf8", timeout: 2000 }),
|
||||
);
|
||||
});
|
||||
|
||||
it("returns an array (not undefined or null) on any port", () => {
|
||||
const pids = findGatewayPidsOnPortSync(0);
|
||||
expect(Array.isArray(pids)).toBe(true);
|
||||
it("returns empty when lsof fails", () => {
|
||||
spawnSyncMock.mockReturnValue({
|
||||
error: undefined,
|
||||
status: 1,
|
||||
stdout: "",
|
||||
stderr: "lsof failed",
|
||||
});
|
||||
|
||||
expect(findGatewayPidsOnPortSync(18789)).toEqual([]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("cleanStaleGatewayProcessesSync", () => {
|
||||
it("kills stale gateway pids discovered on the gateway port", () => {
|
||||
spawnSyncMock.mockReturnValue({
|
||||
error: undefined,
|
||||
status: 0,
|
||||
stdout: ["p6001", "copenclaw", "p6002", "copenclaw-gateway"].join("\n"),
|
||||
});
|
||||
const killSpy = vi.spyOn(process, "kill").mockImplementation(() => true);
|
||||
|
||||
const killed = cleanStaleGatewayProcessesSync();
|
||||
|
||||
expect(killed).toEqual([6001, 6002]);
|
||||
expect(resolveGatewayPortMock).toHaveBeenCalledWith(undefined, process.env);
|
||||
expect(killSpy).toHaveBeenCalledWith(6001, "SIGTERM");
|
||||
expect(killSpy).toHaveBeenCalledWith(6002, "SIGTERM");
|
||||
expect(killSpy).toHaveBeenCalledWith(6001, "SIGKILL");
|
||||
expect(killSpy).toHaveBeenCalledWith(6002, "SIGKILL");
|
||||
});
|
||||
|
||||
it("returns empty when no stale listeners are found", () => {
|
||||
spawnSyncMock.mockReturnValue({
|
||||
error: undefined,
|
||||
status: 0,
|
||||
stdout: "",
|
||||
});
|
||||
const killSpy = vi.spyOn(process, "kill").mockImplementation(() => true);
|
||||
|
||||
const killed = cleanStaleGatewayProcessesSync();
|
||||
|
||||
expect(killed).toEqual([]);
|
||||
expect(killSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
import { spawnSync } from "node:child_process";
|
||||
import { resolveGatewayPort } from "../config/paths.js";
|
||||
import {
|
||||
resolveGatewayLaunchAgentLabel,
|
||||
resolveGatewaySystemdServiceName,
|
||||
} from "../daemon/constants.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import { resolveLsofCommandSync } from "./ports-lsof.js";
|
||||
import { cleanStaleGatewayProcessesSync, findGatewayPidsOnPortSync } from "./restart-stale-pids.js";
|
||||
|
||||
export type RestartAttempt = {
|
||||
ok: boolean;
|
||||
@@ -22,6 +21,8 @@ const RESTART_COOLDOWN_MS = 30_000;
|
||||
|
||||
const restartLog = createSubsystemLogger("restart");
|
||||
|
||||
export { findGatewayPidsOnPortSync };
|
||||
|
||||
let sigusr1AuthorizedCount = 0;
|
||||
let sigusr1AuthorizedUntil = 0;
|
||||
let sigusr1ExternalAllowed = false;
|
||||
@@ -285,99 +286,6 @@ function normalizeSystemdUnit(raw?: string, profile?: string): string {
|
||||
return unit.endsWith(".service") ? unit : `${unit}.service`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find PIDs of gateway processes listening on the given port using synchronous lsof.
|
||||
* Returns only PIDs that belong to openclaw gateway processes (not the current process).
|
||||
*/
|
||||
export function findGatewayPidsOnPortSync(port: number): number[] {
|
||||
if (process.platform === "win32") {
|
||||
return [];
|
||||
}
|
||||
const lsof = resolveLsofCommandSync();
|
||||
const res = spawnSync(lsof, ["-nP", `-iTCP:${port}`, "-sTCP:LISTEN", "-Fpc"], {
|
||||
encoding: "utf8",
|
||||
timeout: SPAWN_TIMEOUT_MS,
|
||||
});
|
||||
if (res.error || res.status !== 0) {
|
||||
return [];
|
||||
}
|
||||
const pids: number[] = [];
|
||||
let currentPid: number | undefined;
|
||||
let currentCmd: string | undefined;
|
||||
for (const line of res.stdout.split(/\r?\n/).filter(Boolean)) {
|
||||
if (line.startsWith("p")) {
|
||||
if (currentPid != null && currentCmd && currentCmd.toLowerCase().includes("openclaw")) {
|
||||
pids.push(currentPid);
|
||||
}
|
||||
const parsed = Number.parseInt(line.slice(1), 10);
|
||||
currentPid = Number.isFinite(parsed) && parsed > 0 ? parsed : undefined;
|
||||
currentCmd = undefined;
|
||||
} else if (line.startsWith("c")) {
|
||||
currentCmd = line.slice(1);
|
||||
}
|
||||
}
|
||||
if (currentPid != null && currentCmd && currentCmd.toLowerCase().includes("openclaw")) {
|
||||
pids.push(currentPid);
|
||||
}
|
||||
return pids.filter((pid) => pid !== process.pid);
|
||||
}
|
||||
|
||||
const STALE_SIGTERM_WAIT_MS = 300;
|
||||
const STALE_SIGKILL_WAIT_MS = 200;
|
||||
|
||||
/**
|
||||
* Synchronously terminate stale gateway processes.
|
||||
* Sends SIGTERM, waits briefly, then SIGKILL for survivors.
|
||||
*/
|
||||
function terminateStaleProcessesSync(pids: number[]): number[] {
|
||||
if (pids.length === 0) {
|
||||
return [];
|
||||
}
|
||||
const killed: number[] = [];
|
||||
for (const pid of pids) {
|
||||
try {
|
||||
process.kill(pid, "SIGTERM");
|
||||
killed.push(pid);
|
||||
} catch {
|
||||
// ESRCH — already gone
|
||||
}
|
||||
}
|
||||
if (killed.length === 0) {
|
||||
return killed;
|
||||
}
|
||||
spawnSync("sleep", [String(STALE_SIGTERM_WAIT_MS / 1000)], { timeout: 2000 });
|
||||
for (const pid of killed) {
|
||||
try {
|
||||
process.kill(pid, 0);
|
||||
process.kill(pid, "SIGKILL");
|
||||
} catch {
|
||||
// already gone
|
||||
}
|
||||
}
|
||||
spawnSync("sleep", [String(STALE_SIGKILL_WAIT_MS / 1000)], { timeout: 2000 });
|
||||
return killed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Inspect the gateway port and kill any stale gateway processes holding it.
|
||||
* Called before service restart commands to prevent port conflicts.
|
||||
*/
|
||||
function cleanStaleGatewayProcessesSync(): number[] {
|
||||
try {
|
||||
const port = resolveGatewayPort(undefined, process.env);
|
||||
const stalePids = findGatewayPidsOnPortSync(port);
|
||||
if (stalePids.length === 0) {
|
||||
return [];
|
||||
}
|
||||
restartLog.warn(
|
||||
`killing ${stalePids.length} stale gateway process(es) before restart: ${stalePids.join(", ")}`,
|
||||
);
|
||||
return terminateStaleProcessesSync(stalePids);
|
||||
} catch {
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
export function triggerOpenClawRestart(): RestartAttempt {
|
||||
if (process.env.VITEST || process.env.NODE_ENV === "test") {
|
||||
return { ok: true, method: "supervisor", detail: "test mode" };
|
||||
|
||||
20
src/infra/supervisor-markers.ts
Normal file
20
src/infra/supervisor-markers.ts
Normal file
@@ -0,0 +1,20 @@
|
||||
export const SUPERVISOR_HINT_ENV_VARS = [
|
||||
// macOS launchd
|
||||
"LAUNCH_JOB_LABEL",
|
||||
"LAUNCH_JOB_NAME",
|
||||
// OpenClaw service env markers
|
||||
"OPENCLAW_LAUNCHD_LABEL",
|
||||
"OPENCLAW_SYSTEMD_UNIT",
|
||||
"OPENCLAW_SERVICE_MARKER",
|
||||
// Linux systemd
|
||||
"INVOCATION_ID",
|
||||
"SYSTEMD_EXEC_PID",
|
||||
"JOURNAL_STREAM",
|
||||
] as const;
|
||||
|
||||
export function hasSupervisorHint(env: NodeJS.ProcessEnv = process.env): boolean {
|
||||
return SUPERVISOR_HINT_ENV_VARS.some((key) => {
|
||||
const value = env[key];
|
||||
return typeof value === "string" && value.trim().length > 0;
|
||||
});
|
||||
}
|
||||
Reference in New Issue
Block a user