From 158576740dbb1d12ecead6c6853d51c857109984 Mon Sep 17 00:00:00 2001 From: puhui999 Date: Tue, 13 Jan 2026 15:29:29 +0800 Subject: [PATCH 1/7] =?UTF-8?q?feat=EF=BC=9A=E3=80=90iot=E3=80=91=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=20WebSocket=20=E7=9A=84=20{@link=20IotDataRuleAction}?= =?UTF-8?q?=20=E5=AE=9E=E7=8E=B0=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../action/IotWebSocketDataRuleAction.java | 83 +++++++++ .../action/websocket/IotWebSocketClient.java | 175 ++++++++++++++++++ 2 files changed, 258 insertions(+) create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java new file mode 100644 index 0000000000..5e2750980c --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java @@ -0,0 +1,83 @@ +package cn.iocoder.yudao.module.iot.service.rule.data.action; + +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkWebSocketConfig; +import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum; +import cn.iocoder.yudao.module.iot.service.rule.data.action.websocket.IotWebSocketClient; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * WebSocket 的 {@link IotDataRuleAction} 实现类 + *

+ * 负责将设备消息发送到外部 WebSocket 服务器 + * 支持 ws:// 和 wss:// 协议,支持 JSON 和 TEXT 数据格式 + * 使用连接池管理 WebSocket 连接,提高性能和资源利用率 + * + * @author HUIHUI + */ +@Component +@Slf4j +public class IotWebSocketDataRuleAction extends + IotDataRuleCacheableAction { + + @Override + public Integer getType() { + return IotDataSinkTypeEnum.WEBSOCKET.getType(); + } + + @Override + protected IotWebSocketClient initProducer(IotDataSinkWebSocketConfig config) throws Exception { + // 1.1 参数校验 + if (config.getServerUrl() == null || config.getServerUrl().trim().isEmpty()) { + throw new IllegalArgumentException("WebSocket 服务器地址不能为空"); + } + if (!config.getServerUrl().startsWith("ws://") && !config.getServerUrl().startsWith("wss://")) { + throw new IllegalArgumentException("WebSocket 服务器地址必须以 ws:// 或 wss:// 开头"); + } + + // 2.1 创建 WebSocket 客户端 + IotWebSocketClient webSocketClient = new IotWebSocketClient( + config.getServerUrl(), + config.getConnectTimeoutMs(), + config.getSendTimeoutMs(), + config.getDataFormat() + ); + // 2.2 连接服务器 + webSocketClient.connect(); + log.info("[initProducer][WebSocket 客户端创建并连接成功,服务器: {},数据格式: {}]", + config.getServerUrl(), config.getDataFormat()); + return webSocketClient; + } + + @Override + protected void closeProducer(IotWebSocketClient producer) throws Exception { + if (producer != null) { + producer.close(); + } + } + + @Override + protected void execute(IotDeviceMessage message, IotDataSinkWebSocketConfig config) throws Exception { + try { + // 1.1 获取或创建 WebSocket 客户端 + IotWebSocketClient webSocketClient = getProducer(config); + // 1.2 检查连接状态,如果断开则重新连接 + if (!webSocketClient.isConnected()) { + log.warn("[execute][WebSocket 连接已断开,尝试重新连接,服务器: {}]", config.getServerUrl()); + webSocketClient.connect(); + } + + // 2.1 发送消息 + webSocketClient.sendMessage(message); + // 2.2 记录发送成功日志 + log.info("[execute][message({}) config({}) 发送成功,WebSocket 服务器: {}]", + message, config, config.getServerUrl()); + } catch (Exception e) { + log.error("[execute][message({}) config({}) 发送失败,WebSocket 服务器: {}]", + message, config, config.getServerUrl(), e); + throw e; + } + } + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java new file mode 100644 index 0000000000..15c3cd1ae3 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java @@ -0,0 +1,175 @@ +package cn.iocoder.yudao.module.iot.service.rule.data.action.websocket; + +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import lombok.extern.slf4j.Slf4j; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.WebSocket; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * IoT WebSocket 客户端 + *

