perf(test): merge queue integration coverage and shrink media fixture

This commit is contained in:
Peter Steinberger
2026-02-14 00:50:09 +00:00
parent d134c854a5
commit aa6d8b27ac
3 changed files with 116 additions and 171 deletions

View File

@@ -2,11 +2,13 @@ import fs from "node:fs/promises";
import os from "node:os"; import os from "node:os";
import path from "node:path"; import path from "node:path";
import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { withTempHome as withTempHomeBase } from "../../test/helpers/temp-home.js";
import { loadModelCatalog } from "../agents/model-catalog.js"; import { loadModelCatalog } from "../agents/model-catalog.js";
import { getReplyFromConfig } from "./reply.js"; import { getReplyFromConfig } from "./reply.js";
type RunEmbeddedPiAgent = typeof import("../agents/pi-embedded.js").runEmbeddedPiAgent; type RunEmbeddedPiAgent = typeof import("../agents/pi-embedded.js").runEmbeddedPiAgent;
type RunEmbeddedPiAgentParams = Parameters<RunEmbeddedPiAgent>[0]; type RunEmbeddedPiAgentParams = Parameters<RunEmbeddedPiAgent>[0];
type ReplyResult = Awaited<ReturnType<RunEmbeddedPiAgent>>;
const piEmbeddedMock = vi.hoisted(() => ({ const piEmbeddedMock = vi.hoisted(() => ({
abortEmbeddedPiRun: vi.fn().mockReturnValue(false), abortEmbeddedPiRun: vi.fn().mockReturnValue(false),
@@ -77,6 +79,16 @@ async function withTempHome<T>(fn: (home: string) => Promise<T>): Promise<T> {
} }
} }
function makeAgentResult(text: string): ReplyResult {
return {
payloads: [{ text }],
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
};
}
describe("block streaming", () => { describe("block streaming", () => {
beforeAll(async () => { beforeAll(async () => {
fixtureRoot = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-stream-")); fixtureRoot = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-stream-"));
@@ -253,4 +265,100 @@ describe("block streaming", () => {
expect(onBlockReplyStreamMode).not.toHaveBeenCalled(); expect(onBlockReplyStreamMode).not.toHaveBeenCalled();
}); });
}); });
it("queues followups for collect + summarize modes", async () => {
vi.useFakeTimers();
await withTempHomeBase(
async (home) => {
const prompts: string[] = [];
piEmbeddedMock.runEmbeddedPiAgent.mockImplementation(async (params) => {
prompts.push(params.prompt);
return makeAgentResult("ok");
});
const collectCfg = {
agents: {
defaults: {
model: "anthropic/claude-opus-4-5",
workspace: path.join(home, "openclaw"),
},
},
channels: { whatsapp: { allowFrom: ["*"] } },
session: { store: path.join(home, "sessions.json") },
messages: {
queue: {
mode: "collect",
debounceMs: 200,
cap: 10,
drop: "summarize",
},
},
};
piEmbeddedMock.isEmbeddedPiRunActive.mockReturnValue(true);
piEmbeddedMock.isEmbeddedPiRunStreaming.mockReturnValue(true);
const first = await getReplyFromConfig(
{ Body: "first", From: "+1001", To: "+2000", MessageSid: "m-1" },
{},
collectCfg,
);
expect(first).toBeUndefined();
expect(piEmbeddedMock.runEmbeddedPiAgent).not.toHaveBeenCalled();
piEmbeddedMock.isEmbeddedPiRunActive.mockReturnValue(false);
piEmbeddedMock.isEmbeddedPiRunStreaming.mockReturnValue(false);
const second = await getReplyFromConfig(
{ Body: "second", From: "+1001", To: "+2000" },
{},
collectCfg,
);
const secondText = Array.isArray(second) ? second[0]?.text : second?.text;
expect(secondText).toBe("ok");
await vi.advanceTimersByTimeAsync(500);
await Promise.resolve();
const queuedPrompt =
prompts.find((p) => p.includes("[Queued messages while agent was busy]")) ?? "";
expect(queuedPrompt).toContain("Queued #1");
expect(queuedPrompt).toContain("first");
expect(queuedPrompt).not.toContain("[message_id:");
prompts.length = 0;
piEmbeddedMock.isEmbeddedPiRunActive.mockReturnValue(true);
piEmbeddedMock.isEmbeddedPiRunStreaming.mockReturnValue(false);
const followupCfg = {
agents: {
defaults: {
model: "anthropic/claude-opus-4-5",
workspace: path.join(home, "openclaw"),
},
},
channels: { whatsapp: { allowFrom: ["*"] } },
session: { store: path.join(home, "sessions-2.json") },
messages: {
queue: {
mode: "followup",
debounceMs: 0,
cap: 1,
drop: "summarize",
},
},
};
await getReplyFromConfig({ Body: "one", From: "+1002", To: "+2000" }, {}, followupCfg);
await getReplyFromConfig({ Body: "two", From: "+1002", To: "+2000" }, {}, followupCfg);
piEmbeddedMock.isEmbeddedPiRunActive.mockReturnValue(false);
await getReplyFromConfig({ Body: "three", From: "+1002", To: "+2000" }, {}, followupCfg);
await vi.advanceTimersByTimeAsync(50);
await Promise.resolve();
expect(prompts.some((p) => p.includes("[Queue overflow]"))).toBe(true);
},
{ prefix: "openclaw-queue-" },
);
});
}); });

