mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-09 12:57:40 +00:00
refactor(test): share gateway server start helper
This commit is contained in:
@@ -13,13 +13,12 @@ import { createRegistry } from "./server.e2e-registry-helpers.js";
|
|||||||
import {
|
import {
|
||||||
agentCommand,
|
agentCommand,
|
||||||
connectOk,
|
connectOk,
|
||||||
getFreePort,
|
|
||||||
installGatewayTestHooks,
|
installGatewayTestHooks,
|
||||||
onceMessage,
|
onceMessage,
|
||||||
rpcReq,
|
rpcReq,
|
||||||
startGatewayServer,
|
|
||||||
startServerWithClient,
|
startServerWithClient,
|
||||||
testState,
|
testState,
|
||||||
|
withGatewayServer,
|
||||||
writeSessionStore,
|
writeSessionStore,
|
||||||
} from "./test-helpers.js";
|
} from "./test-helpers.js";
|
||||||
|
|
||||||
@@ -326,52 +325,50 @@ describe("gateway server agent", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
test("agent dedupe survives reconnect", { timeout: 60_000 }, async () => {
|
test("agent dedupe survives reconnect", { timeout: 60_000 }, async () => {
|
||||||
const port = await getFreePort();
|
await withGatewayServer(async ({ port }) => {
|
||||||
const server = await startGatewayServer(port);
|
const dial = async () => {
|
||||||
|
const ws = new WebSocket(`ws://127.0.0.1:${port}`);
|
||||||
|
await new Promise<void>((resolve) => ws.once("open", resolve));
|
||||||
|
await connectOk(ws);
|
||||||
|
return ws;
|
||||||
|
};
|
||||||
|
|
||||||
const dial = async () => {
|
const idem = "reconnect-agent";
|
||||||
const ws = new WebSocket(`ws://127.0.0.1:${port}`);
|
const ws1 = await dial();
|
||||||
await new Promise<void>((resolve) => ws.once("open", resolve));
|
const final1P = onceMessage(
|
||||||
await connectOk(ws);
|
ws1,
|
||||||
return ws;
|
(o) => o.type === "res" && o.id === "ag1" && o.payload?.status !== "accepted",
|
||||||
};
|
6000,
|
||||||
|
);
|
||||||
|
ws1.send(
|
||||||
|
JSON.stringify({
|
||||||
|
type: "req",
|
||||||
|
id: "ag1",
|
||||||
|
method: "agent",
|
||||||
|
params: { message: "hi", idempotencyKey: idem },
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
const final1 = await final1P;
|
||||||
|
ws1.close();
|
||||||
|
|
||||||
const idem = "reconnect-agent";
|
const ws2 = await dial();
|
||||||
const ws1 = await dial();
|
const final2P = onceMessage(
|
||||||
const final1P = onceMessage(
|
ws2,
|
||||||
ws1,
|
(o) => o.type === "res" && o.id === "ag2" && o.payload?.status !== "accepted",
|
||||||
(o) => o.type === "res" && o.id === "ag1" && o.payload?.status !== "accepted",
|
6000,
|
||||||
6000,
|
);
|
||||||
);
|
ws2.send(
|
||||||
ws1.send(
|
JSON.stringify({
|
||||||
JSON.stringify({
|
type: "req",
|
||||||
type: "req",
|
id: "ag2",
|
||||||
id: "ag1",
|
method: "agent",
|
||||||
method: "agent",
|
params: { message: "hi again", idempotencyKey: idem },
|
||||||
params: { message: "hi", idempotencyKey: idem },
|
}),
|
||||||
}),
|
);
|
||||||
);
|
const res = await final2P;
|
||||||
const final1 = await final1P;
|
expect(res.payload).toEqual(final1.payload);
|
||||||
ws1.close();
|
ws2.close();
|
||||||
|
});
|
||||||
const ws2 = await dial();
|
|
||||||
const final2P = onceMessage(
|
|
||||||
ws2,
|
|
||||||
(o) => o.type === "res" && o.id === "ag2" && o.payload?.status !== "accepted",
|
|
||||||
6000,
|
|
||||||
);
|
|
||||||
ws2.send(
|
|
||||||
JSON.stringify({
|
|
||||||
type: "req",
|
|
||||||
id: "ag2",
|
|
||||||
method: "agent",
|
|
||||||
params: { message: "hi again", idempotencyKey: idem },
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
const res = await final2P;
|
|
||||||
expect(res.payload).toEqual(final1.payload);
|
|
||||||
ws2.close();
|
|
||||||
await server.close();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
test("agent events stream to webchat clients when run context is registered", async () => {
|
test("agent events stream to webchat clients when run context is registered", async () => {
|
||||||
|
|||||||
@@ -3,10 +3,9 @@ import { resolveMainSessionKeyFromConfig } from "../config/sessions.js";
|
|||||||
import { drainSystemEvents, peekSystemEvents } from "../infra/system-events.js";
|
import { drainSystemEvents, peekSystemEvents } from "../infra/system-events.js";
|
||||||
import {
|
import {
|
||||||
cronIsolatedRun,
|
cronIsolatedRun,
|
||||||
getFreePort,
|
|
||||||
installGatewayTestHooks,
|
installGatewayTestHooks,
|
||||||
startGatewayServer,
|
|
||||||
testState,
|
testState,
|
||||||
|
withGatewayServer,
|
||||||
waitForSystemEvent,
|
waitForSystemEvent,
|
||||||
} from "./test-helpers.js";
|
} from "./test-helpers.js";
|
||||||
|
|
||||||
@@ -14,16 +13,6 @@ installGatewayTestHooks({ scope: "suite" });
|
|||||||
|
|
||||||
const resolveMainKey = () => resolveMainSessionKeyFromConfig();
|
const resolveMainKey = () => resolveMainSessionKeyFromConfig();
|
||||||
|
|
||||||
async function withGatewayServer<T>(fn: (ctx: { port: number }) => Promise<T>): Promise<T> {
|
|
||||||
const port = await getFreePort();
|
|
||||||
const server = await startGatewayServer(port);
|
|
||||||
try {
|
|
||||||
return await fn({ port });
|
|
||||||
} finally {
|
|
||||||
await server.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
describe("gateway server hooks", () => {
|
describe("gateway server hooks", () => {
|
||||||
test("handles auth, wake, and agent flows", async () => {
|
test("handles auth, wake, and agent flows", async () => {
|
||||||
testState.hooksConfig = { enabled: true, token: "hook-secret" };
|
testState.hooksConfig = { enabled: true, token: "hook-secret" };
|
||||||
|
|||||||
@@ -1,11 +1,10 @@
|
|||||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||||
import {
|
import {
|
||||||
connectOk,
|
connectOk,
|
||||||
getFreePort,
|
|
||||||
installGatewayTestHooks,
|
installGatewayTestHooks,
|
||||||
rpcReq,
|
rpcReq,
|
||||||
startGatewayServer,
|
|
||||||
startServerWithClient,
|
startServerWithClient,
|
||||||
|
withGatewayServer,
|
||||||
} from "./test-helpers.js";
|
} from "./test-helpers.js";
|
||||||
|
|
||||||
const hoisted = vi.hoisted(() => {
|
const hoisted = vi.hoisted(() => {
|
||||||
@@ -200,110 +199,107 @@ describe("gateway hot reload", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it("applies hot reload actions and emits restart signal", async () => {
|
it("applies hot reload actions and emits restart signal", async () => {
|
||||||
const port = await getFreePort();
|
await withGatewayServer(async () => {
|
||||||
const server = await startGatewayServer(port);
|
const onHotReload = hoisted.getOnHotReload();
|
||||||
|
expect(onHotReload).toBeTypeOf("function");
|
||||||
|
|
||||||
const onHotReload = hoisted.getOnHotReload();
|
const nextConfig = {
|
||||||
expect(onHotReload).toBeTypeOf("function");
|
hooks: {
|
||||||
|
enabled: true,
|
||||||
|
token: "secret",
|
||||||
|
gmail: { account: "me@example.com" },
|
||||||
|
},
|
||||||
|
cron: { enabled: true, store: "/tmp/cron.json" },
|
||||||
|
agents: { defaults: { heartbeat: { every: "1m" }, maxConcurrent: 2 } },
|
||||||
|
browser: { enabled: true },
|
||||||
|
web: { enabled: true },
|
||||||
|
channels: {
|
||||||
|
telegram: { botToken: "token" },
|
||||||
|
discord: { token: "token" },
|
||||||
|
signal: { account: "+15550000000" },
|
||||||
|
imessage: { enabled: true },
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
const nextConfig = {
|
await onHotReload?.(
|
||||||
hooks: {
|
{
|
||||||
enabled: true,
|
changedPaths: [
|
||||||
token: "secret",
|
"hooks.gmail.account",
|
||||||
gmail: { account: "me@example.com" },
|
"cron.enabled",
|
||||||
},
|
"agents.defaults.heartbeat.every",
|
||||||
cron: { enabled: true, store: "/tmp/cron.json" },
|
"browser.enabled",
|
||||||
agents: { defaults: { heartbeat: { every: "1m" }, maxConcurrent: 2 } },
|
"web.enabled",
|
||||||
browser: { enabled: true },
|
"channels.telegram.botToken",
|
||||||
web: { enabled: true },
|
"channels.discord.token",
|
||||||
channels: {
|
"channels.signal.account",
|
||||||
telegram: { botToken: "token" },
|
"channels.imessage.enabled",
|
||||||
discord: { token: "token" },
|
],
|
||||||
signal: { account: "+15550000000" },
|
restartGateway: false,
|
||||||
imessage: { enabled: true },
|
restartReasons: [],
|
||||||
},
|
hotReasons: ["web.enabled"],
|
||||||
};
|
reloadHooks: true,
|
||||||
|
restartGmailWatcher: true,
|
||||||
|
restartBrowserControl: true,
|
||||||
|
restartCron: true,
|
||||||
|
restartHeartbeat: true,
|
||||||
|
restartChannels: new Set(["whatsapp", "telegram", "discord", "signal", "imessage"]),
|
||||||
|
noopPaths: [],
|
||||||
|
},
|
||||||
|
nextConfig,
|
||||||
|
);
|
||||||
|
|
||||||
await onHotReload?.(
|
expect(hoisted.stopGmailWatcher).toHaveBeenCalled();
|
||||||
{
|
expect(hoisted.startGmailWatcher).toHaveBeenCalledWith(nextConfig);
|
||||||
changedPaths: [
|
|
||||||
"hooks.gmail.account",
|
|
||||||
"cron.enabled",
|
|
||||||
"agents.defaults.heartbeat.every",
|
|
||||||
"browser.enabled",
|
|
||||||
"web.enabled",
|
|
||||||
"channels.telegram.botToken",
|
|
||||||
"channels.discord.token",
|
|
||||||
"channels.signal.account",
|
|
||||||
"channels.imessage.enabled",
|
|
||||||
],
|
|
||||||
restartGateway: false,
|
|
||||||
restartReasons: [],
|
|
||||||
hotReasons: ["web.enabled"],
|
|
||||||
reloadHooks: true,
|
|
||||||
restartGmailWatcher: true,
|
|
||||||
restartBrowserControl: true,
|
|
||||||
restartCron: true,
|
|
||||||
restartHeartbeat: true,
|
|
||||||
restartChannels: new Set(["whatsapp", "telegram", "discord", "signal", "imessage"]),
|
|
||||||
noopPaths: [],
|
|
||||||
},
|
|
||||||
nextConfig,
|
|
||||||
);
|
|
||||||
|
|
||||||
expect(hoisted.stopGmailWatcher).toHaveBeenCalled();
|
expect(hoisted.browserStop).toHaveBeenCalledTimes(1);
|
||||||
expect(hoisted.startGmailWatcher).toHaveBeenCalledWith(nextConfig);
|
expect(hoisted.startBrowserControlServerIfEnabled).toHaveBeenCalledTimes(2);
|
||||||
|
|
||||||
expect(hoisted.browserStop).toHaveBeenCalledTimes(1);
|
expect(hoisted.startHeartbeatRunner).toHaveBeenCalledTimes(1);
|
||||||
expect(hoisted.startBrowserControlServerIfEnabled).toHaveBeenCalledTimes(2);
|
expect(hoisted.heartbeatUpdateConfig).toHaveBeenCalledTimes(1);
|
||||||
|
expect(hoisted.heartbeatUpdateConfig).toHaveBeenCalledWith(nextConfig);
|
||||||
|
|
||||||
expect(hoisted.startHeartbeatRunner).toHaveBeenCalledTimes(1);
|
expect(hoisted.cronInstances.length).toBe(2);
|
||||||
expect(hoisted.heartbeatUpdateConfig).toHaveBeenCalledTimes(1);
|
expect(hoisted.cronInstances[0].stop).toHaveBeenCalledTimes(1);
|
||||||
expect(hoisted.heartbeatUpdateConfig).toHaveBeenCalledWith(nextConfig);
|
expect(hoisted.cronInstances[1].start).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
expect(hoisted.cronInstances.length).toBe(2);
|
expect(hoisted.providerManager.stopChannel).toHaveBeenCalledTimes(5);
|
||||||
expect(hoisted.cronInstances[0].stop).toHaveBeenCalledTimes(1);
|
expect(hoisted.providerManager.startChannel).toHaveBeenCalledTimes(5);
|
||||||
expect(hoisted.cronInstances[1].start).toHaveBeenCalledTimes(1);
|
expect(hoisted.providerManager.stopChannel).toHaveBeenCalledWith("whatsapp");
|
||||||
|
expect(hoisted.providerManager.startChannel).toHaveBeenCalledWith("whatsapp");
|
||||||
|
expect(hoisted.providerManager.stopChannel).toHaveBeenCalledWith("telegram");
|
||||||
|
expect(hoisted.providerManager.startChannel).toHaveBeenCalledWith("telegram");
|
||||||
|
expect(hoisted.providerManager.stopChannel).toHaveBeenCalledWith("discord");
|
||||||
|
expect(hoisted.providerManager.startChannel).toHaveBeenCalledWith("discord");
|
||||||
|
expect(hoisted.providerManager.stopChannel).toHaveBeenCalledWith("signal");
|
||||||
|
expect(hoisted.providerManager.startChannel).toHaveBeenCalledWith("signal");
|
||||||
|
expect(hoisted.providerManager.stopChannel).toHaveBeenCalledWith("imessage");
|
||||||
|
expect(hoisted.providerManager.startChannel).toHaveBeenCalledWith("imessage");
|
||||||
|
|
||||||
expect(hoisted.providerManager.stopChannel).toHaveBeenCalledTimes(5);
|
const onRestart = hoisted.getOnRestart();
|
||||||
expect(hoisted.providerManager.startChannel).toHaveBeenCalledTimes(5);
|
expect(onRestart).toBeTypeOf("function");
|
||||||
expect(hoisted.providerManager.stopChannel).toHaveBeenCalledWith("whatsapp");
|
|
||||||
expect(hoisted.providerManager.startChannel).toHaveBeenCalledWith("whatsapp");
|
|
||||||
expect(hoisted.providerManager.stopChannel).toHaveBeenCalledWith("telegram");
|
|
||||||
expect(hoisted.providerManager.startChannel).toHaveBeenCalledWith("telegram");
|
|
||||||
expect(hoisted.providerManager.stopChannel).toHaveBeenCalledWith("discord");
|
|
||||||
expect(hoisted.providerManager.startChannel).toHaveBeenCalledWith("discord");
|
|
||||||
expect(hoisted.providerManager.stopChannel).toHaveBeenCalledWith("signal");
|
|
||||||
expect(hoisted.providerManager.startChannel).toHaveBeenCalledWith("signal");
|
|
||||||
expect(hoisted.providerManager.stopChannel).toHaveBeenCalledWith("imessage");
|
|
||||||
expect(hoisted.providerManager.startChannel).toHaveBeenCalledWith("imessage");
|
|
||||||
|
|
||||||
const onRestart = hoisted.getOnRestart();
|
const signalSpy = vi.fn();
|
||||||
expect(onRestart).toBeTypeOf("function");
|
process.once("SIGUSR1", signalSpy);
|
||||||
|
|
||||||
const signalSpy = vi.fn();
|
onRestart?.(
|
||||||
process.once("SIGUSR1", signalSpy);
|
{
|
||||||
|
changedPaths: ["gateway.port"],
|
||||||
|
restartGateway: true,
|
||||||
|
restartReasons: ["gateway.port"],
|
||||||
|
hotReasons: [],
|
||||||
|
reloadHooks: false,
|
||||||
|
restartGmailWatcher: false,
|
||||||
|
restartBrowserControl: false,
|
||||||
|
restartCron: false,
|
||||||
|
restartHeartbeat: false,
|
||||||
|
restartChannels: new Set(),
|
||||||
|
noopPaths: [],
|
||||||
|
},
|
||||||
|
{},
|
||||||
|
);
|
||||||
|
|
||||||
onRestart?.(
|
expect(signalSpy).toHaveBeenCalledTimes(1);
|
||||||
{
|
});
|
||||||
changedPaths: ["gateway.port"],
|
|
||||||
restartGateway: true,
|
|
||||||
restartReasons: ["gateway.port"],
|
|
||||||
hotReasons: [],
|
|
||||||
reloadHooks: false,
|
|
||||||
restartGmailWatcher: false,
|
|
||||||
restartBrowserControl: false,
|
|
||||||
restartCron: false,
|
|
||||||
restartHeartbeat: false,
|
|
||||||
restartChannels: new Set(),
|
|
||||||
noopPaths: [],
|
|
||||||
},
|
|
||||||
{},
|
|
||||||
);
|
|
||||||
|
|
||||||
expect(signalSpy).toHaveBeenCalledTimes(1);
|
|
||||||
|
|
||||||
await server.close();
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -331,6 +331,43 @@ export async function startGatewayServer(port: number, opts?: GatewayServerOptio
|
|||||||
return await mod.startGatewayServer(port, resolvedOpts);
|
return await mod.startGatewayServer(port, resolvedOpts);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function startGatewayServerWithRetries(params: {
|
||||||
|
port: number;
|
||||||
|
opts?: GatewayServerOptions;
|
||||||
|
}): Promise<{ port: number; server: Awaited<ReturnType<typeof startGatewayServer>> }> {
|
||||||
|
let port = params.port;
|
||||||
|
for (let attempt = 0; attempt < 10; attempt++) {
|
||||||
|
try {
|
||||||
|
return {
|
||||||
|
port,
|
||||||
|
server: await startGatewayServer(port, params.opts),
|
||||||
|
};
|
||||||
|
} catch (err) {
|
||||||
|
const code = (err as { cause?: { code?: string } }).cause?.code;
|
||||||
|
if (code !== "EADDRINUSE") {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
port = await getFreePort();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new Error("failed to start gateway server after retries");
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function withGatewayServer<T>(
|
||||||
|
fn: (ctx: { port: number; server: Awaited<ReturnType<typeof startGatewayServer>> }) => Promise<T>,
|
||||||
|
opts?: { port?: number; serverOptions?: GatewayServerOptions },
|
||||||
|
): Promise<T> {
|
||||||
|
const started = await startGatewayServerWithRetries({
|
||||||
|
port: opts?.port ?? (await getFreePort()),
|
||||||
|
opts: opts?.serverOptions,
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
return await fn({ port: started.port, server: started.server });
|
||||||
|
} finally {
|
||||||
|
await started.server.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export async function startServerWithClient(
|
export async function startServerWithClient(
|
||||||
token?: string,
|
token?: string,
|
||||||
opts?: GatewayServerOptions & { wsHeaders?: Record<string, string> },
|
opts?: GatewayServerOptions & { wsHeaders?: Record<string, string> },
|
||||||
@@ -352,22 +389,9 @@ export async function startServerWithClient(
|
|||||||
process.env.OPENCLAW_GATEWAY_TOKEN = fallbackToken;
|
process.env.OPENCLAW_GATEWAY_TOKEN = fallbackToken;
|
||||||
}
|
}
|
||||||
|
|
||||||
let server: Awaited<ReturnType<typeof startGatewayServer>> | null = null;
|
const started = await startGatewayServerWithRetries({ port, opts: gatewayOpts });
|
||||||
for (let attempt = 0; attempt < 10; attempt++) {
|
port = started.port;
|
||||||
try {
|
const server = started.server;
|
||||||
server = await startGatewayServer(port, gatewayOpts);
|
|
||||||
break;
|
|
||||||
} catch (err) {
|
|
||||||
const code = (err as { cause?: { code?: string } }).cause?.code;
|
|
||||||
if (code !== "EADDRINUSE") {
|
|
||||||
throw err;
|
|
||||||
}
|
|
||||||
port = await getFreePort();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!server) {
|
|
||||||
throw new Error("failed to start gateway server after retries");
|
|
||||||
}
|
|
||||||
|
|
||||||
const ws = new WebSocket(
|
const ws = new WebSocket(
|
||||||
`ws://127.0.0.1:${port}`,
|
`ws://127.0.0.1:${port}`,
|
||||||
|
|||||||
Reference in New Issue
Block a user