diff --git a/CHANGELOG.md b/CHANGELOG.md index 88afb0b7f0c..b9bbdf3cb91 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ Docs: https://docs.openclaw.ai - Gateway/Restart: fix restart-loop edge cases by keeping `openclaw.mjs -> dist/entry.js` bootstrap detection explicit, reacquiring the gateway lock for in-process restart fallback paths, and tightening restart-loop regression coverage. (#23416) Thanks @jeffwnli. - Channels/Dedupe: centralize plugin dedupe primitives in plugin SDK (memory + persistent), move Feishu inbound dedupe to a namespace-scoped persistent store, and reuse shared dedupe cache logic for Zalo webhook replay + Tlon processed-message tracking to reduce duplicate handling during reconnect/replay paths. +- ACP/Gateway: wait for gateway hello before opening ACP requests, and fail fast on pre-hello connect failures to avoid startup hangs and early `gateway not connected` request races. (#23390) Thanks @janckerchen. - Security/Audit: add `openclaw security audit` detection for open group policies that expose runtime/filesystem tools without sandbox/workspace guards (`security.exposure.open_groups_with_runtime_or_fs`). - Security/Exec env: block request-scoped `HOME` and `ZDOTDIR` overrides in host exec env sanitizers (Node + macOS), preventing shell startup-file execution before allowlist-evaluated command bodies. This ships in the next npm release. Thanks @tdjackey for reporting. - Security/Gateway: emit a startup security warning when insecure/dangerous config flags are enabled (including `gateway.controlUi.dangerouslyDisableDeviceAuth=true`) and point operators to `openclaw security audit`. diff --git a/src/acp/server.startup.test.ts b/src/acp/server.startup.test.ts new file mode 100644 index 00000000000..ae8d99d3a99 --- /dev/null +++ b/src/acp/server.startup.test.ts @@ -0,0 +1,152 @@ +import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; + +type GatewayClientCallbacks = { + onHelloOk?: () => void; + onConnectError?: (err: Error) => void; + onClose?: (code: number, reason: string) => void; +}; + +const mockState = { + gateways: [] as MockGatewayClient[], + agentSideConnectionCtor: vi.fn(), + agentStart: vi.fn(), +}; + +class MockGatewayClient { + private callbacks: GatewayClientCallbacks; + + constructor(opts: GatewayClientCallbacks) { + this.callbacks = opts; + mockState.gateways.push(this); + } + + start(): void {} + + stop(): void { + this.callbacks.onClose?.(1000, "gateway stopped"); + } + + emitHello(): void { + this.callbacks.onHelloOk?.(); + } + + emitConnectError(message: string): void { + this.callbacks.onConnectError?.(new Error(message)); + } +} + +vi.mock("@agentclientprotocol/sdk", () => ({ + AgentSideConnection: class { + constructor(factory: (conn: unknown) => unknown, stream: unknown) { + mockState.agentSideConnectionCtor(factory, stream); + factory({}); + } + }, + ndJsonStream: vi.fn(() => ({ type: "mock-stream" })), +})); + +vi.mock("../config/config.js", () => ({ + loadConfig: () => ({ + gateway: { + mode: "local", + }, + }), +})); + +vi.mock("../gateway/auth.js", () => ({ + resolveGatewayAuth: () => ({}), +})); + +vi.mock("../gateway/call.js", () => ({ + buildGatewayConnectionDetails: () => ({ + url: "ws://127.0.0.1:18789", + }), +})); + +vi.mock("../gateway/client.js", () => ({ + GatewayClient: MockGatewayClient, +})); + +vi.mock("./translator.js", () => ({ + AcpGatewayAgent: class { + start(): void { + mockState.agentStart(); + } + + handleGatewayReconnect(): void {} + + handleGatewayDisconnect(): void {} + + async handleGatewayEvent(): Promise {} + }, +})); + +describe("serveAcpGateway startup", () => { + let serveAcpGateway: typeof import("./server.js").serveAcpGateway; + + beforeAll(async () => { + ({ serveAcpGateway } = await import("./server.js")); + }); + + beforeEach(() => { + mockState.gateways.length = 0; + mockState.agentSideConnectionCtor.mockReset(); + mockState.agentStart.mockReset(); + }); + + it("waits for gateway hello before creating AgentSideConnection", async () => { + const signalHandlers = new Map void>(); + const onceSpy = vi.spyOn(process, "once").mockImplementation((( + signal: NodeJS.Signals, + handler: () => void, + ) => { + signalHandlers.set(signal, handler); + return process; + }) as typeof process.once); + + try { + const servePromise = serveAcpGateway({}); + await Promise.resolve(); + + expect(mockState.agentSideConnectionCtor).not.toHaveBeenCalled(); + const gateway = mockState.gateways[0]; + if (!gateway) { + throw new Error("Expected mocked gateway instance"); + } + + gateway.emitHello(); + await vi.waitFor(() => { + expect(mockState.agentSideConnectionCtor).toHaveBeenCalledTimes(1); + }); + + signalHandlers.get("SIGINT")?.(); + await servePromise; + } finally { + onceSpy.mockRestore(); + } + }); + + it("rejects startup when gateway connect fails before hello", async () => { + const onceSpy = vi + .spyOn(process, "once") + .mockImplementation( + ((_signal: NodeJS.Signals, _handler: () => void) => process) as typeof process.once, + ); + + try { + const servePromise = serveAcpGateway({}); + await Promise.resolve(); + + const gateway = mockState.gateways[0]; + if (!gateway) { + throw new Error("Expected mocked gateway instance"); + } + + gateway.emitConnectError("connect failed"); + await expect(servePromise).rejects.toThrow("connect failed"); + expect(mockState.agentSideConnectionCtor).not.toHaveBeenCalled(); + } finally { + onceSpy.mockRestore(); + } + }); +}); diff --git a/src/acp/server.ts b/src/acp/server.ts index e8085bd6fb3..0c17ca429d1 100644 --- a/src/acp/server.ts +++ b/src/acp/server.ts @@ -40,6 +40,27 @@ export async function serveAcpGateway(opts: AcpServerOptions = {}): Promise void; + let onGatewayReadyReject!: (err: Error) => void; + let gatewayReadySettled = false; + const gatewayReady = new Promise((resolve, reject) => { + onGatewayReadyResolve = resolve; + onGatewayReadyReject = reject; + }); + const resolveGatewayReady = () => { + if (gatewayReadySettled) { + return; + } + gatewayReadySettled = true; + onGatewayReadyResolve(); + }; + const rejectGatewayReady = (err: unknown) => { + if (gatewayReadySettled) { + return; + } + gatewayReadySettled = true; + onGatewayReadyReject(err instanceof Error ? err : new Error(String(err))); + }; const gateway = new GatewayClient({ url: connection.url, @@ -53,9 +74,16 @@ export async function serveAcpGateway(opts: AcpServerOptions = {}): Promise { + resolveGatewayReady(); agent?.handleGatewayReconnect(); }, + onConnectError: (err) => { + rejectGatewayReady(err); + }, onClose: (code, reason) => { + if (!stopped) { + rejectGatewayReady(new Error(`gateway closed before ready (${code}): ${reason}`)); + } agent?.handleGatewayDisconnect(`${code}: ${reason}`); // Resolve only on intentional shutdown (gateway.stop() sets closed // which skips scheduleReconnect, then fires onClose). Transient @@ -71,6 +99,7 @@ export async function serveAcpGateway(opts: AcpServerOptions = {}): Promise((resolve) => { - const originalOnHelloOk = gateway.opts.onHelloOk; - gateway.opts.onHelloOk = (hello) => { - originalOnHelloOk?.(hello); - resolve(); - }; + await gatewayReady.catch((err) => { + shutdown(); + throw err; }); - - // Wait for gateway connection before creating AgentSideConnection - await helloReceived; + if (stopped) { + return closed; + } const input = Writable.toWeb(process.stdout); const output = Readable.toWeb(process.stdin) as unknown as ReadableStream;