From 158576740dbb1d12ecead6c6853d51c857109984 Mon Sep 17 00:00:00 2001 From: puhui999 Date: Tue, 13 Jan 2026 15:29:29 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E3=80=90iot=E3=80=91=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=20WebSocket=20=E7=9A=84=20{@link=20IotDataRuleAction}?= =?UTF-8?q?=20=E5=AE=9E=E7=8E=B0=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../action/IotWebSocketDataRuleAction.java | 83 +++++++++ .../action/websocket/IotWebSocketClient.java | 175 ++++++++++++++++++ 2 files changed, 258 insertions(+) create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java new file mode 100644 index 0000000000..5e2750980c --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java @@ -0,0 +1,83 @@ +package cn.iocoder.yudao.module.iot.service.rule.data.action; + +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkWebSocketConfig; +import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum; +import cn.iocoder.yudao.module.iot.service.rule.data.action.websocket.IotWebSocketClient; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * WebSocket 的 {@link IotDataRuleAction} 实现类 + *

+ * 负责将设备消息发送到外部 WebSocket 服务器 + * 支持 ws:// 和 wss:// 协议,支持 JSON 和 TEXT 数据格式 + * 使用连接池管理 WebSocket 连接,提高性能和资源利用率 + * + * @author HUIHUI + */ +@Component +@Slf4j +public class IotWebSocketDataRuleAction extends + IotDataRuleCacheableAction { + + @Override + public Integer getType() { + return IotDataSinkTypeEnum.WEBSOCKET.getType(); + } + + @Override + protected IotWebSocketClient initProducer(IotDataSinkWebSocketConfig config) throws Exception { + // 1.1 参数校验 + if (config.getServerUrl() == null || config.getServerUrl().trim().isEmpty()) { + throw new IllegalArgumentException("WebSocket 服务器地址不能为空"); + } + if (!config.getServerUrl().startsWith("ws://") && !config.getServerUrl().startsWith("wss://")) { + throw new IllegalArgumentException("WebSocket 服务器地址必须以 ws:// 或 wss:// 开头"); + } + + // 2.1 创建 WebSocket 客户端 + IotWebSocketClient webSocketClient = new IotWebSocketClient( + config.getServerUrl(), + config.getConnectTimeoutMs(), + config.getSendTimeoutMs(), + config.getDataFormat() + ); + // 2.2 连接服务器 + webSocketClient.connect(); + log.info("[initProducer][WebSocket 客户端创建并连接成功,服务器: {},数据格式: {}]", + config.getServerUrl(), config.getDataFormat()); + return webSocketClient; + } + + @Override + protected void closeProducer(IotWebSocketClient producer) throws Exception { + if (producer != null) { + producer.close(); + } + } + + @Override + protected void execute(IotDeviceMessage message, IotDataSinkWebSocketConfig config) throws Exception { + try { + // 1.1 获取或创建 WebSocket 客户端 + IotWebSocketClient webSocketClient = getProducer(config); + // 1.2 检查连接状态,如果断开则重新连接 + if (!webSocketClient.isConnected()) { + log.warn("[execute][WebSocket 连接已断开,尝试重新连接,服务器: {}]", config.getServerUrl()); + webSocketClient.connect(); + } + + // 2.1 发送消息 + webSocketClient.sendMessage(message); + // 2.2 记录发送成功日志 + log.info("[execute][message({}) config({}) 发送成功,WebSocket 服务器: {}]", + message, config, config.getServerUrl()); + } catch (Exception e) { + log.error("[execute][message({}) config({}) 发送失败,WebSocket 服务器: {}]", + message, config, config.getServerUrl(), e); + throw e; + } + } + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java new file mode 100644 index 0000000000..15c3cd1ae3 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java @@ -0,0 +1,175 @@ +package cn.iocoder.yudao.module.iot.service.rule.data.action.websocket; + +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import lombok.extern.slf4j.Slf4j; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.WebSocket; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * IoT WebSocket 客户端 + *

