perf(test): reduce gateway reload waits and trim duplicate invoke coverage

This commit is contained in:
Peter Steinberger
2026-02-13 23:50:04 +00:00
parent ab71fdf821
commit 4bef423d83
7 changed files with 46 additions and 77 deletions

View File

@@ -164,7 +164,7 @@ describe("block streaming", () => {
}); });
}); });
it("falls back to final payloads when block reply send times out", async () => { it("falls back to final payloads and respects telegram streamMode block", async () => {
await withTempHome(async (home) => { await withTempHome(async (home) => {
let sawAbort = false; let sawAbort = false;
const onBlockReply = vi.fn((_, context) => { const onBlockReply = vi.fn((_, context) => {
@@ -220,32 +220,26 @@ describe("block streaming", () => {
const res = await replyPromise; const res = await replyPromise;
expect(res).toMatchObject({ text: "final" }); expect(res).toMatchObject({ text: "final" });
expect(sawAbort).toBe(true); expect(sawAbort).toBe(true);
});
});
it("does not enable block streaming for telegram streamMode block", async () => { const onBlockReplyStreamMode = vi.fn().mockResolvedValue(undefined);
await withTempHome(async (home) => { piEmbeddedMock.runEmbeddedPiAgent.mockImplementation(async () => ({
const onBlockReply = vi.fn().mockResolvedValue(undefined);
const impl = async () => ({
payloads: [{ text: "final" }], payloads: [{ text: "final" }],
meta: { meta: {
durationMs: 5, durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" }, agentMeta: { sessionId: "s", provider: "p", model: "m" },
}, },
}); }));
piEmbeddedMock.runEmbeddedPiAgent.mockImplementation(impl);
const res = await getReplyFromConfig( const resStreamMode = await getReplyFromConfig(
{ {
Body: "ping", Body: "ping",
From: "+1004", From: "+1004",
To: "+2000", To: "+2000",
MessageSid: "msg-126", MessageSid: "msg-127",
Provider: "telegram", Provider: "telegram",
}, },
{ {
onBlockReply, onBlockReply: onBlockReplyStreamMode,
}, },
{ {
agents: { agents: {
@@ -259,8 +253,8 @@ describe("block streaming", () => {
}, },
); );
expect(res?.text).toBe("final"); expect(resStreamMode?.text).toBe("final");
expect(onBlockReply).not.toHaveBeenCalled(); expect(onBlockReplyStreamMode).not.toHaveBeenCalled();
}); });
}); });
}); });

View File

@@ -102,7 +102,7 @@ describe("RawBody directive parsing", () => {
vi.clearAllMocks(); vi.clearAllMocks();
}); });
it("detects command directives from RawBody/CommandBody in wrapped group messages", async () => { it("handles directives, history, and non-default agent session files", async () => {
await withTempHome(async (home) => { await withTempHome(async (home) => {
const assertCommandReply = async (input: { const assertCommandReply = async (input: {
message: ReplyMessage; message: ReplyMessage;
@@ -161,11 +161,7 @@ describe("RawBody directive parsing", () => {
}, },
expectedIncludes: ["Verbose logging enabled."], expectedIncludes: ["Verbose logging enabled."],
}); });
});
});
it("preserves history and reuses non-default agent session files", async () => {
await withTempHome(async (home) => {
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({ vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
payloads: [{ text: "ok" }], payloads: [{ text: "ok" }],
meta: { meta: {

View File

@@ -35,8 +35,8 @@ describe("gateway config reload during reply", () => {
let deliveredReplies: string[] = []; let deliveredReplies: string[] = [];
const dispatcher = createReplyDispatcher({ const dispatcher = createReplyDispatcher({
deliver: async (payload) => { deliver: async (payload) => {
// Simulate async reply delivery // Keep delivery asynchronous without real wall-clock delay.
await new Promise((resolve) => setTimeout(resolve, 20)); await Promise.resolve();
deliveredReplies.push(payload.text ?? ""); deliveredReplies.push(payload.text ?? "");
}, },
onError: (err) => { onError: (err) => {

View File

@@ -30,8 +30,8 @@ describe("gateway restart deferral integration", () => {
const deliveredReplies: Array<{ text: string; timestamp: number }> = []; const deliveredReplies: Array<{ text: string; timestamp: number }> = [];
const dispatcher = createReplyDispatcher({ const dispatcher = createReplyDispatcher({
deliver: async (payload) => { deliver: async (payload) => {
// Simulate network delay // Keep delivery asynchronous without real wall-clock delay.
await new Promise((resolve) => setTimeout(resolve, 20)); await Promise.resolve();
deliveredReplies.push({ deliveredReplies.push({
text: payload.text ?? "", text: payload.text ?? "",
timestamp: Date.now(), timestamp: Date.now(),

View File

@@ -4,6 +4,16 @@
*/ */
import { describe, expect, it, vi, beforeEach, afterEach } from "vitest"; import { describe, expect, it, vi, beforeEach, afterEach } from "vitest";
function createDeferred<T = void>() {
let resolve!: (value: T | PromiseLike<T>) => void;
let reject!: (reason?: unknown) => void;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve, reject };
}
describe("real scenario: config change during message processing", () => { describe("real scenario: config change during message processing", () => {
let replyErrors: string[] = []; let replyErrors: string[] = [];
@@ -26,8 +36,10 @@ describe("real scenario: config change during message processing", () => {
let rpcConnected = true; let rpcConnected = true;
const deliveredReplies: string[] = []; const deliveredReplies: string[] = [];
const deliveryStarted = createDeferred();
const allowDelivery = createDeferred();
// Create dispatcher with slow delivery (simulates real network delay) // Hold delivery open so restart checks run while reply is in-flight.
const dispatcher = createReplyDispatcher({ const dispatcher = createReplyDispatcher({
deliver: async (payload) => { deliver: async (payload) => {
if (!rpcConnected) { if (!rpcConnected) {
@@ -35,8 +47,8 @@ describe("real scenario: config change during message processing", () => {
replyErrors.push(error); replyErrors.push(error);
throw new Error(error); throw new Error(error);
} }
// Slow delivery — restart checks will run during this window deliveryStarted.resolve();
await new Promise((resolve) => setTimeout(resolve, 150)); await allowDelivery.promise;
deliveredReplies.push(payload.text ?? ""); deliveredReplies.push(payload.text ?? "");
}, },
onError: () => { onError: () => {
@@ -49,6 +61,7 @@ describe("real scenario: config change during message processing", () => {
// keeping pending > 0 is the in-flight delivery itself. // keeping pending > 0 is the in-flight delivery itself.
dispatcher.sendFinalReply({ text: "Configuration updated!" }); dispatcher.sendFinalReply({ text: "Configuration updated!" });
dispatcher.markComplete(); dispatcher.markComplete();
await deliveryStarted.promise;
// At this point: markComplete flagged, delivery is in flight. // At this point: markComplete flagged, delivery is in flight.
// pending > 0 because the in-flight delivery keeps it alive. // pending > 0 because the in-flight delivery keeps it alive.
@@ -59,7 +72,7 @@ describe("real scenario: config change during message processing", () => {
// If the tracking is broken, pending would be 0 and we'd restart. // If the tracking is broken, pending would be 0 and we'd restart.
let restartTriggered = false; let restartTriggered = false;
for (let i = 0; i < 3; i++) { for (let i = 0; i < 3; i++) {
await new Promise((resolve) => setTimeout(resolve, 25)); await Promise.resolve();
const pending = getTotalPendingReplies(); const pending = getTotalPendingReplies();
if (pending === 0) { if (pending === 0) {
restartTriggered = true; restartTriggered = true;
@@ -68,6 +81,7 @@ describe("real scenario: config change during message processing", () => {
} }
} }
allowDelivery.resolve();
// Wait for delivery to complete // Wait for delivery to complete
await dispatcher.waitForIdle(); await dispatcher.waitForIdle();
@@ -83,10 +97,11 @@ describe("real scenario: config change during message processing", () => {
it("should keep pending > 0 until reply is actually enqueued", async () => { it("should keep pending > 0 until reply is actually enqueued", async () => {
const { createReplyDispatcher } = await import("../auto-reply/reply/reply-dispatcher.js"); const { createReplyDispatcher } = await import("../auto-reply/reply/reply-dispatcher.js");
const { getTotalPendingReplies } = await import("../auto-reply/reply/dispatcher-registry.js"); const { getTotalPendingReplies } = await import("../auto-reply/reply/dispatcher-registry.js");
const allowDelivery = createDeferred();
const dispatcher = createReplyDispatcher({ const dispatcher = createReplyDispatcher({
deliver: async (_payload) => { deliver: async (_payload) => {
await new Promise((resolve) => setTimeout(resolve, 10)); await allowDelivery.promise;
}, },
}); });
@@ -94,7 +109,7 @@ describe("real scenario: config change during message processing", () => {
expect(getTotalPendingReplies()).toBe(1); expect(getTotalPendingReplies()).toBe(1);
// Simulate command processing delay BEFORE reply is enqueued // Simulate command processing delay BEFORE reply is enqueued
await new Promise((resolve) => setTimeout(resolve, 20)); await Promise.resolve();
// During this delay, pending should STILL be 1 (reservation active) // During this delay, pending should STILL be 1 (reservation active)
expect(getTotalPendingReplies()).toBe(1); expect(getTotalPendingReplies()).toBe(1);
@@ -112,6 +127,7 @@ describe("real scenario: config change during message processing", () => {
const pendingAfterMarkComplete = getTotalPendingReplies(); const pendingAfterMarkComplete = getTotalPendingReplies();
expect(pendingAfterMarkComplete).toBeGreaterThan(0); expect(pendingAfterMarkComplete).toBeGreaterThan(0);
allowDelivery.resolve();
// Wait for reply to send // Wait for reply to send
await dispatcher.waitForIdle(); await dispatcher.waitForIdle();

View File

@@ -15,26 +15,25 @@ vi.mock("../infra/update-runner.js", () => ({
import { import {
connectOk, connectOk,
getFreePort,
installGatewayTestHooks, installGatewayTestHooks,
rpcReq, rpcReq,
startServerWithClient, startGatewayServer,
} from "./test-helpers.js"; } from "./test-helpers.js";
import { testState } from "./test-helpers.mocks.js";
installGatewayTestHooks({ scope: "suite" }); installGatewayTestHooks({ scope: "suite" });
let server: Awaited<ReturnType<typeof startServerWithClient>>["server"]; let server: Awaited<ReturnType<typeof startGatewayServer>>;
let ws: WebSocket;
let port: number; let port: number;
let nodeWs: WebSocket; let nodeWs: WebSocket;
let nodeId: string; let nodeId: string;
beforeAll(async () => { beforeAll(async () => {
const token = "test-gateway-token-1234567890"; const token = "test-gateway-token-1234567890";
const started = await startServerWithClient(token); testState.gatewayAuth = { mode: "token", token };
server = started.server; port = await getFreePort();
ws = started.ws; server = await startGatewayServer(port, { bind: "loopback" });
port = started.port;
await connectOk(ws, { token });
nodeWs = new WebSocket(`ws://127.0.0.1:${port}`); nodeWs = new WebSocket(`ws://127.0.0.1:${port}`);
await new Promise<void>((resolve) => nodeWs.once("open", resolve)); await new Promise<void>((resolve) => nodeWs.once("open", resolve));
@@ -55,8 +54,7 @@ beforeAll(async () => {
}); });
afterAll(async () => { afterAll(async () => {
nodeWs.close(); nodeWs.terminate();
ws.close();
await server.close(); await server.close();
}); });

View File

@@ -46,7 +46,7 @@ const invokeAgentsList = async (params: {
} }
return await fetch(`http://127.0.0.1:${params.port}/tools/invoke`, { return await fetch(`http://127.0.0.1:${params.port}/tools/invoke`, {
method: "POST", method: "POST",
headers: { "content-type": "application/json", ...params.headers }, headers: { "content-type": "application/json", connection: "close", ...params.headers },
body: JSON.stringify(body), body: JSON.stringify(body),
}); });
}; };
@@ -71,7 +71,7 @@ const invokeTool = async (params: {
} }
return await fetch(`http://127.0.0.1:${params.port}/tools/invoke`, { return await fetch(`http://127.0.0.1:${params.port}/tools/invoke`, {
method: "POST", method: "POST",
headers: { "content-type": "application/json", ...params.headers }, headers: { "content-type": "application/json", connection: "close", ...params.headers },
body: JSON.stringify(body), body: JSON.stringify(body),
}); });
}; };
@@ -144,41 +144,6 @@ describe("POST /tools/invoke", () => {
expect(implicitBody.ok).toBe(true); expect(implicitBody.ok).toBe(true);
}); });
it("handles dedicated auth modes for password accept and token reject", async () => {
allowAgentsListForMain();
const passwordPort = await getFreePort();
const passwordServer = await startGatewayServer(passwordPort, {
bind: "loopback",
auth: { mode: "password", password: "secret" },
});
try {
const passwordRes = await invokeAgentsList({
port: passwordPort,
headers: { authorization: "Bearer secret" },
sessionKey: "main",
});
expect(passwordRes.status).toBe(200);
} finally {
await passwordServer.close();
}
const tokenPort = await getFreePort();
const tokenServer = await startGatewayServer(tokenPort, {
bind: "loopback",
auth: { mode: "token", token: "t" },
});
try {
const tokenRes = await invokeAgentsList({
port: tokenPort,
sessionKey: "main",
});
expect(tokenRes.status).toBe(401);
} finally {
await tokenServer.close();
}
});
it("routes tools invoke before plugin HTTP handlers", async () => { it("routes tools invoke before plugin HTTP handlers", async () => {
const pluginHandler = vi.fn(async (_req: IncomingMessage, res: ServerResponse) => { const pluginHandler = vi.fn(async (_req: IncomingMessage, res: ServerResponse) => {
res.statusCode = 418; res.statusCode = 418;