diff --git a/src/auto-reply/reply.block-streaming.test.ts b/src/auto-reply/reply.block-streaming.test.ts index a525071399f..a2fa0160f66 100644 --- a/src/auto-reply/reply.block-streaming.test.ts +++ b/src/auto-reply/reply.block-streaming.test.ts @@ -2,11 +2,13 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import { withTempHome as withTempHomeBase } from "../../test/helpers/temp-home.js"; import { loadModelCatalog } from "../agents/model-catalog.js"; import { getReplyFromConfig } from "./reply.js"; type RunEmbeddedPiAgent = typeof import("../agents/pi-embedded.js").runEmbeddedPiAgent; type RunEmbeddedPiAgentParams = Parameters[0]; +type ReplyResult = Awaited>; const piEmbeddedMock = vi.hoisted(() => ({ abortEmbeddedPiRun: vi.fn().mockReturnValue(false), @@ -77,6 +79,16 @@ async function withTempHome(fn: (home: string) => Promise): Promise { } } +function makeAgentResult(text: string): ReplyResult { + return { + payloads: [{ text }], + meta: { + durationMs: 5, + agentMeta: { sessionId: "s", provider: "p", model: "m" }, + }, + }; +} + describe("block streaming", () => { beforeAll(async () => { fixtureRoot = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-stream-")); @@ -253,4 +265,100 @@ describe("block streaming", () => { expect(onBlockReplyStreamMode).not.toHaveBeenCalled(); }); }); + + it("queues followups for collect + summarize modes", async () => { + vi.useFakeTimers(); + await withTempHomeBase( + async (home) => { + const prompts: string[] = []; + piEmbeddedMock.runEmbeddedPiAgent.mockImplementation(async (params) => { + prompts.push(params.prompt); + return makeAgentResult("ok"); + }); + + const collectCfg = { + agents: { + defaults: { + model: "anthropic/claude-opus-4-5", + workspace: path.join(home, "openclaw"), + }, + }, + channels: { whatsapp: { allowFrom: ["*"] } }, + session: { store: path.join(home, "sessions.json") }, + messages: { + queue: { + mode: "collect", + debounceMs: 200, + cap: 10, + drop: "summarize", + }, + }, + }; + + piEmbeddedMock.isEmbeddedPiRunActive.mockReturnValue(true); + piEmbeddedMock.isEmbeddedPiRunStreaming.mockReturnValue(true); + + const first = await getReplyFromConfig( + { Body: "first", From: "+1001", To: "+2000", MessageSid: "m-1" }, + {}, + collectCfg, + ); + expect(first).toBeUndefined(); + expect(piEmbeddedMock.runEmbeddedPiAgent).not.toHaveBeenCalled(); + + piEmbeddedMock.isEmbeddedPiRunActive.mockReturnValue(false); + piEmbeddedMock.isEmbeddedPiRunStreaming.mockReturnValue(false); + + const second = await getReplyFromConfig( + { Body: "second", From: "+1001", To: "+2000" }, + {}, + collectCfg, + ); + const secondText = Array.isArray(second) ? second[0]?.text : second?.text; + expect(secondText).toBe("ok"); + + await vi.advanceTimersByTimeAsync(500); + await Promise.resolve(); + const queuedPrompt = + prompts.find((p) => p.includes("[Queued messages while agent was busy]")) ?? ""; + expect(queuedPrompt).toContain("Queued #1"); + expect(queuedPrompt).toContain("first"); + expect(queuedPrompt).not.toContain("[message_id:"); + + prompts.length = 0; + piEmbeddedMock.isEmbeddedPiRunActive.mockReturnValue(true); + piEmbeddedMock.isEmbeddedPiRunStreaming.mockReturnValue(false); + + const followupCfg = { + agents: { + defaults: { + model: "anthropic/claude-opus-4-5", + workspace: path.join(home, "openclaw"), + }, + }, + channels: { whatsapp: { allowFrom: ["*"] } }, + session: { store: path.join(home, "sessions-2.json") }, + messages: { + queue: { + mode: "followup", + debounceMs: 0, + cap: 1, + drop: "summarize", + }, + }, + }; + + await getReplyFromConfig({ Body: "one", From: "+1002", To: "+2000" }, {}, followupCfg); + await getReplyFromConfig({ Body: "two", From: "+1002", To: "+2000" }, {}, followupCfg); + + piEmbeddedMock.isEmbeddedPiRunActive.mockReturnValue(false); + await getReplyFromConfig({ Body: "three", From: "+1002", To: "+2000" }, {}, followupCfg); + + await vi.advanceTimersByTimeAsync(50); + await Promise.resolve(); + expect(prompts.some((p) => p.includes("[Queue overflow]"))).toBe(true); + }, + { prefix: "openclaw-queue-" }, + ); + }); }); diff --git a/src/auto-reply/reply.queue.test.ts b/src/auto-reply/reply.queue.test.ts deleted file mode 100644 index 2af49458bf0..00000000000 --- a/src/auto-reply/reply.queue.test.ts +++ /dev/null @@ -1,149 +0,0 @@ -import path from "node:path"; -import { afterEach, describe, expect, it, vi } from "vitest"; -import { pollUntil } from "../../test/helpers/poll.js"; -import { withTempHome as withTempHomeBase } from "../../test/helpers/temp-home.js"; -import { - isEmbeddedPiRunActive, - isEmbeddedPiRunStreaming, - runEmbeddedPiAgent, -} from "../agents/pi-embedded.js"; -import { getReplyFromConfig } from "./reply.js"; - -vi.mock("../agents/pi-embedded.js", () => ({ - abortEmbeddedPiRun: vi.fn().mockReturnValue(false), - runEmbeddedPiAgent: vi.fn(), - queueEmbeddedPiMessage: vi.fn().mockReturnValue(false), - resolveEmbeddedSessionLane: (key: string) => `session:${key.trim() || "main"}`, - isEmbeddedPiRunActive: vi.fn().mockReturnValue(false), - isEmbeddedPiRunStreaming: vi.fn().mockReturnValue(false), -})); - -function makeResult(text: string) { - return { - payloads: [{ text }], - meta: { - durationMs: 5, - agentMeta: { sessionId: "s", provider: "p", model: "m" }, - }, - }; -} - -async function withTempHome(fn: (home: string) => Promise): Promise { - return withTempHomeBase( - async (home) => { - vi.mocked(runEmbeddedPiAgent).mockReset(); - return await fn(home); - }, - { prefix: "openclaw-queue-" }, - ); -} - -function makeCfg(home: string, queue?: Record) { - return { - agents: { - defaults: { - model: "anthropic/claude-opus-4-5", - workspace: path.join(home, "openclaw"), - }, - }, - channels: { whatsapp: { allowFrom: ["*"] } }, - session: { store: path.join(home, "sessions.json") }, - messages: queue ? { queue } : undefined, - }; -} - -describe("queue followups", () => { - afterEach(() => { - vi.useRealTimers(); - }); - - it("collects queued messages and drains after run completes", async () => { - vi.useFakeTimers(); - await withTempHome(async (home) => { - const prompts: string[] = []; - vi.mocked(runEmbeddedPiAgent).mockImplementation(async (params) => { - prompts.push(params.prompt); - if (params.prompt.includes("[Queued messages while agent was busy]")) { - return makeResult("followup"); - } - return makeResult("main"); - }); - - vi.mocked(isEmbeddedPiRunActive).mockReturnValue(true); - vi.mocked(isEmbeddedPiRunStreaming).mockReturnValue(true); - - const cfg = makeCfg(home, { - mode: "collect", - debounceMs: 200, - cap: 10, - drop: "summarize", - }); - - const first = await getReplyFromConfig( - { Body: "first", From: "+1001", To: "+2000", MessageSid: "m-1" }, - {}, - cfg, - ); - expect(first).toBeUndefined(); - expect(runEmbeddedPiAgent).not.toHaveBeenCalled(); - - vi.mocked(isEmbeddedPiRunActive).mockReturnValue(false); - vi.mocked(isEmbeddedPiRunStreaming).mockReturnValue(false); - - const second = await getReplyFromConfig( - { Body: "second", From: "+1001", To: "+2000" }, - {}, - cfg, - ); - - const secondText = Array.isArray(second) ? second[0]?.text : second?.text; - expect(secondText).toBe("main"); - - await vi.advanceTimersByTimeAsync(500); - await Promise.resolve(); - - expect(runEmbeddedPiAgent).toHaveBeenCalledTimes(2); - const queuedPrompt = prompts.find((p) => - p.includes("[Queued messages while agent was busy]"), - ); - expect(queuedPrompt).toBeTruthy(); - // Message id hints are no longer exposed to the model prompt. - expect(queuedPrompt).toContain("Queued #1"); - expect(queuedPrompt).toContain("first"); - expect(queuedPrompt).not.toContain("[message_id:"); - }); - }); - - it("summarizes dropped followups when cap is exceeded", async () => { - await withTempHome(async (home) => { - const prompts: string[] = []; - vi.mocked(runEmbeddedPiAgent).mockImplementation(async (params) => { - prompts.push(params.prompt); - return makeResult("ok"); - }); - - vi.mocked(isEmbeddedPiRunActive).mockReturnValue(true); - vi.mocked(isEmbeddedPiRunStreaming).mockReturnValue(false); - - const cfg = makeCfg(home, { - mode: "followup", - debounceMs: 0, - cap: 1, - drop: "summarize", - }); - - await getReplyFromConfig({ Body: "one", From: "+1002", To: "+2000" }, {}, cfg); - await getReplyFromConfig({ Body: "two", From: "+1002", To: "+2000" }, {}, cfg); - - vi.mocked(isEmbeddedPiRunActive).mockReturnValue(false); - await getReplyFromConfig({ Body: "three", From: "+1002", To: "+2000" }, {}, cfg); - - await pollUntil( - async () => (prompts.some((p) => p.includes("[Queue overflow]")) ? true : null), - { timeoutMs: 2000 }, - ); - - expect(prompts.some((p) => p.includes("[Queue overflow]"))).toBe(true); - }); - }); -}); diff --git a/src/web/media.test.ts b/src/web/media.test.ts index 45a7585180a..2bb082f8adc 100644 --- a/src/web/media.test.ts +++ b/src/web/media.test.ts @@ -282,28 +282,14 @@ describe("web media loading", () => { }); it("falls back to JPEG when PNG alpha cannot fit under cap", async () => { - const sizes = [224, 256, 320]; - let pngBuffer: Buffer | null = null; - let smallestPng: Awaited> | null = null; - let jpegOptimized: Awaited> | null = null; - let cap = 0; - - for (const size of sizes) { - const raw = buildDeterministicBytes(size * size * 4); - pngBuffer = await sharp(raw, { raw: { width: size, height: size, channels: 4 } }) - .png() - .toBuffer(); - smallestPng = await optimizeImageToPng(pngBuffer, 1); - cap = Math.max(1, smallestPng.optimizedSize - 1); - jpegOptimized = await optimizeImageToJpeg(pngBuffer, cap); - if (jpegOptimized.buffer.length < smallestPng.optimizedSize) { - break; - } - } - - if (!pngBuffer || !smallestPng || !jpegOptimized) { - throw new Error("PNG fallback setup failed"); - } + const size = 96; + const raw = buildDeterministicBytes(size * size * 4); + const pngBuffer = await sharp(raw, { raw: { width: size, height: size, channels: 4 } }) + .png() + .toBuffer(); + const smallestPng = await optimizeImageToPng(pngBuffer, 1); + const cap = Math.max(1, smallestPng.optimizedSize - 1); + const jpegOptimized = await optimizeImageToJpeg(pngBuffer, cap); if (jpegOptimized.buffer.length >= smallestPng.optimizedSize) { throw new Error(