View File

@@ -1,149 +0,0 @@
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import { pollUntil } from "../../test/helpers/poll.js";
import { withTempHome as withTempHomeBase } from "../../test/helpers/temp-home.js";
import {
isEmbeddedPiRunActive,
isEmbeddedPiRunStreaming,
runEmbeddedPiAgent,
} from "../agents/pi-embedded.js";
import { getReplyFromConfig } from "./reply.js";
vi.mock("../agents/pi-embedded.js", () => ({
abortEmbeddedPiRun: vi.fn().mockReturnValue(false),
runEmbeddedPiAgent: vi.fn(),
queueEmbeddedPiMessage: vi.fn().mockReturnValue(false),
resolveEmbeddedSessionLane: (key: string) => `session:${key.trim() || "main"}`,
isEmbeddedPiRunActive: vi.fn().mockReturnValue(false),
isEmbeddedPiRunStreaming: vi.fn().mockReturnValue(false),
}));
function makeResult(text: string) {
return {
payloads: [{ text }],
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
};
}
async function withTempHome<T>(fn: (home: string) => Promise<T>): Promise<T> {
return withTempHomeBase(
async (home) => {
vi.mocked(runEmbeddedPiAgent).mockReset();
return await fn(home);
},
{ prefix: "openclaw-queue-" },
);
}
function makeCfg(home: string, queue?: Record<string, unknown>) {
return {
agents: {
defaults: {
model: "anthropic/claude-opus-4-5",
workspace: path.join(home, "openclaw"),
},
},
channels: { whatsapp: { allowFrom: ["*"] } },
session: { store: path.join(home, "sessions.json") },
messages: queue ? { queue } : undefined,
};
}
describe("queue followups", () => {
afterEach(() => {
vi.useRealTimers();
});
it("collects queued messages and drains after run completes", async () => {
vi.useFakeTimers();
await withTempHome(async (home) => {
const prompts: string[] = [];
vi.mocked(runEmbeddedPiAgent).mockImplementation(async (params) => {
prompts.push(params.prompt);
if (params.prompt.includes("[Queued messages while agent was busy]")) {
return makeResult("followup");
}
return makeResult("main");
});
vi.mocked(isEmbeddedPiRunActive).mockReturnValue(true);
vi.mocked(isEmbeddedPiRunStreaming).mockReturnValue(true);
const cfg = makeCfg(home, {
mode: "collect",
debounceMs: 200,
cap: 10,
drop: "summarize",
});
const first = await getReplyFromConfig(
{ Body: "first", From: "+1001", To: "+2000", MessageSid: "m-1" },
{},
cfg,
);
expect(first).toBeUndefined();
expect(runEmbeddedPiAgent).not.toHaveBeenCalled();
vi.mocked(isEmbeddedPiRunActive).mockReturnValue(false);
vi.mocked(isEmbeddedPiRunStreaming).mockReturnValue(false);
const second = await getReplyFromConfig(
{ Body: "second", From: "+1001", To: "+2000" },
{},
cfg,
);
const secondText = Array.isArray(second) ? second[0]?.text : second?.text;
expect(secondText).toBe("main");
await vi.advanceTimersByTimeAsync(500);
await Promise.resolve();
expect(runEmbeddedPiAgent).toHaveBeenCalledTimes(2);
const queuedPrompt = prompts.find((p) =>
p.includes("[Queued messages while agent was busy]"),
);
expect(queuedPrompt).toBeTruthy();
// Message id hints are no longer exposed to the model prompt.
expect(queuedPrompt).toContain("Queued #1");
expect(queuedPrompt).toContain("first");
expect(queuedPrompt).not.toContain("[message_id:");
});
});
it("summarizes dropped followups when cap is exceeded", async () => {
await withTempHome(async (home) => {
const prompts: string[] = [];
vi.mocked(runEmbeddedPiAgent).mockImplementation(async (params) => {
prompts.push(params.prompt);
return makeResult("ok");
});
vi.mocked(isEmbeddedPiRunActive).mockReturnValue(true);
vi.mocked(isEmbeddedPiRunStreaming).mockReturnValue(false);
const cfg = makeCfg(home, {
mode: "followup",
debounceMs: 0,
cap: 1,
drop: "summarize",
});
await getReplyFromConfig({ Body: "one", From: "+1002", To: "+2000" }, {}, cfg);
await getReplyFromConfig({ Body: "two", From: "+1002", To: "+2000" }, {}, cfg);
vi.mocked(isEmbeddedPiRunActive).mockReturnValue(false);
await getReplyFromConfig({ Body: "three", From: "+1002", To: "+2000" }, {}, cfg);
await pollUntil(
async () => (prompts.some((p) => p.includes("[Queue overflow]")) ? true : null),
{ timeoutMs: 2000 },
);
expect(prompts.some((p) => p.includes("[Queue overflow]"))).toBe(true);
});
});
});

