feat: migrate zalouser plugin to sdk

# Conflicts:
#	CHANGELOG.md
This commit is contained in:
Peter Steinberger
2026-01-18 04:23:25 +00:00
parent b105745299
commit 89c5185f1c
9 changed files with 152 additions and 397 deletions

View File

@@ -1,8 +1,9 @@
import type { ChildProcess } from "node:child_process";
import type { RuntimeEnv } from "clawdbot/plugin-sdk";
import type { ClawdbotConfig, RuntimeEnv } from "clawdbot/plugin-sdk";
import {
finalizeInboundContext,
formatAgentEnvelope,
isControlCommandMessage,
mergeAllowlist,
recordSessionMetaFromInbound,
@@ -11,20 +12,19 @@ import {
shouldComputeCommandAuthorized,
summarizeMapping,
} from "clawdbot/plugin-sdk";
import { loadCoreChannelDeps, type CoreChannelDeps } from "./core-bridge.js";
import { sendMessageZalouser } from "./send.js";
import type {
CoreConfig,
ResolvedZalouserAccount,
ZcaFriend,
ZcaGroup,
ZcaMessage,
} from "./types.js";
import { getZalouserRuntime } from "./runtime.js";
import { parseJsonOutput, runZca, runZcaStreaming } from "./zca.js";
export type ZalouserMonitorOptions = {
account: ResolvedZalouserAccount;
config: CoreConfig;
config: ClawdbotConfig;
runtime: RuntimeEnv;
abortSignal: AbortSignal;
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void;
@@ -55,8 +55,10 @@ function buildNameIndex<T>(
return index;
}
function logVerbose(deps: CoreChannelDeps, runtime: RuntimeEnv, message: string): void {
if (deps.shouldLogVerbose()) {
type ZalouserCoreRuntime = ReturnType<typeof getZalouserRuntime>;
function logVerbose(core: ZalouserCoreRuntime, runtime: RuntimeEnv, message: string): void {
if (core.logging.shouldLogVerbose()) {
runtime.log(`[zalouser] ${message}`);
}
}
@@ -157,8 +159,8 @@ function startZcaListener(
async function processMessage(
message: ZcaMessage,
account: ResolvedZalouserAccount,
config: CoreConfig,
deps: CoreChannelDeps,
config: ClawdbotConfig,
core: ZalouserCoreRuntime,
runtime: RuntimeEnv,
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void,
): Promise<void> {
@@ -176,13 +178,13 @@ async function processMessage(
const groups = account.config.groups ?? {};
if (isGroup) {
if (groupPolicy === "disabled") {
logVerbose(deps, runtime, `zalouser: drop group ${chatId} (groupPolicy=disabled)`);
logVerbose(core, runtime, `zalouser: drop group ${chatId} (groupPolicy=disabled)`);
return;
}
if (groupPolicy === "allowlist") {
const allowed = isGroupAllowed({ groupId: chatId, groupName, groups });
if (!allowed) {
logVerbose(deps, runtime, `zalouser: drop group ${chatId} (not allowlisted)`);
logVerbose(core, runtime, `zalouser: drop group ${chatId} (not allowlisted)`);
return;
}
}
@@ -194,7 +196,7 @@ async function processMessage(
const shouldComputeAuth = shouldComputeCommandAuthorized(rawBody, config);
const storeAllowFrom =
!isGroup && (dmPolicy !== "open" || shouldComputeAuth)
? await deps.readChannelAllowFromStore("zalouser").catch(() => [])
? await core.channel.pairing.readAllowFromStore("zalouser").catch(() => [])
: [];
const effectiveAllowFrom = [...configAllowFrom, ...storeAllowFrom];
const useAccessGroups = config.commands?.useAccessGroups !== false;
@@ -208,7 +210,7 @@ async function processMessage(
if (!isGroup) {
if (dmPolicy === "disabled") {
logVerbose(deps, runtime, `Blocked zalouser DM from ${senderId} (dmPolicy=disabled)`);
logVerbose(core, runtime, `Blocked zalouser DM from ${senderId} (dmPolicy=disabled)`);
return;
}
@@ -217,18 +219,18 @@ async function processMessage(
if (!allowed) {
if (dmPolicy === "pairing") {
const { code, created } = await deps.upsertChannelPairingRequest({
const { code, created } = await core.channel.pairing.upsertPairingRequest({
channel: "zalouser",
id: senderId,
meta: { name: senderName || undefined },
});
if (created) {
logVerbose(deps, runtime, `zalouser pairing request sender=${senderId}`);
logVerbose(core, runtime, `zalouser pairing request sender=${senderId}`);
try {
await sendMessageZalouser(
chatId,
deps.buildPairingReply({
core.channel.pairing.buildPairingReply({
channel: "zalouser",
idLine: `Your Zalo user id: ${senderId}`,
code,
@@ -238,7 +240,7 @@ async function processMessage(
statusSink?.({ lastOutboundAt: Date.now() });
} catch (err) {
logVerbose(
deps,
core,
runtime,
`zalouser pairing reply failed for ${senderId}: ${String(err)}`,
);
@@ -246,7 +248,7 @@ async function processMessage(
}
} else {
logVerbose(
deps,
core,
runtime,
`Blocked unauthorized zalouser sender ${senderId} (dmPolicy=${dmPolicy})`,
);
@@ -257,13 +259,13 @@ async function processMessage(
}
if (isGroup && isControlCommandMessage(rawBody, config) && commandAuthorized !== true) {
logVerbose(deps, runtime, `zalouser: drop control command from unauthorized sender ${senderId}`);
logVerbose(core, runtime, `zalouser: drop control command from unauthorized sender ${senderId}`);
return;
}
const peer = isGroup ? { kind: "group" as const, id: chatId } : { kind: "group" as const, id: senderId };
const route = deps.resolveAgentRoute({
const route = core.channel.routing.resolveAgentRoute({
cfg: config,
channel: "zalouser",
accountId: account.accountId,
@@ -275,7 +277,7 @@ async function processMessage(
});
const fromLabel = isGroup ? `group:${chatId}` : senderName || `user:${senderId}`;
const body = deps.formatAgentEnvelope({
const body = formatAgentEnvelope({
channel: "Zalo Personal",
from: fromLabel,
timestamp: timestamp ? timestamp * 1000 : undefined,
@@ -313,7 +315,7 @@ async function processMessage(
runtime.error?.(`zalouser: failed updating session meta: ${String(err)}`);
});
await deps.dispatchReplyWithBufferedBlockDispatcher({
await core.channel.reply.dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg: config,
dispatcherOptions: {
@@ -324,7 +326,7 @@ async function processMessage(
chatId,
isGroup,
runtime,
deps,
core,
statusSink,
});
},
@@ -343,10 +345,10 @@ async function deliverZalouserReply(params: {
chatId: string;
isGroup: boolean;
runtime: RuntimeEnv;
deps: CoreChannelDeps;
core: ZalouserCoreRuntime;
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void;
}): Promise<void> {
const { payload, profile, chatId, isGroup, runtime, deps, statusSink } = params;
const { payload, profile, chatId, isGroup, runtime, core, statusSink } = params;
const mediaList = payload.mediaUrls?.length
? payload.mediaUrls
@@ -360,7 +362,7 @@ async function deliverZalouserReply(params: {
const caption = first ? payload.text : undefined;
first = false;
try {
logVerbose(deps, runtime, `Sending media to ${chatId}`);
logVerbose(core, runtime, `Sending media to ${chatId}`);
await sendMessageZalouser(chatId, caption ?? "", {
profile,
mediaUrl,
@@ -375,8 +377,8 @@ async function deliverZalouserReply(params: {
}
if (payload.text) {
const chunks = deps.chunkMarkdownText(payload.text, ZALOUSER_TEXT_LIMIT);
logVerbose(deps, runtime, `Sending ${chunks.length} text chunk(s) to ${chatId}`);
const chunks = core.channel.text.chunkMarkdownText(payload.text, ZALOUSER_TEXT_LIMIT);
logVerbose(core, runtime, `Sending ${chunks.length} text chunk(s) to ${chatId}`);
for (const chunk of chunks) {
try {
await sendMessageZalouser(chatId, chunk, { profile, isGroup });
@@ -394,7 +396,7 @@ export async function monitorZalouserProvider(
let { account, config } = options;
const { abortSignal, statusSink, runtime } = options;
const deps = await loadCoreChannelDeps();
const core = getZalouserRuntime();
let stopped = false;
let proc: ChildProcess | null = null;
let restartTimer: ReturnType<typeof setTimeout> | null = null;
@@ -506,7 +508,7 @@ export async function monitorZalouserProvider(
}
logVerbose(
deps,
core,
runtime,
`[${account.accountId}] starting zca listener (profile=${account.profile})`,
);
@@ -515,16 +517,16 @@ export async function monitorZalouserProvider(
runtime,
account.profile,
(msg) => {
logVerbose(deps, runtime, `[${account.accountId}] inbound message`);
logVerbose(core, runtime, `[${account.accountId}] inbound message`);
statusSink?.({ lastInboundAt: Date.now() });
processMessage(msg, account, config, deps, runtime, statusSink).catch((err) => {
processMessage(msg, account, config, core, runtime, statusSink).catch((err) => {
runtime.error(`[${account.accountId}] Failed to process message: ${String(err)}`);
});
},
(err) => {
runtime.error(`[${account.accountId}] zca listener error: ${String(err)}`);
if (!stopped && !abortSignal.aborted) {
logVerbose(deps, runtime, `[${account.accountId}] restarting listener in 5s...`);
logVerbose(core, runtime, `[${account.accountId}] restarting listener in 5s...`);
restartTimer = setTimeout(startListener, 5000);
} else {
resolveRunning?.();