+ * 负责与外部 WebSocket 服务器建立连接并发送设备消息 + * 支持 ws:// 和 wss:// 协议,支持 JSON 和 TEXT 数据格式 + * 基于 Java 11+ 内置的 java.net.http.WebSocket 实现 + * + * @author HUIHUI + */ +@Slf4j +public class IotWebSocketClient implements WebSocket.Listener { + + private final String serverUrl; + private final Integer connectTimeoutMs; + private final Integer sendTimeoutMs; + private final String dataFormat; + + private WebSocket webSocket; + private final AtomicBoolean connected = new AtomicBoolean(false); + private final StringBuilder messageBuffer = new StringBuilder(); + + public IotWebSocketClient(String serverUrl, Integer connectTimeoutMs, Integer sendTimeoutMs, String dataFormat) { + this.serverUrl = serverUrl; + this.connectTimeoutMs = connectTimeoutMs != null ? connectTimeoutMs : 5000; + this.sendTimeoutMs = sendTimeoutMs != null ? sendTimeoutMs : 10000; + this.dataFormat = dataFormat != null ? dataFormat : "JSON"; + } + + /** + * 连接到 WebSocket 服务器 + */ + public void connect() throws Exception { + if (connected.get()) { + log.warn("[connect][WebSocket 客户端已经连接,无需重复连接]"); + return; + } + + try { + HttpClient httpClient = HttpClient.newBuilder() + .connectTimeout(Duration.ofMillis(connectTimeoutMs)) + .build(); + + CompletableFuture future = httpClient.newWebSocketBuilder() + .connectTimeout(Duration.ofMillis(connectTimeoutMs)) + .buildAsync(URI.create(serverUrl), this); + + // 等待连接完成 + webSocket = future.get(connectTimeoutMs, TimeUnit.MILLISECONDS); + connected.set(true); + log.info("[connect][WebSocket 客户端连接成功,服务器地址: {}]", serverUrl); + } catch (Exception e) { + close(); + log.error("[connect][WebSocket 客户端连接失败,服务器地址: {}]", serverUrl, e); + throw e; + } + } + + @Override + public void onOpen(WebSocket webSocket) { + log.debug("[onOpen][WebSocket 连接已打开]"); + webSocket.request(1); + } + + @Override + public CompletionStage onText(WebSocket webSocket, CharSequence data, boolean last) { + messageBuffer.append(data); + if (last) { + log.debug("[onText][收到 WebSocket 消息: {}]", messageBuffer); + messageBuffer.setLength(0); + } + webSocket.request(1); + return null; + } + + @Override + public CompletionStage onClose(WebSocket webSocket, int statusCode, String reason) { + connected.set(false); + log.info("[onClose][WebSocket 连接已关闭,状态码: {},原因: {}]", statusCode, reason); + return null; + } + + @Override + public void onError(WebSocket webSocket, Throwable error) { + connected.set(false); + log.error("[onError][WebSocket 发生错误]", error); + } + + /** + * 发送设备消息 + * + * @param message 设备消息 + * @throws Exception 发送异常 + */ + public void sendMessage(IotDeviceMessage message) throws Exception { + if (!connected.get() || webSocket == null) { + throw new IllegalStateException("WebSocket 客户端未连接"); + } + + try { + String messageData; + if ("JSON".equalsIgnoreCase(dataFormat)) { + messageData = JsonUtils.toJsonString(message); + } else { + messageData = message.toString(); + } + + // 发送消息并等待完成 + CompletableFuture future = webSocket.sendText(messageData, true); + future.get(sendTimeoutMs, TimeUnit.MILLISECONDS); + log.debug("[sendMessage][发送消息成功,设备 ID: {},消息长度: {}]", + message.getDeviceId(), messageData.length()); + } catch (Exception e) { + log.error("[sendMessage][发送消息失败,设备 ID: {}]", message.getDeviceId(), e); + throw e; + } + } + + /** + * 关闭连接 + */ + public void close() { + if (!connected.get() && webSocket == null) { + return; + } + + try { + if (webSocket != null) { + webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "客户端主动关闭") + .orTimeout(5, TimeUnit.SECONDS) + .exceptionally(e -> { + log.warn("[close][发送关闭帧失败]", e); + return null; + }); + } + connected.set(false); + log.info("[close][WebSocket 客户端连接已关闭,服务器地址: {}]", serverUrl); + } catch (Exception e) { + log.error("[close][关闭 WebSocket 客户端连接异常]", e); + } + } + + /** + * 检查连接状态 + * + * @return 是否已连接 + */ + public boolean isConnected() { + return connected.get() && webSocket != null; + } + + @Override + public String toString() { + return "IotWebSocketClient{" + + "serverUrl='" + serverUrl + '\'' + + ", dataFormat='" + dataFormat + '\'' + + ", connected=" + connected.get() + + '}'; + } + +} From 60817a6a5ba8aaa60ba63c280461dbd846cbe127 Mon Sep 17 00:00:00 2001 From: puhui999 Date: Tue, 13 Jan 2026 15:38:33 +0800 Subject: [PATCH 2/7] =?UTF-8?q?perf=EF=BC=9A=E3=80=90iot=E3=80=91IotDataSi?= =?UTF-8?q?nkTcpConfig=20=E5=B8=B8=E9=87=8F=E6=9E=9A=E4=B8=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rule/config/IotDataSinkTcpConfig.java | 43 ++++++++++++++++--- .../rule/data/action/tcp/IotTcpClient.java | 13 +++--- 2 files changed, 42 insertions(+), 14 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/config/IotDataSinkTcpConfig.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/config/IotDataSinkTcpConfig.java index 3d96f11ceb..513a987f2f 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/config/IotDataSinkTcpConfig.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/config/IotDataSinkTcpConfig.java @@ -10,6 +10,35 @@ import lombok.Data; @Data public class IotDataSinkTcpConfig extends IotAbstractDataSinkConfig { + /** + * 默认连接超时时间(毫秒) + */ + public static final int DEFAULT_CONNECT_TIMEOUT_MS = 5000; + /** + * 默认读取超时时间(毫秒) + */ + public static final int DEFAULT_READ_TIMEOUT_MS = 10000; + /** + * 默认是否启用 SSL + */ + public static final boolean DEFAULT_SSL = false; + /** + * 默认数据格式 + */ + public static final String DEFAULT_DATA_FORMAT = "JSON"; + /** + * 默认心跳间隔时间(毫秒) + */ + public static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 30000L; + /** + * 默认重连间隔时间(毫秒) + */ + public static final long DEFAULT_RECONNECT_INTERVAL_MS = 5000L; + /** + * 默认最大重连次数 + */ + public static final int DEFAULT_MAX_RECONNECT_ATTEMPTS = 3; + /** * TCP 服务器地址 */ @@ -23,17 +52,17 @@ public class IotDataSinkTcpConfig extends IotAbstractDataSinkConfig { /** * 连接超时时间(毫秒) */ - private Integer connectTimeoutMs = 5000; + private Integer connectTimeoutMs = DEFAULT_CONNECT_TIMEOUT_MS; /** * 读取超时时间(毫秒) */ - private Integer readTimeoutMs = 10000; + private Integer readTimeoutMs = DEFAULT_READ_TIMEOUT_MS; /** * 是否启用 SSL */ - private Boolean ssl = false; + private Boolean ssl = DEFAULT_SSL; /** * SSL 证书路径(当 ssl=true 时需要) @@ -43,21 +72,21 @@ public class IotDataSinkTcpConfig extends IotAbstractDataSinkConfig { /** * 数据格式:JSON 或 BINARY */ - private String dataFormat = "JSON"; + private String dataFormat = DEFAULT_DATA_FORMAT; /** * 心跳间隔时间(毫秒),0 表示不启用心跳 */ - private Long heartbeatIntervalMs = 30000L; + private Long heartbeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL_MS; /** * 重连间隔时间(毫秒) */ - private Long reconnectIntervalMs = 5000L; + private Long reconnectIntervalMs = DEFAULT_RECONNECT_INTERVAL_MS; /** * 最大重连次数 */ - private Integer maxReconnectAttempts = 3; + private Integer maxReconnectAttempts = DEFAULT_MAX_RECONNECT_ATTEMPTS; } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClient.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClient.java index 1618532a4a..b417dca5a2 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClient.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClient.java @@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.iot.service.rule.data.action.tcp; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkTcpConfig; import lombok.extern.slf4j.Slf4j; import javax.net.ssl.SSLSocketFactory; @@ -38,16 +39,15 @@ public class IotTcpClient { private BufferedReader reader; private final AtomicBoolean connected = new AtomicBoolean(false); - // TODO @puhui999:default 值,IotDataSinkTcpConfig.java 枚举起来哈; public IotTcpClient(String host, Integer port, Integer connectTimeoutMs, Integer readTimeoutMs, Boolean ssl, String sslCertPath, String dataFormat) { this.host = host; this.port = port; - this.connectTimeoutMs = connectTimeoutMs != null ? connectTimeoutMs : 5000; - this.readTimeoutMs = readTimeoutMs != null ? readTimeoutMs : 10000; - this.ssl = ssl != null ? ssl : false; + this.connectTimeoutMs = connectTimeoutMs != null ? connectTimeoutMs : IotDataSinkTcpConfig.DEFAULT_CONNECT_TIMEOUT_MS; + this.readTimeoutMs = readTimeoutMs != null ? readTimeoutMs : IotDataSinkTcpConfig.DEFAULT_READ_TIMEOUT_MS; + this.ssl = ssl != null ? ssl : IotDataSinkTcpConfig.DEFAULT_SSL; this.sslCertPath = sslCertPath; - this.dataFormat = dataFormat != null ? dataFormat : "JSON"; + this.dataFormat = dataFormat != null ? dataFormat : IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT; } /** @@ -99,9 +99,8 @@ public class IotTcpClient { } try { - // TODO @puhui999:枚举值 String messageData; - if ("JSON".equalsIgnoreCase(dataFormat)) { + if (IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT.equalsIgnoreCase(dataFormat)) { // JSON 格式 messageData = JsonUtils.toJsonString(message); } else { From 9fbced1192a453d6557085b3959acead2ee9c71c Mon Sep 17 00:00:00 2001 From: puhui999 Date: Tue, 13 Jan 2026 15:42:06 +0800 Subject: [PATCH 3/7] =?UTF-8?q?perf=EF=BC=9A=E3=80=90iot=E3=80=91IotDataSi?= =?UTF-8?q?nkWebSocketConfig=20=E5=B8=B8=E9=87=8F=E6=9E=9A=E4=B8=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/IotDataSinkWebSocketConfig.java | 67 ++++++++++++++++--- .../iot/enums/rule/IotDataSinkTypeEnum.java | 4 +- .../data/action/IotTcpDataRuleAction.java | 5 -- .../action/websocket/IotWebSocketClient.java | 9 +-- 4 files changed, 63 insertions(+), 22 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/config/IotDataSinkWebSocketConfig.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/config/IotDataSinkWebSocketConfig.java index f1b7e86d86..55514da7c8 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/config/IotDataSinkWebSocketConfig.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/config/IotDataSinkWebSocketConfig.java @@ -13,6 +13,51 @@ import lombok.Data; @Data public class IotDataSinkWebSocketConfig extends IotAbstractDataSinkConfig { + /** + * 默认连接超时时间(毫秒) + */ + public static final int DEFAULT_CONNECT_TIMEOUT_MS = 5000; + /** + * 默认发送超时时间(毫秒) + */ + public static final int DEFAULT_SEND_TIMEOUT_MS = 10000; + /** + * 默认心跳间隔时间(毫秒) + */ + public static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 30000L; + /** + * 默认心跳消息内容 + */ + public static final String DEFAULT_HEARTBEAT_MESSAGE = "{\"type\":\"heartbeat\"}"; + /** + * 默认是否启用 SSL 证书验证 + */ + public static final boolean DEFAULT_VERIFY_SSL_CERT = true; + /** + * 默认数据格式 + */ + public static final String DEFAULT_DATA_FORMAT = "JSON"; + /** + * 默认重连间隔时间(毫秒) + */ + public static final long DEFAULT_RECONNECT_INTERVAL_MS = 5000L; + /** + * 默认最大重连次数 + */ + public static final int DEFAULT_MAX_RECONNECT_ATTEMPTS = 3; + /** + * 默认是否启用压缩 + */ + public static final boolean DEFAULT_ENABLE_COMPRESSION = false; + /** + * 默认消息发送重试次数 + */ + public static final int DEFAULT_SEND_RETRY_COUNT = 1; + /** + * 默认消息发送重试间隔(毫秒) + */ + public static final long DEFAULT_SEND_RETRY_INTERVAL_MS = 1000L; + /** * WebSocket 服务器地址 * 例如:ws://localhost:8080/ws 或 wss://example.com/ws @@ -22,22 +67,22 @@ public class IotDataSinkWebSocketConfig extends IotAbstractDataSinkConfig { /** * 连接超时时间(毫秒) */ - private Integer connectTimeoutMs = 5000; + private Integer connectTimeoutMs = DEFAULT_CONNECT_TIMEOUT_MS; /** * 发送超时时间(毫秒) */ - private Integer sendTimeoutMs = 10000; + private Integer sendTimeoutMs = DEFAULT_SEND_TIMEOUT_MS; /** * 心跳间隔时间(毫秒),0 表示不启用心跳 */ - private Long heartbeatIntervalMs = 30000L; + private Long heartbeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL_MS; /** * 心跳消息内容(JSON 格式) */ - private String heartbeatMessage = "{\"type\":\"heartbeat\"}"; + private String heartbeatMessage = DEFAULT_HEARTBEAT_MESSAGE; /** * 子协议列表(逗号分隔) @@ -52,36 +97,36 @@ public class IotDataSinkWebSocketConfig extends IotAbstractDataSinkConfig { /** * 是否启用 SSL 证书验证(仅对 wss:// 生效) */ - private Boolean verifySslCert = true; + private Boolean verifySslCert = DEFAULT_VERIFY_SSL_CERT; /** * 数据格式:JSON 或 TEXT */ - private String dataFormat = "JSON"; + private String dataFormat = DEFAULT_DATA_FORMAT; /** * 重连间隔时间(毫秒) */ - private Long reconnectIntervalMs = 5000L; + private Long reconnectIntervalMs = DEFAULT_RECONNECT_INTERVAL_MS; /** * 最大重连次数 */ - private Integer maxReconnectAttempts = 3; + private Integer maxReconnectAttempts = DEFAULT_MAX_RECONNECT_ATTEMPTS; /** * 是否启用压缩 */ - private Boolean enableCompression = false; + private Boolean enableCompression = DEFAULT_ENABLE_COMPRESSION; /** * 消息发送重试次数 */ - private Integer sendRetryCount = 1; + private Integer sendRetryCount = DEFAULT_SEND_RETRY_COUNT; /** * 消息发送重试间隔(毫秒) */ - private Long sendRetryIntervalMs = 1000L; + private Long sendRetryIntervalMs = DEFAULT_SEND_RETRY_INTERVAL_MS; } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataSinkTypeEnum.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataSinkTypeEnum.java index 45a557db61..440fab5f53 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataSinkTypeEnum.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataSinkTypeEnum.java @@ -16,8 +16,8 @@ import java.util.Arrays; public enum IotDataSinkTypeEnum implements ArrayValuable { HTTP(1, "HTTP"), - TCP(2, "TCP"), // TODO @puhui999:待实现; - WEBSOCKET(3, "WebSocket"), // TODO @puhui999:待实现; + TCP(2, "TCP"), + WEBSOCKET(3, "WebSocket"), MQTT(10, "MQTT"), // TODO 待实现; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotTcpDataRuleAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotTcpDataRuleAction.java index 4db6dc205a..53a3b71480 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotTcpDataRuleAction.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotTcpDataRuleAction.java @@ -7,8 +7,6 @@ import cn.iocoder.yudao.module.iot.service.rule.data.action.tcp.IotTcpClient; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import java.time.Duration; - /** * TCP 的 {@link IotDataRuleAction} 实现类 *