View File

@@ -282,28 +282,14 @@ describe("web media loading", () => {
}); });
it("falls back to JPEG when PNG alpha cannot fit under cap", async () => { it("falls back to JPEG when PNG alpha cannot fit under cap", async () => {
const sizes = [224, 256, 320]; const size = 96;
let pngBuffer: Buffer | null = null; const raw = buildDeterministicBytes(size * size * 4);
let smallestPng: Awaited<ReturnType<typeof optimizeImageToPng>> | null = null; const pngBuffer = await sharp(raw, { raw: { width: size, height: size, channels: 4 } })
let jpegOptimized: Awaited<ReturnType<typeof optimizeImageToJpeg>> | null = null; .png()
let cap = 0; .toBuffer();
const smallestPng = await optimizeImageToPng(pngBuffer, 1);
for (const size of sizes) { const cap = Math.max(1, smallestPng.optimizedSize - 1);
const raw = buildDeterministicBytes(size * size * 4); const jpegOptimized = await optimizeImageToJpeg(pngBuffer, cap);
pngBuffer = await sharp(raw, { raw: { width: size, height: size, channels: 4 } })
.png()
.toBuffer();
smallestPng = await optimizeImageToPng(pngBuffer, 1);
cap = Math.max(1, smallestPng.optimizedSize - 1);
jpegOptimized = await optimizeImageToJpeg(pngBuffer, cap);
if (jpegOptimized.buffer.length < smallestPng.optimizedSize) {
break;
}
}
if (!pngBuffer || !smallestPng || !jpegOptimized) {
throw new Error("PNG fallback setup failed");
}
if (jpegOptimized.buffer.length >= smallestPng.optimizedSize) { if (jpegOptimized.buffer.length >= smallestPng.optimizedSize) {
throw new Error( throw new Error(