refactor(channels): dedupe message routing and telegram helpers

This commit is contained in:
Peter Steinberger
2026-02-22 07:37:54 +00:00
parent b109fa53ea
commit 75c1bfbae8
21 changed files with 566 additions and 410 deletions

79
src/channels/dock.test.ts Normal file
View File

@@ -0,0 +1,79 @@
import { describe, expect, it } from "vitest";
import type { OpenClawConfig } from "../config/config.js";
import { getChannelDock } from "./dock.js";
function emptyConfig(): OpenClawConfig {
return {} as OpenClawConfig;
}
describe("channels dock", () => {
it("telegram and googlechat threading contexts map thread ids consistently", () => {
const hasRepliedRef = { value: false };
const telegramDock = getChannelDock("telegram");
const googleChatDock = getChannelDock("googlechat");
const telegramContext = telegramDock?.threading?.buildToolContext?.({
cfg: emptyConfig(),
context: { To: " room-1 ", MessageThreadId: 42, ReplyToId: "fallback" },
hasRepliedRef,
});
const googleChatContext = googleChatDock?.threading?.buildToolContext?.({
cfg: emptyConfig(),
context: { To: " space-1 ", ReplyToId: "thread-abc" },
hasRepliedRef,
});
expect(telegramContext).toEqual({
currentChannelId: "room-1",
currentThreadTs: "42",
hasRepliedRef,
});
expect(googleChatContext).toEqual({
currentChannelId: "space-1",
currentThreadTs: "thread-abc",
hasRepliedRef,
});
});
it("irc resolveDefaultTo matches account id case-insensitively", () => {
const ircDock = getChannelDock("irc");
const cfg = {
channels: {
irc: {
defaultTo: "#root",
accounts: {
Work: { defaultTo: "#work" },
},
},
},
} as OpenClawConfig;
const accountDefault = ircDock?.config?.resolveDefaultTo?.({ cfg, accountId: "work" });
const rootDefault = ircDock?.config?.resolveDefaultTo?.({ cfg, accountId: "missing" });
expect(accountDefault).toBe("#work");
expect(rootDefault).toBe("#root");
});
it("signal allowFrom formatter normalizes values and preserves wildcard", () => {
const signalDock = getChannelDock("signal");
const formatted = signalDock?.config?.formatAllowFrom?.({
cfg: emptyConfig(),
allowFrom: [" signal:+14155550100 ", " * "],
});
expect(formatted).toEqual(["+14155550100", "*"]);
});
it("telegram allowFrom formatter trims, strips prefix, and lowercases", () => {
const telegramDock = getChannelDock("telegram");
const formatted = telegramDock?.config?.formatAllowFrom?.({
cfg: emptyConfig(),
allowFrom: [" TG:User ", "telegram:Foo", " Plain "],
});
expect(formatted).toEqual(["user", "foo", "plain"]);
});
});

View File

