mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-18 20:57:28 +00:00
265 lines
7.4 KiB
TypeScript
265 lines
7.4 KiB
TypeScript
import * as http from "http";
|
|
import crypto from "node:crypto";
|
|
import * as Lark from "@larksuiteoapi/node-sdk";
|
|
import {
|
|
applyBasicWebhookRequestGuards,
|
|
readJsonBodyWithLimit,
|
|
type RuntimeEnv,
|
|
installRequestBodyLimitGuard,
|
|
} from "openclaw/plugin-sdk/feishu";
|
|
import { createFeishuWSClient } from "./client.js";
|
|
import {
|
|
botNames,
|
|
botOpenIds,
|
|
FEISHU_WEBHOOK_BODY_TIMEOUT_MS,
|
|
FEISHU_WEBHOOK_MAX_BODY_BYTES,
|
|
feishuWebhookRateLimiter,
|
|
httpServers,
|
|
recordWebhookStatus,
|
|
wsClients,
|
|
} from "./monitor.state.js";
|
|
import type { ResolvedFeishuAccount } from "./types.js";
|
|
|
|
export type MonitorTransportParams = {
|
|
account: ResolvedFeishuAccount;
|
|
accountId: string;
|
|
runtime?: RuntimeEnv;
|
|
abortSignal?: AbortSignal;
|
|
eventDispatcher: Lark.EventDispatcher;
|
|
};
|
|
|
|
function isFeishuWebhookPayload(value: unknown): value is Record<string, unknown> {
|
|
return !!value && typeof value === "object" && !Array.isArray(value);
|
|
}
|
|
|
|
function buildFeishuWebhookEnvelope(
|
|
req: http.IncomingMessage,
|
|
payload: Record<string, unknown>,
|
|
): Record<string, unknown> {
|
|
return Object.assign(Object.create({ headers: req.headers }), payload) as Record<string, unknown>;
|
|
}
|
|
|
|
function isFeishuWebhookSignatureValid(params: {
|
|
headers: http.IncomingHttpHeaders;
|
|
payload: Record<string, unknown>;
|
|
encryptKey?: string;
|
|
}): boolean {
|
|
const encryptKey = params.encryptKey?.trim();
|
|
if (!encryptKey) {
|
|
return true;
|
|
}
|
|
|
|
const timestampHeader = params.headers["x-lark-request-timestamp"];
|
|
const nonceHeader = params.headers["x-lark-request-nonce"];
|
|
const signatureHeader = params.headers["x-lark-signature"];
|
|
const timestamp = Array.isArray(timestampHeader) ? timestampHeader[0] : timestampHeader;
|
|
const nonce = Array.isArray(nonceHeader) ? nonceHeader[0] : nonceHeader;
|
|
const signature = Array.isArray(signatureHeader) ? signatureHeader[0] : signatureHeader;
|
|
if (!timestamp || !nonce || !signature) {
|
|
return false;
|
|
}
|
|
|
|
const computedSignature = crypto
|
|
.createHash("sha256")
|
|
.update(timestamp + nonce + encryptKey + JSON.stringify(params.payload))
|
|
.digest("hex");
|
|
return computedSignature === signature;
|
|
}
|
|
|
|
function respondText(res: http.ServerResponse, statusCode: number, body: string): void {
|
|
res.statusCode = statusCode;
|
|
res.setHeader("Content-Type", "text/plain; charset=utf-8");
|
|
res.end(body);
|
|
}
|
|
|
|
export async function monitorWebSocket({
|
|
account,
|
|
accountId,
|
|
runtime,
|
|
abortSignal,
|
|
eventDispatcher,
|
|
}: MonitorTransportParams): Promise<void> {
|
|
const log = runtime?.log ?? console.log;
|
|
log(`feishu[${accountId}]: starting WebSocket connection...`);
|
|
|
|
const wsClient = createFeishuWSClient(account);
|
|
wsClients.set(accountId, wsClient);
|
|
|
|
return new Promise((resolve, reject) => {
|
|
const cleanup = () => {
|
|
wsClients.delete(accountId);
|
|
botOpenIds.delete(accountId);
|
|
botNames.delete(accountId);
|
|
};
|
|
|
|
const handleAbort = () => {
|
|
log(`feishu[${accountId}]: abort signal received, stopping`);
|
|
cleanup();
|
|
resolve();
|
|
};
|
|
|
|
if (abortSignal?.aborted) {
|
|
cleanup();
|
|
resolve();
|
|
return;
|
|
}
|
|
|
|
abortSignal?.addEventListener("abort", handleAbort, { once: true });
|
|
|
|
try {
|
|
wsClient.start({ eventDispatcher });
|
|
log(`feishu[${accountId}]: WebSocket client started`);
|
|
} catch (err) {
|
|
cleanup();
|
|
abortSignal?.removeEventListener("abort", handleAbort);
|
|
reject(err);
|
|
}
|
|
});
|
|
}
|
|
|
|
export async function monitorWebhook({
|
|
account,
|
|
accountId,
|
|
runtime,
|
|
abortSignal,
|
|
eventDispatcher,
|
|
}: MonitorTransportParams): Promise<void> {
|
|
const log = runtime?.log ?? console.log;
|
|
const error = runtime?.error ?? console.error;
|
|
|
|
const port = account.config.webhookPort ?? 3000;
|
|
const path = account.config.webhookPath ?? "/feishu/events";
|
|
const host = account.config.webhookHost ?? "127.0.0.1";
|
|
|
|
log(`feishu[${accountId}]: starting Webhook server on ${host}:${port}, path ${path}...`);
|
|
|
|
const server = http.createServer();
|
|
|
|
server.on("request", (req, res) => {
|
|
res.on("finish", () => {
|
|
recordWebhookStatus(runtime, accountId, path, res.statusCode);
|
|
});
|
|
|
|
const rateLimitKey = `${accountId}:${path}:${req.socket.remoteAddress ?? "unknown"}`;
|
|
if (
|
|
!applyBasicWebhookRequestGuards({
|
|
req,
|
|
res,
|
|
rateLimiter: feishuWebhookRateLimiter,
|
|
rateLimitKey,
|
|
nowMs: Date.now(),
|
|
requireJsonContentType: true,
|
|
})
|
|
) {
|
|
return;
|
|
}
|
|
|
|
const guard = installRequestBodyLimitGuard(req, res, {
|
|
maxBytes: FEISHU_WEBHOOK_MAX_BODY_BYTES,
|
|
timeoutMs: FEISHU_WEBHOOK_BODY_TIMEOUT_MS,
|
|
responseFormat: "text",
|
|
});
|
|
if (guard.isTripped()) {
|
|
return;
|
|
}
|
|
|
|
void (async () => {
|
|
try {
|
|
const bodyResult = await readJsonBodyWithLimit(req, {
|
|
maxBytes: FEISHU_WEBHOOK_MAX_BODY_BYTES,
|
|
timeoutMs: FEISHU_WEBHOOK_BODY_TIMEOUT_MS,
|
|
});
|
|
if (guard.isTripped() || res.writableEnded) {
|
|
return;
|
|
}
|
|
if (!bodyResult.ok) {
|
|
if (bodyResult.code === "INVALID_JSON") {
|
|
respondText(res, 400, "Invalid JSON");
|
|
}
|
|
return;
|
|
}
|
|
if (!isFeishuWebhookPayload(bodyResult.value)) {
|
|
respondText(res, 400, "Invalid JSON");
|
|
return;
|
|
}
|
|
|
|
// Lark's default adapter drops invalid signatures as an empty 200. Reject here instead.
|
|
if (
|
|
!isFeishuWebhookSignatureValid({
|
|
headers: req.headers,
|
|
payload: bodyResult.value,
|
|
encryptKey: account.encryptKey,
|
|
})
|
|
) {
|
|
respondText(res, 401, "Invalid signature");
|
|
return;
|
|
}
|
|
|
|
const { isChallenge, challenge } = Lark.generateChallenge(bodyResult.value, {
|
|
encryptKey: account.encryptKey ?? "",
|
|
});
|
|
if (isChallenge) {
|
|
res.statusCode = 200;
|
|
res.setHeader("Content-Type", "application/json; charset=utf-8");
|
|
res.end(JSON.stringify(challenge));
|
|
return;
|
|
}
|
|
|
|
const value = await eventDispatcher.invoke(
|
|
buildFeishuWebhookEnvelope(req, bodyResult.value),
|
|
{ needCheck: false },
|
|
);
|
|
if (!res.headersSent) {
|
|
res.statusCode = 200;
|
|
res.setHeader("Content-Type", "application/json; charset=utf-8");
|
|
res.end(JSON.stringify(value));
|
|
}
|
|
} catch (err) {
|
|
if (!guard.isTripped()) {
|
|
error(`feishu[${accountId}]: webhook handler error: ${String(err)}`);
|
|
if (!res.headersSent) {
|
|
respondText(res, 500, "Internal Server Error");
|
|
}
|
|
}
|
|
} finally {
|
|
guard.dispose();
|
|
}
|
|
})();
|
|
});
|
|
|
|
httpServers.set(accountId, server);
|
|
|
|
return new Promise((resolve, reject) => {
|
|
const cleanup = () => {
|
|
server.close();
|
|
httpServers.delete(accountId);
|
|
botOpenIds.delete(accountId);
|
|
botNames.delete(accountId);
|
|
};
|
|
|
|
const handleAbort = () => {
|
|
log(`feishu[${accountId}]: abort signal received, stopping Webhook server`);
|
|
cleanup();
|
|
resolve();
|
|
};
|
|
|
|
if (abortSignal?.aborted) {
|
|
cleanup();
|
|
resolve();
|
|
return;
|
|
}
|
|
|
|
abortSignal?.addEventListener("abort", handleAbort, { once: true });
|
|
|
|
server.listen(port, host, () => {
|
|
log(`feishu[${accountId}]: Webhook server listening on ${host}:${port}`);
|
|
});
|
|
|
|
server.on("error", (err) => {
|
|
error(`feishu[${accountId}]: Webhook server error: ${err}`);
|
|
abortSignal?.removeEventListener("abort", handleAbort);
|
|
reject(err);
|
|
});
|
|
});
|
|
}
|