@@ -23,9 +21,6 @@ import java.time.Duration; public class IotTcpDataRuleAction extends IotDataRuleCacheableAction { - private static final Duration CONNECT_TIMEOUT = Duration.ofSeconds(5); - private static final Duration SEND_TIMEOUT = Duration.ofSeconds(10); - @Override public Integer getType() { return IotDataSinkTypeEnum.TCP.getType(); diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java index 15c3cd1ae3..bed197657f 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java @@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.iot.service.rule.data.action.websocket; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkWebSocketConfig; import lombok.extern.slf4j.Slf4j; import java.net.URI; @@ -36,9 +37,9 @@ public class IotWebSocketClient implements WebSocket.Listener { public IotWebSocketClient(String serverUrl, Integer connectTimeoutMs, Integer sendTimeoutMs, String dataFormat) { this.serverUrl = serverUrl; - this.connectTimeoutMs = connectTimeoutMs != null ? connectTimeoutMs : 5000; - this.sendTimeoutMs = sendTimeoutMs != null ? sendTimeoutMs : 10000; - this.dataFormat = dataFormat != null ? dataFormat : "JSON"; + this.connectTimeoutMs = connectTimeoutMs != null ? connectTimeoutMs : IotDataSinkWebSocketConfig.DEFAULT_CONNECT_TIMEOUT_MS; + this.sendTimeoutMs = sendTimeoutMs != null ? sendTimeoutMs : IotDataSinkWebSocketConfig.DEFAULT_SEND_TIMEOUT_MS; + this.dataFormat = dataFormat != null ? dataFormat : IotDataSinkWebSocketConfig.DEFAULT_DATA_FORMAT; } /** @@ -113,7 +114,7 @@ public class IotWebSocketClient implements WebSocket.Listener { try { String messageData; - if ("JSON".equalsIgnoreCase(dataFormat)) { + if (IotDataSinkWebSocketConfig.DEFAULT_DATA_FORMAT.equalsIgnoreCase(dataFormat)) { messageData = JsonUtils.toJsonString(message); } else { messageData = message.toString(); From 908f95875df88a0e5f4e41e55c0549d760d15d62 Mon Sep 17 00:00:00 2001 From: puhui999 Date: Tue, 13 Jan 2026 15:49:19 +0800 Subject: [PATCH 4/7] =?UTF-8?q?fix=EF=BC=9A=E3=80=90iot=E3=80=91saveDevice?= =?UTF-8?q?Property=20=E4=B8=AD=EF=BC=8C=E7=B1=BB=E5=9E=8B=E8=A6=81?= =?UTF-8?q?=E8=BD=AC=E6=8D=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../device/property/IotDevicePropertyServiceImpl.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/property/IotDevicePropertyServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/property/IotDevicePropertyServiceImpl.java index 8031c2a11a..4e1be3a0ca 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/property/IotDevicePropertyServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/property/IotDevicePropertyServiceImpl.java @@ -1,6 +1,7 @@ package cn.iocoder.yudao.module.iot.service.device.property; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.convert.Convert; import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.StrUtil; @@ -145,6 +146,12 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService { IotDataSpecsDataTypeEnum.STRUCT.getDataType(), IotDataSpecsDataTypeEnum.ARRAY.getDataType())) { // 特殊:STRUCT 和 ARRAY 类型,在 TDengine 里,有没对应数据类型,只能通过 JSON 来存储 properties.put((String) key, JsonUtils.toJsonString(value)); + } else if (IotDataSpecsDataTypeEnum.DOUBLE.getDataType().equals(thingModel.getProperty().getDataType())) { + properties.put((String) key, Convert.toDouble(value)); + } else if (IotDataSpecsDataTypeEnum.FLOAT.getDataType().equals(thingModel.getProperty().getDataType())) { + properties.put((String) key, Convert.toFloat(value)); + } else if (IotDataSpecsDataTypeEnum.BOOL.getDataType().equals(thingModel.getProperty().getDataType())) { + properties.put((String) key, Convert.toByte(value)); } else { properties.put((String) key, value); } From a7f655e1e7ba87bca575246fc543dd20247b0123 Mon Sep 17 00:00:00 2001 From: puhui999 Date: Tue, 13 Jan 2026 16:04:08 +0800 Subject: [PATCH 5/7] =?UTF-8?q?fix=EF=BC=9A=E3=80=90iot=E3=80=91=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=E5=BD=93=E5=A4=9A=E4=B8=AA=E5=8D=8F=E8=AE=AE=E5=90=8C?= =?UTF-8?q?=E6=97=B6=E5=90=AF=E7=94=A8=E6=97=B6=EF=BC=8C=E5=87=BA=E7=8E=B0?= =?UTF-8?q?=20Bean=20=E5=86=B2=E7=AA=81=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/IotGatewayConfiguration.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java index fab4c8cc85..3e573efdde 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java @@ -21,6 +21,7 @@ import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.core.Vertx; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; @@ -59,20 +60,20 @@ public class IotGatewayConfiguration { @Slf4j public static class EmqxProtocolConfiguration { - @Bean(destroyMethod = "close") + @Bean(name = "emqxVertx", destroyMethod = "close") public Vertx emqxVertx() { return Vertx.vertx(); } @Bean public IotEmqxAuthEventProtocol iotEmqxAuthEventProtocol(IotGatewayProperties gatewayProperties, - Vertx emqxVertx) { + @Qualifier("emqxVertx") Vertx emqxVertx) { return new IotEmqxAuthEventProtocol(gatewayProperties.getProtocol().getEmqx(), emqxVertx); } @Bean public IotEmqxUpstreamProtocol iotEmqxUpstreamProtocol(IotGatewayProperties gatewayProperties, - Vertx emqxVertx) { + @Qualifier("emqxVertx") Vertx emqxVertx) { return new IotEmqxUpstreamProtocol(gatewayProperties.getProtocol().getEmqx(), emqxVertx); } @@ -91,7 +92,7 @@ public class IotGatewayConfiguration { @Slf4j public static class TcpProtocolConfiguration { - @Bean(destroyMethod = "close") + @Bean(name = "tcpVertx", destroyMethod = "close") public Vertx tcpVertx() { return Vertx.vertx(); } @@ -101,7 +102,7 @@ public class IotGatewayConfiguration { IotDeviceService deviceService, IotDeviceMessageService messageService, IotTcpConnectionManager connectionManager, - Vertx tcpVertx) { + @Qualifier("tcpVertx") Vertx tcpVertx) { return new IotTcpUpstreamProtocol(gatewayProperties.getProtocol().getTcp(), deviceService, messageService, connectionManager, tcpVertx); } @@ -126,7 +127,7 @@ public class IotGatewayConfiguration { @Slf4j public static class MqttProtocolConfiguration { - @Bean(destroyMethod = "close") + @Bean(name = "mqttVertx", destroyMethod = "close") public Vertx mqttVertx() { return Vertx.vertx(); } @@ -135,7 +136,7 @@ public class IotGatewayConfiguration { public IotMqttUpstreamProtocol iotMqttUpstreamProtocol(IotGatewayProperties gatewayProperties, IotDeviceMessageService messageService, IotMqttConnectionManager connectionManager, - Vertx mqttVertx) { + @Qualifier("mqttVertx") Vertx mqttVertx) { return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getMqtt(), messageService, connectionManager, mqttVertx); } @@ -163,7 +164,7 @@ public class IotGatewayConfiguration { @Slf4j public static class MqttWsProtocolConfiguration { - @Bean(destroyMethod = "close") + @Bean(name = "mqttWsVertx", destroyMethod = "close") public Vertx mqttWsVertx() { return Vertx.vertx(); } @@ -172,7 +173,7 @@ public class IotGatewayConfiguration { public IotMqttWsUpstreamProtocol iotMqttWsUpstreamProtocol(IotGatewayProperties gatewayProperties, IotDeviceMessageService messageService, IotMqttWsConnectionManager connectionManager, - Vertx mqttWsVertx) { + @Qualifier("mqttWsVertx") Vertx mqttWsVertx) { return new IotMqttWsUpstreamProtocol(gatewayProperties.getProtocol().getMqttWs(), messageService, connectionManager, mqttWsVertx); } From a42202e7eb9990e62a0e4d24e4b76c68134cc2b6 Mon Sep 17 00:00:00 2001 From: puhui999 Date: Tue, 13 Jan 2026 16:31:43 +0800 Subject: [PATCH 6/7] =?UTF-8?q?fix=EF=BC=9A=E3=80=90iot=E3=80=91=E5=B1=9E?= =?UTF-8?q?=E6=80=A7=E4=B8=8A=E6=8A=A5=E5=8F=AF=E8=83=BD=E5=90=8C=E6=97=B6?= =?UTF-8?q?=E4=B8=8A=E6=8A=A5=E5=A4=9A=E4=B8=AA=E5=B1=9E=E6=80=A7=EF=BC=8C?= =?UTF-8?q?=E6=89=80=E4=BB=A5=E9=9C=80=E8=A6=81=E5=88=A4=E6=96=AD=20trigge?= =?UTF-8?q?r.getIdentifier()=20=E6=98=AF=E5=90=A6=E5=9C=A8=20message=20?= =?UTF-8?q?=E7=9A=84=20params=20=E4=B8=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../IotDevicePropertyPostTriggerMatcher.java | 10 ++-- .../iot/core/util/IotDeviceMessageUtils.java | 50 +++++++++++++++++++ 2 files changed, 55 insertions(+), 5 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDevicePropertyPostTriggerMatcher.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDevicePropertyPostTriggerMatcher.java index 27cb02a1a5..f5d461275b 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDevicePropertyPostTriggerMatcher.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDevicePropertyPostTriggerMatcher.java @@ -36,11 +36,11 @@ public class IotDevicePropertyPostTriggerMatcher implements IotSceneRuleTriggerM return false; } - // 1.3 检查标识符是否匹配 - String messageIdentifier = IotDeviceMessageUtils.getIdentifier(message); - if (!IotSceneRuleMatcherHelper.isIdentifierMatched(trigger.getIdentifier(), messageIdentifier)) { - IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "标识符不匹配,期望: " + - trigger.getIdentifier() + ", 实际: " + messageIdentifier); + // 1.3 检查消息中是否包含触发器指定的属性标识符 + // 注意:属性上报可能同时上报多个属性,所以需要判断 trigger.getIdentifier() 是否在 message 的 params 中 + if (!IotDeviceMessageUtils.containsIdentifier(message, trigger.getIdentifier())) { + IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "消息中不包含属性: " + + trigger.getIdentifier()); return false; } diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java index 65165425c8..5c1ac26005 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java @@ -5,6 +5,7 @@ import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.system.SystemUtil; +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; @@ -69,6 +70,55 @@ public class IotDeviceMessageUtils { return null; } + /** + * 判断消息中是否包含指定的标识符 + * + * 对于不同消息类型的处理: + * - EVENT_POST/SERVICE_INVOKE:检查 params.identifier 是否匹配 + * - STATE_UPDATE:检查 params.state 是否匹配 + * - PROPERTY_POST:检查 params 中是否包含该属性 key + * + * @param message 消息 + * @param identifier 要检查的标识符 + * @return 是否包含 + */ + public static boolean containsIdentifier(IotDeviceMessage message, String identifier) { + if (message.getParams() == null || StrUtil.isBlank(identifier)) { + return false; + } + // EVENT_POST / SERVICE_INVOKE / STATE_UPDATE:使用原有逻辑 + String messageIdentifier = getIdentifier(message); + if (messageIdentifier != null) { + return identifier.equals(messageIdentifier); + } + // PROPERTY_POST:检查 params 中是否包含该属性 key + if (StrUtil.equals(message.getMethod(), IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod())) { + Map params = parseParamsToMap(message.getParams()); + return params != null && params.containsKey(identifier); + } + return false; + } + + /** + * 将 params 解析为 Map + * + * @param params 参数(可能是 Map 或 JSON 字符串) + * @return Map,解析失败返回 null + */ + @SuppressWarnings("unchecked") + private static Map parseParamsToMap(Object params) { + if (params instanceof Map) { + return (Map) params; + } + if (params instanceof String) { + try { + return JsonUtils.parseObject((String) params, Map.class); + } catch (Exception ignored) { + } + } + return null; + } + /** * 从设备消息中提取指定标识符的属性值 * - 支持多种消息格式和属性值提取策略 From 9febc2b0b087bbd97f1e9dafdd7b0ad7f2f6c391 Mon Sep 17 00:00:00 2001 From: puhui999 Date: Tue, 13 Jan 2026 16:44:24 +0800 Subject: [PATCH 7/7] =?UTF-8?q?feat=EF=BC=9A=E3=80=90iot=E3=80=91executeSc?= =?UTF-8?q?eneRuleAction=20=E6=9B=B4=E6=96=B0=E8=A7=84=E5=88=99=E5=9C=BA?= =?UTF-8?q?=E6=99=AF=E7=9A=84=E6=9C=80=E5=90=8E=E8=A7=A6=E5=8F=91=E6=97=B6?= =?UTF-8?q?=E9=97=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dal/dataobject/rule/IotSceneRuleDO.java | 6 ++++++ .../rule/scene/IotSceneRuleServiceImpl.java | 20 +++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotSceneRuleDO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotSceneRuleDO.java index 94aa1eb5a3..ecf87db7a7 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotSceneRuleDO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotSceneRuleDO.java @@ -21,6 +21,7 @@ import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; +import java.time.LocalDateTime; import java.util.List; /** @@ -56,6 +57,11 @@ public class IotSceneRuleDO extends TenantBaseDO { */ private Integer status; + /** + * 最后触发时间 + */ + private LocalDateTime lastTriggerTime; + /** * 场景定义配置 */ diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/IotSceneRuleServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/IotSceneRuleServiceImpl.java index 41052289a6..eb70b30480 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/IotSceneRuleServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/IotSceneRuleServiceImpl.java @@ -30,6 +30,7 @@ import org.springframework.cache.annotation.Cacheable; import org.springframework.stereotype.Service; import org.springframework.validation.annotation.Validated; +import java.time.LocalDateTime; import java.util.Collection; import java.util.List; @@ -392,9 +393,28 @@ public class IotSceneRuleServiceImpl implements IotSceneRuleService { } }); }); + + // 3. 更新最后触发时间 + updateLastTriggerTime(sceneRule.getId()); }); } + /** + * 更新规则场景的最后触发时间 + * + * @param id 规则场景编号 + */ + private void updateLastTriggerTime(Long id) { + try { + IotSceneRuleDO updateObj = new IotSceneRuleDO() + .setId(id) + .setLastTriggerTime(LocalDateTime.now()); + sceneRuleMapper.updateById(updateObj); + } catch (Exception e) { + log.error("[updateLastTriggerTime][规则场景编号({}) 更新最后触发时间异常]", id, e); + } + } + private IotSceneRuleServiceImpl getSelf() { return SpringUtil.getBean(IotSceneRuleServiceImpl.class); }