mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 14:41:36 +00:00
fix(gateway): probe port liveness for stale lock recovery
Co-authored-by: Operative-001 <261882263+Operative-001@users.noreply.github.com>
This commit is contained in:
@@ -100,6 +100,7 @@ Docs: https://docs.openclaw.ai
|
|||||||
- CLI/Sessions: pass the configured sessions directory when resolving transcript paths in `agentCommand`, so custom `session.store` locations resume sessions reliably. Thanks @davidrudduck.
|
- CLI/Sessions: pass the configured sessions directory when resolving transcript paths in `agentCommand`, so custom `session.store` locations resume sessions reliably. Thanks @davidrudduck.
|
||||||
- Gateway/Chat UI: strip inline reply/audio directive tags from non-streaming final webchat broadcasts (including `chat.inject`) while preserving empty-string message content when tags are the entire reply. (#23298) Thanks @SidQin-cyber.
|
- Gateway/Chat UI: strip inline reply/audio directive tags from non-streaming final webchat broadcasts (including `chat.inject`) while preserving empty-string message content when tags are the entire reply. (#23298) Thanks @SidQin-cyber.
|
||||||
- Gateway/Restart: fix restart-loop edge cases by keeping `openclaw.mjs -> dist/entry.js` bootstrap detection explicit, reacquiring the gateway lock for in-process restart fallback paths, and tightening restart-loop regression coverage. (#23416) Thanks @jeffwnli.
|
- Gateway/Restart: fix restart-loop edge cases by keeping `openclaw.mjs -> dist/entry.js` bootstrap detection explicit, reacquiring the gateway lock for in-process restart fallback paths, and tightening restart-loop regression coverage. (#23416) Thanks @jeffwnli.
|
||||||
|
- Gateway/Lock: use optional gateway-port reachability as a primary stale-lock liveness signal (and wire gateway run-loop lock acquisition to the resolved port), reducing false "already running" lockouts after unclean exits. (#23760) Thanks @Operative-001.
|
||||||
- Signal/Monitor: treat user-initiated abort shutdowns as clean exits when auto-started `signal-cli` is terminated, while still surfacing unexpected daemon exits as startup/runtime failures. (#23379) Thanks @frankekn.
|
- Signal/Monitor: treat user-initiated abort shutdowns as clean exits when auto-started `signal-cli` is terminated, while still surfacing unexpected daemon exits as startup/runtime failures. (#23379) Thanks @frankekn.
|
||||||
- Channels/Dedupe: centralize plugin dedupe primitives in plugin SDK (memory + persistent), move Feishu inbound dedupe to a namespace-scoped persistent store, and reuse shared dedupe cache logic for Zalo webhook replay + Tlon processed-message tracking to reduce duplicate handling during reconnect/replay paths. (#23377) Thanks @SidQin-cyber.
|
- Channels/Dedupe: centralize plugin dedupe primitives in plugin SDK (memory + persistent), move Feishu inbound dedupe to a namespace-scoped persistent store, and reuse shared dedupe cache logic for Zalo webhook replay + Tlon processed-message tracking to reduce duplicate handling during reconnect/replay paths. (#23377) Thanks @SidQin-cyber.
|
||||||
- Channels/Delivery: remove hardcoded WhatsApp delivery fallbacks; require explicit/session channel context or auto-pick the sole configured channel when unambiguous. (#23357) Thanks @lbo728.
|
- Channels/Delivery: remove hardcoded WhatsApp delivery fallbacks; require explicit/session channel context or auto-pick the sole configured channel when unambiguous. (#23357) Thanks @lbo728.
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ import { describe, expect, it, vi } from "vitest";
|
|||||||
import type { GatewayBonjourBeacon } from "../../infra/bonjour-discovery.js";
|
import type { GatewayBonjourBeacon } from "../../infra/bonjour-discovery.js";
|
||||||
import { pickBeaconHost, pickGatewayPort } from "./discover.js";
|
import { pickBeaconHost, pickGatewayPort } from "./discover.js";
|
||||||
|
|
||||||
const acquireGatewayLock = vi.fn(async () => ({
|
const acquireGatewayLock = vi.fn(async (_opts?: { port?: number }) => ({
|
||||||
release: vi.fn(async () => {}),
|
release: vi.fn(async () => {}),
|
||||||
}));
|
}));
|
||||||
const consumeGatewaySigusr1RestartAuthorization = vi.fn(() => true);
|
const consumeGatewaySigusr1RestartAuthorization = vi.fn(() => true);
|
||||||
@@ -22,7 +22,7 @@ const gatewayLog = {
|
|||||||
};
|
};
|
||||||
|
|
||||||
vi.mock("../../infra/gateway-lock.js", () => ({
|
vi.mock("../../infra/gateway-lock.js", () => ({
|
||||||
acquireGatewayLock: () => acquireGatewayLock(),
|
acquireGatewayLock: (opts?: { port?: number }) => acquireGatewayLock(opts),
|
||||||
}));
|
}));
|
||||||
|
|
||||||
vi.mock("../../infra/restart.js", () => ({
|
vi.mock("../../infra/restart.js", () => ({
|
||||||
@@ -109,12 +109,17 @@ function createSignaledStart(close: GatewayCloseFn) {
|
|||||||
return { start, started };
|
return { start, started };
|
||||||
}
|
}
|
||||||
|
|
||||||
async function runLoopWithStart(params: { start: ReturnType<typeof vi.fn>; runtime: LoopRuntime }) {
|
async function runLoopWithStart(params: {
|
||||||
|
start: ReturnType<typeof vi.fn>;
|
||||||
|
runtime: LoopRuntime;
|
||||||
|
lockPort?: number;
|
||||||
|
}) {
|
||||||
vi.resetModules();
|
vi.resetModules();
|
||||||
const { runGatewayLoop } = await import("./run-loop.js");
|
const { runGatewayLoop } = await import("./run-loop.js");
|
||||||
const loopPromise = runGatewayLoop({
|
const loopPromise = runGatewayLoop({
|
||||||
start: params.start as unknown as Parameters<typeof runGatewayLoop>[0]["start"],
|
start: params.start as unknown as Parameters<typeof runGatewayLoop>[0]["start"],
|
||||||
runtime: params.runtime,
|
runtime: params.runtime,
|
||||||
|
lockPort: params.lockPort,
|
||||||
});
|
});
|
||||||
return { loopPromise };
|
return { loopPromise };
|
||||||
}
|
}
|
||||||
@@ -276,6 +281,39 @@ describe("runGatewayLoop", () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("forwards lockPort to initial and restart lock acquisitions", async () => {
|
||||||
|
vi.clearAllMocks();
|
||||||
|
|
||||||
|
await withIsolatedSignals(async () => {
|
||||||
|
const closeFirst = vi.fn(async () => {});
|
||||||
|
const closeSecond = vi.fn(async () => {});
|
||||||
|
restartGatewayProcessWithFreshPid.mockReturnValueOnce({ mode: "disabled" });
|
||||||
|
|
||||||
|
const start = vi
|
||||||
|
.fn()
|
||||||
|
.mockResolvedValueOnce({ close: closeFirst })
|
||||||
|
.mockResolvedValueOnce({ close: closeSecond })
|
||||||
|
.mockRejectedValueOnce(new Error("stop-loop"));
|
||||||
|
const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() };
|
||||||
|
const { runGatewayLoop } = await import("./run-loop.js");
|
||||||
|
const loopPromise = runGatewayLoop({
|
||||||
|
start: start as unknown as Parameters<typeof runGatewayLoop>[0]["start"],
|
||||||
|
runtime: runtime as unknown as Parameters<typeof runGatewayLoop>[0]["runtime"],
|
||||||
|
lockPort: 18789,
|
||||||
|
});
|
||||||
|
|
||||||
|
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||||
|
process.emit("SIGUSR1");
|
||||||
|
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||||
|
process.emit("SIGUSR1");
|
||||||
|
|
||||||
|
await expect(loopPromise).rejects.toThrow("stop-loop");
|
||||||
|
expect(acquireGatewayLock).toHaveBeenNthCalledWith(1, { port: 18789 });
|
||||||
|
expect(acquireGatewayLock).toHaveBeenNthCalledWith(2, { port: 18789 });
|
||||||
|
expect(acquireGatewayLock).toHaveBeenNthCalledWith(3, { port: 18789 });
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
it("exits when lock reacquire fails during in-process restart fallback", async () => {
|
it("exits when lock reacquire fails during in-process restart fallback", async () => {
|
||||||
vi.clearAllMocks();
|
vi.clearAllMocks();
|
||||||
|
|
||||||
|
|||||||
@@ -22,8 +22,9 @@ type GatewayRunSignalAction = "stop" | "restart";
|
|||||||
export async function runGatewayLoop(params: {
|
export async function runGatewayLoop(params: {
|
||||||
start: () => Promise<Awaited<ReturnType<typeof startGatewayServer>>>;
|
start: () => Promise<Awaited<ReturnType<typeof startGatewayServer>>>;
|
||||||
runtime: typeof defaultRuntime;
|
runtime: typeof defaultRuntime;
|
||||||
|
lockPort?: number;
|
||||||
}) {
|
}) {
|
||||||
let lock = await acquireGatewayLock();
|
let lock = await acquireGatewayLock({ port: params.lockPort });
|
||||||
let server: Awaited<ReturnType<typeof startGatewayServer>> | null = null;
|
let server: Awaited<ReturnType<typeof startGatewayServer>> | null = null;
|
||||||
let shuttingDown = false;
|
let shuttingDown = false;
|
||||||
let restartResolver: (() => void) | null = null;
|
let restartResolver: (() => void) | null = null;
|
||||||
@@ -47,7 +48,7 @@ export async function runGatewayLoop(params: {
|
|||||||
};
|
};
|
||||||
const reacquireLockForInProcessRestart = async (): Promise<boolean> => {
|
const reacquireLockForInProcessRestart = async (): Promise<boolean> => {
|
||||||
try {
|
try {
|
||||||
lock = await acquireGatewayLock();
|
lock = await acquireGatewayLock({ port: params.lockPort });
|
||||||
return true;
|
return true;
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
gatewayLog.error(`failed to reacquire gateway lock for in-process restart: ${String(err)}`);
|
gatewayLog.error(`failed to reacquire gateway lock for in-process restart: ${String(err)}`);
|
||||||
|
|||||||
@@ -317,6 +317,7 @@ async function runGatewayCommand(opts: GatewayRunOpts) {
|
|||||||
try {
|
try {
|
||||||
await runGatewayLoop({
|
await runGatewayLoop({
|
||||||
runtime: defaultRuntime,
|
runtime: defaultRuntime,
|
||||||
|
lockPort: port,
|
||||||
start: async () =>
|
start: async () =>
|
||||||
await startGatewayServer(port, {
|
await startGatewayServer(port, {
|
||||||
bind,
|
bind,
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import { createHash } from "node:crypto";
|
import { createHash } from "node:crypto";
|
||||||
import fsSync from "node:fs";
|
import fsSync from "node:fs";
|
||||||
import fs from "node:fs/promises";
|
import fs from "node:fs/promises";
|
||||||
|
import net from "node:net";
|
||||||
import os from "node:os";
|
import os from "node:os";
|
||||||
import path from "node:path";
|
import path from "node:path";
|
||||||
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||||
@@ -129,6 +130,35 @@ async function acquireStaleLinuxLock(env: NodeJS.ProcessEnv) {
|
|||||||
staleProcSpy.mockRestore();
|
staleProcSpy.mockRestore();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function listenOnLoopbackPort() {
|
||||||
|
const server = net.createServer();
|
||||||
|
await new Promise<void>((resolve, reject) => {
|
||||||
|
server.once("error", reject);
|
||||||
|
server.listen(0, "127.0.0.1", () => {
|
||||||
|
server.off("error", reject);
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
const address = server.address();
|
||||||
|
if (!address || typeof address === "string") {
|
||||||
|
throw new Error("failed to resolve loopback test port");
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
port: address.port,
|
||||||
|
close: async () => {
|
||||||
|
await new Promise<void>((resolve, reject) => {
|
||||||
|
server.close((err) => {
|
||||||
|
if (err) {
|
||||||
|
reject(err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
describe("gateway lock", () => {
|
describe("gateway lock", () => {
|
||||||
beforeAll(async () => {
|
beforeAll(async () => {
|
||||||
fixtureRoot = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gateway-lock-"));
|
fixtureRoot = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gateway-lock-"));
|
||||||
@@ -227,6 +257,50 @@ describe("gateway lock", () => {
|
|||||||
statSpy.mockRestore();
|
statSpy.mockRestore();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("treats lock as stale when owner pid is alive but configured port is free", async () => {
|
||||||
|
vi.useRealTimers();
|
||||||
|
const env = await makeEnv();
|
||||||
|
await writeLockFile(env, {
|
||||||
|
startTime: 111,
|
||||||
|
createdAt: new Date().toISOString(),
|
||||||
|
});
|
||||||
|
const listener = await listenOnLoopbackPort();
|
||||||
|
const port = listener.port;
|
||||||
|
await listener.close();
|
||||||
|
|
||||||
|
const lock = await acquireForTest(env, {
|
||||||
|
timeoutMs: 80,
|
||||||
|
pollIntervalMs: 5,
|
||||||
|
staleMs: 10_000,
|
||||||
|
platform: "darwin",
|
||||||
|
port,
|
||||||
|
});
|
||||||
|
expect(lock).not.toBeNull();
|
||||||
|
await lock?.release();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("keeps lock when configured port is busy and owner pid is alive", async () => {
|
||||||
|
vi.useRealTimers();
|
||||||
|
const env = await makeEnv();
|
||||||
|
await writeLockFile(env, {
|
||||||
|
startTime: 111,
|
||||||
|
createdAt: new Date().toISOString(),
|
||||||
|
});
|
||||||
|
const listener = await listenOnLoopbackPort();
|
||||||
|
try {
|
||||||
|
const pending = acquireForTest(env, {
|
||||||
|
timeoutMs: 20,
|
||||||
|
pollIntervalMs: 2,
|
||||||
|
staleMs: 10_000,
|
||||||
|
platform: "darwin",
|
||||||
|
port: listener.port,
|
||||||
|
});
|
||||||
|
await expect(pending).rejects.toBeInstanceOf(GatewayLockError);
|
||||||
|
} finally {
|
||||||
|
await listener.close();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
it("returns null when multi-gateway override is enabled", async () => {
|
it("returns null when multi-gateway override is enabled", async () => {
|
||||||
const env = await makeEnv();
|
const env = await makeEnv();
|
||||||
const lock = await acquireGatewayLock({
|
const lock = await acquireGatewayLock({
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import { createHash } from "node:crypto";
|
import { createHash } from "node:crypto";
|
||||||
import fsSync from "node:fs";
|
import fsSync from "node:fs";
|
||||||
import fs from "node:fs/promises";
|
import fs from "node:fs/promises";
|
||||||
|
import net from "node:net";
|
||||||
import path from "node:path";
|
import path from "node:path";
|
||||||
import { resolveConfigPath, resolveGatewayLockDir, resolveStateDir } from "../config/paths.js";
|
import { resolveConfigPath, resolveGatewayLockDir, resolveStateDir } from "../config/paths.js";
|
||||||
import { isPidAlive } from "../shared/pid-alive.js";
|
import { isPidAlive } from "../shared/pid-alive.js";
|
||||||
@@ -8,6 +9,7 @@ import { isPidAlive } from "../shared/pid-alive.js";
|
|||||||
const DEFAULT_TIMEOUT_MS = 5000;
|
const DEFAULT_TIMEOUT_MS = 5000;
|
||||||
const DEFAULT_POLL_INTERVAL_MS = 100;
|
const DEFAULT_POLL_INTERVAL_MS = 100;
|
||||||
const DEFAULT_STALE_MS = 30_000;
|
const DEFAULT_STALE_MS = 30_000;
|
||||||
|
const DEFAULT_PORT_PROBE_TIMEOUT_MS = 1000;
|
||||||
|
|
||||||
type LockPayload = {
|
type LockPayload = {
|
||||||
pid: number;
|
pid: number;
|
||||||
@@ -29,6 +31,7 @@ export type GatewayLockOptions = {
|
|||||||
staleMs?: number;
|
staleMs?: number;
|
||||||
allowInTests?: boolean;
|
allowInTests?: boolean;
|
||||||
platform?: NodeJS.Platform;
|
platform?: NodeJS.Platform;
|
||||||
|
port?: number;
|
||||||
};
|
};
|
||||||
|
|
||||||
export class GatewayLockError extends Error {
|
export class GatewayLockError extends Error {
|
||||||
@@ -100,11 +103,47 @@ function readLinuxStartTime(pid: number): number | null {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function resolveGatewayOwnerStatus(
|
async function checkPortFree(port: number, host = "127.0.0.1"): Promise<boolean> {
|
||||||
|
return await new Promise<boolean>((resolve) => {
|
||||||
|
const socket = net.createConnection({ port, host });
|
||||||
|
let settled = false;
|
||||||
|
const finish = (result: boolean) => {
|
||||||
|
if (settled) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
settled = true;
|
||||||
|
clearTimeout(timer);
|
||||||
|
socket.removeAllListeners();
|
||||||
|
socket.destroy();
|
||||||
|
resolve(result);
|
||||||
|
};
|
||||||
|
const timer = setTimeout(() => {
|
||||||
|
// Conservative for liveness checks: timeout usually means no responsive
|
||||||
|
// local listener, so treat the lock owner as stale.
|
||||||
|
finish(true);
|
||||||
|
}, DEFAULT_PORT_PROBE_TIMEOUT_MS);
|
||||||
|
socket.once("connect", () => {
|
||||||
|
finish(false);
|
||||||
|
});
|
||||||
|
socket.once("error", () => {
|
||||||
|
finish(true);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async function resolveGatewayOwnerStatus(
|
||||||
pid: number,
|
pid: number,
|
||||||
payload: LockPayload | null,
|
payload: LockPayload | null,
|
||||||
platform: NodeJS.Platform,
|
platform: NodeJS.Platform,
|
||||||
): LockOwnerStatus {
|
port: number | undefined,
|
||||||
|
): Promise<LockOwnerStatus> {
|
||||||
|
if (port != null) {
|
||||||
|
const portFree = await checkPortFree(port);
|
||||||
|
if (portFree) {
|
||||||
|
return "dead";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (!isPidAlive(pid)) {
|
if (!isPidAlive(pid)) {
|
||||||
return "dead";
|
return "dead";
|
||||||
}
|
}
|
||||||
@@ -178,6 +217,7 @@ export async function acquireGatewayLock(
|
|||||||
const pollIntervalMs = opts.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS;
|
const pollIntervalMs = opts.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS;
|
||||||
const staleMs = opts.staleMs ?? DEFAULT_STALE_MS;
|
const staleMs = opts.staleMs ?? DEFAULT_STALE_MS;
|
||||||
const platform = opts.platform ?? process.platform;
|
const platform = opts.platform ?? process.platform;
|
||||||
|
const port = opts.port;
|
||||||
const { lockPath, configPath } = resolveGatewayLockPath(env);
|
const { lockPath, configPath } = resolveGatewayLockPath(env);
|
||||||
await fs.mkdir(path.dirname(lockPath), { recursive: true });
|
await fs.mkdir(path.dirname(lockPath), { recursive: true });
|
||||||
|
|
||||||
@@ -214,7 +254,7 @@ export async function acquireGatewayLock(
|
|||||||
lastPayload = await readLockPayload(lockPath);
|
lastPayload = await readLockPayload(lockPath);
|
||||||
const ownerPid = lastPayload?.pid;
|
const ownerPid = lastPayload?.pid;
|
||||||
const ownerStatus = ownerPid
|
const ownerStatus = ownerPid
|
||||||
? resolveGatewayOwnerStatus(ownerPid, lastPayload, platform)
|
? await resolveGatewayOwnerStatus(ownerPid, lastPayload, platform, port)
|
||||||
: "unknown";
|
: "unknown";
|
||||||
if (ownerStatus === "dead" && ownerPid) {
|
if (ownerStatus === "dead" && ownerPid) {
|
||||||
await fs.rm(lockPath, { force: true });
|
await fs.rm(lockPath, { force: true });
|
||||||
|
|||||||
Reference in New Issue
Block a user