mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-08 17:08:27 +00:00
refactor!: rename chat providers to channels
This commit is contained in:
326
src/gateway/server-channels.ts
Normal file
326
src/gateway/server-channels.ts
Normal file
@@ -0,0 +1,326 @@
|
||||
import { resolveChannelDefaultAccountId } from "../channels/plugins/helpers.js";
|
||||
import {
|
||||
type ChannelId,
|
||||
getChannelPlugin,
|
||||
listChannelPlugins,
|
||||
} from "../channels/plugins/index.js";
|
||||
import type { ChannelAccountSnapshot } from "../channels/plugins/types.js";
|
||||
import type { ClawdbotConfig } from "../config/config.js";
|
||||
import { formatErrorMessage } from "../infra/errors.js";
|
||||
import type { createSubsystemLogger } from "../logging.js";
|
||||
import { DEFAULT_ACCOUNT_ID } from "../routing/session-key.js";
|
||||
import type { RuntimeEnv } from "../runtime.js";
|
||||
|
||||
export type ChannelRuntimeSnapshot = {
|
||||
channels: Partial<Record<ChannelId, ChannelAccountSnapshot>>;
|
||||
channelAccounts: Partial<
|
||||
Record<ChannelId, Record<string, ChannelAccountSnapshot>>
|
||||
>;
|
||||
};
|
||||
|
||||
type SubsystemLogger = ReturnType<typeof createSubsystemLogger>;
|
||||
|
||||
type ChannelRuntimeStore = {
|
||||
aborts: Map<string, AbortController>;
|
||||
tasks: Map<string, Promise<unknown>>;
|
||||
runtimes: Map<string, ChannelAccountSnapshot>;
|
||||
};
|
||||
|
||||
function createRuntimeStore(): ChannelRuntimeStore {
|
||||
return {
|
||||
aborts: new Map(),
|
||||
tasks: new Map(),
|
||||
runtimes: new Map(),
|
||||
};
|
||||
}
|
||||
|
||||
function isAccountEnabled(account: unknown): boolean {
|
||||
if (!account || typeof account !== "object") return true;
|
||||
const enabled = (account as { enabled?: boolean }).enabled;
|
||||
return enabled !== false;
|
||||
}
|
||||
|
||||
function resolveDefaultRuntime(channelId: ChannelId): ChannelAccountSnapshot {
|
||||
const plugin = getChannelPlugin(channelId);
|
||||
return plugin?.status?.defaultRuntime ?? { accountId: DEFAULT_ACCOUNT_ID };
|
||||
}
|
||||
|
||||
function cloneDefaultRuntime(
|
||||
channelId: ChannelId,
|
||||
accountId: string,
|
||||
): ChannelAccountSnapshot {
|
||||
return { ...resolveDefaultRuntime(channelId), accountId };
|
||||
}
|
||||
|
||||
type ChannelManagerOptions = {
|
||||
loadConfig: () => ClawdbotConfig;
|
||||
channelLogs: Record<ChannelId, SubsystemLogger>;
|
||||
channelRuntimeEnvs: Record<ChannelId, RuntimeEnv>;
|
||||
};
|
||||
|
||||
export type ChannelManager = {
|
||||
getRuntimeSnapshot: () => ChannelRuntimeSnapshot;
|
||||
startChannels: () => Promise<void>;
|
||||
startChannel: (channel: ChannelId, accountId?: string) => Promise<void>;
|
||||
stopChannel: (channel: ChannelId, accountId?: string) => Promise<void>;
|
||||
markChannelLoggedOut: (
|
||||
channelId: ChannelId,
|
||||
cleared: boolean,
|
||||
accountId?: string,
|
||||
) => void;
|
||||
};
|
||||
|
||||
// Channel docking: lifecycle hooks (`plugin.gateway`) flow through this manager.
|
||||
export function createChannelManager(
|
||||
opts: ChannelManagerOptions,
|
||||
): ChannelManager {
|
||||
const { loadConfig, channelLogs, channelRuntimeEnvs } = opts;
|
||||
|
||||
const channelStores = new Map<ChannelId, ChannelRuntimeStore>();
|
||||
|
||||
const getStore = (channelId: ChannelId): ChannelRuntimeStore => {
|
||||
const existing = channelStores.get(channelId);
|
||||
if (existing) return existing;
|
||||
const next = createRuntimeStore();
|
||||
channelStores.set(channelId, next);
|
||||
return next;
|
||||
};
|
||||
|
||||
const getRuntime = (
|
||||
channelId: ChannelId,
|
||||
accountId: string,
|
||||
): ChannelAccountSnapshot => {
|
||||
const store = getStore(channelId);
|
||||
return (
|
||||
store.runtimes.get(accountId) ?? cloneDefaultRuntime(channelId, accountId)
|
||||
);
|
||||
};
|
||||
|
||||
const setRuntime = (
|
||||
channelId: ChannelId,
|
||||
accountId: string,
|
||||
patch: ChannelAccountSnapshot,
|
||||
): ChannelAccountSnapshot => {
|
||||
const store = getStore(channelId);
|
||||
const current = getRuntime(channelId, accountId);
|
||||
const next = { ...current, ...patch, accountId };
|
||||
store.runtimes.set(accountId, next);
|
||||
return next;
|
||||
};
|
||||
|
||||
const startChannel = async (channelId: ChannelId, accountId?: string) => {
|
||||
const plugin = getChannelPlugin(channelId);
|
||||
const startAccount = plugin?.gateway?.startAccount;
|
||||
if (!startAccount) return;
|
||||
const cfg = loadConfig();
|
||||
const store = getStore(channelId);
|
||||
const accountIds = accountId
|
||||
? [accountId]
|
||||
: plugin.config.listAccountIds(cfg);
|
||||
if (accountIds.length === 0) return;
|
||||
|
||||
await Promise.all(
|
||||
accountIds.map(async (id) => {
|
||||
if (store.tasks.has(id)) return;
|
||||
const account = plugin.config.resolveAccount(cfg, id);
|
||||
const enabled = plugin.config.isEnabled
|
||||
? plugin.config.isEnabled(account, cfg)
|
||||
: isAccountEnabled(account);
|
||||
if (!enabled) {
|
||||
setRuntime(channelId, id, {
|
||||
accountId: id,
|
||||
running: false,
|
||||
lastError:
|
||||
plugin.config.disabledReason?.(account, cfg) ?? "disabled",
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
let configured = true;
|
||||
if (plugin.config.isConfigured) {
|
||||
configured = await plugin.config.isConfigured(account, cfg);
|
||||
}
|
||||
if (!configured) {
|
||||
setRuntime(channelId, id, {
|
||||
accountId: id,
|
||||
running: false,
|
||||
lastError:
|
||||
plugin.config.unconfiguredReason?.(account, cfg) ??
|
||||
"not configured",
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const abort = new AbortController();
|
||||
store.aborts.set(id, abort);
|
||||
setRuntime(channelId, id, {
|
||||
accountId: id,
|
||||
running: true,
|
||||
lastStartAt: Date.now(),
|
||||
lastError: null,
|
||||
});
|
||||
|
||||
const log = channelLogs[channelId];
|
||||
const task = startAccount({
|
||||
cfg,
|
||||
accountId: id,
|
||||
account,
|
||||
runtime: channelRuntimeEnvs[channelId],
|
||||
abortSignal: abort.signal,
|
||||
log,
|
||||
getStatus: () => getRuntime(channelId, id),
|
||||
setStatus: (next) => setRuntime(channelId, id, next),
|
||||
});
|
||||
const tracked = Promise.resolve(task)
|
||||
.catch((err) => {
|
||||
const message = formatErrorMessage(err);
|
||||
setRuntime(channelId, id, { accountId: id, lastError: message });
|
||||
log.error?.(`[${id}] channel exited: ${message}`);
|
||||
})
|
||||
.finally(() => {
|
||||
store.aborts.delete(id);
|
||||
store.tasks.delete(id);
|
||||
setRuntime(channelId, id, {
|
||||
accountId: id,
|
||||
running: false,
|
||||
lastStopAt: Date.now(),
|
||||
});
|
||||
});
|
||||
store.tasks.set(id, tracked);
|
||||
}),
|
||||
);
|
||||
};
|
||||
|
||||
const stopChannel = async (channelId: ChannelId, accountId?: string) => {
|
||||
const plugin = getChannelPlugin(channelId);
|
||||
const cfg = loadConfig();
|
||||
const store = getStore(channelId);
|
||||
const knownIds = new Set<string>([
|
||||
...store.aborts.keys(),
|
||||
...store.tasks.keys(),
|
||||
...(plugin ? plugin.config.listAccountIds(cfg) : []),
|
||||
]);
|
||||
if (accountId) {
|
||||
knownIds.clear();
|
||||
knownIds.add(accountId);
|
||||
}
|
||||
|
||||
await Promise.all(
|
||||
Array.from(knownIds.values()).map(async (id) => {
|
||||
const abort = store.aborts.get(id);
|
||||
const task = store.tasks.get(id);
|
||||
if (!abort && !task && !plugin?.gateway?.stopAccount) return;
|
||||
abort?.abort();
|
||||
if (plugin?.gateway?.stopAccount) {
|
||||
const account = plugin.config.resolveAccount(cfg, id);
|
||||
await plugin.gateway.stopAccount({
|
||||
cfg,
|
||||
accountId: id,
|
||||
account,
|
||||
runtime: channelRuntimeEnvs[channelId],
|
||||
abortSignal: abort?.signal ?? new AbortController().signal,
|
||||
log: channelLogs[channelId],
|
||||
getStatus: () => getRuntime(channelId, id),
|
||||
setStatus: (next) => setRuntime(channelId, id, next),
|
||||
});
|
||||
}
|
||||
try {
|
||||
await task;
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
store.aborts.delete(id);
|
||||
store.tasks.delete(id);
|
||||
setRuntime(channelId, id, {
|
||||
accountId: id,
|
||||
running: false,
|
||||
lastStopAt: Date.now(),
|
||||
});
|
||||
}),
|
||||
);
|
||||
};
|
||||
|
||||
const startChannels = async () => {
|
||||
for (const plugin of listChannelPlugins()) {
|
||||
await startChannel(plugin.id);
|
||||
}
|
||||
};
|
||||
|
||||
const markChannelLoggedOut = (
|
||||
channelId: ChannelId,
|
||||
cleared: boolean,
|
||||
accountId?: string,
|
||||
) => {
|
||||
const plugin = getChannelPlugin(channelId);
|
||||
if (!plugin) return;
|
||||
const cfg = loadConfig();
|
||||
const resolvedId =
|
||||
accountId ??
|
||||
resolveChannelDefaultAccountId({
|
||||
plugin,
|
||||
cfg,
|
||||
});
|
||||
const current = getRuntime(channelId, resolvedId);
|
||||
const next: ChannelAccountSnapshot = {
|
||||
accountId: resolvedId,
|
||||
running: false,
|
||||
lastError: cleared ? "logged out" : current.lastError,
|
||||
};
|
||||
if (typeof current.connected === "boolean") {
|
||||
next.connected = false;
|
||||
}
|
||||
setRuntime(channelId, resolvedId, next);
|
||||
};
|
||||
|
||||
const getRuntimeSnapshot = (): ChannelRuntimeSnapshot => {
|
||||
const cfg = loadConfig();
|
||||
const channels: ChannelRuntimeSnapshot["channels"] = {};
|
||||
const channelAccounts: ChannelRuntimeSnapshot["channelAccounts"] = {};
|
||||
for (const plugin of listChannelPlugins()) {
|
||||
const store = getStore(plugin.id);
|
||||
const accountIds = plugin.config.listAccountIds(cfg);
|
||||
const defaultAccountId = resolveChannelDefaultAccountId({
|
||||
plugin,
|
||||
cfg,
|
||||
accountIds,
|
||||
});
|
||||
const accounts: Record<string, ChannelAccountSnapshot> = {};
|
||||
for (const id of accountIds) {
|
||||
const account = plugin.config.resolveAccount(cfg, id);
|
||||
const enabled = plugin.config.isEnabled
|
||||
? plugin.config.isEnabled(account, cfg)
|
||||
: isAccountEnabled(account);
|
||||
const described = plugin.config.describeAccount?.(account, cfg);
|
||||
const configured = described?.configured;
|
||||
const current =
|
||||
store.runtimes.get(id) ?? cloneDefaultRuntime(plugin.id, id);
|
||||
const next = { ...current, accountId: id };
|
||||
if (!next.running) {
|
||||
if (!enabled) {
|
||||
next.lastError ??=
|
||||
plugin.config.disabledReason?.(account, cfg) ?? "disabled";
|
||||
} else if (configured === false) {
|
||||
next.lastError ??=
|
||||
plugin.config.unconfiguredReason?.(account, cfg) ??
|
||||
"not configured";
|
||||
}
|
||||
}
|
||||
accounts[id] = next;
|
||||
}
|
||||
const defaultAccount =
|
||||
accounts[defaultAccountId] ??
|
||||
cloneDefaultRuntime(plugin.id, defaultAccountId);
|
||||
channels[plugin.id] = defaultAccount;
|
||||
channelAccounts[plugin.id] = accounts;
|
||||
}
|
||||
return { channels, channelAccounts };
|
||||
};
|
||||
|
||||
return {
|
||||
getRuntimeSnapshot,
|
||||
startChannels,
|
||||
startChannel,
|
||||
stopChannel,
|
||||
markChannelLoggedOut,
|
||||
};
|
||||
}
|
||||
Reference in New Issue
Block a user