From 68a0db6fb4c6d116b674e89135fe8b148b9e18f5 Mon Sep 17 00:00:00 2001 From: Josh Lehman Date: Sat, 28 Feb 2026 14:47:32 -0800 Subject: [PATCH] outbound: add plugin session resolver hook and migrate tlon --- extensions/tlon/src/channel.ts | 2 + extensions/tlon/src/outbound-session.test.ts | 80 ++++++++ extensions/tlon/src/outbound-session.ts | 64 +++++++ src/channels/plugins/types.adapters.ts | 32 ++++ src/channels/plugins/types.ts | 3 + src/infra/outbound/outbound-session.ts | 185 +++++++++++++++++-- src/infra/outbound/outbound.test.ts | 161 ++++++++++++++++ src/plugin-sdk/index.ts | 3 + 8 files changed, 513 insertions(+), 17 deletions(-) create mode 100644 extensions/tlon/src/outbound-session.test.ts create mode 100644 extensions/tlon/src/outbound-session.ts diff --git a/extensions/tlon/src/channel.ts b/extensions/tlon/src/channel.ts index cc7f14ea3e5..dd8c735a537 100644 --- a/extensions/tlon/src/channel.ts +++ b/extensions/tlon/src/channel.ts @@ -14,6 +14,7 @@ import { buildTlonAccountFields } from "./account-fields.js"; import { tlonChannelConfigSchema } from "./config-schema.js"; import { monitorTlonProvider } from "./monitor/index.js"; import { tlonOnboardingAdapter } from "./onboarding.js"; +import { resolveTlonOutboundSession } from "./outbound-session.js"; import { formatTargetHint, normalizeShip, parseTlonTarget } from "./targets.js"; import { resolveTlonAccount, listTlonAccountIds } from "./types.js"; import { authenticate } from "./urbit/auth.js"; @@ -89,6 +90,7 @@ function applyTlonSetupConfig(params: { const tlonOutbound: ChannelOutboundAdapter = { deliveryMode: "direct", textChunkLimit: 10000, + resolveSession: resolveTlonOutboundSession, resolveTarget: ({ to }) => { const parsed = parseTlonTarget(to ?? ""); if (!parsed) { diff --git a/extensions/tlon/src/outbound-session.test.ts b/extensions/tlon/src/outbound-session.test.ts new file mode 100644 index 00000000000..cc384d888d2 --- /dev/null +++ b/extensions/tlon/src/outbound-session.test.ts @@ -0,0 +1,80 @@ +import { describe, expect, it } from "vitest"; +import { resolveTlonOutboundSession } from "./outbound-session.js"; + +describe("resolveTlonOutboundSession", () => { + it("resolves direct and group targets with legacy parity", () => { + const cases = [ + { + target: "~sampel-palnet", + expected: { + peer: { kind: "direct", id: "~sampel-palnet" }, + from: "tlon:~sampel-palnet", + to: "tlon:~sampel-palnet", + }, + }, + { + target: "dm:sampel-palnet", + expected: { + peer: { kind: "direct", id: "~sampel-palnet" }, + from: "tlon:~sampel-palnet", + to: "tlon:~sampel-palnet", + }, + }, + { + target: "group:~host-ship/general", + expected: { + peer: { kind: "group", id: "chat/~host-ship/general" }, + from: "tlon:group:chat/~host-ship/general", + to: "tlon:chat/~host-ship/general", + }, + }, + { + target: "chat/~host-ship/general", + expected: { + peer: { kind: "group", id: "chat/~host-ship/general" }, + from: "tlon:group:chat/~host-ship/general", + to: "tlon:chat/~host-ship/general", + }, + }, + { + target: "~host-ship/general", + expected: { + peer: { kind: "group", id: "chat/~host-ship/general" }, + from: "tlon:group:chat/~host-ship/general", + to: "tlon:chat/~host-ship/general", + }, + }, + { + target: "group:opaque-channel-id", + expected: { + peer: { kind: "group", id: "opaque-channel-id" }, + from: "tlon:group:opaque-channel-id", + to: "tlon:opaque-channel-id", + }, + }, + { + target: "tlon:dm:~marzod", + expected: { + peer: { kind: "direct", id: "~marzod" }, + from: "tlon:~marzod", + to: "tlon:~marzod", + }, + }, + ] as const; + + for (const testCase of cases) { + const resolved = resolveTlonOutboundSession({ + cfg: {}, + target: testCase.target, + }); + expect(resolved).not.toBeNull(); + expect(resolved?.peer).toEqual(testCase.expected.peer); + expect(resolved?.from).toBe(testCase.expected.from); + expect(resolved?.to).toBe(testCase.expected.to); + } + }); + + it("returns null for blank target", () => { + expect(resolveTlonOutboundSession({ cfg: {}, target: " " })).toBeNull(); + }); +}); diff --git a/extensions/tlon/src/outbound-session.ts b/extensions/tlon/src/outbound-session.ts new file mode 100644 index 00000000000..5cebc87e3a8 --- /dev/null +++ b/extensions/tlon/src/outbound-session.ts @@ -0,0 +1,64 @@ +import type { + ChannelOutboundSessionResolveParams, + ChannelOutboundSessionResolveResult, +} from "openclaw/plugin-sdk"; + +function normalizeTlonShip(raw: string): string { + const trimmed = raw.trim(); + if (!trimmed) { + return trimmed; + } + return trimmed.startsWith("~") ? trimmed : `~${trimmed}`; +} + +/** + * Resolves outbound Tlon targets to canonical session-routing fields. + * Kept parity-compatible with legacy core resolver during migration. + */ +export function resolveTlonOutboundSession( + params: ChannelOutboundSessionResolveParams, +): ChannelOutboundSessionResolveResult | null { + let trimmed = params.target.trim(); + if (!trimmed) { + return null; + } + trimmed = trimmed.replace(/^tlon:/i, "").trim(); + if (!trimmed) { + return null; + } + const lower = trimmed.toLowerCase(); + let isGroup = + lower.startsWith("group:") || lower.startsWith("room:") || lower.startsWith("chat/"); + let peerId = trimmed; + if (lower.startsWith("group:") || lower.startsWith("room:")) { + peerId = trimmed.replace(/^(group|room):/i, "").trim(); + if (!peerId.startsWith("chat/")) { + const parts = peerId.split("/").filter(Boolean); + if (parts.length === 2) { + peerId = `chat/${normalizeTlonShip(parts[0])}/${parts[1]}`; + } + } + isGroup = true; + } else if (lower.startsWith("dm:")) { + peerId = normalizeTlonShip(trimmed.slice("dm:".length)); + isGroup = false; + } else if (lower.startsWith("chat/")) { + peerId = trimmed; + isGroup = true; + } else if (trimmed.includes("/")) { + const parts = trimmed.split("/").filter(Boolean); + if (parts.length === 2) { + peerId = `chat/${normalizeTlonShip(parts[0])}/${parts[1]}`; + isGroup = true; + } + } else { + peerId = normalizeTlonShip(trimmed); + } + + return { + peer: { kind: isGroup ? "group" : "direct", id: peerId }, + chatType: isGroup ? "group" : "direct", + from: isGroup ? `tlon:group:${peerId}` : `tlon:${peerId}`, + to: `tlon:${peerId}`, + }; +} diff --git a/src/channels/plugins/types.adapters.ts b/src/channels/plugins/types.adapters.ts index ead7f68b2fa..7de25c36506 100644 --- a/src/channels/plugins/types.adapters.ts +++ b/src/channels/plugins/types.adapters.ts @@ -4,6 +4,7 @@ import type { GroupToolPolicyConfig } from "../../config/types.tools.js"; import type { OutboundDeliveryResult, OutboundSendDeps } from "../../infra/outbound/deliver.js"; import type { OutboundIdentity } from "../../infra/outbound/identity.js"; import type { RuntimeEnv } from "../../runtime.js"; +import type { ChatType } from "../chat-type.js"; import type { ChannelAccountSnapshot, ChannelAccountState, @@ -103,12 +104,43 @@ export type ChannelOutboundPayloadContext = ChannelOutboundContext & { payload: ReplyPayload; }; +export type ChannelOutboundSessionResolveTarget = { + kind: "user" | "group" | "channel"; +}; + +export type ChannelOutboundSessionResolveParams = { + cfg: OpenClawConfig; + accountId?: string | null; + target: string; + resolvedTarget?: ChannelOutboundSessionResolveTarget; + replyToId?: string | null; + threadId?: string | number | null; +}; + +export type ChannelOutboundSessionResolveResult = { + peer: { + kind: ChatType; + id: string; + }; + chatType?: "direct" | "group" | "channel"; + from?: string; + to?: string; + threadId?: string | number | null; + useThreadSuffix?: boolean; +}; + export type ChannelOutboundAdapter = { deliveryMode: "direct" | "gateway" | "hybrid"; chunker?: ((text: string, limit: number) => string[]) | null; chunkerMode?: "text" | "markdown"; textChunkLimit?: number; pollMaxOptions?: number; + resolveSession?: ( + params: ChannelOutboundSessionResolveParams, + ) => + | Promise + | ChannelOutboundSessionResolveResult + | null; resolveTarget?: (params: { cfg?: OpenClawConfig; to?: string; diff --git a/src/channels/plugins/types.ts b/src/channels/plugins/types.ts index d3028e9970d..71eab4cf686 100644 --- a/src/channels/plugins/types.ts +++ b/src/channels/plugins/types.ts @@ -23,6 +23,9 @@ export type { ChannelLogoutResult, ChannelOutboundAdapter, ChannelOutboundContext, + ChannelOutboundSessionResolveParams, + ChannelOutboundSessionResolveResult, + ChannelOutboundSessionResolveTarget, ChannelPairingAdapter, ChannelSecurityAdapter, ChannelSetupAdapter, diff --git a/src/infra/outbound/outbound-session.ts b/src/infra/outbound/outbound-session.ts index fa2727f9c4f..b2553f5e68f 100644 --- a/src/infra/outbound/outbound-session.ts +++ b/src/infra/outbound/outbound-session.ts @@ -2,6 +2,7 @@ import type { MsgContext } from "../../auto-reply/templating.js"; import type { ChatType } from "../../channels/chat-type.js"; import { getChannelPlugin } from "../../channels/plugins/index.js"; import type { ChannelId } from "../../channels/plugins/types.js"; +import type { ChannelOutboundSessionResolveResult } from "../../channels/plugins/types.js"; import type { OpenClawConfig } from "../../config/config.js"; import { recordSessionMetaFromInbound, resolveStorePath } from "../../config/sessions.js"; import { parseDiscordTarget } from "../../discord/targets.js"; @@ -117,6 +118,148 @@ function buildBaseSessionKey(params: { }); } +const OPENCLAW_OUTBOUND_SESSION_LEGACY_TLON = "OPENCLAW_OUTBOUND_SESSION_LEGACY_TLON"; + +type NormalizedPluginSessionResolveResult = { + peer: RoutePeer; + chatType: "direct" | "group" | "channel"; + from?: string; + to?: string; + threadId?: string | number; + useThreadSuffix?: boolean; +}; + +function resolveDefaultRouteLabels(params: { channel: ChannelId; peer: RoutePeer }): { + from: string; + to: string; + chatType: "direct" | "group" | "channel"; +} { + if (params.peer.kind === "direct") { + return { + from: `${params.channel}:${params.peer.id}`, + to: `user:${params.peer.id}`, + chatType: "direct", + }; + } + if (params.peer.kind === "group") { + return { + from: `${params.channel}:group:${params.peer.id}`, + to: `channel:${params.peer.id}`, + chatType: "group", + }; + } + return { + from: `${params.channel}:channel:${params.peer.id}`, + to: `channel:${params.peer.id}`, + chatType: "channel", + }; +} + +function normalizePluginSessionResolveResult( + value: ChannelOutboundSessionResolveResult | null | undefined, +): NormalizedPluginSessionResolveResult | null { + if (!value || typeof value !== "object") { + return null; + } + const peerValue = value.peer; + if (!peerValue || typeof peerValue !== "object") { + return null; + } + const peerKind = peerValue.kind; + if (peerKind !== "direct" && peerKind !== "group" && peerKind !== "channel") { + return null; + } + const peerId = typeof peerValue.id === "string" ? peerValue.id.trim() : ""; + if (!peerId) { + return null; + } + const chatType = value.chatType; + if (chatType && chatType !== "direct" && chatType !== "group" && chatType !== "channel") { + return null; + } + const threadIdValue = value.threadId; + if ( + threadIdValue != null && + typeof threadIdValue !== "string" && + typeof threadIdValue !== "number" + ) { + return null; + } + const fromValue = value.from; + if (fromValue != null && typeof fromValue !== "string") { + return null; + } + const toValue = value.to; + if (toValue != null && typeof toValue !== "string") { + return null; + } + if (value.useThreadSuffix != null && typeof value.useThreadSuffix !== "boolean") { + return null; + } + return { + peer: { kind: peerKind, id: peerId }, + chatType: chatType ?? peerKind, + from: typeof fromValue === "string" && fromValue.trim() ? fromValue.trim() : undefined, + to: typeof toValue === "string" && toValue.trim() ? toValue.trim() : undefined, + threadId: threadIdValue ?? undefined, + useThreadSuffix: value.useThreadSuffix, + }; +} + +async function resolvePluginSession( + params: ResolveOutboundSessionRouteParams, +): Promise { + const plugin = getChannelPlugin(params.channel); + const resolver = plugin?.outbound?.resolveSession; + if (!resolver) { + return null; + } + const resolved = normalizePluginSessionResolveResult( + await resolver({ + cfg: params.cfg, + accountId: params.accountId, + target: params.target, + resolvedTarget: params.resolvedTarget ? { kind: params.resolvedTarget.kind } : undefined, + replyToId: params.replyToId ?? undefined, + threadId: params.threadId ?? undefined, + }), + ); + if (!resolved) { + return null; + } + const baseSessionKey = buildBaseSessionKey({ + cfg: params.cfg, + agentId: params.agentId, + channel: params.channel, + accountId: params.accountId, + peer: resolved.peer, + }); + const threadId = normalizeThreadId(resolved.threadId); + const threadKeys = resolveThreadSessionKeys({ + baseSessionKey, + threadId, + useSuffix: resolved.useThreadSuffix ?? true, + }); + const labels = resolveDefaultRouteLabels({ channel: params.channel, peer: resolved.peer }); + return { + sessionKey: threadKeys.sessionKey, + baseSessionKey, + peer: resolved.peer, + chatType: resolved.chatType, + from: resolved.from ?? labels.from, + to: resolved.to ?? labels.to, + threadId, + }; +} + +function shouldUseLegacyTlonSessionResolver(): boolean { + const raw = process.env[OPENCLAW_OUTBOUND_SESSION_LEGACY_TLON]; + if (!raw) { + return false; + } + return raw === "1" || /^true$/i.test(raw.trim()); +} + // Best-effort mpim detection: allowlist/config, then Slack API (if token available). async function resolveSlackChannelType(params: { cfg: OpenClawConfig; @@ -721,6 +864,7 @@ function resolveNostrSession( }; } +// Legacy compatibility resolver retained behind OPENCLAW_OUTBOUND_SESSION_LEGACY_TLON. function normalizeTlonShip(raw: string): string { const trimmed = raw.trim(); if (!trimmed) { @@ -891,41 +1035,48 @@ export async function resolveOutboundSessionRoute( if (!target) { return null; } + const withTarget = { ...params, target }; + const pluginRoute = await resolvePluginSession(withTarget); + if (pluginRoute) { + return pluginRoute; + } switch (params.channel) { case "slack": - return await resolveSlackSession({ ...params, target }); + return await resolveSlackSession(withTarget); case "discord": - return resolveDiscordSession({ ...params, target }); + return resolveDiscordSession(withTarget); case "telegram": - return resolveTelegramSession({ ...params, target }); + return resolveTelegramSession(withTarget); case "whatsapp": - return resolveWhatsAppSession({ ...params, target }); + return resolveWhatsAppSession(withTarget); case "signal": - return resolveSignalSession({ ...params, target }); + return resolveSignalSession(withTarget); case "imessage": - return resolveIMessageSession({ ...params, target }); + return resolveIMessageSession(withTarget); case "matrix": - return resolveMatrixSession({ ...params, target }); + return resolveMatrixSession(withTarget); case "msteams": - return resolveMSTeamsSession({ ...params, target }); + return resolveMSTeamsSession(withTarget); case "mattermost": - return resolveMattermostSession({ ...params, target }); + return resolveMattermostSession(withTarget); case "bluebubbles": - return resolveBlueBubblesSession({ ...params, target }); + return resolveBlueBubblesSession(withTarget); case "nextcloud-talk": - return resolveNextcloudTalkSession({ ...params, target }); + return resolveNextcloudTalkSession(withTarget); case "zalo": - return resolveZaloSession({ ...params, target }); + return resolveZaloSession(withTarget); case "zalouser": - return resolveZalouserSession({ ...params, target }); + return resolveZalouserSession(withTarget); case "nostr": - return resolveNostrSession({ ...params, target }); + return resolveNostrSession(withTarget); case "tlon": - return resolveTlonSession({ ...params, target }); + return shouldUseLegacyTlonSessionResolver() + ? resolveTlonSession(withTarget) + : resolveFallbackSession(withTarget); case "feishu": - return resolveFeishuSession({ ...params, target }); + return resolveFeishuSession(withTarget); default: - return resolveFallbackSession({ ...params, target }); + return resolveFallbackSession(withTarget); } } diff --git a/src/infra/outbound/outbound.test.ts b/src/infra/outbound/outbound.test.ts index 01cdaf3e7c9..e6c055cc5ec 100644 --- a/src/infra/outbound/outbound.test.ts +++ b/src/infra/outbound/outbound.test.ts @@ -2,8 +2,11 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { resolveTlonOutboundSession } from "../../../extensions/tlon/src/outbound-session.js"; import type { ReplyPayload } from "../../auto-reply/types.js"; import type { OpenClawConfig } from "../../config/config.js"; +import { setActivePluginRegistry } from "../../plugins/runtime.js"; +import { createTestRegistry } from "../../test-utils/channel-plugins.js"; import { typedCases } from "../../test-utils/typed-cases.js"; import { ackDelivery, @@ -1034,6 +1037,164 @@ describe("resolveOutboundSessionRoute", () => { } } }); + + it("builds canonical session keys from plugin outbound session resolvers", async () => { + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "discord", + source: "test", + plugin: { + id: "discord", + meta: { + id: "discord", + label: "Discord", + selectionLabel: "Discord", + docsPath: "/channels/discord", + blurb: "test stub.", + }, + capabilities: { chatTypes: ["direct", "group", "channel"] }, + config: { listAccountIds: () => [], resolveAccount: () => ({}) }, + outbound: { + deliveryMode: "direct" as const, + resolveSession: () => ({ + peer: { kind: "channel" as const, id: "C-123" }, + chatType: "channel" as const, + from: "discord:custom:C-123", + to: "channel:C-123", + threadId: "77", + useThreadSuffix: false, + }), + }, + }, + }, + ]), + ); + + const route = await resolveOutboundSessionRoute({ + cfg: {}, + channel: "discord", + agentId: "main", + target: "ignored-by-plugin", + }); + expect(route).toEqual({ + sessionKey: "agent:main:discord:channel:c-123", + baseSessionKey: "agent:main:discord:channel:c-123", + peer: { kind: "channel", id: "C-123" }, + chatType: "channel", + from: "discord:custom:C-123", + to: "channel:C-123", + threadId: "77", + }); + }); + + it("falls back to built-in routing when plugin resolver payload is invalid", async () => { + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "discord", + source: "test", + plugin: { + id: "discord", + meta: { + id: "discord", + label: "Discord", + selectionLabel: "Discord", + docsPath: "/channels/discord", + blurb: "test stub.", + }, + capabilities: { chatTypes: ["direct", "group", "channel"] }, + config: { listAccountIds: () => [], resolveAccount: () => ({}) }, + outbound: { + deliveryMode: "direct" as const, + resolveSession: () => + ({ peer: { kind: "invalid", id: "123" } }) as unknown as { + peer: { kind: "direct"; id: string }; + }, + }, + }, + }, + ]), + ); + + const route = await resolveOutboundSessionRoute({ + cfg: {}, + channel: "discord", + agentId: "main", + target: "user:123", + }); + expect(route).toMatchObject({ + sessionKey: "agent:main:main", + peer: { kind: "direct", id: "123" }, + chatType: "direct", + from: "discord:123", + to: "user:123", + }); + }); + + it("keeps tlon outbound routing parity between plugin and legacy compat path", async () => { + const previousCompat = process.env.OPENCLAW_OUTBOUND_SESSION_LEGACY_TLON; + const targets = ["dm:sampel-palnet", "group:~host-ship/general", "~host-ship/general"] as const; + + try { + process.env.OPENCLAW_OUTBOUND_SESSION_LEGACY_TLON = "1"; + setActivePluginRegistry(createTestRegistry([])); + const legacyRoutes = await Promise.all( + targets.map((target) => + resolveOutboundSessionRoute({ + cfg: {}, + channel: "tlon", + agentId: "main", + target, + }), + ), + ); + + process.env.OPENCLAW_OUTBOUND_SESSION_LEGACY_TLON = "0"; + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "tlon", + source: "test", + plugin: { + id: "tlon", + meta: { + id: "tlon", + label: "Tlon", + selectionLabel: "Tlon", + docsPath: "/channels/tlon", + blurb: "test stub.", + }, + capabilities: { chatTypes: ["direct", "group"] }, + config: { listAccountIds: () => [], resolveAccount: () => ({}) }, + outbound: { + deliveryMode: "direct", + resolveSession: resolveTlonOutboundSession, + }, + }, + }, + ]), + ); + const pluginRoutes = await Promise.all( + targets.map((target) => + resolveOutboundSessionRoute({ + cfg: {}, + channel: "tlon", + agentId: "main", + target, + }), + ), + ); + + expect(pluginRoutes).toEqual(legacyRoutes); + } finally { + if (previousCompat === undefined) { + delete process.env.OPENCLAW_OUTBOUND_SESSION_LEGACY_TLON; + } else { + process.env.OPENCLAW_OUTBOUND_SESSION_LEGACY_TLON = previousCompat; + } + } + }); }); describe("normalizeOutboundPayloadsForJson", () => { diff --git a/src/plugin-sdk/index.ts b/src/plugin-sdk/index.ts index 6a0829c0b9f..94cb0ef2d00 100644 --- a/src/plugin-sdk/index.ts +++ b/src/plugin-sdk/index.ts @@ -38,6 +38,9 @@ export type { ChannelMeta, ChannelOutboundAdapter, ChannelOutboundContext, + ChannelOutboundSessionResolveParams, + ChannelOutboundSessionResolveResult, + ChannelOutboundSessionResolveTarget, ChannelOutboundTargetMode, ChannelPairingAdapter, ChannelPollContext,