mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-10 15:04:58 +00:00
refactor(zalo): split monitor access and webhook logic
This commit is contained in:
48
extensions/zalo/src/group-access.ts
Normal file
48
extensions/zalo/src/group-access.ts
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
import type { GroupPolicy, SenderGroupAccessDecision } from "openclaw/plugin-sdk";
|
||||||
|
import {
|
||||||
|
evaluateSenderGroupAccess,
|
||||||
|
isNormalizedSenderAllowed,
|
||||||
|
resolveOpenProviderRuntimeGroupPolicy,
|
||||||
|
} from "openclaw/plugin-sdk";
|
||||||
|
|
||||||
|
const ZALO_ALLOW_FROM_PREFIX_RE = /^(zalo|zl):/i;
|
||||||
|
|
||||||
|
export function isZaloSenderAllowed(senderId: string, allowFrom: string[]): boolean {
|
||||||
|
return isNormalizedSenderAllowed({
|
||||||
|
senderId,
|
||||||
|
allowFrom,
|
||||||
|
stripPrefixRe: ZALO_ALLOW_FROM_PREFIX_RE,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
export function resolveZaloRuntimeGroupPolicy(params: {
|
||||||
|
providerConfigPresent: boolean;
|
||||||
|
groupPolicy?: GroupPolicy;
|
||||||
|
defaultGroupPolicy?: GroupPolicy;
|
||||||
|
}): {
|
||||||
|
groupPolicy: GroupPolicy;
|
||||||
|
providerMissingFallbackApplied: boolean;
|
||||||
|
} {
|
||||||
|
return resolveOpenProviderRuntimeGroupPolicy({
|
||||||
|
providerConfigPresent: params.providerConfigPresent,
|
||||||
|
groupPolicy: params.groupPolicy,
|
||||||
|
defaultGroupPolicy: params.defaultGroupPolicy,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
export function evaluateZaloGroupAccess(params: {
|
||||||
|
providerConfigPresent: boolean;
|
||||||
|
configuredGroupPolicy?: GroupPolicy;
|
||||||
|
defaultGroupPolicy?: GroupPolicy;
|
||||||
|
groupAllowFrom: string[];
|
||||||
|
senderId: string;
|
||||||
|
}): SenderGroupAccessDecision {
|
||||||
|
return evaluateSenderGroupAccess({
|
||||||
|
providerConfigPresent: params.providerConfigPresent,
|
||||||
|
configuredGroupPolicy: params.configuredGroupPolicy,
|
||||||
|
defaultGroupPolicy: params.defaultGroupPolicy,
|
||||||
|
groupAllowFrom: params.groupAllowFrom,
|
||||||
|
senderId: params.senderId,
|
||||||
|
isSenderAllowed: isZaloSenderAllowed,
|
||||||
|
});
|
||||||
|
}
|
||||||
@@ -1,22 +1,13 @@
|
|||||||
import { timingSafeEqual } from "node:crypto";
|
|
||||||
import type { IncomingMessage, ServerResponse } from "node:http";
|
import type { IncomingMessage, ServerResponse } from "node:http";
|
||||||
import type { MarkdownTableMode, OpenClawConfig, OutboundReplyPayload } from "openclaw/plugin-sdk";
|
import type { MarkdownTableMode, OpenClawConfig, OutboundReplyPayload } from "openclaw/plugin-sdk";
|
||||||
import {
|
import {
|
||||||
createDedupeCache,
|
|
||||||
createReplyPrefixOptions,
|
createReplyPrefixOptions,
|
||||||
readJsonBodyWithLimit,
|
|
||||||
registerWebhookTarget,
|
|
||||||
rejectNonPostWebhookRequest,
|
|
||||||
resolveSingleWebhookTarget,
|
|
||||||
resolveSenderCommandAuthorization,
|
resolveSenderCommandAuthorization,
|
||||||
resolveOutboundMediaUrls,
|
resolveOutboundMediaUrls,
|
||||||
resolveDefaultGroupPolicy,
|
resolveDefaultGroupPolicy,
|
||||||
resolveOpenProviderRuntimeGroupPolicy,
|
|
||||||
sendMediaWithLeadingCaption,
|
sendMediaWithLeadingCaption,
|
||||||
resolveWebhookPath,
|
resolveWebhookPath,
|
||||||
resolveWebhookTargets,
|
|
||||||
warnMissingProviderGroupPolicyFallbackOnce,
|
warnMissingProviderGroupPolicyFallbackOnce,
|
||||||
requestBodyErrorToText,
|
|
||||||
} from "openclaw/plugin-sdk";
|
} from "openclaw/plugin-sdk";
|
||||||
import type { ResolvedZaloAccount } from "./accounts.js";
|
import type { ResolvedZaloAccount } from "./accounts.js";
|
||||||
import {
|
import {
|
||||||
@@ -30,6 +21,16 @@ import {
|
|||||||
type ZaloMessage,
|
type ZaloMessage,
|
||||||
type ZaloUpdate,
|
type ZaloUpdate,
|
||||||
} from "./api.js";
|
} from "./api.js";
|
||||||
|
import {
|
||||||
|
evaluateZaloGroupAccess,
|
||||||
|
isZaloSenderAllowed,
|
||||||
|
resolveZaloRuntimeGroupPolicy,
|
||||||
|
} from "./group-access.js";
|
||||||
|
import {
|
||||||
|
handleZaloWebhookRequest as handleZaloWebhookRequestInternal,
|
||||||
|
registerZaloWebhookTarget as registerZaloWebhookTargetInternal,
|
||||||
|
type ZaloWebhookTarget,
|
||||||
|
} from "./monitor.webhook.js";
|
||||||
import { resolveZaloProxyFetch } from "./proxy.js";
|
import { resolveZaloProxyFetch } from "./proxy.js";
|
||||||
import { getZaloRuntime } from "./runtime.js";
|
import { getZaloRuntime } from "./runtime.js";
|
||||||
|
|
||||||
@@ -58,21 +59,8 @@ export type ZaloMonitorResult = {
|
|||||||
|
|
||||||
const ZALO_TEXT_LIMIT = 2000;
|
const ZALO_TEXT_LIMIT = 2000;
|
||||||
const DEFAULT_MEDIA_MAX_MB = 5;
|
const DEFAULT_MEDIA_MAX_MB = 5;
|
||||||
const ZALO_WEBHOOK_RATE_LIMIT_WINDOW_MS = 60_000;
|
|
||||||
const ZALO_WEBHOOK_RATE_LIMIT_MAX_REQUESTS = 120;
|
|
||||||
const ZALO_WEBHOOK_REPLAY_WINDOW_MS = 5 * 60_000;
|
|
||||||
const ZALO_WEBHOOK_COUNTER_LOG_EVERY = 25;
|
|
||||||
|
|
||||||
type ZaloCoreRuntime = ReturnType<typeof getZaloRuntime>;
|
type ZaloCoreRuntime = ReturnType<typeof getZaloRuntime>;
|
||||||
type WebhookRateLimitState = { count: number; windowStartMs: number };
|
|
||||||
type ZaloGroupPolicy = "open" | "allowlist" | "disabled";
|
|
||||||
type ZaloGroupAccessReason = "allowed" | "disabled" | "empty_allowlist" | "sender_not_allowlisted";
|
|
||||||
type ZaloGroupAccessDecision = {
|
|
||||||
allowed: boolean;
|
|
||||||
groupPolicy: ZaloGroupPolicy;
|
|
||||||
providerMissingFallbackApplied: boolean;
|
|
||||||
reason: ZaloGroupAccessReason;
|
|
||||||
};
|
|
||||||
|
|
||||||
function logVerbose(core: ZaloCoreRuntime, runtime: ZaloRuntimeEnv, message: string): void {
|
function logVerbose(core: ZaloCoreRuntime, runtime: ZaloRuntimeEnv, message: string): void {
|
||||||
if (core.logging.shouldLogVerbose()) {
|
if (core.logging.shouldLogVerbose()) {
|
||||||
@@ -80,277 +68,27 @@ function logVerbose(core: ZaloCoreRuntime, runtime: ZaloRuntimeEnv, message: str
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function isSenderAllowed(senderId: string, allowFrom: string[]): boolean {
|
export function registerZaloWebhookTarget(target: ZaloWebhookTarget): () => void {
|
||||||
if (allowFrom.includes("*")) {
|
return registerZaloWebhookTargetInternal(target);
|
||||||
return true;
|
|
||||||
}
|
|
||||||
const normalizedSenderId = senderId.toLowerCase();
|
|
||||||
return allowFrom.some((entry) => {
|
|
||||||
const normalized = entry.toLowerCase().replace(/^(zalo|zl):/i, "");
|
|
||||||
return normalized === normalizedSenderId;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
function resolveZaloRuntimeGroupPolicy(params: {
|
|
||||||
providerConfigPresent: boolean;
|
|
||||||
groupPolicy?: ZaloGroupPolicy;
|
|
||||||
defaultGroupPolicy?: ZaloGroupPolicy;
|
|
||||||
}): {
|
|
||||||
groupPolicy: ZaloGroupPolicy;
|
|
||||||
providerMissingFallbackApplied: boolean;
|
|
||||||
} {
|
|
||||||
return resolveOpenProviderRuntimeGroupPolicy({
|
|
||||||
providerConfigPresent: params.providerConfigPresent,
|
|
||||||
groupPolicy: params.groupPolicy,
|
|
||||||
defaultGroupPolicy: params.defaultGroupPolicy,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
function evaluateZaloGroupAccess(params: {
|
|
||||||
providerConfigPresent: boolean;
|
|
||||||
configuredGroupPolicy?: ZaloGroupPolicy;
|
|
||||||
defaultGroupPolicy?: ZaloGroupPolicy;
|
|
||||||
groupAllowFrom: string[];
|
|
||||||
senderId: string;
|
|
||||||
}): ZaloGroupAccessDecision {
|
|
||||||
const { groupPolicy, providerMissingFallbackApplied } = resolveZaloRuntimeGroupPolicy({
|
|
||||||
providerConfigPresent: params.providerConfigPresent,
|
|
||||||
groupPolicy: params.configuredGroupPolicy,
|
|
||||||
defaultGroupPolicy: params.defaultGroupPolicy,
|
|
||||||
});
|
|
||||||
if (groupPolicy === "disabled") {
|
|
||||||
return {
|
|
||||||
allowed: false,
|
|
||||||
groupPolicy,
|
|
||||||
providerMissingFallbackApplied,
|
|
||||||
reason: "disabled",
|
|
||||||
};
|
|
||||||
}
|
|
||||||
if (groupPolicy === "allowlist") {
|
|
||||||
if (params.groupAllowFrom.length === 0) {
|
|
||||||
return {
|
|
||||||
allowed: false,
|
|
||||||
groupPolicy,
|
|
||||||
providerMissingFallbackApplied,
|
|
||||||
reason: "empty_allowlist",
|
|
||||||
};
|
|
||||||
}
|
|
||||||
if (!isSenderAllowed(params.senderId, params.groupAllowFrom)) {
|
|
||||||
return {
|
|
||||||
allowed: false,
|
|
||||||
groupPolicy,
|
|
||||||
providerMissingFallbackApplied,
|
|
||||||
reason: "sender_not_allowlisted",
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return {
|
|
||||||
allowed: true,
|
|
||||||
groupPolicy,
|
|
||||||
providerMissingFallbackApplied,
|
|
||||||
reason: "allowed",
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
type WebhookTarget = {
|
|
||||||
token: string;
|
|
||||||
account: ResolvedZaloAccount;
|
|
||||||
config: OpenClawConfig;
|
|
||||||
runtime: ZaloRuntimeEnv;
|
|
||||||
core: ZaloCoreRuntime;
|
|
||||||
secret: string;
|
|
||||||
path: string;
|
|
||||||
mediaMaxMb: number;
|
|
||||||
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void;
|
|
||||||
fetcher?: ZaloFetch;
|
|
||||||
};
|
|
||||||
|
|
||||||
const webhookTargets = new Map<string, WebhookTarget[]>();
|
|
||||||
const webhookRateLimits = new Map<string, WebhookRateLimitState>();
|
|
||||||
const recentWebhookEvents = createDedupeCache({
|
|
||||||
ttlMs: ZALO_WEBHOOK_REPLAY_WINDOW_MS,
|
|
||||||
maxSize: 5000,
|
|
||||||
});
|
|
||||||
const webhookStatusCounters = new Map<string, number>();
|
|
||||||
|
|
||||||
function isJsonContentType(value: string | string[] | undefined): boolean {
|
|
||||||
const first = Array.isArray(value) ? value[0] : value;
|
|
||||||
if (!first) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
const mediaType = first.split(";", 1)[0]?.trim().toLowerCase();
|
|
||||||
return mediaType === "application/json" || Boolean(mediaType?.endsWith("+json"));
|
|
||||||
}
|
|
||||||
|
|
||||||
function timingSafeEquals(left: string, right: string): boolean {
|
|
||||||
const leftBuffer = Buffer.from(left);
|
|
||||||
const rightBuffer = Buffer.from(right);
|
|
||||||
|
|
||||||
if (leftBuffer.length !== rightBuffer.length) {
|
|
||||||
const length = Math.max(1, leftBuffer.length, rightBuffer.length);
|
|
||||||
const paddedLeft = Buffer.alloc(length);
|
|
||||||
const paddedRight = Buffer.alloc(length);
|
|
||||||
leftBuffer.copy(paddedLeft);
|
|
||||||
rightBuffer.copy(paddedRight);
|
|
||||||
timingSafeEqual(paddedLeft, paddedRight);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return timingSafeEqual(leftBuffer, rightBuffer);
|
|
||||||
}
|
|
||||||
|
|
||||||
function isWebhookRateLimited(key: string, nowMs: number): boolean {
|
|
||||||
const state = webhookRateLimits.get(key);
|
|
||||||
if (!state || nowMs - state.windowStartMs >= ZALO_WEBHOOK_RATE_LIMIT_WINDOW_MS) {
|
|
||||||
webhookRateLimits.set(key, { count: 1, windowStartMs: nowMs });
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
state.count += 1;
|
|
||||||
if (state.count > ZALO_WEBHOOK_RATE_LIMIT_MAX_REQUESTS) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
function isReplayEvent(update: ZaloUpdate, nowMs: number): boolean {
|
|
||||||
const messageId = update.message?.message_id;
|
|
||||||
if (!messageId) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
const key = `${update.event_name}:${messageId}`;
|
|
||||||
return recentWebhookEvents.check(key, nowMs);
|
|
||||||
}
|
|
||||||
|
|
||||||
function recordWebhookStatus(
|
|
||||||
runtime: ZaloRuntimeEnv | undefined,
|
|
||||||
path: string,
|
|
||||||
statusCode: number,
|
|
||||||
): void {
|
|
||||||
if (![400, 401, 408, 413, 415, 429].includes(statusCode)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
const key = `${path}:${statusCode}`;
|
|
||||||
const next = (webhookStatusCounters.get(key) ?? 0) + 1;
|
|
||||||
webhookStatusCounters.set(key, next);
|
|
||||||
if (next === 1 || next % ZALO_WEBHOOK_COUNTER_LOG_EVERY === 0) {
|
|
||||||
runtime?.log?.(
|
|
||||||
`[zalo] webhook anomaly path=${path} status=${statusCode} count=${String(next)}`,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export function registerZaloWebhookTarget(target: WebhookTarget): () => void {
|
|
||||||
return registerWebhookTarget(webhookTargets, target).unregister;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function handleZaloWebhookRequest(
|
export async function handleZaloWebhookRequest(
|
||||||
req: IncomingMessage,
|
req: IncomingMessage,
|
||||||
res: ServerResponse,
|
res: ServerResponse,
|
||||||
): Promise<boolean> {
|
): Promise<boolean> {
|
||||||
const resolved = resolveWebhookTargets(req, webhookTargets);
|
return handleZaloWebhookRequestInternal(req, res, async ({ update, target }) => {
|
||||||
if (!resolved) {
|
await processUpdate(
|
||||||
return false;
|
update,
|
||||||
}
|
target.token,
|
||||||
const { targets } = resolved;
|
target.account,
|
||||||
|
target.config,
|
||||||
if (rejectNonPostWebhookRequest(req, res)) {
|
target.runtime,
|
||||||
return true;
|
target.core as ZaloCoreRuntime,
|
||||||
}
|
target.mediaMaxMb,
|
||||||
|
target.statusSink,
|
||||||
const headerToken = String(req.headers["x-bot-api-secret-token"] ?? "");
|
target.fetcher,
|
||||||
const matchedTarget = resolveSingleWebhookTarget(targets, (entry) =>
|
);
|
||||||
timingSafeEquals(entry.secret, headerToken),
|
|
||||||
);
|
|
||||||
if (matchedTarget.kind === "none") {
|
|
||||||
res.statusCode = 401;
|
|
||||||
res.end("unauthorized");
|
|
||||||
recordWebhookStatus(targets[0]?.runtime, req.url ?? "<unknown>", res.statusCode);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (matchedTarget.kind === "ambiguous") {
|
|
||||||
res.statusCode = 401;
|
|
||||||
res.end("ambiguous webhook target");
|
|
||||||
recordWebhookStatus(targets[0]?.runtime, req.url ?? "<unknown>", res.statusCode);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
const target = matchedTarget.target;
|
|
||||||
const path = req.url ?? "<unknown>";
|
|
||||||
const rateLimitKey = `${path}:${req.socket.remoteAddress ?? "unknown"}`;
|
|
||||||
const nowMs = Date.now();
|
|
||||||
|
|
||||||
if (isWebhookRateLimited(rateLimitKey, nowMs)) {
|
|
||||||
res.statusCode = 429;
|
|
||||||
res.end("Too Many Requests");
|
|
||||||
recordWebhookStatus(target.runtime, path, res.statusCode);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!isJsonContentType(req.headers["content-type"])) {
|
|
||||||
res.statusCode = 415;
|
|
||||||
res.end("Unsupported Media Type");
|
|
||||||
recordWebhookStatus(target.runtime, path, res.statusCode);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
const body = await readJsonBodyWithLimit(req, {
|
|
||||||
maxBytes: 1024 * 1024,
|
|
||||||
timeoutMs: 30_000,
|
|
||||||
emptyObjectOnEmpty: false,
|
|
||||||
});
|
});
|
||||||
if (!body.ok) {
|
|
||||||
res.statusCode =
|
|
||||||
body.code === "PAYLOAD_TOO_LARGE" ? 413 : body.code === "REQUEST_BODY_TIMEOUT" ? 408 : 400;
|
|
||||||
const message =
|
|
||||||
body.code === "PAYLOAD_TOO_LARGE"
|
|
||||||
? requestBodyErrorToText("PAYLOAD_TOO_LARGE")
|
|
||||||
: body.code === "REQUEST_BODY_TIMEOUT"
|
|
||||||
? requestBodyErrorToText("REQUEST_BODY_TIMEOUT")
|
|
||||||
: "Bad Request";
|
|
||||||
res.end(message);
|
|
||||||
recordWebhookStatus(target.runtime, path, res.statusCode);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Zalo sends updates directly as { event_name, message, ... }, not wrapped in { ok, result }
|
|
||||||
const raw = body.value;
|
|
||||||
const record = raw && typeof raw === "object" ? (raw as Record<string, unknown>) : null;
|
|
||||||
const update: ZaloUpdate | undefined =
|
|
||||||
record && record.ok === true && record.result
|
|
||||||
? (record.result as ZaloUpdate)
|
|
||||||
: ((record as ZaloUpdate | null) ?? undefined);
|
|
||||||
|
|
||||||
if (!update?.event_name) {
|
|
||||||
res.statusCode = 400;
|
|
||||||
res.end("Bad Request");
|
|
||||||
recordWebhookStatus(target.runtime, path, res.statusCode);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (isReplayEvent(update, nowMs)) {
|
|
||||||
res.statusCode = 200;
|
|
||||||
res.end("ok");
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
target.statusSink?.({ lastInboundAt: Date.now() });
|
|
||||||
processUpdate(
|
|
||||||
update,
|
|
||||||
target.token,
|
|
||||||
target.account,
|
|
||||||
target.config,
|
|
||||||
target.runtime,
|
|
||||||
target.core,
|
|
||||||
target.mediaMaxMb,
|
|
||||||
target.statusSink,
|
|
||||||
target.fetcher,
|
|
||||||
).catch((err) => {
|
|
||||||
target.runtime.error?.(`[${target.account.accountId}] Zalo webhook failed: ${String(err)}`);
|
|
||||||
});
|
|
||||||
|
|
||||||
res.statusCode = 200;
|
|
||||||
res.end("ok");
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function startPollingLoop(params: {
|
function startPollingLoop(params: {
|
||||||
@@ -618,7 +356,7 @@ async function processMessageWithPipeline(params: {
|
|||||||
dmPolicy,
|
dmPolicy,
|
||||||
configuredAllowFrom: configAllowFrom,
|
configuredAllowFrom: configAllowFrom,
|
||||||
senderId,
|
senderId,
|
||||||
isSenderAllowed,
|
isSenderAllowed: isZaloSenderAllowed,
|
||||||
readAllowFromStore: () => core.channel.pairing.readAllowFromStore("zalo"),
|
readAllowFromStore: () => core.channel.pairing.readAllowFromStore("zalo"),
|
||||||
shouldComputeCommandAuthorized: (body, cfg) =>
|
shouldComputeCommandAuthorized: (body, cfg) =>
|
||||||
core.channel.commands.shouldComputeCommandAuthorized(body, cfg),
|
core.channel.commands.shouldComputeCommandAuthorized(body, cfg),
|
||||||
|
|||||||
219
extensions/zalo/src/monitor.webhook.ts
Normal file
219
extensions/zalo/src/monitor.webhook.ts
Normal file
@@ -0,0 +1,219 @@
|
|||||||
|
import { timingSafeEqual } from "node:crypto";
|
||||||
|
import type { IncomingMessage, ServerResponse } from "node:http";
|
||||||
|
import type { OpenClawConfig } from "openclaw/plugin-sdk";
|
||||||
|
import {
|
||||||
|
createDedupeCache,
|
||||||
|
readJsonBodyWithLimit,
|
||||||
|
registerWebhookTarget,
|
||||||
|
rejectNonPostWebhookRequest,
|
||||||
|
requestBodyErrorToText,
|
||||||
|
resolveSingleWebhookTarget,
|
||||||
|
resolveWebhookTargets,
|
||||||
|
} from "openclaw/plugin-sdk";
|
||||||
|
import type { ResolvedZaloAccount } from "./accounts.js";
|
||||||
|
import type { ZaloFetch, ZaloUpdate } from "./api.js";
|
||||||
|
import type { ZaloRuntimeEnv } from "./monitor.js";
|
||||||
|
|
||||||
|
type WebhookRateLimitState = { count: number; windowStartMs: number };
|
||||||
|
|
||||||
|
const ZALO_WEBHOOK_RATE_LIMIT_WINDOW_MS = 60_000;
|
||||||
|
const ZALO_WEBHOOK_RATE_LIMIT_MAX_REQUESTS = 120;
|
||||||
|
const ZALO_WEBHOOK_REPLAY_WINDOW_MS = 5 * 60_000;
|
||||||
|
const ZALO_WEBHOOK_COUNTER_LOG_EVERY = 25;
|
||||||
|
|
||||||
|
export type ZaloWebhookTarget = {
|
||||||
|
token: string;
|
||||||
|
account: ResolvedZaloAccount;
|
||||||
|
config: OpenClawConfig;
|
||||||
|
runtime: ZaloRuntimeEnv;
|
||||||
|
core: unknown;
|
||||||
|
secret: string;
|
||||||
|
path: string;
|
||||||
|
mediaMaxMb: number;
|
||||||
|
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void;
|
||||||
|
fetcher?: ZaloFetch;
|
||||||
|
};
|
||||||
|
|
||||||
|
export type ZaloWebhookProcessUpdate = (params: {
|
||||||
|
update: ZaloUpdate;
|
||||||
|
target: ZaloWebhookTarget;
|
||||||
|
}) => Promise<void>;
|
||||||
|
|
||||||
|
const webhookTargets = new Map<string, ZaloWebhookTarget[]>();
|
||||||
|
const webhookRateLimits = new Map<string, WebhookRateLimitState>();
|
||||||
|
const recentWebhookEvents = createDedupeCache({
|
||||||
|
ttlMs: ZALO_WEBHOOK_REPLAY_WINDOW_MS,
|
||||||
|
maxSize: 5000,
|
||||||
|
});
|
||||||
|
const webhookStatusCounters = new Map<string, number>();
|
||||||
|
|
||||||
|
function isJsonContentType(value: string | string[] | undefined): boolean {
|
||||||
|
const first = Array.isArray(value) ? value[0] : value;
|
||||||
|
if (!first) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
const mediaType = first.split(";", 1)[0]?.trim().toLowerCase();
|
||||||
|
return mediaType === "application/json" || Boolean(mediaType?.endsWith("+json"));
|
||||||
|
}
|
||||||
|
|
||||||
|
function timingSafeEquals(left: string, right: string): boolean {
|
||||||
|
const leftBuffer = Buffer.from(left);
|
||||||
|
const rightBuffer = Buffer.from(right);
|
||||||
|
|
||||||
|
if (leftBuffer.length !== rightBuffer.length) {
|
||||||
|
const length = Math.max(1, leftBuffer.length, rightBuffer.length);
|
||||||
|
const paddedLeft = Buffer.alloc(length);
|
||||||
|
const paddedRight = Buffer.alloc(length);
|
||||||
|
leftBuffer.copy(paddedLeft);
|
||||||
|
rightBuffer.copy(paddedRight);
|
||||||
|
timingSafeEqual(paddedLeft, paddedRight);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return timingSafeEqual(leftBuffer, rightBuffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
function isWebhookRateLimited(key: string, nowMs: number): boolean {
|
||||||
|
const state = webhookRateLimits.get(key);
|
||||||
|
if (!state || nowMs - state.windowStartMs >= ZALO_WEBHOOK_RATE_LIMIT_WINDOW_MS) {
|
||||||
|
webhookRateLimits.set(key, { count: 1, windowStartMs: nowMs });
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
state.count += 1;
|
||||||
|
if (state.count > ZALO_WEBHOOK_RATE_LIMIT_MAX_REQUESTS) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
function isReplayEvent(update: ZaloUpdate, nowMs: number): boolean {
|
||||||
|
const messageId = update.message?.message_id;
|
||||||
|
if (!messageId) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
const key = `${update.event_name}:${messageId}`;
|
||||||
|
return recentWebhookEvents.check(key, nowMs);
|
||||||
|
}
|
||||||
|
|
||||||
|
function recordWebhookStatus(
|
||||||
|
runtime: ZaloRuntimeEnv | undefined,
|
||||||
|
path: string,
|
||||||
|
statusCode: number,
|
||||||
|
): void {
|
||||||
|
if (![400, 401, 408, 413, 415, 429].includes(statusCode)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const key = `${path}:${statusCode}`;
|
||||||
|
const next = (webhookStatusCounters.get(key) ?? 0) + 1;
|
||||||
|
webhookStatusCounters.set(key, next);
|
||||||
|
if (next === 1 || next % ZALO_WEBHOOK_COUNTER_LOG_EVERY === 0) {
|
||||||
|
runtime?.log?.(
|
||||||
|
`[zalo] webhook anomaly path=${path} status=${statusCode} count=${String(next)}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export function registerZaloWebhookTarget(target: ZaloWebhookTarget): () => void {
|
||||||
|
return registerWebhookTarget(webhookTargets, target).unregister;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function handleZaloWebhookRequest(
|
||||||
|
req: IncomingMessage,
|
||||||
|
res: ServerResponse,
|
||||||
|
processUpdate: ZaloWebhookProcessUpdate,
|
||||||
|
): Promise<boolean> {
|
||||||
|
const resolved = resolveWebhookTargets(req, webhookTargets);
|
||||||
|
if (!resolved) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
const { targets } = resolved;
|
||||||
|
|
||||||
|
if (rejectNonPostWebhookRequest(req, res)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
const headerToken = String(req.headers["x-bot-api-secret-token"] ?? "");
|
||||||
|
const matchedTarget = resolveSingleWebhookTarget(targets, (entry) =>
|
||||||
|
timingSafeEquals(entry.secret, headerToken),
|
||||||
|
);
|
||||||
|
if (matchedTarget.kind === "none") {
|
||||||
|
res.statusCode = 401;
|
||||||
|
res.end("unauthorized");
|
||||||
|
recordWebhookStatus(targets[0]?.runtime, req.url ?? "<unknown>", res.statusCode);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (matchedTarget.kind === "ambiguous") {
|
||||||
|
res.statusCode = 401;
|
||||||
|
res.end("ambiguous webhook target");
|
||||||
|
recordWebhookStatus(targets[0]?.runtime, req.url ?? "<unknown>", res.statusCode);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
const target = matchedTarget.target;
|
||||||
|
const path = req.url ?? "<unknown>";
|
||||||
|
const rateLimitKey = `${path}:${req.socket.remoteAddress ?? "unknown"}`;
|
||||||
|
const nowMs = Date.now();
|
||||||
|
|
||||||
|
if (isWebhookRateLimited(rateLimitKey, nowMs)) {
|
||||||
|
res.statusCode = 429;
|
||||||
|
res.end("Too Many Requests");
|
||||||
|
recordWebhookStatus(target.runtime, path, res.statusCode);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!isJsonContentType(req.headers["content-type"])) {
|
||||||
|
res.statusCode = 415;
|
||||||
|
res.end("Unsupported Media Type");
|
||||||
|
recordWebhookStatus(target.runtime, path, res.statusCode);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
const body = await readJsonBodyWithLimit(req, {
|
||||||
|
maxBytes: 1024 * 1024,
|
||||||
|
timeoutMs: 30_000,
|
||||||
|
emptyObjectOnEmpty: false,
|
||||||
|
});
|
||||||
|
if (!body.ok) {
|
||||||
|
res.statusCode =
|
||||||
|
body.code === "PAYLOAD_TOO_LARGE" ? 413 : body.code === "REQUEST_BODY_TIMEOUT" ? 408 : 400;
|
||||||
|
const message =
|
||||||
|
body.code === "PAYLOAD_TOO_LARGE"
|
||||||
|
? requestBodyErrorToText("PAYLOAD_TOO_LARGE")
|
||||||
|
: body.code === "REQUEST_BODY_TIMEOUT"
|
||||||
|
? requestBodyErrorToText("REQUEST_BODY_TIMEOUT")
|
||||||
|
: "Bad Request";
|
||||||
|
res.end(message);
|
||||||
|
recordWebhookStatus(target.runtime, path, res.statusCode);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Zalo sends updates directly as { event_name, message, ... }, not wrapped in { ok, result }.
|
||||||
|
const raw = body.value;
|
||||||
|
const record = raw && typeof raw === "object" ? (raw as Record<string, unknown>) : null;
|
||||||
|
const update: ZaloUpdate | undefined =
|
||||||
|
record && record.ok === true && record.result
|
||||||
|
? (record.result as ZaloUpdate)
|
||||||
|
: ((record as ZaloUpdate | null) ?? undefined);
|
||||||
|
|
||||||
|
if (!update?.event_name) {
|
||||||
|
res.statusCode = 400;
|
||||||
|
res.end("Bad Request");
|
||||||
|
recordWebhookStatus(target.runtime, path, res.statusCode);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isReplayEvent(update, nowMs)) {
|
||||||
|
res.statusCode = 200;
|
||||||
|
res.end("ok");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
target.statusSink?.({ lastInboundAt: Date.now() });
|
||||||
|
processUpdate({ update, target }).catch((err) => {
|
||||||
|
target.runtime.error?.(`[${target.account.accountId}] Zalo webhook failed: ${String(err)}`);
|
||||||
|
});
|
||||||
|
|
||||||
|
res.statusCode = 200;
|
||||||
|
res.end("ok");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
@@ -1,5 +1,5 @@
|
|||||||
import { describe, expect, it } from "vitest";
|
import { describe, expect, it } from "vitest";
|
||||||
import { isAllowedParsedChatSender } from "./allow-from.js";
|
import { isAllowedParsedChatSender, isNormalizedSenderAllowed } from "./allow-from.js";
|
||||||
|
|
||||||
function parseAllowTarget(
|
function parseAllowTarget(
|
||||||
entry: string,
|
entry: string,
|
||||||
@@ -71,3 +71,34 @@ describe("isAllowedParsedChatSender", () => {
|
|||||||
expect(allowed).toBe(true);
|
expect(allowed).toBe(true);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe("isNormalizedSenderAllowed", () => {
|
||||||
|
it("allows wildcard", () => {
|
||||||
|
expect(
|
||||||
|
isNormalizedSenderAllowed({
|
||||||
|
senderId: "attacker",
|
||||||
|
allowFrom: ["*"],
|
||||||
|
}),
|
||||||
|
).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("normalizes case and strips prefixes", () => {
|
||||||
|
expect(
|
||||||
|
isNormalizedSenderAllowed({
|
||||||
|
senderId: "12345",
|
||||||
|
allowFrom: ["ZALO:12345", "zl:777"],
|
||||||
|
stripPrefixRe: /^(zalo|zl):/i,
|
||||||
|
}),
|
||||||
|
).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("rejects when sender is missing", () => {
|
||||||
|
expect(
|
||||||
|
isNormalizedSenderAllowed({
|
||||||
|
senderId: "999",
|
||||||
|
allowFrom: ["zl:12345"],
|
||||||
|
stripPrefixRe: /^(zalo|zl):/i,
|
||||||
|
}),
|
||||||
|
).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|||||||
@@ -9,6 +9,25 @@ export function formatAllowFromLowercase(params: {
|
|||||||
.map((entry) => entry.toLowerCase());
|
.map((entry) => entry.toLowerCase());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function isNormalizedSenderAllowed(params: {
|
||||||
|
senderId: string | number;
|
||||||
|
allowFrom: Array<string | number>;
|
||||||
|
stripPrefixRe?: RegExp;
|
||||||
|
}): boolean {
|
||||||
|
const normalizedAllow = formatAllowFromLowercase({
|
||||||
|
allowFrom: params.allowFrom,
|
||||||
|
stripPrefixRe: params.stripPrefixRe,
|
||||||
|
});
|
||||||
|
if (normalizedAllow.length === 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (normalizedAllow.includes("*")) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
const sender = String(params.senderId).trim().toLowerCase();
|
||||||
|
return normalizedAllow.includes(sender);
|
||||||
|
}
|
||||||
|
|
||||||
type ParsedChatAllowTarget =
|
type ParsedChatAllowTarget =
|
||||||
| { kind: "chat_id"; chatId: number }
|
| { kind: "chat_id"; chatId: number }
|
||||||
| { kind: "chat_guid"; chatGuid: string }
|
| { kind: "chat_guid"; chatGuid: string }
|
||||||
|
|||||||
69
src/plugin-sdk/group-access.test.ts
Normal file
69
src/plugin-sdk/group-access.test.ts
Normal file
@@ -0,0 +1,69 @@
|
|||||||
|
import { describe, expect, it } from "vitest";
|
||||||
|
import { evaluateSenderGroupAccess } from "./group-access.js";
|
||||||
|
|
||||||
|
describe("evaluateSenderGroupAccess", () => {
|
||||||
|
it("defaults missing provider config to allowlist", () => {
|
||||||
|
const decision = evaluateSenderGroupAccess({
|
||||||
|
providerConfigPresent: false,
|
||||||
|
configuredGroupPolicy: undefined,
|
||||||
|
defaultGroupPolicy: "open",
|
||||||
|
groupAllowFrom: ["123"],
|
||||||
|
senderId: "123",
|
||||||
|
isSenderAllowed: () => true,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(decision).toEqual({
|
||||||
|
allowed: true,
|
||||||
|
groupPolicy: "allowlist",
|
||||||
|
providerMissingFallbackApplied: true,
|
||||||
|
reason: "allowed",
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("blocks disabled policy", () => {
|
||||||
|
const decision = evaluateSenderGroupAccess({
|
||||||
|
providerConfigPresent: true,
|
||||||
|
configuredGroupPolicy: "disabled",
|
||||||
|
defaultGroupPolicy: "open",
|
||||||
|
groupAllowFrom: ["123"],
|
||||||
|
senderId: "123",
|
||||||
|
isSenderAllowed: () => true,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(decision).toMatchObject({ allowed: false, reason: "disabled", groupPolicy: "disabled" });
|
||||||
|
});
|
||||||
|
|
||||||
|
it("blocks allowlist with empty list", () => {
|
||||||
|
const decision = evaluateSenderGroupAccess({
|
||||||
|
providerConfigPresent: true,
|
||||||
|
configuredGroupPolicy: "allowlist",
|
||||||
|
defaultGroupPolicy: "open",
|
||||||
|
groupAllowFrom: [],
|
||||||
|
senderId: "123",
|
||||||
|
isSenderAllowed: () => true,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(decision).toMatchObject({
|
||||||
|
allowed: false,
|
||||||
|
reason: "empty_allowlist",
|
||||||
|
groupPolicy: "allowlist",
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("blocks sender not allowlisted", () => {
|
||||||
|
const decision = evaluateSenderGroupAccess({
|
||||||
|
providerConfigPresent: true,
|
||||||
|
configuredGroupPolicy: "allowlist",
|
||||||
|
defaultGroupPolicy: "open",
|
||||||
|
groupAllowFrom: ["123"],
|
||||||
|
senderId: "999",
|
||||||
|
isSenderAllowed: () => false,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(decision).toMatchObject({
|
||||||
|
allowed: false,
|
||||||
|
reason: "sender_not_allowlisted",
|
||||||
|
groupPolicy: "allowlist",
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
64
src/plugin-sdk/group-access.ts
Normal file
64
src/plugin-sdk/group-access.ts
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
import { resolveOpenProviderRuntimeGroupPolicy } from "../config/runtime-group-policy.js";
|
||||||
|
import type { GroupPolicy } from "../config/types.base.js";
|
||||||
|
|
||||||
|
export type SenderGroupAccessReason =
|
||||||
|
| "allowed"
|
||||||
|
| "disabled"
|
||||||
|
| "empty_allowlist"
|
||||||
|
| "sender_not_allowlisted";
|
||||||
|
|
||||||
|
export type SenderGroupAccessDecision = {
|
||||||
|
allowed: boolean;
|
||||||
|
groupPolicy: GroupPolicy;
|
||||||
|
providerMissingFallbackApplied: boolean;
|
||||||
|
reason: SenderGroupAccessReason;
|
||||||
|
};
|
||||||
|
|
||||||
|
export function evaluateSenderGroupAccess(params: {
|
||||||
|
providerConfigPresent: boolean;
|
||||||
|
configuredGroupPolicy?: GroupPolicy;
|
||||||
|
defaultGroupPolicy?: GroupPolicy;
|
||||||
|
groupAllowFrom: string[];
|
||||||
|
senderId: string;
|
||||||
|
isSenderAllowed: (senderId: string, allowFrom: string[]) => boolean;
|
||||||
|
}): SenderGroupAccessDecision {
|
||||||
|
const { groupPolicy, providerMissingFallbackApplied } = resolveOpenProviderRuntimeGroupPolicy({
|
||||||
|
providerConfigPresent: params.providerConfigPresent,
|
||||||
|
groupPolicy: params.configuredGroupPolicy,
|
||||||
|
defaultGroupPolicy: params.defaultGroupPolicy,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (groupPolicy === "disabled") {
|
||||||
|
return {
|
||||||
|
allowed: false,
|
||||||
|
groupPolicy,
|
||||||
|
providerMissingFallbackApplied,
|
||||||
|
reason: "disabled",
|
||||||
|
};
|
||||||
|
}
|
||||||
|
if (groupPolicy === "allowlist") {
|
||||||
|
if (params.groupAllowFrom.length === 0) {
|
||||||
|
return {
|
||||||
|
allowed: false,
|
||||||
|
groupPolicy,
|
||||||
|
providerMissingFallbackApplied,
|
||||||
|
reason: "empty_allowlist",
|
||||||
|
};
|
||||||
|
}
|
||||||
|
if (!params.isSenderAllowed(params.senderId, params.groupAllowFrom)) {
|
||||||
|
return {
|
||||||
|
allowed: false,
|
||||||
|
groupPolicy,
|
||||||
|
providerMissingFallbackApplied,
|
||||||
|
reason: "sender_not_allowlisted",
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
allowed: true,
|
||||||
|
groupPolicy,
|
||||||
|
providerMissingFallbackApplied,
|
||||||
|
reason: "allowed",
|
||||||
|
};
|
||||||
|
}
|
||||||
@@ -181,7 +181,16 @@ export {
|
|||||||
normalizeAccountId,
|
normalizeAccountId,
|
||||||
resolveThreadSessionKeys,
|
resolveThreadSessionKeys,
|
||||||
} from "../routing/session-key.js";
|
} from "../routing/session-key.js";
|
||||||
export { formatAllowFromLowercase, isAllowedParsedChatSender } from "./allow-from.js";
|
export {
|
||||||
|
formatAllowFromLowercase,
|
||||||
|
isAllowedParsedChatSender,
|
||||||
|
isNormalizedSenderAllowed,
|
||||||
|
} from "./allow-from.js";
|
||||||
|
export {
|
||||||
|
evaluateSenderGroupAccess,
|
||||||
|
type SenderGroupAccessDecision,
|
||||||
|
type SenderGroupAccessReason,
|
||||||
|
} from "./group-access.js";
|
||||||
export { resolveSenderCommandAuthorization } from "./command-auth.js";
|
export { resolveSenderCommandAuthorization } from "./command-auth.js";
|
||||||
export { handleSlackMessageAction } from "./slack-message-actions.js";
|
export { handleSlackMessageAction } from "./slack-message-actions.js";
|
||||||
export { extractToolSend } from "./tool-send.js";
|
export { extractToolSend } from "./tool-send.js";
|
||||||
|
|||||||
Reference in New Issue
Block a user