refactor(channels): dedupe transport and gateway test scaffolds

This commit is contained in:
Peter Steinberger
2026-02-16 14:52:15 +00:00
parent f717a13039
commit 93ca0ed54f
95 changed files with 4068 additions and 5221 deletions

View File

@@ -34,6 +34,23 @@ export function classifySignalCliLogLine(line: string): "log" | "error" | null {
return "log";
}
function bindSignalCliOutput(params: {
stream: NodeJS.ReadableStream | null | undefined;
log: (message: string) => void;
error: (message: string) => void;
}): void {
params.stream?.on("data", (data) => {
for (const line of data.toString().split(/\r?\n/)) {
const kind = classifySignalCliLogLine(line);
if (kind === "log") {
params.log(`signal-cli: ${line.trim()}`);
} else if (kind === "error") {
params.error(`signal-cli: ${line.trim()}`);
}
}
});
}
function buildDaemonArgs(opts: SignalDaemonOpts): string[] {
const args: string[] = [];
if (opts.account) {
@@ -67,26 +84,8 @@ export function spawnSignalDaemon(opts: SignalDaemonOpts): SignalDaemonHandle {
const log = opts.runtime?.log ?? (() => {});
const error = opts.runtime?.error ?? (() => {});
child.stdout?.on("data", (data) => {
for (const line of data.toString().split(/\r?\n/)) {
const kind = classifySignalCliLogLine(line);
if (kind === "log") {
log(`signal-cli: ${line.trim()}`);
} else if (kind === "error") {
error(`signal-cli: ${line.trim()}`);
}
}
});
child.stderr?.on("data", (data) => {
for (const line of data.toString().split(/\r?\n/)) {
const kind = classifySignalCliLogLine(line);
if (kind === "log") {
log(`signal-cli: ${line.trim()}`);
} else if (kind === "error") {
error(`signal-cli: ${line.trim()}`);
}
}
});
bindSignalCliOutput({ stream: child.stdout, log, error });
bindSignalCliOutput({ stream: child.stderr, log, error });
child.on("error", (err) => {
error(`signal-cli spawn error: ${String(err)}`);
});

View File

@@ -1,4 +1,5 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { createBaseSignalEventHandlerDeps } from "./monitor/event-handler.test-harness.js";
const sendTypingMock = vi.fn();
const sendReadReceiptMock = vi.fn();
@@ -37,39 +38,19 @@ describe("signal event handler typing + read receipts", () => {
it("sends typing + read receipt for allowed DMs", async () => {
const { createSignalEventHandler } = await import("./monitor/event-handler.js");
const handler = createSignalEventHandler({
// oxlint-disable-next-line typescript/no-explicit-any
runtime: { log: () => {}, error: () => {} } as any,
cfg: {
messages: { inbound: { debounceMs: 0 } },
channels: { signal: { dmPolicy: "open", allowFrom: ["*"] } },
// oxlint-disable-next-line typescript/no-explicit-any
} as any,
baseUrl: "http://localhost",
account: "+15550009999",
accountId: "default",
blockStreaming: false,
historyLimit: 0,
groupHistories: new Map(),
textLimit: 4000,
dmPolicy: "open",
allowFrom: ["*"],
groupAllowFrom: ["*"],
groupPolicy: "open",
reactionMode: "off",
reactionAllowlist: [],
mediaMaxBytes: 1024,
ignoreAttachments: true,
sendReadReceipts: true,
readReceiptsViaDaemon: false,
fetchAttachment: async () => null,
deliverReplies: async () => {},
resolveSignalReactionTargets: () => [],
// oxlint-disable-next-line typescript/no-explicit-any
isSignalReactionMessage: () => false as any,
shouldEmitSignalReactionNotification: () => false,
buildSignalReactionSystemEventText: () => "reaction",
});
const handler = createSignalEventHandler(
createBaseSignalEventHandlerDeps({
cfg: {
messages: { inbound: { debounceMs: 0 } },
channels: { signal: { dmPolicy: "open", allowFrom: ["*"] } },
},
account: "+15550009999",
blockStreaming: false,
historyLimit: 0,
groupHistories: new Map(),
sendReadReceipts: true,
}),
);
await handler({
event: "receive",

View File

@@ -27,6 +27,40 @@ const {
const SIGNAL_BASE_URL = "http://127.0.0.1:8080";
function createMonitorRuntime() {
return {
log: vi.fn(),
error: vi.fn(),
exit: ((code: number): never => {
throw new Error(`exit ${code}`);
}) as (code: number) => never,
};
}
function setSignalAutoStartConfig(overrides: Record<string, unknown> = {}) {
setSignalToolResultTestConfig({
...config,
channels: {
...config.channels,
signal: {
autoStart: true,
dmPolicy: "open",
allowFrom: ["*"],
...overrides,
},
},
});
}
function createAutoAbortController() {
const abortController = new AbortController();
streamMock.mockImplementation(async () => {
abortController.abort();
return;
});
return abortController;
}
async function runMonitorWithMocks(
opts: Parameters<(typeof import("./monitor.js"))["monitorSignalProvider"]>[0],
) {
@@ -59,27 +93,21 @@ async function receiveSignalPayloads(params: {
await flush();
}
function getDirectSignalEventsFor(sender: string) {
const route = resolveAgentRoute({
cfg: config as OpenClawConfig,
channel: "signal",
accountId: "default",
peer: { kind: "direct", id: normalizeE164(sender) },
});
return peekSystemEvents(route.sessionKey);
}
describe("monitorSignalProvider tool results", () => {
it("uses bounded readiness checks when auto-starting the daemon", async () => {
const runtime = {
log: vi.fn(),
error: vi.fn(),
exit: ((code: number): never => {
throw new Error(`exit ${code}`);
}) as (code: number) => never,
};
setSignalToolResultTestConfig({
...config,
channels: {
...config.channels,
signal: { autoStart: true, dmPolicy: "open", allowFrom: ["*"] },
},
});
const abortController = new AbortController();
streamMock.mockImplementation(async () => {
abortController.abort();
return;
});
const runtime = createMonitorRuntime();
setSignalAutoStartConfig();
const abortController = createAutoAbortController();
await runMonitorWithMocks({
autoStart: true,
baseUrl: SIGNAL_BASE_URL,
@@ -102,30 +130,9 @@ describe("monitorSignalProvider tool results", () => {
});
it("uses startupTimeoutMs override when provided", async () => {
const runtime = {
log: vi.fn(),
error: vi.fn(),
exit: ((code: number): never => {
throw new Error(`exit ${code}`);
}) as (code: number) => never,
};
setSignalToolResultTestConfig({
...config,
channels: {
...config.channels,
signal: {
autoStart: true,
dmPolicy: "open",
allowFrom: ["*"],
startupTimeoutMs: 60_000,
},
},
});
const abortController = new AbortController();
streamMock.mockImplementation(async () => {
abortController.abort();
return;
});
const runtime = createMonitorRuntime();
setSignalAutoStartConfig({ startupTimeoutMs: 60_000 });
const abortController = createAutoAbortController();
await runMonitorWithMocks({
autoStart: true,
@@ -144,30 +151,9 @@ describe("monitorSignalProvider tool results", () => {
});
it("caps startupTimeoutMs at 2 minutes", async () => {
const runtime = {
log: vi.fn(),
error: vi.fn(),
exit: ((code: number): never => {
throw new Error(`exit ${code}`);
}) as (code: number) => never,
};
setSignalToolResultTestConfig({
...config,
channels: {
...config.channels,
signal: {
autoStart: true,
dmPolicy: "open",
allowFrom: ["*"],
startupTimeoutMs: 180_000,
},
},
});
const abortController = new AbortController();
streamMock.mockImplementation(async () => {
abortController.abort();
return;
});
const runtime = createMonitorRuntime();
setSignalAutoStartConfig({ startupTimeoutMs: 180_000 });
const abortController = createAutoAbortController();
await runMonitorWithMocks({
autoStart: true,
@@ -321,13 +307,7 @@ describe("monitorSignalProvider tool results", () => {
],
});
const route = resolveAgentRoute({
cfg: config as OpenClawConfig,
channel: "signal",
accountId: "default",
peer: { kind: "direct", id: normalizeE164("+15550001111") },
});
const events = peekSystemEvents(route.sessionKey);
const events = getDirectSignalEventsFor("+15550001111");
expect(events.some((text) => text.includes("Signal reaction added"))).toBe(true);
});
@@ -364,13 +344,7 @@ describe("monitorSignalProvider tool results", () => {
],
});
const route = resolveAgentRoute({
cfg: config as OpenClawConfig,
channel: "signal",
accountId: "default",
peer: { kind: "direct", id: normalizeE164("+15550001111") },
});
const events = peekSystemEvents(route.sessionKey);
const events = getDirectSignalEventsFor("+15550001111");
expect(events.some((text) => text.includes("Signal reaction added"))).toBe(true);
});

View File

@@ -1,57 +1,31 @@
import { describe, expect, it, vi } from "vitest";
import type { MsgContext } from "../../auto-reply/templating.js";
import { buildDispatchInboundCaptureMock } from "../../../test/helpers/dispatch-inbound-capture.js";
import { expectInboundContextContract } from "../../../test/helpers/inbound-contract.js";
let capturedCtx: MsgContext | undefined;
vi.mock("../../auto-reply/dispatch.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../../auto-reply/dispatch.js")>();
const dispatchInboundMessage = vi.fn(async (params: { ctx: MsgContext }) => {
capturedCtx = params.ctx;
return { queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } };
return buildDispatchInboundCaptureMock(actual, (ctx) => {
capturedCtx = ctx as MsgContext;
});
return {
...actual,
dispatchInboundMessage,
dispatchInboundMessageWithDispatcher: dispatchInboundMessage,
dispatchInboundMessageWithBufferedDispatcher: dispatchInboundMessage,
};
});
import { createSignalEventHandler } from "./event-handler.js";
import { createBaseSignalEventHandlerDeps } from "./event-handler.test-harness.js";
describe("signal createSignalEventHandler inbound contract", () => {
it("passes a finalized MsgContext to dispatchInboundMessage", async () => {
capturedCtx = undefined;
const handler = createSignalEventHandler({
// oxlint-disable-next-line typescript/no-explicit-any
runtime: { log: () => {}, error: () => {} } as any,
// oxlint-disable-next-line typescript/no-explicit-any
cfg: { messages: { inbound: { debounceMs: 0 } } } as any,
baseUrl: "http://localhost",
accountId: "default",
historyLimit: 0,
groupHistories: new Map(),
textLimit: 4000,
dmPolicy: "open",
allowFrom: ["*"],
groupAllowFrom: ["*"],
groupPolicy: "open",
reactionMode: "off",
reactionAllowlist: [],
mediaMaxBytes: 1024,
ignoreAttachments: true,
sendReadReceipts: false,
readReceiptsViaDaemon: false,
fetchAttachment: async () => null,
deliverReplies: async () => {},
resolveSignalReactionTargets: () => [],
// oxlint-disable-next-line typescript/no-explicit-any
isSignalReactionMessage: () => false as any,
shouldEmitSignalReactionNotification: () => false,
buildSignalReactionSystemEventText: () => "reaction",
});
const handler = createSignalEventHandler(
createBaseSignalEventHandlerDeps({
// oxlint-disable-next-line typescript/no-explicit-any
cfg: { messages: { inbound: { debounceMs: 0 } } } as any,
historyLimit: 0,
}),
);
await handler({
event: "receive",

View File

@@ -1,55 +1,20 @@
import { describe, expect, it, vi } from "vitest";
import type { MsgContext } from "../../auto-reply/templating.js";
import { buildDispatchInboundCaptureMock } from "../../../test/helpers/dispatch-inbound-capture.js";
import { createBaseSignalEventHandlerDeps } from "./event-handler.test-harness.js";
let capturedCtx: MsgContext | undefined;
vi.mock("../../auto-reply/dispatch.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../../auto-reply/dispatch.js")>();
const dispatchInboundMessage = vi.fn(async (params: { ctx: MsgContext }) => {
capturedCtx = params.ctx;
return { queuedFinal: false, counts: { tool: 0, block: 0, final: 0 } };
return buildDispatchInboundCaptureMock(actual, (ctx) => {
capturedCtx = ctx as MsgContext;
});
return {
...actual,
dispatchInboundMessage,
dispatchInboundMessageWithDispatcher: dispatchInboundMessage,
dispatchInboundMessageWithBufferedDispatcher: dispatchInboundMessage,
};
});
import { createSignalEventHandler } from "./event-handler.js";
import { renderSignalMentions } from "./mentions.js";
function createBaseDeps(overrides: Record<string, unknown> = {}) {
return {
// oxlint-disable-next-line typescript/no-explicit-any
runtime: { log: () => {}, error: () => {} } as any,
baseUrl: "http://localhost",
accountId: "default",
historyLimit: 5,
groupHistories: new Map(),
textLimit: 4000,
dmPolicy: "open" as const,
allowFrom: ["*"],
groupAllowFrom: ["*"],
groupPolicy: "open" as const,
reactionMode: "off" as const,
reactionAllowlist: [],
mediaMaxBytes: 1024,
ignoreAttachments: true,
sendReadReceipts: false,
readReceiptsViaDaemon: false,
fetchAttachment: async () => null,
deliverReplies: async () => {},
resolveSignalReactionTargets: () => [],
// oxlint-disable-next-line typescript/no-explicit-any
isSignalReactionMessage: () => false as any,
shouldEmitSignalReactionNotification: () => false,
buildSignalReactionSystemEventText: () => "reaction",
...overrides,
};
}
type GroupEventOpts = {
message?: string;
attachments?: unknown[];
@@ -82,11 +47,37 @@ function makeGroupEvent(opts: GroupEventOpts) {
};
}
function createMentionGatedHistoryHandler() {
const groupHistories = new Map();
const handler = createSignalEventHandler(
createBaseSignalEventHandlerDeps({
cfg: {
messages: { inbound: { debounceMs: 0 }, groupChat: { mentionPatterns: ["@bot"] } },
channels: { signal: { groups: { "*": { requireMention: true } } } },
},
historyLimit: 5,
groupHistories,
}),
);
return { handler, groupHistories };
}
async function expectSkippedGroupHistory(opts: GroupEventOpts, expectedBody: string) {
capturedCtx = undefined;
const { handler, groupHistories } = createMentionGatedHistoryHandler();
await handler(makeGroupEvent(opts));
expect(capturedCtx).toBeUndefined();
const entries = groupHistories.get("g1");
expect(entries).toBeTruthy();
expect(entries).toHaveLength(1);
expect(entries[0].body).toBe(expectedBody);
}
describe("signal mention gating", () => {
it("drops group messages without mention when requireMention is configured", async () => {
capturedCtx = undefined;
const handler = createSignalEventHandler(
createBaseDeps({
createBaseSignalEventHandlerDeps({
cfg: {
messages: { inbound: { debounceMs: 0 }, groupChat: { mentionPatterns: ["@bot"] } },
channels: { signal: { groups: { "*": { requireMention: true } } } },
@@ -101,7 +92,7 @@ describe("signal mention gating", () => {
it("allows group messages with mention when requireMention is configured", async () => {
capturedCtx = undefined;
const handler = createSignalEventHandler(
createBaseDeps({
createBaseSignalEventHandlerDeps({
cfg: {
messages: { inbound: { debounceMs: 0 }, groupChat: { mentionPatterns: ["@bot"] } },
channels: { signal: { groups: { "*": { requireMention: true } } } },
@@ -117,7 +108,7 @@ describe("signal mention gating", () => {
it("sets WasMentioned=false for group messages without mention when requireMention is off", async () => {
capturedCtx = undefined;
const handler = createSignalEventHandler(
createBaseDeps({
createBaseSignalEventHandlerDeps({
cfg: {
messages: { inbound: { debounceMs: 0 }, groupChat: { mentionPatterns: ["@bot"] } },
channels: { signal: { groups: { "*": { requireMention: false } } } },
@@ -132,75 +123,30 @@ describe("signal mention gating", () => {
it("records pending history for skipped group messages", async () => {
capturedCtx = undefined;
const groupHistories = new Map();
const handler = createSignalEventHandler(
createBaseDeps({
cfg: {
messages: { inbound: { debounceMs: 0 }, groupChat: { mentionPatterns: ["@bot"] } },
channels: { signal: { groups: { "*": { requireMention: true } } } },
},
historyLimit: 5,
groupHistories,
}),
);
const { handler, groupHistories } = createMentionGatedHistoryHandler();
await handler(makeGroupEvent({ message: "hello from alice" }));
expect(capturedCtx).toBeUndefined();
const entries = groupHistories.get("g1");
expect(entries).toBeTruthy();
expect(entries).toHaveLength(1);
expect(entries[0].sender).toBe("Alice");
expect(entries[0].body).toBe("hello from alice");
});
it("records attachment placeholder in pending history for skipped attachment-only group messages", async () => {
capturedCtx = undefined;
const groupHistories = new Map();
const handler = createSignalEventHandler(
createBaseDeps({
cfg: {
messages: { inbound: { debounceMs: 0 }, groupChat: { mentionPatterns: ["@bot"] } },
channels: { signal: { groups: { "*": { requireMention: true } } } },
},
historyLimit: 5,
groupHistories,
}),
await expectSkippedGroupHistory(
{ message: "", attachments: [{ id: "a1" }] },
"<media:attachment>",
);
await handler(makeGroupEvent({ message: "", attachments: [{ id: "a1" }] }));
expect(capturedCtx).toBeUndefined();
const entries = groupHistories.get("g1");
expect(entries).toBeTruthy();
expect(entries).toHaveLength(1);
expect(entries[0].body).toBe("<media:attachment>");
});
it("records quote text in pending history for skipped quote-only group messages", async () => {
capturedCtx = undefined;
const groupHistories = new Map();
const handler = createSignalEventHandler(
createBaseDeps({
cfg: {
messages: { inbound: { debounceMs: 0 }, groupChat: { mentionPatterns: ["@bot"] } },
channels: { signal: { groups: { "*": { requireMention: true } } } },
},
historyLimit: 5,
groupHistories,
}),
);
await handler(makeGroupEvent({ message: "", quoteText: "quoted context" }));
expect(capturedCtx).toBeUndefined();
const entries = groupHistories.get("g1");
expect(entries).toBeTruthy();
expect(entries).toHaveLength(1);
expect(entries[0].body).toBe("quoted context");
await expectSkippedGroupHistory({ message: "", quoteText: "quoted context" }, "quoted context");
});
it("bypasses mention gating for authorized control commands", async () => {
capturedCtx = undefined;
const handler = createSignalEventHandler(
createBaseDeps({
createBaseSignalEventHandlerDeps({
cfg: {
messages: { inbound: { debounceMs: 0 }, groupChat: { mentionPatterns: ["@bot"] } },
channels: { signal: { groups: { "*": { requireMention: true } } } },
@@ -215,7 +161,7 @@ describe("signal mention gating", () => {
it("hydrates mention placeholders before trimming so offsets stay aligned", async () => {
capturedCtx = undefined;
const handler = createSignalEventHandler(
createBaseDeps({
createBaseSignalEventHandlerDeps({
cfg: {
messages: { inbound: { debounceMs: 0 }, groupChat: { mentionPatterns: ["@bot"] } },
channels: { signal: { groups: { "*": { requireMention: false } } } },
@@ -247,7 +193,7 @@ describe("signal mention gating", () => {
it("counts mention metadata replacements toward requireMention gating", async () => {
capturedCtx = undefined;
const handler = createSignalEventHandler(
createBaseDeps({
createBaseSignalEventHandlerDeps({
cfg: {
messages: { inbound: { debounceMs: 0 }, groupChat: { mentionPatterns: ["@123e4567"] } },
channels: { signal: { groups: { "*": { requireMention: true } } } },

View File

@@ -0,0 +1,35 @@
import type { SignalEventHandlerDeps, SignalReactionMessage } from "./event-handler.types.js";
export function createBaseSignalEventHandlerDeps(
overrides: Partial<SignalEventHandlerDeps> = {},
): SignalEventHandlerDeps {
return {
// oxlint-disable-next-line typescript/no-explicit-any
runtime: { log: () => {}, error: () => {} } as any,
cfg: {},
baseUrl: "http://localhost",
accountId: "default",
historyLimit: 5,
groupHistories: new Map(),
textLimit: 4000,
dmPolicy: "open",
allowFrom: ["*"],
groupAllowFrom: ["*"],
groupPolicy: "open",
reactionMode: "off",
reactionAllowlist: [],
mediaMaxBytes: 1024,
ignoreAttachments: true,
sendReadReceipts: false,
readReceiptsViaDaemon: false,
fetchAttachment: async () => null,
deliverReplies: async () => {},
resolveSignalReactionTargets: () => [],
isSignalReactionMessage: (
_reaction: SignalReactionMessage | null | undefined,
): _reaction is SignalReactionMessage => false,
shouldEmitSignalReactionNotification: () => false,
buildSignalReactionSystemEventText: () => "reaction",
...overrides,
};
}