@@ -1,4 +1,3 @@
import type { OpenClawConfig } from "../config/config.js";
import {
resolveChannelGroupRequireMention,
resolveChannelGroupToolsPolicy,
@@ -32,6 +31,7 @@ import { normalizeSignalMessagingTarget } from "./plugins/normalize/signal.js";
import type {
ChannelCapabilities,
ChannelCommandAdapter,
ChannelConfigAdapter,
ChannelElevatedAdapter,
ChannelGroupAdapter,
ChannelId,
@@ -53,21 +53,10 @@ export type ChannelDock = {
};
streaming?: ChannelDockStreaming;
elevated?: ChannelElevatedAdapter;
config?: {
resolveAllowFrom?: (params: {
cfg: OpenClawConfig;
accountId?: string | null;
}) => Array<string | number> | undefined;
formatAllowFrom?: (params: {
cfg: OpenClawConfig;
accountId?: string | null;
allowFrom: Array<string | number>;
}) => string[];
resolveDefaultTo?: (params: {
cfg: OpenClawConfig;
accountId?: string | null;
}) => string | undefined;
};
config?: Pick<
ChannelConfigAdapter<unknown>,
"resolveAllowFrom" | "formatAllowFrom" | "resolveDefaultTo"
>;
groups?: ChannelGroupAdapter;
mentions?: ChannelMentionAdapter;
threading?: ChannelThreadingAdapter;
@@ -87,6 +76,12 @@ const formatLower = (allowFrom: Array<string | number>) =>
.filter(Boolean)
.map((entry) => entry.toLowerCase());
const stringifyAllowFrom = (allowFrom: Array<string | number>) =>
allowFrom.map((entry) => String(entry));
const trimAllowFromEntries = (allowFrom: Array<string | number>) =>
allowFrom.map((entry) => String(entry).trim()).filter(Boolean);
const formatDiscordAllowFrom = (allowFrom: Array<string | number>) =>
allowFrom
.map((entry) =>
@@ -133,6 +128,18 @@ function buildIMessageThreadToolContext(params: {
};
}
function buildThreadToolContextFromMessageThreadOrReply(params: {
context: ChannelThreadingContext;
hasRepliedRef: ChannelThreadingToolContext["hasRepliedRef"];
}): ChannelThreadingToolContext {
const threadId = params.context.MessageThreadId ?? params.context.ReplyToId;
return {
currentChannelId: params.context.To?.trim() || undefined,
currentThreadTs: threadId != null ? String(threadId) : undefined,
hasRepliedRef: params.hasRepliedRef,
};
}
function resolveCaseInsensitiveAccount<T>(
accounts: Record<string, T> | undefined,
accountId?: string | null,
@@ -182,13 +189,9 @@ const DOCKS: Record<ChatChannelId, ChannelDock> = {
outbound: { textChunkLimit: 4000 },
config: {
resolveAllowFrom: ({ cfg, accountId }) =>
(resolveTelegramAccount({ cfg, accountId }).config.allowFrom ?? []).map((entry) =>
String(entry),
),
stringifyAllowFrom(resolveTelegramAccount({ cfg, accountId }).config.allowFrom ?? []),
formatAllowFrom: ({ allowFrom }) =>
allowFrom
.map((entry) => String(entry).trim())
.filter(Boolean)
trimAllowFromEntries(allowFrom)
.map((entry) => entry.replace(/^(telegram|tg):/i, ""))
.map((entry) => entry.toLowerCase()),
resolveDefaultTo: ({ cfg, accountId }) => {
@@ -202,14 +205,8 @@ const DOCKS: Record<ChatChannelId, ChannelDock> = {
},
threading: {
resolveReplyToMode: ({ cfg }) => cfg.channels?.telegram?.replyToMode ?? "off",
buildToolContext: ({ context, hasRepliedRef }) => {
const threadId = context.MessageThreadId ?? context.ReplyToId;
return {
currentChannelId: context.To?.trim() || undefined,
currentThreadTs: threadId != null ? String(threadId) : undefined,
hasRepliedRef,
};
},
buildToolContext: ({ context, hasRepliedRef }) =>
buildThreadToolContextFromMessageThreadOrReply({ context, hasRepliedRef }),
},
},
whatsapp: {
@@ -426,14 +423,8 @@ const DOCKS: Record<ChatChannelId, ChannelDock> = {
},
threading: {
resolveReplyToMode: ({ cfg }) => cfg.channels?.googlechat?.replyToMode ?? "off",
buildToolContext: ({ context, hasRepliedRef }) => {
const threadId = context.MessageThreadId ?? context.ReplyToId;
return {
currentChannelId: context.To?.trim() || undefined,
currentThreadTs: threadId != null ? String(threadId) : undefined,
hasRepliedRef,
};
},
buildToolContext: ({ context, hasRepliedRef }) =>
buildThreadToolContextFromMessageThreadOrReply({ context, hasRepliedRef }),
},
},
slack: {
@@ -487,13 +478,9 @@ const DOCKS: Record<ChatChannelId, ChannelDock> = {
},
config: {
resolveAllowFrom: ({ cfg, accountId }) =>
(resolveSignalAccount({ cfg, accountId }).config.allowFrom ?? []).map((entry) =>
String(entry),
),
stringifyAllowFrom(resolveSignalAccount({ cfg, accountId }).config.allowFrom ?? []),
formatAllowFrom: ({ allowFrom }) =>
allowFrom
.map((entry) => String(entry).trim())
.filter(Boolean)
trimAllowFromEntries(allowFrom)
.map((entry) => (entry === "*" ? "*" : normalizeE164(entry.replace(/^signal:/i, ""))))
.filter(Boolean),
resolveDefaultTo: ({ cfg, accountId }) =>

View File

@@ -0,0 +1,122 @@
import { describe, expect, it, vi } from "vitest";
import {
clearFinalizableDraftMessage,
createFinalizableDraftLifecycle,
createFinalizableDraftStreamControlsForState,
takeMessageIdAfterStop,
} from "./draft-stream-controls.js";
describe("draft-stream-controls", () => {
it("takeMessageIdAfterStop stops, reads, and clears message id", async () => {
const events: string[] = [];
let messageId: string | undefined = "m-1";
const result = await takeMessageIdAfterStop({
stopForClear: async () => {
events.push("stop");
},
readMessageId: () => {
events.push("read");
return messageId;
},
clearMessageId: () => {
events.push("clear");
messageId = undefined;
},
});
expect(result).toBe("m-1");
expect(messageId).toBeUndefined();
expect(events).toEqual(["stop", "read", "clear"]);
});
it("clearFinalizableDraftMessage deletes valid message ids", async () => {
const deleteMessage = vi.fn(async () => {});
const onDeleteSuccess = vi.fn();
await clearFinalizableDraftMessage({
stopForClear: async () => {},
readMessageId: () => "m-2",
clearMessageId: () => {},
isValidMessageId: (value): value is string => typeof value === "string",
deleteMessage,
onDeleteSuccess,
warnPrefix: "cleanup failed",
});
expect(deleteMessage).toHaveBeenCalledWith("m-2");
expect(onDeleteSuccess).toHaveBeenCalledWith("m-2");
});
it("clearFinalizableDraftMessage skips invalid message ids", async () => {
const deleteMessage = vi.fn(async () => {});
await clearFinalizableDraftMessage({
stopForClear: async () => {},
readMessageId: () => 123,
clearMessageId: () => {},
isValidMessageId: (value): value is string => typeof value === "string",
deleteMessage,
warnPrefix: "cleanup failed",
});
expect(deleteMessage).not.toHaveBeenCalled();
});
it("clearFinalizableDraftMessage warns when delete fails", async () => {
const warn = vi.fn();
await clearFinalizableDraftMessage({
stopForClear: async () => {},
readMessageId: () => "m-3",
clearMessageId: () => {},
isValidMessageId: (value): value is string => typeof value === "string",
deleteMessage: async () => {
throw new Error("boom");
},
warn,
warnPrefix: "cleanup failed",
});
expect(warn).toHaveBeenCalledWith("cleanup failed: boom");
});
it("controls ignore updates after final", async () => {
const sendOrEditStreamMessage = vi.fn(async () => true);
const controls = createFinalizableDraftStreamControlsForState({
throttleMs: 250,
state: { stopped: false, final: true },
sendOrEditStreamMessage,
});
controls.update("ignored");
await controls.loop.flush();
expect(sendOrEditStreamMessage).not.toHaveBeenCalled();
});
it("lifecycle clear marks stopped, clears id, and deletes preview message", async () => {
const state = { stopped: false, final: false };
let messageId: string | undefined = "m-4";
const deleteMessage = vi.fn(async () => {});
const lifecycle = createFinalizableDraftLifecycle({
throttleMs: 250,
state,
sendOrEditStreamMessage: async () => true,
readMessageId: () => messageId,
clearMessageId: () => {
messageId = undefined;
},
isValidMessageId: (value): value is string => typeof value === "string",
deleteMessage,
warnPrefix: "cleanup failed",
});
await lifecycle.clear();
expect(state.stopped).toBe(true);
expect(messageId).toBeUndefined();
expect(deleteMessage).toHaveBeenCalledWith("m-4");
});
});

View File

@@ -5,6 +5,26 @@ export type FinalizableDraftStreamState = {
final: boolean;
};
type StopAndClearMessageIdParams<T> = {
stopForClear: () => Promise<void>;
readMessageId: () => T | undefined;
clearMessageId: () => void;
};
type ClearFinalizableDraftMessageParams<T> = StopAndClearMessageIdParams<T> & {
isValidMessageId: (value: unknown) => value is T;
deleteMessage: (messageId: T) => Promise<void>;
onDeleteSuccess?: (messageId: T) => void;
warn?: (message: string) => void;
warnPrefix: string;
};
type FinalizableDraftLifecycleParams<T> = ClearFinalizableDraftMessageParams<T> & {
throttleMs: number;
state: FinalizableDraftStreamState;
sendOrEditStreamMessage: (text: string) => Promise<boolean>;
};
export function createFinalizableDraftStreamControls(params: {
throttleMs: number;
isStopped: () => boolean;
@@ -64,27 +84,18 @@ export function createFinalizableDraftStreamControlsForState(params: {
});
}
export async function takeMessageIdAfterStop<T>(params: {
stopForClear: () => Promise<void>;
readMessageId: () => T | undefined;
clearMessageId: () => void;
}): Promise<T | undefined> {
export async function takeMessageIdAfterStop<T>(
params: StopAndClearMessageIdParams<T>,
): Promise<T | undefined> {
await params.stopForClear();
const messageId = params.readMessageId();
params.clearMessageId();
return messageId;
}
export async function clearFinalizableDraftMessage<T>(params: {
stopForClear: () => Promise<void>;
readMessageId: () => T | undefined;
clearMessageId: () => void;
isValidMessageId: (value: unknown) => value is T;
deleteMessage: (messageId: T) => Promise<void>;
onDeleteSuccess?: (messageId: T) => void;
warn?: (message: string) => void;
warnPrefix: string;
}): Promise<void> {
export async function clearFinalizableDraftMessage<T>(
params: ClearFinalizableDraftMessageParams<T>,
): Promise<void> {
const messageId = await takeMessageIdAfterStop({
stopForClear: params.stopForClear,
readMessageId: params.readMessageId,
@@ -101,18 +112,7 @@ export async function clearFinalizableDraftMessage<T>(params: {
}
}
export function createFinalizableDraftLifecycle<T>(params: {
throttleMs: number;
state: FinalizableDraftStreamState;
sendOrEditStreamMessage: (text: string) => Promise<boolean>;
readMessageId: () => T | undefined;
clearMessageId: () => void;
isValidMessageId: (value: unknown) => value is T;
deleteMessage: (messageId: T) => Promise<void>;
onDeleteSuccess?: (messageId: T) => void;
warn?: (message: string) => void;
warnPrefix: string;
}) {
export function createFinalizableDraftLifecycle<T>(params: FinalizableDraftLifecycleParams<T>) {
const controls = createFinalizableDraftStreamControlsForState({
throttleMs: params.throttleMs,
state: params.state,

View File

@@ -36,6 +36,24 @@ vi.mock("../../../discord/monitor/thread-bindings.js", async (importOriginal) =>
const { discordOutbound } = await import("./discord.js");
function mockBoundThreadManager() {
hoisted.getThreadBindingManagerMock.mockReturnValue({
getByThreadId: () => ({
accountId: "default",
channelId: "parent-1",
threadId: "thread-1",
targetKind: "subagent",
targetSessionKey: "agent:main:subagent:child",
agentId: "main",
label: "codex-thread",
webhookId: "wh-1",
webhookToken: "tok-1",
boundBy: "system",
boundAt: Date.now(),
}),
});
}
describe("normalizeDiscordOutboundTarget", () => {
it("normalizes bare numeric IDs to channel: prefix", () => {
expect(normalizeDiscordOutboundTarget("1470130713209602050")).toEqual({
@@ -110,21 +128,7 @@ describe("discordOutbound", () => {
});
it("uses webhook persona delivery for bound thread text replies", async () => {
hoisted.getThreadBindingManagerMock.mockReturnValue({
getByThreadId: () => ({
accountId: "default",
channelId: "parent-1",
threadId: "thread-1",
targetKind: "subagent",
targetSessionKey: "agent:main:subagent:child",
agentId: "main",
label: "codex-thread",
webhookId: "wh-1",
webhookToken: "tok-1",
boundBy: "system",
boundAt: Date.now(),
}),
});
mockBoundThreadManager();
const result = await discordOutbound.sendText?.({
cfg: {},
@@ -160,20 +164,7 @@ describe("discordOutbound", () => {
});
it("falls back to bot send for silent delivery on bound threads", async () => {
hoisted.getThreadBindingManagerMock.mockReturnValue({
getByThreadId: () => ({
accountId: "default",
channelId: "parent-1",
threadId: "thread-1",
targetKind: "subagent",
targetSessionKey: "agent:main:subagent:child",
agentId: "main",
webhookId: "wh-1",
webhookToken: "tok-1",
boundBy: "system",
boundAt: Date.now(),
}),
});
mockBoundThreadManager();
const result = await discordOutbound.sendText?.({
cfg: {},
@@ -201,20 +192,7 @@ describe("discordOutbound", () => {
});
it("falls back to bot send when webhook send fails", async () => {
hoisted.getThreadBindingManagerMock.mockReturnValue({
getByThreadId: () => ({
accountId: "default",
channelId: "parent-1",
threadId: "thread-1",
targetKind: "subagent",
targetSessionKey: "agent:main:subagent:child",
agentId: "main",
webhookId: "wh-1",
webhookToken: "tok-1",
boundBy: "system",
boundAt: Date.now(),
}),
});
mockBoundThreadManager();
hoisted.sendWebhookMessageDiscordMock.mockRejectedValueOnce(new Error("rate limited"));
const result = await discordOutbound.sendText?.({

View File

@@ -57,7 +57,7 @@ export type ChannelConfigAdapter<ResolvedAccount> = {
resolveAllowFrom?: (params: {
cfg: OpenClawConfig;
accountId?: string | null;
}) => string[] | undefined;
}) => Array<string | number> | undefined;
formatAllowFrom?: (params: {
cfg: OpenClawConfig;
accountId?: string | null;

View File

@@ -41,6 +41,21 @@ const createEnabledController = (
return { adapter, calls, controller };
};
const createSetOnlyController = () => {
const calls: { method: string; emoji: string }[] = [];
const adapter: StatusReactionAdapter = {
setReaction: vi.fn(async (emoji: string) => {
calls.push({ method: "set", emoji });
}),
};
const controller = createStatusReactionController({
enabled: true,
adapter,
initialEmoji: "👀",
});
return { calls, controller };
};
// ─────────────────────────────────────────────────────────────────────────────
// Tests
// ─────────────────────────────────────────────────────────────────────────────
@@ -245,19 +260,7 @@ describe("createStatusReactionController", () => {
});
it("should only call setReaction when adapter lacks removeReaction", async () => {
const calls: { method: string; emoji: string }[] = [];
const adapter: StatusReactionAdapter = {
setReaction: vi.fn(async (emoji: string) => {
calls.push({ method: "set", emoji });
}),
// No removeReaction
};
const controller = createStatusReactionController({
enabled: true,
adapter,
initialEmoji: "👀",
});
const { calls, controller } = createSetOnlyController();
void controller.setQueued();
await vi.runAllTimersAsync();
@@ -285,18 +288,7 @@ describe("createStatusReactionController", () => {
});
it("should handle clear gracefully when adapter lacks removeReaction", async () => {
const calls: { method: string; emoji: string }[] = [];
const adapter: StatusReactionAdapter = {
setReaction: vi.fn(async (emoji: string) => {
calls.push({ method: "set", emoji });
}),
};
const controller = createStatusReactionController({
enabled: true,
adapter,
initialEmoji: "👀",
});
const { calls, controller } = createSetOnlyController();
await controller.clear();