+ * 负责与外部 WebSocket 服务器建立连接并发送设备消息 + * 支持 ws:// 和 wss:// 协议,支持 JSON 和 TEXT 数据格式 + * 基于 Java 11+ 内置的 java.net.http.WebSocket 实现 + * + * @author HUIHUI + */ +@Slf4j +public class IotWebSocketClient implements WebSocket.Listener { + + private final String serverUrl; + private final Integer connectTimeoutMs; + private final Integer sendTimeoutMs; + private final String dataFormat; + + private WebSocket webSocket; + private final AtomicBoolean connected = new AtomicBoolean(false); + private final StringBuilder messageBuffer = new StringBuilder(); + + public IotWebSocketClient(String serverUrl, Integer connectTimeoutMs, Integer sendTimeoutMs, String dataFormat) { + this.serverUrl = serverUrl; + this.connectTimeoutMs = connectTimeoutMs != null ? connectTimeoutMs : 5000; + this.sendTimeoutMs = sendTimeoutMs != null ? sendTimeoutMs : 10000; + this.dataFormat = dataFormat != null ? dataFormat : "JSON"; + } + + /** + * 连接到 WebSocket 服务器 + */ + public void connect() throws Exception { + if (connected.get()) { + log.warn("[connect][WebSocket 客户端已经连接,无需重复连接]"); + return; + } + + try { + HttpClient httpClient = HttpClient.newBuilder() + .connectTimeout(Duration.ofMillis(connectTimeoutMs)) + .build(); + + CompletableFuture future = httpClient.newWebSocketBuilder() + .connectTimeout(Duration.ofMillis(connectTimeoutMs)) + .buildAsync(URI.create(serverUrl), this); + + // 等待连接完成 + webSocket = future.get(connectTimeoutMs, TimeUnit.MILLISECONDS); + connected.set(true); + log.info("[connect][WebSocket 客户端连接成功,服务器地址: {}]", serverUrl); + } catch (Exception e) { + close(); + log.error("[connect][WebSocket 客户端连接失败,服务器地址: {}]", serverUrl, e); + throw e; + } + } + + @Override + public void onOpen(WebSocket webSocket) { + log.debug("[onOpen][WebSocket 连接已打开]"); + webSocket.request(1); + } + + @Override + public CompletionStage onText(WebSocket webSocket, CharSequence data, boolean last) { + messageBuffer.append(data); + if (last) { + log.debug("[onText][收到 WebSocket 消息: {}]", messageBuffer); + messageBuffer.setLength(0); + } + webSocket.request(1); + return null; + } + + @Override + public CompletionStage onClose(WebSocket webSocket, int statusCode, String reason) { + connected.set(false); + log.info("[onClose][WebSocket 连接已关闭,状态码: {},原因: {}]", statusCode, reason); + return null; + } + + @Override + public void onError(WebSocket webSocket, Throwable error) { + connected.set(false); + log.error("[onError][WebSocket 发生错误]", error); + } + + /** + * 发送设备消息 + * + * @param message 设备消息 + * @throws Exception 发送异常 + */ + public void sendMessage(IotDeviceMessage message) throws Exception { + if (!connected.get() || webSocket == null) { + throw new IllegalStateException("WebSocket 客户端未连接"); + } + + try { + String messageData; + if ("JSON".equalsIgnoreCase(dataFormat)) { + messageData = JsonUtils.toJsonString(message); + } else { + messageData = message.toString(); + } + + // 发送消息并等待完成 + CompletableFuture future = webSocket.sendText(messageData, true); + future.get(sendTimeoutMs, TimeUnit.MILLISECONDS); + log.debug("[sendMessage][发送消息成功,设备 ID: {},消息长度: {}]", + message.getDeviceId(), messageData.length()); + } catch (Exception e) { + log.error("[sendMessage][发送消息失败,设备 ID: {}]", message.getDeviceId(), e); + throw e; + } + } + + /** + * 关闭连接 + */ + public void close() { + if (!connected.get() && webSocket == null) { + return; + } + + try { + if (webSocket != null) { + webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "客户端主动关闭") + .orTimeout(5, TimeUnit.SECONDS) + .exceptionally(e -> { + log.warn("[close][发送关闭帧失败]", e); + return null; + }); + } + connected.set(false); + log.info("[close][WebSocket 客户端连接已关闭,服务器地址: {}]", serverUrl); + } catch (Exception e) { + log.error("[close][关闭 WebSocket 客户端连接异常]", e); + } + } + + /** + * 检查连接状态 + * + * @return 是否已连接 + */ + public boolean isConnected() { + return connected.get() && webSocket != null; + } + + @Override + public String toString() { + return "IotWebSocketClient{" + + "serverUrl='" + serverUrl + '\'' + + ", dataFormat='" + dataFormat + '\'' + + ", connected=" + connected.get() + + '}'; + } + +}