From 44838510c950098e14a86fc47795cba72fe0c0ba Mon Sep 17 00:00:00 2001 From: puhui999 Date: Wed, 21 Jan 2026 18:12:28 +0800 Subject: [PATCH 1/6] =?UTF-8?q?feat=EF=BC=9A=E3=80=90iot=E3=80=91WebSocket?= =?UTF-8?q?=20=E8=BF=9E=E6=8E=A5=E7=BA=BF=E7=A8=8B=E5=AE=89=E5=85=A8?= =?UTF-8?q?=E4=B8=8E=20JDK=208=20=E5=85=BC=E5=AE=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- yudao-dependencies/pom.xml | 15 + yudao-module-iot/yudao-module-iot-biz/pom.xml | 11 + .../iot/dal/redis/RedisKeyConstants.java | 8 + .../redis/rule/IotWebSocketLockRedisDAO.java | 67 +++++ .../action/IotWebSocketDataRuleAction.java | 32 ++- .../action/websocket/IotWebSocketClient.java | 156 ++++++----- .../websocket/IotWebSocketClientTest.java | 257 ++++++++++++++++++ 7 files changed, 480 insertions(+), 66 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/rule/IotWebSocketLockRedisDAO.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClientTest.java diff --git a/yudao-dependencies/pom.xml b/yudao-dependencies/pom.xml index 0257eb3109..3ff1534cee 100644 --- a/yudao-dependencies/pom.xml +++ b/yudao-dependencies/pom.xml @@ -76,6 +76,8 @@ 2.3.0 4.7.9-20251224.161447 4.40.607.ALL + + 4.12.0 @@ -653,6 +655,19 @@ org.eclipse.paho.client.mqttv3 ${mqtt.version} + + + + com.squareup.okhttp3 + okhttp + ${okhttp.version} + + + com.squareup.okhttp3 + mockwebserver + ${okhttp.version} + test + diff --git a/yudao-module-iot/yudao-module-iot-biz/pom.xml b/yudao-module-iot/yudao-module-iot-biz/pom.xml index 1f83a7acb2..a0fe16de48 100644 --- a/yudao-module-iot/yudao-module-iot-biz/pom.xml +++ b/yudao-module-iot/yudao-module-iot-biz/pom.xml @@ -73,6 +73,17 @@ yudao-spring-boot-starter-excel + + + com.squareup.okhttp3 + okhttp + + + com.squareup.okhttp3 + mockwebserver + test + + diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/RedisKeyConstants.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/RedisKeyConstants.java index c8041a673c..95d210252f 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/RedisKeyConstants.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/RedisKeyConstants.java @@ -84,4 +84,12 @@ public interface RedisKeyConstants { */ String SCENE_RULE_LIST = "iot:scene_rule_list"; + /** + * WebSocket 连接分布式锁 + *

+ * KEY 格式:websocket_connect_lock:${serverUrl} + * 用于保证 WebSocket 重连操作的线程安全 + */ + String WEBSOCKET_CONNECT_LOCK = "iot:websocket_connect_lock:%s"; + } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/rule/IotWebSocketLockRedisDAO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/rule/IotWebSocketLockRedisDAO.java new file mode 100644 index 0000000000..d50dc548af --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/rule/IotWebSocketLockRedisDAO.java @@ -0,0 +1,67 @@ +package cn.iocoder.yudao.module.iot.dal.redis.rule; + +import jakarta.annotation.Resource; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; +import org.springframework.stereotype.Repository; + +import java.util.concurrent.TimeUnit; + +import static cn.iocoder.yudao.module.iot.dal.redis.RedisKeyConstants.WEBSOCKET_CONNECT_LOCK; + +/** + * IoT WebSocket 连接锁 Redis DAO + *

+ * 用于保证 WebSocket 重连操作的线程安全,避免多线程同时重连导致的资源竞争 + * + * @author HUIHUI + */ +@Repository +public class IotWebSocketLockRedisDAO { + + /** + * 锁等待超时时间(毫秒) + */ + private static final long LOCK_WAIT_TIME_MS = 5000; + + /** + * 锁持有超时时间(毫秒) + */ + private static final long LOCK_LEASE_TIME_MS = 10000; + + @Resource + private RedissonClient redissonClient; + + /** + * 在分布式锁保护下执行操作 + * + * @param serverUrl WebSocket 服务器地址 + * @param runnable 需要执行的操作 + * @throws Exception 如果获取锁超时或执行操作时发生异常 + */ + public void lock(String serverUrl, Runnable runnable) throws Exception { + String lockKey = formatKey(serverUrl); + RLock lock = redissonClient.getLock(lockKey); + + try { + // 尝试获取分布式锁 + boolean acquired = lock.tryLock(LOCK_WAIT_TIME_MS, LOCK_LEASE_TIME_MS, TimeUnit.MILLISECONDS); + if (!acquired) { + throw new RuntimeException("获取 WebSocket 连接锁超时,服务器: " + serverUrl); + } + + // 执行操作 + runnable.run(); + } finally { + // 释放锁 + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + } + + private static String formatKey(String serverUrl) { + return String.format(WEBSOCKET_CONNECT_LOCK, serverUrl); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java index c0445df906..651562987a 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java @@ -3,8 +3,10 @@ package cn.iocoder.yudao.module.iot.service.rule.data.action; import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkWebSocketConfig; +import cn.iocoder.yudao.module.iot.dal.redis.rule.IotWebSocketLockRedisDAO; import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum; import cn.iocoder.yudao.module.iot.service.rule.data.action.websocket.IotWebSocketClient; +import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -22,6 +24,9 @@ import org.springframework.stereotype.Component; public class IotWebSocketDataRuleAction extends IotDataRuleCacheableAction { + @Resource + private IotWebSocketLockRedisDAO webSocketLockRedisDAO; + @Override public Integer getType() { return IotDataSinkTypeEnum.WEBSOCKET.getType(); @@ -62,12 +67,11 @@ public class IotWebSocketDataRuleAction extends protected void execute(IotDeviceMessage message, IotDataSinkWebSocketConfig config) throws Exception { try { // 1.1 获取或创建 WebSocket 客户端 - // TODO @puhui999:需要加锁,保证必须连接上; IotWebSocketClient webSocketClient = getProducer(config); - // 1.2 检查连接状态,如果断开则重新连接 + + // 1.2 检查连接状态,如果断开则使用分布式锁保证重连的线程安全 if (!webSocketClient.isConnected()) { - log.warn("[execute][WebSocket 连接已断开,尝试重新连接,服务器: {}]", config.getServerUrl()); - webSocketClient.connect(); + reconnectWithLock(webSocketClient, config); } // 2.1 发送消息 @@ -82,4 +86,24 @@ public class IotWebSocketDataRuleAction extends } } + /** + * 使用分布式锁进行重连 + * + * @param webSocketClient WebSocket 客户端 + * @param config 配置信息 + */ + private void reconnectWithLock(IotWebSocketClient webSocketClient, IotDataSinkWebSocketConfig config) throws Exception { + webSocketLockRedisDAO.lock(config.getServerUrl(), () -> { + // 双重检查:获取锁后再次检查连接状态,避免重复连接 + if (!webSocketClient.isConnected()) { + log.warn("[reconnectWithLock][WebSocket 连接已断开,尝试重新连接,服务器: {}]", config.getServerUrl()); + try { + webSocketClient.connect(); + } catch (Exception e) { + throw new RuntimeException("WebSocket 重连失败,服务器: " + config.getServerUrl(), e); + } + } + }); + } + } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java index 2f55d6ee74..e898f61cb8 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java @@ -4,13 +4,9 @@ import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkWebSocketConfig; import lombok.extern.slf4j.Slf4j; +import okhttp3.*; -import java.net.URI; -import java.net.http.HttpClient; -import java.net.http.WebSocket; -import java.time.Duration; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -19,21 +15,23 @@ import java.util.concurrent.atomic.AtomicBoolean; *

* 负责与外部 WebSocket 服务器建立连接并发送设备消息 * 支持 ws:// 和 wss:// 协议,支持 JSON 和 TEXT 数据格式 - * 基于 Java 11+ 内置的 java.net.http.WebSocket 实现 + * 基于 OkHttp WebSocket 实现,兼容 JDK 8+ + *

+ * 注意:该类的线程安全由调用方(IotWebSocketDataRuleAction)通过分布式锁保证 * * @author HUIHUI */ @Slf4j -public class IotWebSocketClient implements WebSocket.Listener { +public class IotWebSocketClient { private final String serverUrl; private final Integer connectTimeoutMs; private final Integer sendTimeoutMs; private final String dataFormat; - private WebSocket webSocket; + private OkHttpClient okHttpClient; + private volatile WebSocket webSocket; private final AtomicBoolean connected = new AtomicBoolean(false); - private final StringBuilder messageBuffer = new StringBuilder(); public IotWebSocketClient(String serverUrl, Integer connectTimeoutMs, Integer sendTimeoutMs, String dataFormat) { this.serverUrl = serverUrl; @@ -44,8 +42,9 @@ public class IotWebSocketClient implements WebSocket.Listener { /** * 连接到 WebSocket 服务器 + *

+ * 注意:调用方需要通过分布式锁保证并发安全 */ - @SuppressWarnings("resource") public void connect() throws Exception { if (connected.get()) { log.warn("[connect][WebSocket 客户端已经连接,无需重复连接]"); @@ -53,17 +52,32 @@ public class IotWebSocketClient implements WebSocket.Listener { } try { - HttpClient httpClient = HttpClient.newBuilder() - .connectTimeout(Duration.ofMillis(connectTimeoutMs)) + // 创建 OkHttpClient + okHttpClient = new OkHttpClient.Builder() + .connectTimeout(connectTimeoutMs, TimeUnit.MILLISECONDS) + .readTimeout(sendTimeoutMs, TimeUnit.MILLISECONDS) + .writeTimeout(sendTimeoutMs, TimeUnit.MILLISECONDS) .build(); - CompletableFuture future = httpClient.newWebSocketBuilder() - .connectTimeout(Duration.ofMillis(connectTimeoutMs)) - .buildAsync(URI.create(serverUrl), this); + // 创建 WebSocket 请求 + Request request = new Request.Builder() + .url(serverUrl) + .build(); + + // 使用 CountDownLatch 等待连接完成 + CountDownLatch connectLatch = new CountDownLatch(1); + AtomicBoolean connectSuccess = new AtomicBoolean(false); + + // 创建 WebSocket 连接 + webSocket = okHttpClient.newWebSocket(request, new IotWebSocketListener(connectLatch, connectSuccess)); // 等待连接完成 - webSocket = future.get(connectTimeoutMs, TimeUnit.MILLISECONDS); - connected.set(true); + boolean await = connectLatch.await(connectTimeoutMs, TimeUnit.MILLISECONDS); + if (!await || !connectSuccess.get()) { + close(); + throw new Exception("WebSocket 连接超时或失败,服务器地址: " + serverUrl); + } + log.info("[connect][WebSocket 客户端连接成功,服务器地址: {}]", serverUrl); } catch (Exception e) { close(); @@ -72,36 +86,6 @@ public class IotWebSocketClient implements WebSocket.Listener { } } - @Override - public void onOpen(WebSocket webSocket) { - log.debug("[onOpen][WebSocket 连接已打开]"); - webSocket.request(1); - } - - @Override - public CompletionStage onText(WebSocket webSocket, CharSequence data, boolean last) { - messageBuffer.append(data); - if (last) { - log.debug("[onText][收到 WebSocket 消息: {}]", messageBuffer); - messageBuffer.setLength(0); - } - webSocket.request(1); - return null; - } - - @Override - public CompletionStage onClose(WebSocket webSocket, int statusCode, String reason) { - connected.set(false); - log.info("[onClose][WebSocket 连接已关闭,状态码: {},原因: {}]", statusCode, reason); - return null; - } - - @Override - public void onError(WebSocket webSocket, Throwable error) { - connected.set(false); - log.error("[onError][WebSocket 发生错误]", error); - } - /** * 发送设备消息 * @@ -109,7 +93,8 @@ public class IotWebSocketClient implements WebSocket.Listener { * @throws Exception 发送异常 */ public void sendMessage(IotDeviceMessage message) throws Exception { - if (!connected.get() || webSocket == null) { + WebSocket ws = this.webSocket; + if (!connected.get() || ws == null) { throw new IllegalStateException("WebSocket 客户端未连接"); } @@ -121,9 +106,11 @@ public class IotWebSocketClient implements WebSocket.Listener { messageData = message.toString(); } - // 发送消息并等待完成 - CompletableFuture future = webSocket.sendText(messageData, true); - future.get(sendTimeoutMs, TimeUnit.MILLISECONDS); + // 发送消息 + boolean success = ws.send(messageData); + if (!success) { + throw new Exception("WebSocket 发送消息失败,消息队列已满或连接已关闭"); + } log.debug("[sendMessage][发送消息成功,设备 ID: {},消息长度: {}]", message.getDeviceId(), messageData.length()); } catch (Exception e) { @@ -136,18 +123,17 @@ public class IotWebSocketClient implements WebSocket.Listener { * 关闭连接 */ public void close() { - if (!connected.get() && webSocket == null) { - return; - } - try { if (webSocket != null) { - webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "客户端主动关闭") - .orTimeout(5, TimeUnit.SECONDS) - .exceptionally(e -> { - log.warn("[close][发送关闭帧失败]", e); - return null; - }); + // 发送正常关闭帧,状态码 1000 表示正常关闭 + webSocket.close(1000, "客户端主动关闭"); + webSocket = null; + } + if (okHttpClient != null) { + // 关闭连接池和调度器 + okHttpClient.dispatcher().executorService().shutdown(); + okHttpClient.connectionPool().evictAll(); + okHttpClient = null; } connected.set(false); log.info("[close][WebSocket 客户端连接已关闭,服务器地址: {}]", serverUrl); @@ -174,4 +160,50 @@ public class IotWebSocketClient implements WebSocket.Listener { '}'; } + /** + * OkHttp WebSocket 监听器 + */ + private class IotWebSocketListener extends WebSocketListener { + + private final CountDownLatch connectLatch; + private final AtomicBoolean connectSuccess; + + public IotWebSocketListener(CountDownLatch connectLatch, AtomicBoolean connectSuccess) { + this.connectLatch = connectLatch; + this.connectSuccess = connectSuccess; + } + + @Override + public void onOpen(WebSocket webSocket, Response response) { + connected.set(true); + connectSuccess.set(true); + connectLatch.countDown(); + log.info("[onOpen][WebSocket 连接已打开,服务器: {}]", serverUrl); + } + + @Override + public void onMessage(WebSocket webSocket, String text) { + log.debug("[onMessage][收到消息: {}]", text); + } + + @Override + public void onClosing(WebSocket webSocket, int code, String reason) { + connected.set(false); + log.info("[onClosing][WebSocket 正在关闭,code: {}, reason: {}]", code, reason); + } + + @Override + public void onClosed(WebSocket webSocket, int code, String reason) { + connected.set(false); + log.info("[onClosed][WebSocket 已关闭,code: {}, reason: {}]", code, reason); + } + + @Override + public void onFailure(WebSocket webSocket, Throwable t, Response response) { + connected.set(false); + connectLatch.countDown(); // 确保连接失败时也释放等待 + log.error("[onFailure][WebSocket 连接失败]", t); + } + } + } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClientTest.java b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClientTest.java new file mode 100644 index 0000000000..d3568db8b9 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClientTest.java @@ -0,0 +1,257 @@ +package cn.iocoder.yudao.module.iot.service.rule.data.action.websocket; + +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import okhttp3.Response; +import okhttp3.WebSocket; +import okhttp3.WebSocketListener; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * {@link IotWebSocketClient} 的单元测试 + * + * @author HUIHUI + */ +class IotWebSocketClientTest { + + private MockWebServer mockWebServer; + + @BeforeEach + public void setUp() throws Exception { + mockWebServer = new MockWebServer(); + mockWebServer.start(); + } + + @AfterEach + public void tearDown() throws Exception { + if (mockWebServer != null) { + mockWebServer.shutdown(); + } + } + + /** + * 简单的 WebSocket 监听器,用于测试 + */ + private static class TestWebSocketListener extends WebSocketListener { + @Override + public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) { + // 连接打开 + } + + @Override + public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) { + // 收到消息 + } + + @Override + public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) { + webSocket.close(code, reason); + } + + @Override + public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, @Nullable Response response) { + // 连接失败 + } + } + + @Test + public void testConstructor_defaultValues() { + // 准备参数 + String serverUrl = "ws://localhost:8080"; + + // 调用 + IotWebSocketClient client = new IotWebSocketClient(serverUrl, null, null, null); + + // 断言:验证默认值被正确设置 + assertNotNull(client); + assertFalse(client.isConnected()); + } + + @Test + public void testConstructor_customValues() { + // 准备参数 + String serverUrl = "ws://localhost:8080"; + Integer connectTimeoutMs = 3000; + Integer sendTimeoutMs = 5000; + String dataFormat = "TEXT"; + + // 调用 + IotWebSocketClient client = new IotWebSocketClient(serverUrl, connectTimeoutMs, sendTimeoutMs, dataFormat); + + // 断言 + assertNotNull(client); + assertFalse(client.isConnected()); + } + + @Test + public void testConnect_success() throws Exception { + // 准备参数:使用 MockWebServer 的 WebSocket 端点 + String serverUrl = "ws://" + mockWebServer.getHostName() + ":" + mockWebServer.getPort(); + IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON"); + + // mock:设置 MockWebServer 响应 WebSocket 升级请求 + mockWebServer.enqueue(new MockResponse().withWebSocketUpgrade(new TestWebSocketListener())); + + // 调用 + client.connect(); + + // 断言 + assertTrue(client.isConnected()); + + // 清理 + client.close(); + } + + @Test + public void testConnect_alreadyConnected() throws Exception { + // 准备参数 + String serverUrl = "ws://" + mockWebServer.getHostName() + ":" + mockWebServer.getPort(); + IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON"); + + // mock + mockWebServer.enqueue(new MockResponse().withWebSocketUpgrade(new TestWebSocketListener())); + + // 调用:第一次连接 + client.connect(); + assertTrue(client.isConnected()); + + // 调用:第二次连接(应该不会重复连接) + client.connect(); + assertTrue(client.isConnected()); + + // 清理 + client.close(); + } + + @Test + public void testSendMessage_success() throws Exception { + // 准备参数 + String serverUrl = "ws://" + mockWebServer.getHostName() + ":" + mockWebServer.getPort(); + IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON"); + + IotDeviceMessage message = IotDeviceMessage.builder() + .deviceId(123L) + .method("thing.property.report") + .params("{\"temperature\": 25.5}") + .build(); + + // mock + mockWebServer.enqueue(new MockResponse().withWebSocketUpgrade(new TestWebSocketListener())); + + // 调用 + client.connect(); + client.sendMessage(message); + + // 断言:消息发送成功不抛异常 + assertTrue(client.isConnected()); + + // 清理 + client.close(); + } + + @Test + public void testSendMessage_notConnected() { + // 准备参数 + String serverUrl = "ws://localhost:8080"; + IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON"); + + IotDeviceMessage message = IotDeviceMessage.builder() + .deviceId(123L) + .method("thing.property.report") + .params("{\"temperature\": 25.5}") + .build(); + + // 调用 & 断言:未连接时发送消息应抛出异常 + assertThrows(IllegalStateException.class, () -> client.sendMessage(message)); + } + + @Test + public void testClose_success() throws Exception { + // 准备参数 + String serverUrl = "ws://" + mockWebServer.getHostName() + ":" + mockWebServer.getPort(); + IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON"); + + // mock + mockWebServer.enqueue(new MockResponse().withWebSocketUpgrade(new TestWebSocketListener())); + + // 调用 + client.connect(); + assertTrue(client.isConnected()); + + client.close(); + + // 断言 + assertFalse(client.isConnected()); + } + + @Test + public void testClose_notConnected() { + // 准备参数 + String serverUrl = "ws://localhost:8080"; + IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON"); + + // 调用:关闭未连接的客户端不应抛异常 + assertDoesNotThrow(client::close); + assertFalse(client.isConnected()); + } + + @Test + public void testIsConnected_initialState() { + // 准备参数 + String serverUrl = "ws://localhost:8080"; + IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON"); + + // 断言:初始状态应为未连接 + assertFalse(client.isConnected()); + } + + @Test + public void testToString() { + // 准备参数 + String serverUrl = "ws://localhost:8080"; + IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON"); + + // 调用 + String result = client.toString(); + + // 断言 + assertNotNull(result); + assertTrue(result.contains("serverUrl='ws://localhost:8080'")); + assertTrue(result.contains("dataFormat='JSON'")); + assertTrue(result.contains("connected=false")); + } + + @Test + public void testSendMessage_textFormat() throws Exception { + // 准备参数 + String serverUrl = "ws://" + mockWebServer.getHostName() + ":" + mockWebServer.getPort(); + IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "TEXT"); + + IotDeviceMessage message = IotDeviceMessage.builder() + .deviceId(123L) + .method("thing.property.report") + .params("{\"temperature\": 25.5}") + .build(); + + // mock + mockWebServer.enqueue(new MockResponse().withWebSocketUpgrade(new TestWebSocketListener())); + + // 调用 + client.connect(); + client.sendMessage(message); + + // 断言:消息发送成功不抛异常 + assertTrue(client.isConnected()); + + // 清理 + client.close(); + } + +} From f320569f2c46ebca8ea88c4bbbe23d9b63539e94 Mon Sep 17 00:00:00 2001 From: puhui999 Date: Wed, 21 Jan 2026 21:21:07 +0800 Subject: [PATCH 2/6] =?UTF-8?q?perf=EF=BC=9A=E3=80=90iot=E3=80=91=E4=BC=98?= =?UTF-8?q?=E5=8C=96=20IotTcpClient=20=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/rule/data/action/IotTcpDataRuleAction.java | 1 - .../iot/service/rule/data/action/tcp/IotTcpClient.java | 9 +++------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotTcpDataRuleAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotTcpDataRuleAction.java index 53a3b71480..74385d08dd 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotTcpDataRuleAction.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotTcpDataRuleAction.java @@ -43,7 +43,6 @@ public class IotTcpDataRuleAction extends config.getConnectTimeoutMs(), config.getReadTimeoutMs(), config.getSsl(), - config.getSslCertPath(), config.getDataFormat() ); // 2.2 连接服务器 diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClient.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClient.java index 15b57b5405..faf59d3fbc 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClient.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClient.java @@ -1,5 +1,6 @@ package cn.iocoder.yudao.module.iot.service.rule.data.action.tcp; +import cn.hutool.core.util.ObjUtil; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkTcpConfig; @@ -31,8 +32,6 @@ public class IotTcpClient { private final Integer connectTimeoutMs; private final Integer readTimeoutMs; private final Boolean ssl; - // TODO @puhui999:sslCertPath 是不是没在用? - private final String sslCertPath; private final String dataFormat; private Socket socket; @@ -41,15 +40,13 @@ public class IotTcpClient { private final AtomicBoolean connected = new AtomicBoolean(false); public IotTcpClient(String host, Integer port, Integer connectTimeoutMs, Integer readTimeoutMs, - Boolean ssl, String sslCertPath, String dataFormat) { + Boolean ssl, String dataFormat) { this.host = host; this.port = port; this.connectTimeoutMs = connectTimeoutMs != null ? connectTimeoutMs : IotDataSinkTcpConfig.DEFAULT_CONNECT_TIMEOUT_MS; this.readTimeoutMs = readTimeoutMs != null ? readTimeoutMs : IotDataSinkTcpConfig.DEFAULT_READ_TIMEOUT_MS; this.ssl = ssl != null ? ssl : IotDataSinkTcpConfig.DEFAULT_SSL; - this.sslCertPath = sslCertPath; - // TODO @puhui999:可以使用 StrUtil.defaultIfBlank 方法简化 - this.dataFormat = dataFormat != null ? dataFormat : IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT; + this.dataFormat = ObjUtil.defaultIfBlank(dataFormat, IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT); } /** From 5bc8a4e487be9d63c27c63901975579d8f46fa0b Mon Sep 17 00:00:00 2001 From: puhui999 Date: Wed, 21 Jan 2026 21:37:39 +0800 Subject: [PATCH 3/6] =?UTF-8?q?perf=EF=BC=9A=E3=80=90iot=E3=80=91=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=20IotDeviceMessageUtils.notContainsIdentifier=20?= =?UTF-8?q?=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../IotDevicePropertyPostTriggerMatcher.java | 3 +- .../data/action/tcp/IotTcpClientTest.java | 151 ++++++++++++++++++ .../iot/core/util/IotDeviceMessageUtils.java | 11 ++ .../core/util/IotDeviceMessageUtilsTest.java | 72 ++++++++- 4 files changed, 233 insertions(+), 4 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClientTest.java diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDevicePropertyPostTriggerMatcher.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDevicePropertyPostTriggerMatcher.java index d653c9c42e..1f019b5761 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDevicePropertyPostTriggerMatcher.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDevicePropertyPostTriggerMatcher.java @@ -38,8 +38,7 @@ public class IotDevicePropertyPostTriggerMatcher implements IotSceneRuleTriggerM // 1.3 检查消息中是否包含触发器指定的属性标识符 // 注意:属性上报可能同时上报多个属性,所以需要判断 trigger.getIdentifier() 是否在 message 的 params 中 - // TODO @puhui999:可以考虑 notXXX 方法,简化代码(尽量取反) - if (!IotDeviceMessageUtils.containsIdentifier(message, trigger.getIdentifier())) { + if (IotDeviceMessageUtils.notContainsIdentifier(message, trigger.getIdentifier())) { IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "消息中不包含属性: " + trigger.getIdentifier()); return false; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClientTest.java b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClientTest.java new file mode 100644 index 0000000000..cd28f8f54e --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/data/action/tcp/IotTcpClientTest.java @@ -0,0 +1,151 @@ +package cn.iocoder.yudao.module.iot.service.rule.data.action.tcp; + +import cn.hutool.core.util.ReflectUtil; +import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkTcpConfig; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * {@link IotTcpClient} 的单元测试 + *

+ * 测试 dataFormat 默认值行为 + * Property 1: TCP 客户端 dataFormat 默认值行为 + * Validates: Requirements 1.1, 1.2 + * + * @author HUIHUI + */ +class IotTcpClientTest { + + @Test + public void testConstructor_dataFormatNull() { + // 准备参数 + String host = "localhost"; + Integer port = 8080; + + // 调用 + IotTcpClient client = new IotTcpClient(host, port, null, null, null, null); + + // 断言:dataFormat 为 null 时应使用默认值 + assertEquals(IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT, + ReflectUtil.getFieldValue(client, "dataFormat")); + } + + @Test + public void testConstructor_dataFormatEmpty() { + // 准备参数 + String host = "localhost"; + Integer port = 8080; + + // 调用 + IotTcpClient client = new IotTcpClient(host, port, null, null, null, ""); + + // 断言:dataFormat 为空字符串时应使用默认值 + assertEquals(IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT, + ReflectUtil.getFieldValue(client, "dataFormat")); + } + + @Test + public void testConstructor_dataFormatBlank() { + // 准备参数 + String host = "localhost"; + Integer port = 8080; + + // 调用 + IotTcpClient client = new IotTcpClient(host, port, null, null, null, " "); + + // 断言:dataFormat 为纯空白字符串时应使用默认值 + assertEquals(IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT, + ReflectUtil.getFieldValue(client, "dataFormat")); + } + + @Test + public void testConstructor_dataFormatValid() { + // 准备参数 + String host = "localhost"; + Integer port = 8080; + String dataFormat = "BINARY"; + + // 调用 + IotTcpClient client = new IotTcpClient(host, port, null, null, null, dataFormat); + + // 断言:dataFormat 为有效值时应保持原值 + assertEquals(dataFormat, ReflectUtil.getFieldValue(client, "dataFormat")); + } + + @Test + public void testConstructor_defaultValues() { + // 准备参数 + String host = "localhost"; + Integer port = 8080; + + // 调用 + IotTcpClient client = new IotTcpClient(host, port, null, null, null, null); + + // 断言:验证所有默认值 + assertEquals(host, ReflectUtil.getFieldValue(client, "host")); + assertEquals(port, ReflectUtil.getFieldValue(client, "port")); + assertEquals(IotDataSinkTcpConfig.DEFAULT_CONNECT_TIMEOUT_MS, + ReflectUtil.getFieldValue(client, "connectTimeoutMs")); + assertEquals(IotDataSinkTcpConfig.DEFAULT_READ_TIMEOUT_MS, + ReflectUtil.getFieldValue(client, "readTimeoutMs")); + assertEquals(IotDataSinkTcpConfig.DEFAULT_SSL, + ReflectUtil.getFieldValue(client, "ssl")); + assertEquals(IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT, + ReflectUtil.getFieldValue(client, "dataFormat")); + } + + @Test + public void testConstructor_customValues() { + // 准备参数 + String host = "192.168.1.100"; + Integer port = 9090; + Integer connectTimeoutMs = 3000; + Integer readTimeoutMs = 8000; + Boolean ssl = true; + String dataFormat = "BINARY"; + + // 调用 + IotTcpClient client = new IotTcpClient(host, port, connectTimeoutMs, readTimeoutMs, ssl, dataFormat); + + // 断言:验证自定义值 + assertEquals(host, ReflectUtil.getFieldValue(client, "host")); + assertEquals(port, ReflectUtil.getFieldValue(client, "port")); + assertEquals(connectTimeoutMs, ReflectUtil.getFieldValue(client, "connectTimeoutMs")); + assertEquals(readTimeoutMs, ReflectUtil.getFieldValue(client, "readTimeoutMs")); + assertEquals(ssl, ReflectUtil.getFieldValue(client, "ssl")); + assertEquals(dataFormat, ReflectUtil.getFieldValue(client, "dataFormat")); + } + + @Test + public void testIsConnected_initialState() { + // 准备参数 + String host = "localhost"; + Integer port = 8080; + + // 调用 + IotTcpClient client = new IotTcpClient(host, port, null, null, null, null); + + // 断言:初始状态应为未连接 + assertFalse(client.isConnected()); + } + + @Test + public void testToString() { + // 准备参数 + String host = "localhost"; + Integer port = 8080; + + // 调用 + IotTcpClient client = new IotTcpClient(host, port, null, null, null, null); + String result = client.toString(); + + // 断言 + assertNotNull(result); + assertTrue(result.contains("host='localhost'")); + assertTrue(result.contains("port=8080")); + assertTrue(result.contains("dataFormat='JSON'")); + assertTrue(result.contains("connected=false")); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java index 5c1ac26005..b02a9b4c3a 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java @@ -99,6 +99,17 @@ public class IotDeviceMessageUtils { return false; } + /** + * 判断消息中是否不包含指定的标识符 + * + * @param message 消息 + * @param identifier 要检查的标识符 + * @return 是否不包含 + */ + public static boolean notContainsIdentifier(IotDeviceMessage message, String identifier) { + return !containsIdentifier(message, identifier); + } + /** * 将 params 解析为 Map * diff --git a/yudao-module-iot/yudao-module-iot-core/src/test/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtilsTest.java b/yudao-module-iot/yudao-module-iot-core/src/test/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtilsTest.java index a6d669d170..b0d39be519 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/test/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtilsTest.java +++ b/yudao-module-iot/yudao-module-iot-core/src/test/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtilsTest.java @@ -1,13 +1,13 @@ package cn.iocoder.yudao.module.iot.core.util; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import org.junit.jupiter.api.Test; import java.util.HashMap; import java.util.Map; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.*; /** * {@link IotDeviceMessageUtils} 的单元测试 @@ -138,4 +138,72 @@ public class IotDeviceMessageUtilsTest { Object result = IotDeviceMessageUtils.extractPropertyValue(message, "temperature"); assertEquals(25.5, result); // 应该返回直接标识符的值 } + + // ========== notContainsIdentifier 测试 ========== + + /** + * 测试 notContainsIdentifier 与 containsIdentifier 的互补性 + * **Property 2: notContainsIdentifier 与 containsIdentifier 互补性** + * **Validates: Requirements 4.1** + */ + @Test + public void testNotContainsIdentifier_complementary_whenContains() { + // 准备参数:消息包含指定标识符 + IotDeviceMessage message = new IotDeviceMessage(); + message.setMethod(IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod()); + Map params = new HashMap<>(); + params.put("temperature", 25); + message.setParams(params); + String identifier = "temperature"; + + // 调用 & 断言:验证互补性 + boolean containsResult = IotDeviceMessageUtils.containsIdentifier(message, identifier); + boolean notContainsResult = IotDeviceMessageUtils.notContainsIdentifier(message, identifier); + assertTrue(containsResult); + assertFalse(notContainsResult); + assertEquals(!containsResult, notContainsResult); + } + + /** + * 测试 notContainsIdentifier 与 containsIdentifier 的互补性 + * **Property 2: notContainsIdentifier 与 containsIdentifier 互补性** + * **Validates: Requirements 4.1** + */ + @Test + public void testNotContainsIdentifier_complementary_whenNotContains() { + // 准备参数:消息不包含指定标识符 + IotDeviceMessage message = new IotDeviceMessage(); + message.setMethod(IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod()); + Map params = new HashMap<>(); + params.put("temperature", 25); + message.setParams(params); + String identifier = "humidity"; + + // 调用 & 断言:验证互补性 + boolean containsResult = IotDeviceMessageUtils.containsIdentifier(message, identifier); + boolean notContainsResult = IotDeviceMessageUtils.notContainsIdentifier(message, identifier); + assertFalse(containsResult); + assertTrue(notContainsResult); + assertEquals(!containsResult, notContainsResult); + } + + /** + * 测试 notContainsIdentifier 与 containsIdentifier 的互补性 - 空参数场景 + * **Property 2: notContainsIdentifier 与 containsIdentifier 互补性** + * **Validates: Requirements 4.1** + */ + @Test + public void testNotContainsIdentifier_complementary_nullParams() { + // 准备参数:params 为 null + IotDeviceMessage message = new IotDeviceMessage(); + message.setParams(null); + String identifier = "temperature"; + + // 调用 & 断言:验证互补性 + boolean containsResult = IotDeviceMessageUtils.containsIdentifier(message, identifier); + boolean notContainsResult = IotDeviceMessageUtils.notContainsIdentifier(message, identifier); + assertFalse(containsResult); + assertTrue(notContainsResult); + assertEquals(!containsResult, notContainsResult); + } } From 4ad4fcf6cfca7c3f5911011b858944be28fd725a Mon Sep 17 00:00:00 2001 From: puhui999 Date: Wed, 21 Jan 2026 22:06:02 +0800 Subject: [PATCH 4/6] =?UTF-8?q?perf=EF=BC=9A=E3=80=90iot=E3=80=91=E5=A2=9E?= =?UTF-8?q?=E5=BC=BA=20IotDeviceServiceInvokeTriggerMatcher=20=E5=8F=82?= =?UTF-8?q?=E6=95=B0=E6=A0=A1=E9=AA=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../IotDeviceServiceInvokeTriggerMatcher.java | 54 +++- ...DeviceServiceInvokeTriggerMatcherTest.java | 264 +++++++++++++++++- .../iot/core/util/IotDeviceMessageUtils.java | 38 +++ 3 files changed, 351 insertions(+), 5 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDeviceServiceInvokeTriggerMatcher.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDeviceServiceInvokeTriggerMatcher.java index b5fa0330dc..ba3190068d 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDeviceServiceInvokeTriggerMatcher.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDeviceServiceInvokeTriggerMatcher.java @@ -1,5 +1,6 @@ package cn.iocoder.yudao.module.iot.service.rule.scene.matcher.trigger; +import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; @@ -8,6 +9,8 @@ import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleTriggerTypeEnum; import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotSceneRuleMatcherHelper; import org.springframework.stereotype.Component; +import java.util.Map; + /** * 设备服务调用触发器匹配器:处理设备服务调用的触发器匹配逻辑 * @@ -42,13 +45,58 @@ public class IotDeviceServiceInvokeTriggerMatcher implements IotSceneRuleTrigger return false; } - // 2. 对于服务调用触发器,通常只需要匹配服务标识符即可 - // 不需要检查操作符和值,因为服务调用本身就是触发条件 - // TODO @puhui999: 服务调用时校验输入参数是否匹配条件? + // 2. 检查是否配置了参数条件 + if (hasParameterCondition(trigger)) { + return matchParameterCondition(message, trigger); + } + + // 3. 无参数条件时,标识符匹配即成功 IotSceneRuleMatcherHelper.logTriggerMatchSuccess(message, trigger); return true; } + /** + * 判断触发器是否配置了参数条件 + * + * @param trigger 触发器配置 + * @return 是否配置了参数条件 + */ + private boolean hasParameterCondition(IotSceneRuleDO.Trigger trigger) { + return StrUtil.isNotBlank(trigger.getOperator()) && StrUtil.isNotBlank(trigger.getValue()); + } + + /** + * 匹配参数条件 + * + * @param message 设备消息 + * @param trigger 触发器配置 + * @return 是否匹配 + */ + private boolean matchParameterCondition(IotDeviceMessage message, IotSceneRuleDO.Trigger trigger) { + // 从消息中提取服务调用的输入参数 + Map inputParams = IotDeviceMessageUtils.extractServiceInputParams(message); + if (inputParams == null) { + IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "消息中缺少服务输入参数"); + return false; + } + + // 获取要匹配的参数值(使用 identifier 作为参数名) + Object paramValue = inputParams.get(trigger.getIdentifier()); + if (paramValue == null) { + IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "服务输入参数中缺少指定参数: " + trigger.getIdentifier()); + return false; + } + + // 使用条件评估器进行匹配 + boolean matched = IotSceneRuleMatcherHelper.evaluateCondition(paramValue, trigger.getOperator(), trigger.getValue()); + if (matched) { + IotSceneRuleMatcherHelper.logTriggerMatchSuccess(message, trigger); + } else { + IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "服务输入参数条件不匹配"); + } + return matched; + } + @Override public int getPriority() { return 40; // 较低优先级 diff --git a/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDeviceServiceInvokeTriggerMatcherTest.java b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDeviceServiceInvokeTriggerMatcherTest.java index 3d75b19b37..a6b2b0ae0e 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDeviceServiceInvokeTriggerMatcherTest.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/trigger/IotDeviceServiceInvokeTriggerMatcherTest.java @@ -7,7 +7,6 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotSceneRuleDO; import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleTriggerTypeEnum; import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotBaseConditionMatcherTest; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.util.HashMap; @@ -23,7 +22,6 @@ import static org.junit.jupiter.api.Assertions.*; * * @author HUIHUI */ -@Disabled // TODO @puhui999:单测有报错,先屏蔽 public class IotDeviceServiceInvokeTriggerMatcherTest extends IotBaseConditionMatcherTest { private IotDeviceServiceInvokeTriggerMatcher matcher; @@ -378,6 +376,268 @@ public class IotDeviceServiceInvokeTriggerMatcherTest extends IotBaseConditionMa assertFalse(result); } + + // ========== 参数条件匹配测试 ========== + + /** + * 测试无参数条件时的匹配逻辑 - 只要标识符匹配就返回 true + * **Property 4: 服务调用触发器参数匹配逻辑** + * **Validates: Requirements 5.2** + */ + @Test + public void testMatches_noParameterCondition_success() { + // 准备参数 + String serviceIdentifier = "testService"; + Map serviceParams = MapUtil.builder(new HashMap()) + .put("identifier", serviceIdentifier) + .put("inputData", MapUtil.builder(new HashMap()) + .put("level", 5) + .build()) + .build(); + IotDeviceMessage message = createServiceInvokeMessage(serviceParams); + IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger(); + trigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_SERVICE_INVOKE.getType()); + trigger.setIdentifier(serviceIdentifier); + trigger.setOperator(null); // 无参数条件 + trigger.setValue(null); + + // 调用 + boolean result = matcher.matches(message, trigger); + + // 断言 + assertTrue(result); + } + + /** + * 测试有参数条件时的匹配逻辑 - 参数条件匹配成功 + * **Property 4: 服务调用触发器参数匹配逻辑** + * **Validates: Requirements 5.1** + */ + @Test + public void testMatches_withParameterCondition_greaterThan_success() { + // 准备参数 + String serviceIdentifier = "level"; + Map serviceParams = MapUtil.builder(new HashMap()) + .put("identifier", serviceIdentifier) + .put("inputData", MapUtil.builder(new HashMap()) + .put("level", 5) + .build()) + .build(); + IotDeviceMessage message = createServiceInvokeMessage(serviceParams); + IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger(); + trigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_SERVICE_INVOKE.getType()); + trigger.setIdentifier(serviceIdentifier); + trigger.setOperator(">"); // 大于操作符 + trigger.setValue("3"); + + // 调用 + boolean result = matcher.matches(message, trigger); + + // 断言 + assertTrue(result); + } + + /** + * 测试有参数条件时的匹配逻辑 - 参数条件匹配失败 + * **Property 4: 服务调用触发器参数匹配逻辑** + * **Validates: Requirements 5.1** + */ + @Test + public void testMatches_withParameterCondition_greaterThan_failure() { + // 准备参数 + String serviceIdentifier = "level"; + Map serviceParams = MapUtil.builder(new HashMap()) + .put("identifier", serviceIdentifier) + .put("inputData", MapUtil.builder(new HashMap()) + .put("level", 2) + .build()) + .build(); + IotDeviceMessage message = createServiceInvokeMessage(serviceParams); + IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger(); + trigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_SERVICE_INVOKE.getType()); + trigger.setIdentifier(serviceIdentifier); + trigger.setOperator(">"); // 大于操作符 + trigger.setValue("3"); + + // 调用 + boolean result = matcher.matches(message, trigger); + + // 断言 + assertFalse(result); + } + + /** + * 测试有参数条件时的匹配逻辑 - 等于操作符 + * **Property 4: 服务调用触发器参数匹配逻辑** + * **Validates: Requirements 5.1** + */ + @Test + public void testMatches_withParameterCondition_equals_success() { + // 准备参数 + String serviceIdentifier = "mode"; + Map serviceParams = MapUtil.builder(new HashMap()) + .put("identifier", serviceIdentifier) + .put("inputData", MapUtil.builder(new HashMap()) + .put("mode", "auto") + .build()) + .build(); + IotDeviceMessage message = createServiceInvokeMessage(serviceParams); + IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger(); + trigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_SERVICE_INVOKE.getType()); + trigger.setIdentifier(serviceIdentifier); + trigger.setOperator("=="); // 等于操作符 + trigger.setValue("auto"); + + // 调用 + boolean result = matcher.matches(message, trigger); + + // 断言 + assertTrue(result); + } + + /** + * 测试参数缺失时的处理 - 消息中缺少 inputData + * **Property 4: 服务调用触发器参数匹配逻辑** + * **Validates: Requirements 5.3** + */ + @Test + public void testMatches_withParameterCondition_missingInputData() { + // 准备参数 + String serviceIdentifier = "testService"; + Map serviceParams = MapUtil.builder(new HashMap()) + .put("identifier", serviceIdentifier) + // 缺少 inputData 字段 + .build(); + IotDeviceMessage message = createServiceInvokeMessage(serviceParams); + IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger(); + trigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_SERVICE_INVOKE.getType()); + trigger.setIdentifier(serviceIdentifier); + trigger.setOperator(">"); // 配置了参数条件 + trigger.setValue("3"); + + // 调用 + boolean result = matcher.matches(message, trigger); + + // 断言 + assertFalse(result); + } + + /** + * 测试参数缺失时的处理 - inputData 中缺少指定参数 + * **Property 4: 服务调用触发器参数匹配逻辑** + * **Validates: Requirements 5.3** + */ + @Test + public void testMatches_withParameterCondition_missingParam() { + // 准备参数 + String serviceIdentifier = "level"; + Map serviceParams = MapUtil.builder(new HashMap()) + .put("identifier", serviceIdentifier) + .put("inputData", MapUtil.builder(new HashMap()) + .put("otherParam", 5) // 不是 level 参数 + .build()) + .build(); + IotDeviceMessage message = createServiceInvokeMessage(serviceParams); + IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger(); + trigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_SERVICE_INVOKE.getType()); + trigger.setIdentifier(serviceIdentifier); + trigger.setOperator(">"); // 配置了参数条件 + trigger.setValue("3"); + + // 调用 + boolean result = matcher.matches(message, trigger); + + // 断言 + assertFalse(result); + } + + /** + * 测试只有 operator 没有 value 时不触发参数条件匹配 + * **Property 4: 服务调用触发器参数匹配逻辑** + * **Validates: Requirements 5.2** + */ + @Test + public void testMatches_onlyOperator_noValue() { + // 准备参数 + String serviceIdentifier = "testService"; + Map serviceParams = MapUtil.builder(new HashMap()) + .put("identifier", serviceIdentifier) + .put("inputData", MapUtil.builder(new HashMap()) + .put("level", 5) + .build()) + .build(); + IotDeviceMessage message = createServiceInvokeMessage(serviceParams); + IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger(); + trigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_SERVICE_INVOKE.getType()); + trigger.setIdentifier(serviceIdentifier); + trigger.setOperator(">"); // 只有 operator + trigger.setValue(null); // 没有 value + + // 调用 + boolean result = matcher.matches(message, trigger); + + // 断言:只有 operator 没有 value 时,不触发参数条件匹配,标识符匹配即成功 + assertTrue(result); + } + + /** + * 测试只有 value 没有 operator 时不触发参数条件匹配 + * **Property 4: 服务调用触发器参数匹配逻辑** + * **Validates: Requirements 5.2** + */ + @Test + public void testMatches_onlyValue_noOperator() { + // 准备参数 + String serviceIdentifier = "testService"; + Map serviceParams = MapUtil.builder(new HashMap()) + .put("identifier", serviceIdentifier) + .put("inputData", MapUtil.builder(new HashMap()) + .put("level", 5) + .build()) + .build(); + IotDeviceMessage message = createServiceInvokeMessage(serviceParams); + IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger(); + trigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_SERVICE_INVOKE.getType()); + trigger.setIdentifier(serviceIdentifier); + trigger.setOperator(null); // 没有 operator + trigger.setValue("3"); // 只有 value + + // 调用 + boolean result = matcher.matches(message, trigger); + + // 断言:只有 value 没有 operator 时,不触发参数条件匹配,标识符匹配即成功 + assertTrue(result); + } + + /** + * 测试使用 inputParams 字段(替代 inputData) + * **Property 4: 服务调用触发器参数匹配逻辑** + * **Validates: Requirements 5.1** + */ + @Test + public void testMatches_withInputParams_success() { + // 准备参数 + String serviceIdentifier = "level"; + Map serviceParams = MapUtil.builder(new HashMap()) + .put("identifier", serviceIdentifier) + .put("inputParams", MapUtil.builder(new HashMap()) // 使用 inputParams 而不是 inputData + .put("level", 5) + .build()) + .build(); + IotDeviceMessage message = createServiceInvokeMessage(serviceParams); + IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger(); + trigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_SERVICE_INVOKE.getType()); + trigger.setIdentifier(serviceIdentifier); + trigger.setOperator(">"); // 大于操作符 + trigger.setValue("3"); + + // 调用 + boolean result = matcher.matches(message, trigger); + + // 断言 + assertTrue(result); + } + // ========== 辅助方法 ========== /** diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java index b02a9b4c3a..3def053602 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceMessageUtils.java @@ -207,6 +207,44 @@ public class IotDeviceMessageUtils { return null; } + /** + * 从服务调用消息中提取输入参数 + *

+ * 服务调用消息的 params 结构通常为: + * { + * "identifier": "serviceIdentifier", + * "inputData": { ... } 或 "inputParams": { ... } + * } + * + * @param message 设备消息 + * @return 输入参数 Map,如果未找到则返回 null + */ + @SuppressWarnings("unchecked") + public static Map extractServiceInputParams(IotDeviceMessage message) { + Object params = message.getParams(); + if (params == null) { + return null; + } + if (!(params instanceof Map)) { + return null; + } + Map paramsMap = (Map) params; + + // 尝试从 inputData 字段获取 + Object inputData = paramsMap.get("inputData"); + if (inputData instanceof Map) { + return (Map) inputData; + } + + // 尝试从 inputParams 字段获取 + Object inputParams = paramsMap.get("inputParams"); + if (inputParams instanceof Map) { + return (Map) inputParams; + } + + return null; + } + // ========== Topic 相关 ========== public static String buildMessageBusGatewayDeviceMessageTopic(String serverId) { From 4901912ece006720db02fc828bef4f0a810967e2 Mon Sep 17 00:00:00 2001 From: puhui999 Date: Sun, 25 Jan 2026 17:30:16 +0800 Subject: [PATCH 5/6] =?UTF-8?q?feat(iot):=E3=80=90=E5=9C=BA=E6=99=AF?= =?UTF-8?q?=E8=81=94=E5=8A=A8=E3=80=91=E5=AE=9A=E6=97=B6=E8=A7=A6=E5=8F=91?= =?UTF-8?q?=EF=BC=8C=E5=A2=9E=E5=8A=A0=E6=9D=A1=E4=BB=B6=E7=BB=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rule/scene/IotSceneRuleServiceImpl.java | 89 ++- .../rule/scene/IotSceneRuleTimeHelper.java | 213 ++++++ .../IotCurrentTimeConditionMatcher.java | 162 +---- .../timer/IotTimerConditionEvaluator.java | 189 ++++++ ...ceneRuleTimerConditionIntegrationTest.java | 611 ++++++++++++++++++ 5 files changed, 1102 insertions(+), 162 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/IotSceneRuleTimeHelper.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/timer/IotTimerConditionEvaluator.java create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/scene/IotSceneRuleTimerConditionIntegrationTest.java diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/IotSceneRuleServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/IotSceneRuleServiceImpl.java index f96bc9f450..4ea7338e33 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/IotSceneRuleServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/IotSceneRuleServiceImpl.java @@ -23,6 +23,7 @@ import cn.iocoder.yudao.module.iot.service.product.IotProductService; import cn.iocoder.yudao.module.iot.service.rule.scene.action.IotSceneRuleAction; import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotSceneRuleMatcherManager; import cn.iocoder.yudao.module.iot.service.rule.scene.timer.IotSceneRuleTimerHandler; +import cn.iocoder.yudao.module.iot.service.rule.scene.timer.IotTimerConditionEvaluator; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.cache.annotation.CacheEvict; @@ -62,6 +63,8 @@ public class IotSceneRuleServiceImpl implements IotSceneRuleService { private List sceneRuleActions; @Resource private IotSceneRuleTimerHandler timerHandler; + @Resource + private IotTimerConditionEvaluator timerConditionEvaluator; @Override @CacheEvict(value = RedisKeyConstants.SCENE_RULE_LIST, allEntries = true) @@ -222,18 +225,98 @@ public class IotSceneRuleServiceImpl implements IotSceneRuleService { return; } // 1.2 判断是否有定时触发器,避免脏数据 - IotSceneRuleDO.Trigger config = CollUtil.findOne(scene.getTriggers(), + IotSceneRuleDO.Trigger timerTrigger = CollUtil.findOne(scene.getTriggers(), trigger -> ObjUtil.equals(trigger.getType(), IotSceneRuleTriggerTypeEnum.TIMER.getType())); - if (config == null) { + if (timerTrigger == null) { log.error("[executeSceneRuleByTimer][规则场景({}) 不存在定时触发器]", scene); return; } - // 2. 执行规则场景 + // 2. 评估条件组(新增逻辑) + log.info("[executeSceneRuleByTimer][规则场景({}) 开始评估条件组]", id); + if (!evaluateTimerConditionGroups(scene, timerTrigger)) { + log.info("[executeSceneRuleByTimer][规则场景({}) 条件组不满足,跳过执行]", id); + return; + } + log.info("[executeSceneRuleByTimer][规则场景({}) 条件组评估通过,准备执行动作]", id); + + // 3. 执行规则场景 TenantUtils.execute(scene.getTenantId(), () -> executeSceneRuleAction(null, ListUtil.toList(scene))); } + /** + * 评估定时触发器的条件组 + * + * @param scene 场景规则 + * @param trigger 定时触发器 + * @return 是否满足条件 + */ + private boolean evaluateTimerConditionGroups(IotSceneRuleDO scene, IotSceneRuleDO.Trigger trigger) { + // 1. 如果没有条件组,直接返回 true(直接执行动作) + if (CollUtil.isEmpty(trigger.getConditionGroups())) { + log.debug("[evaluateTimerConditionGroups][规则场景({}) 无条件组配置,直接执行]", scene.getId()); + return true; + } + + // 2. 条件组之间是 OR 关系,任一条件组满足即可 + for (List conditionGroup : trigger.getConditionGroups()) { + if (evaluateSingleConditionGroup(scene, conditionGroup)) { + log.debug("[evaluateTimerConditionGroups][规则场景({}) 条件组匹配成功]", scene.getId()); + return true; + } + } + + // 3. 所有条件组都不满足 + log.debug("[evaluateTimerConditionGroups][规则场景({}) 所有条件组都不满足]", scene.getId()); + return false; + } + + /** + * 评估单个条件组 + * + * @param scene 场景规则 + * @param conditionGroup 条件组 + * @return 是否满足条件 + */ + private boolean evaluateSingleConditionGroup(IotSceneRuleDO scene, + List conditionGroup) { + // 1. 空条件组视为满足 + if (CollUtil.isEmpty(conditionGroup)) { + return true; + } + + // 2. 条件之间是 AND 关系,所有条件都必须满足 + for (IotSceneRuleDO.TriggerCondition condition : conditionGroup) { + if (!evaluateTimerCondition(scene, condition)) { + log.debug("[evaluateSingleConditionGroup][规则场景({}) 条件({}) 不满足]", + scene.getId(), condition); + return false; + } + } + + return true; + } + + /** + * 评估单个条件(定时触发器专用) + * + * @param scene 场景规则 + * @param condition 条件 + * @return 是否满足条件 + */ + private boolean evaluateTimerCondition(IotSceneRuleDO scene, IotSceneRuleDO.TriggerCondition condition) { + try { + boolean result = timerConditionEvaluator.evaluate(condition); + log.debug("[evaluateTimerCondition][规则场景({}) 条件类型({}) 评估结果: {}]", + scene.getId(), condition.getType(), result); + return result; + } catch (Exception e) { + log.error("[evaluateTimerCondition][规则场景({}) 条件评估异常]", scene.getId(), e); + return false; + } + } + /** * 基于消息,获得匹配的规则场景列表 * diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/IotSceneRuleTimeHelper.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/IotSceneRuleTimeHelper.java new file mode 100644 index 0000000000..8d1c1f6292 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/IotSceneRuleTimeHelper.java @@ -0,0 +1,213 @@ +package cn.iocoder.yudao.module.iot.service.rule.scene; + +import cn.hutool.core.lang.Assert; +import cn.hutool.core.text.CharPool; +import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleConditionOperatorEnum; +import lombok.extern.slf4j.Slf4j; + +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.List; + +/** + * IoT 场景规则时间匹配工具类 + *

+ * 提供时间条件匹配的通用方法,供 {@link cn.iocoder.yudao.module.iot.service.rule.scene.matcher.condition.IotCurrentTimeConditionMatcher} + * 和 {@link cn.iocoder.yudao.module.iot.service.rule.scene.timer.IotTimerConditionEvaluator} 共同使用。 + * + * @author HUIHUI + */ +@Slf4j +public class IotSceneRuleTimeHelper { + + /** + * 时间格式化器 - HH:mm:ss + */ + private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss"); + + /** + * 时间格式化器 - HH:mm + */ + private static final DateTimeFormatter TIME_FORMATTER_SHORT = DateTimeFormatter.ofPattern("HH:mm"); + + private IotSceneRuleTimeHelper() { + // 工具类,禁止实例化 + } + + /** + * 判断是否为日期时间操作符 + * + * @param operatorEnum 操作符枚举 + * @return 是否为日期时间操作符 + */ + public static boolean isDateTimeOperator(IotSceneRuleConditionOperatorEnum operatorEnum) { + return operatorEnum == IotSceneRuleConditionOperatorEnum.DATE_TIME_GREATER_THAN + || operatorEnum == IotSceneRuleConditionOperatorEnum.DATE_TIME_LESS_THAN + || operatorEnum == IotSceneRuleConditionOperatorEnum.DATE_TIME_BETWEEN; + } + + /** + * 判断是否为时间操作符(包括日期时间操作符和当日时间操作符) + * + * @param operatorEnum 操作符枚举 + * @return 是否为时间操作符 + */ + public static boolean isTimeOperator(IotSceneRuleConditionOperatorEnum operatorEnum) { + return operatorEnum != IotSceneRuleConditionOperatorEnum.TIME_GREATER_THAN + && operatorEnum != IotSceneRuleConditionOperatorEnum.TIME_LESS_THAN + && operatorEnum != IotSceneRuleConditionOperatorEnum.TIME_BETWEEN + && !isDateTimeOperator(operatorEnum); + } + + /** + * 执行时间匹配逻辑 + * + * @param operatorEnum 操作符枚举 + * @param param 参数值 + * @return 是否匹配 + */ + public static boolean executeTimeMatching(IotSceneRuleConditionOperatorEnum operatorEnum, String param) { + try { + LocalDateTime now = LocalDateTime.now(); + if (isDateTimeOperator(operatorEnum)) { + // 日期时间匹配(时间戳,秒级) + long currentTimestamp = now.atZone(ZoneId.systemDefault()).toEpochSecond(); + return matchDateTime(currentTimestamp, operatorEnum, param); + } else { + // 当日时间匹配(HH:mm:ss) + return matchTime(now.toLocalTime(), operatorEnum, param); + } + } catch (Exception e) { + log.error("[executeTimeMatching][operatorEnum({}) param({}) 时间匹配异常]", operatorEnum, param, e); + return false; + } + } + + /** + * 匹配日期时间(时间戳,秒级) + * + * @param currentTimestamp 当前时间戳 + * @param operatorEnum 操作符枚举 + * @param param 参数值 + * @return 是否匹配 + */ + public static boolean matchDateTime(long currentTimestamp, IotSceneRuleConditionOperatorEnum operatorEnum, + String param) { + try { + // DATE_TIME_BETWEEN 需要解析两个时间戳,单独处理 + if (operatorEnum == IotSceneRuleConditionOperatorEnum.DATE_TIME_BETWEEN) { + return matchDateTimeBetween(currentTimestamp, param); + } + // 其他操作符只需要解析一个时间戳 + long targetTimestamp = Long.parseLong(param); + switch (operatorEnum) { + case DATE_TIME_GREATER_THAN: + return currentTimestamp > targetTimestamp; + case DATE_TIME_LESS_THAN: + return currentTimestamp < targetTimestamp; + default: + log.warn("[matchDateTime][operatorEnum({}) 不支持的日期时间操作符]", operatorEnum); + return false; + } + } catch (Exception e) { + log.error("[matchDateTime][operatorEnum({}) param({}) 日期时间匹配异常]", operatorEnum, param, e); + return false; + } + } + + /** + * 匹配日期时间区间 + * + * @param currentTimestamp 当前时间戳 + * @param param 参数值(格式:startTimestamp,endTimestamp) + * @return 是否匹配 + */ + public static boolean matchDateTimeBetween(long currentTimestamp, String param) { + List timestampRange = StrUtil.splitTrim(param, CharPool.COMMA); + if (timestampRange.size() != 2) { + log.warn("[matchDateTimeBetween][param({}) 时间戳区间参数格式错误]", param); + return false; + } + long startTimestamp = Long.parseLong(timestampRange.get(0).trim()); + long endTimestamp = Long.parseLong(timestampRange.get(1).trim()); + return currentTimestamp >= startTimestamp && currentTimestamp <= endTimestamp; + } + + /** + * 匹配当日时间(HH:mm:ss 或 HH:mm) + * + * @param currentTime 当前时间 + * @param operatorEnum 操作符枚举 + * @param param 参数值 + * @return 是否匹配 + */ + public static boolean matchTime(LocalTime currentTime, IotSceneRuleConditionOperatorEnum operatorEnum, + String param) { + try { + // TIME_BETWEEN 需要解析两个时间,单独处理 + if (operatorEnum == IotSceneRuleConditionOperatorEnum.TIME_BETWEEN) { + return matchTimeBetween(currentTime, param); + } + // 其他操作符只需要解析一个时间 + LocalTime targetTime = parseTime(param); + switch (operatorEnum) { + case TIME_GREATER_THAN: + return currentTime.isAfter(targetTime); + case TIME_LESS_THAN: + return currentTime.isBefore(targetTime); + default: + log.warn("[matchTime][operatorEnum({}) 不支持的时间操作符]", operatorEnum); + return false; + } + } catch (Exception e) { + log.error("[matchTime][operatorEnum({}) param({}) 时间解析异常]", operatorEnum, param, e); + return false; + } + } + + /** + * 匹配时间区间 + * + * @param currentTime 当前时间 + * @param param 参数值(格式:startTime,endTime) + * @return 是否匹配 + */ + public static boolean matchTimeBetween(LocalTime currentTime, String param) { + List timeRange = StrUtil.splitTrim(param, CharPool.COMMA); + if (timeRange.size() != 2) { + log.warn("[matchTimeBetween][param({}) 时间区间参数格式错误]", param); + return false; + } + LocalTime startTime = parseTime(timeRange.get(0).trim()); + LocalTime endTime = parseTime(timeRange.get(1).trim()); + return !currentTime.isBefore(startTime) && !currentTime.isAfter(endTime); + } + + /** + * 解析时间字符串 + * 支持 HH:mm 和 HH:mm:ss 两种格式 + * + * @param timeStr 时间字符串 + * @return 解析后的 LocalTime + */ + public static LocalTime parseTime(String timeStr) { + Assert.isFalse(StrUtil.isBlank(timeStr), "时间字符串不能为空"); + try { + // 尝试不同的时间格式 + if (timeStr.length() == 5) { // HH:mm + return LocalTime.parse(timeStr, TIME_FORMATTER_SHORT); + } else if (timeStr.length() == 8) { // HH:mm:ss + return LocalTime.parse(timeStr, TIME_FORMATTER); + } else { + throw new IllegalArgumentException("时间格式长度不正确,期望 HH:mm 或 HH:mm:ss 格式"); + } + } catch (Exception e) { + log.error("[parseTime][timeStr({}) 时间格式解析失败]", timeStr, e); + throw new IllegalArgumentException("时间格式无效: " + timeStr, e); + } + } + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/condition/IotCurrentTimeConditionMatcher.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/condition/IotCurrentTimeConditionMatcher.java index 2083bebac9..a54785ad69 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/condition/IotCurrentTimeConditionMatcher.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/matcher/condition/IotCurrentTimeConditionMatcher.java @@ -1,21 +1,14 @@ package cn.iocoder.yudao.module.iot.service.rule.scene.matcher.condition; -import cn.hutool.core.lang.Assert; -import cn.hutool.core.text.CharPool; -import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotSceneRuleDO; import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleConditionOperatorEnum; import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleConditionTypeEnum; +import cn.iocoder.yudao.module.iot.service.rule.scene.IotSceneRuleTimeHelper; import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotSceneRuleMatcherHelper; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.format.DateTimeFormatter; -import java.util.List; - /** * 当前时间条件匹配器:处理时间相关的子条件匹配逻辑 * @@ -25,16 +18,6 @@ import java.util.List; @Slf4j public class IotCurrentTimeConditionMatcher implements IotSceneRuleConditionMatcher { - /** - * 时间格式化器 - HH:mm:ss - */ - private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss"); - - /** - * 时间格式化器 - HH:mm - */ - private static final DateTimeFormatter TIME_FORMATTER_SHORT = DateTimeFormatter.ofPattern("HH:mm"); - @Override public IotSceneRuleConditionTypeEnum getSupportedConditionType() { return IotSceneRuleConditionTypeEnum.CURRENT_TIME; @@ -62,13 +45,13 @@ public class IotCurrentTimeConditionMatcher implements IotSceneRuleConditionMatc return false; } - if (!isTimeOperator(operatorEnum)) { + if (IotSceneRuleTimeHelper.isTimeOperator(operatorEnum)) { IotSceneRuleMatcherHelper.logConditionMatchFailure(message, condition, "不支持的时间操作符: " + operator); return false; } // 2.1 执行时间匹配 - boolean matched = executeTimeMatching(operatorEnum, condition.getParam()); + boolean matched = IotSceneRuleTimeHelper.executeTimeMatching(operatorEnum, condition.getParam()); // 2.2 记录匹配结果 if (matched) { @@ -80,145 +63,6 @@ public class IotCurrentTimeConditionMatcher implements IotSceneRuleConditionMatc return matched; } - /** - * 执行时间匹配逻辑 - * 直接实现时间条件匹配,不使用 Spring EL 表达式 - */ - private boolean executeTimeMatching(IotSceneRuleConditionOperatorEnum operatorEnum, String param) { - try { - LocalDateTime now = LocalDateTime.now(); - - if (isDateTimeOperator(operatorEnum)) { - // 日期时间匹配(时间戳) - long currentTimestamp = now.toEpochSecond(java.time.ZoneOffset.of("+8")); - return matchDateTime(currentTimestamp, operatorEnum, param); - } else { - // 当日时间匹配(HH:mm:ss) - return matchTime(now.toLocalTime(), operatorEnum, param); - } - } catch (Exception e) { - log.error("[executeTimeMatching][operatorEnum({}) param({}) 时间匹配异常]", operatorEnum, param, e); - return false; - } - } - - /** - * 判断是否为日期时间操作符 - */ - private boolean isDateTimeOperator(IotSceneRuleConditionOperatorEnum operatorEnum) { - return operatorEnum == IotSceneRuleConditionOperatorEnum.DATE_TIME_GREATER_THAN || - operatorEnum == IotSceneRuleConditionOperatorEnum.DATE_TIME_LESS_THAN || - operatorEnum == IotSceneRuleConditionOperatorEnum.DATE_TIME_BETWEEN; - } - - /** - * 判断是否为时间操作符 - */ - private boolean isTimeOperator(IotSceneRuleConditionOperatorEnum operatorEnum) { - return operatorEnum == IotSceneRuleConditionOperatorEnum.TIME_GREATER_THAN || - operatorEnum == IotSceneRuleConditionOperatorEnum.TIME_LESS_THAN || - operatorEnum == IotSceneRuleConditionOperatorEnum.TIME_BETWEEN || - isDateTimeOperator(operatorEnum); - } - - /** - * 匹配日期时间(时间戳) - * 直接实现时间戳比较逻辑 - */ - private boolean matchDateTime(long currentTimestamp, IotSceneRuleConditionOperatorEnum operatorEnum, String param) { - try { - long targetTimestamp = Long.parseLong(param); - switch (operatorEnum) { - case DATE_TIME_GREATER_THAN: - return currentTimestamp > targetTimestamp; - case DATE_TIME_LESS_THAN: - return currentTimestamp < targetTimestamp; - case DATE_TIME_BETWEEN: - return matchDateTimeBetween(currentTimestamp, param); - default: - log.warn("[matchDateTime][operatorEnum({}) 不支持的日期时间操作符]", operatorEnum); - return false; - } - } catch (Exception e) { - log.error("[matchDateTime][operatorEnum({}) param({}) 日期时间匹配异常]", operatorEnum, param, e); - return false; - } - } - - /** - * 匹配日期时间区间 - */ - private boolean matchDateTimeBetween(long currentTimestamp, String param) { - List timestampRange = StrUtil.splitTrim(param, CharPool.COMMA); - if (timestampRange.size() != 2) { - log.warn("[matchDateTimeBetween][param({}) 时间戳区间参数格式错误]", param); - return false; - } - long startTimestamp = Long.parseLong(timestampRange.get(0).trim()); - long endTimestamp = Long.parseLong(timestampRange.get(1).trim()); - return currentTimestamp >= startTimestamp && currentTimestamp <= endTimestamp; - } - - /** - * 匹配当日时间(HH:mm:ss) - * 直接实现时间比较逻辑 - */ - private boolean matchTime(LocalTime currentTime, IotSceneRuleConditionOperatorEnum operatorEnum, String param) { - try { - LocalTime targetTime = parseTime(param); - switch (operatorEnum) { - case TIME_GREATER_THAN: - return currentTime.isAfter(targetTime); - case TIME_LESS_THAN: - return currentTime.isBefore(targetTime); - case TIME_BETWEEN: - return matchTimeBetween(currentTime, param); - default: - log.warn("[matchTime][operatorEnum({}) 不支持的时间操作符]", operatorEnum); - return false; - } - } catch (Exception e) { - log.error("[matchTime][][operatorEnum({}) param({}) 时间解析异常]", operatorEnum, param, e); - return false; - } - } - - /** - * 匹配时间区间 - */ - private boolean matchTimeBetween(LocalTime currentTime, String param) { - List timeRange = StrUtil.splitTrim(param, CharPool.COMMA); - if (timeRange.size() != 2) { - log.warn("[matchTimeBetween][param({}) 时间区间参数格式错误]", param); - return false; - } - LocalTime startTime = parseTime(timeRange.get(0).trim()); - LocalTime endTime = parseTime(timeRange.get(1).trim()); - return !currentTime.isBefore(startTime) && !currentTime.isAfter(endTime); - } - - /** - * 解析时间字符串 - * 支持 HH:mm 和 HH:mm:ss 两种格式 - */ - private LocalTime parseTime(String timeStr) { - Assert.isFalse(StrUtil.isBlank(timeStr), "时间字符串不能为空"); - - try { - // 尝试不同的时间格式 - if (timeStr.length() == 5) { // HH:mm - return LocalTime.parse(timeStr, TIME_FORMATTER_SHORT); - } else if (timeStr.length() == 8) { // HH:mm:ss - return LocalTime.parse(timeStr, TIME_FORMATTER); - } else { - throw new IllegalArgumentException("时间格式长度不正确,期望 HH:mm 或 HH:mm:ss 格式"); - } - } catch (Exception e) { - log.error("[parseTime][timeStr({}) 时间格式解析失败]", timeStr, e); - throw new IllegalArgumentException("时间格式无效: " + timeStr, e); - } - } - @Override public int getPriority() { return 40; // 较低优先级 diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/timer/IotTimerConditionEvaluator.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/timer/IotTimerConditionEvaluator.java new file mode 100644 index 0000000000..d8fe8183bf --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/scene/timer/IotTimerConditionEvaluator.java @@ -0,0 +1,189 @@ +package cn.iocoder.yudao.module.iot.service.rule.scene.timer; + +import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO; +import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDevicePropertyDO; +import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotSceneRuleDO; +import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleConditionOperatorEnum; +import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleConditionTypeEnum; +import cn.iocoder.yudao.module.iot.service.device.IotDeviceService; +import cn.iocoder.yudao.module.iot.service.device.property.IotDevicePropertyService; +import cn.iocoder.yudao.module.iot.service.rule.scene.IotSceneRuleTimeHelper; +import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotSceneRuleMatcherHelper; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.Map; + +/** + * IoT 定时触发器条件评估器 + *

+ * 与设备触发器不同,定时触发器没有设备消息上下文, + * 需要主动查询设备属性和状态来评估条件。 + * + * @author HUIHUI + */ +@Component +@Slf4j +public class IotTimerConditionEvaluator { + + @Resource + private IotDevicePropertyService devicePropertyService; + + @Resource + private IotDeviceService deviceService; + + /** + * 评估条件 + * + * @param condition 条件配置 + * @return 是否满足条件 + */ + public boolean evaluate(IotSceneRuleDO.TriggerCondition condition) { + // 1. 基础参数校验 + if (condition == null || condition.getType() == null) { + log.warn("[evaluate][条件为空或类型为空]"); + return false; + } + + // 2. 根据条件类型分发到具体的评估方法 + IotSceneRuleConditionTypeEnum conditionType = + IotSceneRuleConditionTypeEnum.typeOf(condition.getType()); + if (conditionType == null) { + log.warn("[evaluate][未知的条件类型: {}]", condition.getType()); + return false; + } + + switch (conditionType) { + case DEVICE_PROPERTY: + return evaluateDevicePropertyCondition(condition); + case DEVICE_STATE: + return evaluateDeviceStateCondition(condition); + case CURRENT_TIME: + return evaluateCurrentTimeCondition(condition); + default: + log.warn("[evaluate][未知的条件类型: {}]", conditionType); + return false; + } + } + + /** + * 评估设备属性条件 + * + * @param condition 条件配置 + * @return 是否满足条件 + */ + private boolean evaluateDevicePropertyCondition(IotSceneRuleDO.TriggerCondition condition) { + // 1. 校验必要参数 + if (condition.getDeviceId() == null) { + log.debug("[evaluateDevicePropertyCondition][设备ID为空]"); + return false; + } + if (StrUtil.isBlank(condition.getIdentifier())) { + log.debug("[evaluateDevicePropertyCondition][属性标识符为空]"); + return false; + } + if (!IotSceneRuleMatcherHelper.isConditionOperatorAndParamValid(condition)) { + log.debug("[evaluateDevicePropertyCondition][操作符或参数无效]"); + return false; + } + + // 2. 获取设备最新属性值 + Map properties = + devicePropertyService.getLatestDeviceProperties(condition.getDeviceId()); + if (properties == null || properties.isEmpty()) { + log.debug("[evaluateDevicePropertyCondition][设备({}) 无属性数据]", condition.getDeviceId()); + return false; + } + + // 3. 获取指定属性 + IotDevicePropertyDO property = properties.get(condition.getIdentifier()); + if (property == null || property.getValue() == null) { + log.debug("[evaluateDevicePropertyCondition][设备({}) 属性({}) 不存在或值为空]", + condition.getDeviceId(), condition.getIdentifier()); + return false; + } + + // 4. 使用现有的条件评估逻辑进行比较 + boolean matched = IotSceneRuleMatcherHelper.evaluateCondition( + property.getValue(), condition.getOperator(), condition.getParam()); + log.debug("[evaluateDevicePropertyCondition][设备({}) 属性({}) 值({}) 操作符({}) 参数({}) 匹配结果: {}]", + condition.getDeviceId(), condition.getIdentifier(), property.getValue(), + condition.getOperator(), condition.getParam(), matched); + return matched; + } + + /** + * 评估设备状态条件 + * + * @param condition 条件配置 + * @return 是否满足条件 + */ + private boolean evaluateDeviceStateCondition(IotSceneRuleDO.TriggerCondition condition) { + // 1. 校验必要参数 + if (condition.getDeviceId() == null) { + log.debug("[evaluateDeviceStateCondition][设备ID为空]"); + return false; + } + if (!IotSceneRuleMatcherHelper.isConditionOperatorAndParamValid(condition)) { + log.debug("[evaluateDeviceStateCondition][操作符或参数无效]"); + return false; + } + + // 2. 获取设备信息 + IotDeviceDO device = deviceService.getDevice(condition.getDeviceId()); + if (device == null) { + log.debug("[evaluateDeviceStateCondition][设备({}) 不存在]", condition.getDeviceId()); + return false; + } + + // 3. 获取设备状态 + Integer state = device.getState(); + if (state == null) { + log.debug("[evaluateDeviceStateCondition][设备({}) 状态为空]", condition.getDeviceId()); + return false; + } + + // 4. 比较状态 + boolean matched = IotSceneRuleMatcherHelper.evaluateCondition( + state.toString(), condition.getOperator(), condition.getParam()); + log.debug("[evaluateDeviceStateCondition][设备({}) 状态({}) 操作符({}) 参数({}) 匹配结果: {}]", + condition.getDeviceId(), state, condition.getOperator(), condition.getParam(), matched); + return matched; + } + + /** + * 评估当前时间条件 + * + * @param condition 条件配置 + * @return 是否满足条件 + */ + private boolean evaluateCurrentTimeCondition(IotSceneRuleDO.TriggerCondition condition) { + // 1. 校验必要参数 + if (!IotSceneRuleMatcherHelper.isConditionOperatorAndParamValid(condition)) { + log.debug("[evaluateCurrentTimeCondition][操作符或参数无效]"); + return false; + } + + // 2. 验证操作符是否为支持的时间操作符 + IotSceneRuleConditionOperatorEnum operatorEnum = + IotSceneRuleConditionOperatorEnum.operatorOf(condition.getOperator()); + if (operatorEnum == null) { + log.debug("[evaluateCurrentTimeCondition][无效的操作符: {}]", condition.getOperator()); + return false; + } + + if (IotSceneRuleTimeHelper.isTimeOperator(operatorEnum)) { + log.debug("[evaluateCurrentTimeCondition][不支持的时间操作符: {}]", condition.getOperator()); + return false; + } + + // 3. 执行时间匹配 + boolean matched = IotSceneRuleTimeHelper.executeTimeMatching(operatorEnum, condition.getParam()); + log.debug("[evaluateCurrentTimeCondition][操作符({}) 参数({}) 匹配结果: {}]", + condition.getOperator(), condition.getParam(), matched); + return matched; + } + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/scene/IotSceneRuleTimerConditionIntegrationTest.java b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/scene/IotSceneRuleTimerConditionIntegrationTest.java new file mode 100644 index 0000000000..75319b9c21 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/scene/IotSceneRuleTimerConditionIntegrationTest.java @@ -0,0 +1,611 @@ +package cn.iocoder.yudao.module.iot.service.rule.scene; + +import cn.hutool.core.collection.ListUtil; +import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum; +import cn.iocoder.yudao.framework.test.core.ut.BaseMockitoUnitTest; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum; +import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO; +import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDevicePropertyDO; +import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotSceneRuleDO; +import cn.iocoder.yudao.module.iot.dal.mysql.rule.IotSceneRuleMapper; +import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleConditionOperatorEnum; +import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleConditionTypeEnum; +import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleTriggerTypeEnum; +import cn.iocoder.yudao.module.iot.service.device.IotDeviceService; +import cn.iocoder.yudao.module.iot.service.device.property.IotDevicePropertyService; +import cn.iocoder.yudao.module.iot.service.rule.scene.action.IotSceneRuleAction; +import cn.iocoder.yudao.module.iot.service.rule.scene.timer.IotSceneRuleTimerHandler; +import cn.iocoder.yudao.module.iot.service.rule.scene.timer.IotTimerConditionEvaluator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.mockito.InjectMocks; +import org.mockito.Mock; + +import java.util.*; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +/** + * {@link IotSceneRuleServiceImpl} 定时触发器条件组集成测试 + *

+ * 测试定时触发器的条件组评估功能: + * - 空条件组直接执行动作 + * - 条件组评估后决定是否执行动作 + * - 条件组之间的 OR 逻辑 + * - 条件组内的 AND 逻辑 + * - 所有条件组不满足时跳过执行 + *

+ * Validates: Requirements 2.1, 2.2, 2.3, 2.4, 2.5 + * + * @author HUIHUI + */ +public class IotSceneRuleTimerConditionIntegrationTest extends BaseMockitoUnitTest { + + @InjectMocks + private IotSceneRuleServiceImpl sceneRuleService; + + @Mock + private IotSceneRuleMapper sceneRuleMapper; + + @Mock + private IotDeviceService deviceService; + + @Mock + private IotDevicePropertyService devicePropertyService; + + @Mock + private List sceneRuleActions; + + @Mock + private IotSceneRuleTimerHandler timerHandler; + + private IotTimerConditionEvaluator timerConditionEvaluator; + + // 测试常量 + private static final Long SCENE_RULE_ID = 1L; + private static final Long TENANT_ID = 1L; + private static final Long DEVICE_ID = 100L; + private static final String PROPERTY_IDENTIFIER = "temperature"; + + @BeforeEach + void setUp() { + // 创建并注入 timerConditionEvaluator 的依赖 + timerConditionEvaluator = new IotTimerConditionEvaluator(); + try { + var devicePropertyServiceField = IotTimerConditionEvaluator.class.getDeclaredField("devicePropertyService"); + devicePropertyServiceField.setAccessible(true); + devicePropertyServiceField.set(timerConditionEvaluator, devicePropertyService); + + var deviceServiceField = IotTimerConditionEvaluator.class.getDeclaredField("deviceService"); + deviceServiceField.setAccessible(true); + deviceServiceField.set(timerConditionEvaluator, deviceService); + + var evaluatorField = IotSceneRuleServiceImpl.class.getDeclaredField("timerConditionEvaluator"); + evaluatorField.setAccessible(true); + evaluatorField.set(sceneRuleService, timerConditionEvaluator); + } catch (Exception e) { + throw new RuntimeException("Failed to inject dependencies", e); + } + } + + // ========== 辅助方法 ========== + + private IotSceneRuleDO createBaseSceneRule() { + IotSceneRuleDO sceneRule = new IotSceneRuleDO(); + sceneRule.setId(SCENE_RULE_ID); + sceneRule.setTenantId(TENANT_ID); + sceneRule.setName("测试定时触发器"); + sceneRule.setStatus(CommonStatusEnum.ENABLE.getStatus()); + sceneRule.setActions(Collections.emptyList()); + return sceneRule; + } + + private IotSceneRuleDO.Trigger createTimerTrigger(String cronExpression, + List> conditionGroups) { + IotSceneRuleDO.Trigger trigger = new IotSceneRuleDO.Trigger(); + trigger.setType(IotSceneRuleTriggerTypeEnum.TIMER.getType()); + trigger.setCronExpression(cronExpression); + trigger.setConditionGroups(conditionGroups); + return trigger; + } + + private IotSceneRuleDO.TriggerCondition createDevicePropertyCondition(Long deviceId, String identifier, + String operator, String param) { + IotSceneRuleDO.TriggerCondition condition = new IotSceneRuleDO.TriggerCondition(); + condition.setType(IotSceneRuleConditionTypeEnum.DEVICE_PROPERTY.getType()); + condition.setDeviceId(deviceId); + condition.setIdentifier(identifier); + condition.setOperator(operator); + condition.setParam(param); + return condition; + } + + private IotSceneRuleDO.TriggerCondition createDeviceStateCondition(Long deviceId, String operator, String param) { + IotSceneRuleDO.TriggerCondition condition = new IotSceneRuleDO.TriggerCondition(); + condition.setType(IotSceneRuleConditionTypeEnum.DEVICE_STATE.getType()); + condition.setDeviceId(deviceId); + condition.setOperator(operator); + condition.setParam(param); + return condition; + } + + private void mockDeviceProperty(Long deviceId, String identifier, Object value) { + Map properties = new HashMap<>(); + IotDevicePropertyDO property = new IotDevicePropertyDO(); + property.setValue(value); + properties.put(identifier, property); + when(devicePropertyService.getLatestDeviceProperties(deviceId)).thenReturn(properties); + } + + private void mockDeviceState(Long deviceId, Integer state) { + IotDeviceDO device = new IotDeviceDO(); + device.setId(deviceId); + device.setState(state); + when(deviceService.getDevice(deviceId)).thenReturn(device); + } + + /** + * 创建单条件的条件组列表 + */ + private List> createSingleConditionGroups( + IotSceneRuleDO.TriggerCondition condition) { + List group = new ArrayList<>(); + group.add(condition); + List> groups = new ArrayList<>(); + groups.add(group); + return groups; + } + + /** + * 创建两个单条件组的条件组列表 + */ + private List> createTwoSingleConditionGroups( + IotSceneRuleDO.TriggerCondition cond1, IotSceneRuleDO.TriggerCondition cond2) { + List group1 = new ArrayList<>(); + group1.add(cond1); + List group2 = new ArrayList<>(); + group2.add(cond2); + List> groups = new ArrayList<>(); + groups.add(group1); + groups.add(group2); + return groups; + } + + /** + * 创建单个多条件组的条件组列表 + */ + private List> createSingleGroupWithMultipleConditions( + IotSceneRuleDO.TriggerCondition... conditions) { + List group = new ArrayList<>(Arrays.asList(conditions)); + List> groups = new ArrayList<>(); + groups.add(group); + return groups; + } + + // ========== 测试用例 ========== + + @Nested + @DisplayName("空条件组测试 - Validates: Requirement 2.1") + class EmptyConditionGroupsTest { + + @Test + @DisplayName("定时触发器无条件组时,应直接执行动作") + void testTimerTrigger_withNullConditionGroups_shouldExecuteActions() { + // 准备数据 + IotSceneRuleDO sceneRule = createBaseSceneRule(); + IotSceneRuleDO.Trigger trigger = createTimerTrigger("0 0 12 * * ?", null); + sceneRule.setTriggers(ListUtil.toList(trigger)); + + when(sceneRuleMapper.selectById(SCENE_RULE_ID)).thenReturn(sceneRule); + + assertDoesNotThrow(() -> sceneRuleService.executeSceneRuleByTimer(SCENE_RULE_ID)); + + verify(sceneRuleMapper, times(1)).selectById(SCENE_RULE_ID); + verify(devicePropertyService, never()).getLatestDeviceProperties(any()); + verify(deviceService, never()).getDevice(any()); + } + + @Test + @DisplayName("定时触发器条件组为空列表时,应直接执行动作") + void testTimerTrigger_withEmptyConditionGroups_shouldExecuteActions() { + IotSceneRuleDO sceneRule = createBaseSceneRule(); + IotSceneRuleDO.Trigger trigger = createTimerTrigger("0 0 12 * * ?", Collections.emptyList()); + sceneRule.setTriggers(ListUtil.toList(trigger)); + + when(sceneRuleMapper.selectById(SCENE_RULE_ID)).thenReturn(sceneRule); + + assertDoesNotThrow(() -> sceneRuleService.executeSceneRuleByTimer(SCENE_RULE_ID)); + + verify(sceneRuleMapper, times(1)).selectById(SCENE_RULE_ID); + verify(devicePropertyService, never()).getLatestDeviceProperties(any()); + } + } + + @Nested + @DisplayName("条件组 OR 逻辑测试 - Validates: Requirements 2.2, 2.3") + class ConditionGroupOrLogicTest { + + @Test + @DisplayName("多个条件组中第一个满足时,应执行动作") + void testMultipleConditionGroups_firstGroupMatches_shouldExecuteActions() { + IotSceneRuleDO.TriggerCondition condition1 = createDevicePropertyCondition( + DEVICE_ID, PROPERTY_IDENTIFIER, IotSceneRuleConditionOperatorEnum.GREATER_THAN.getOperator(), "20"); + IotSceneRuleDO.TriggerCondition condition2 = createDevicePropertyCondition( + DEVICE_ID + 1, PROPERTY_IDENTIFIER, IotSceneRuleConditionOperatorEnum.GREATER_THAN.getOperator(), "50"); + + List> conditionGroups = + createTwoSingleConditionGroups(condition1, condition2); + + IotSceneRuleDO sceneRule = createBaseSceneRule(); + IotSceneRuleDO.Trigger trigger = createTimerTrigger("0 0 12 * * ?", conditionGroups); + sceneRule.setTriggers(ListUtil.toList(trigger)); + + mockDeviceProperty(DEVICE_ID, PROPERTY_IDENTIFIER, 30); + mockDeviceProperty(DEVICE_ID + 1, PROPERTY_IDENTIFIER, 30); + when(sceneRuleMapper.selectById(SCENE_RULE_ID)).thenReturn(sceneRule); + + assertDoesNotThrow(() -> sceneRuleService.executeSceneRuleByTimer(SCENE_RULE_ID)); + + verify(devicePropertyService, atLeastOnce()).getLatestDeviceProperties(DEVICE_ID); + } + + @Test + @DisplayName("多个条件组中第二个满足时,应执行动作") + void testMultipleConditionGroups_secondGroupMatches_shouldExecuteActions() { + IotSceneRuleDO.TriggerCondition condition1 = createDevicePropertyCondition( + DEVICE_ID, PROPERTY_IDENTIFIER, IotSceneRuleConditionOperatorEnum.GREATER_THAN.getOperator(), "50"); + IotSceneRuleDO.TriggerCondition condition2 = createDevicePropertyCondition( + DEVICE_ID + 1, PROPERTY_IDENTIFIER, IotSceneRuleConditionOperatorEnum.GREATER_THAN.getOperator(), "20"); + + List> conditionGroups = + createTwoSingleConditionGroups(condition1, condition2); + + IotSceneRuleDO sceneRule = createBaseSceneRule(); + IotSceneRuleDO.Trigger trigger = createTimerTrigger("0 0 12 * * ?", conditionGroups); + sceneRule.setTriggers(ListUtil.toList(trigger)); + + mockDeviceProperty(DEVICE_ID, PROPERTY_IDENTIFIER, 30); + mockDeviceProperty(DEVICE_ID + 1, PROPERTY_IDENTIFIER, 30); + when(sceneRuleMapper.selectById(SCENE_RULE_ID)).thenReturn(sceneRule); + + assertDoesNotThrow(() -> sceneRuleService.executeSceneRuleByTimer(SCENE_RULE_ID)); + + verify(devicePropertyService, atLeastOnce()).getLatestDeviceProperties(DEVICE_ID); + verify(devicePropertyService, atLeastOnce()).getLatestDeviceProperties(DEVICE_ID + 1); + } + } + + @Nested + @DisplayName("条件组内 AND 逻辑测试 - Validates: Requirement 2.4") + class ConditionGroupAndLogicTest { + + @Test + @DisplayName("条件组内所有条件都满足时,该组应匹配成功") + void testSingleConditionGroup_allConditionsMatch_shouldPass() { + IotSceneRuleDO.TriggerCondition condition1 = createDevicePropertyCondition( + DEVICE_ID, "temperature", IotSceneRuleConditionOperatorEnum.GREATER_THAN.getOperator(), "20"); + IotSceneRuleDO.TriggerCondition condition2 = createDevicePropertyCondition( + DEVICE_ID, "humidity", IotSceneRuleConditionOperatorEnum.LESS_THAN.getOperator(), "80"); + + List> conditionGroups = + createSingleGroupWithMultipleConditions(condition1, condition2); + + IotSceneRuleDO sceneRule = createBaseSceneRule(); + IotSceneRuleDO.Trigger trigger = createTimerTrigger("0 0 12 * * ?", conditionGroups); + sceneRule.setTriggers(ListUtil.toList(trigger)); + + Map properties = new HashMap<>(); + IotDevicePropertyDO tempProperty = new IotDevicePropertyDO(); + tempProperty.setValue(30); + properties.put("temperature", tempProperty); + IotDevicePropertyDO humidityProperty = new IotDevicePropertyDO(); + humidityProperty.setValue(60); + properties.put("humidity", humidityProperty); + when(devicePropertyService.getLatestDeviceProperties(DEVICE_ID)).thenReturn(properties); + when(sceneRuleMapper.selectById(SCENE_RULE_ID)).thenReturn(sceneRule); + + assertDoesNotThrow(() -> sceneRuleService.executeSceneRuleByTimer(SCENE_RULE_ID)); + + verify(devicePropertyService, atLeastOnce()).getLatestDeviceProperties(DEVICE_ID); + } + + @Test + @DisplayName("条件组内有一个条件不满足时,该组应匹配失败") + void testSingleConditionGroup_oneConditionFails_shouldFail() { + IotSceneRuleDO.TriggerCondition condition1 = createDevicePropertyCondition( + DEVICE_ID, "temperature", IotSceneRuleConditionOperatorEnum.GREATER_THAN.getOperator(), "20"); + IotSceneRuleDO.TriggerCondition condition2 = createDevicePropertyCondition( + DEVICE_ID, "humidity", IotSceneRuleConditionOperatorEnum.LESS_THAN.getOperator(), "50"); + + List> conditionGroups = + createSingleGroupWithMultipleConditions(condition1, condition2); + + IotSceneRuleDO sceneRule = createBaseSceneRule(); + IotSceneRuleDO.Trigger trigger = createTimerTrigger("0 0 12 * * ?", conditionGroups); + sceneRule.setTriggers(ListUtil.toList(trigger)); + + Map properties = new HashMap<>(); + IotDevicePropertyDO tempProperty = new IotDevicePropertyDO(); + tempProperty.setValue(30); + properties.put("temperature", tempProperty); + IotDevicePropertyDO humidityProperty = new IotDevicePropertyDO(); + humidityProperty.setValue(60); // 不满足 < 50 + properties.put("humidity", humidityProperty); + when(devicePropertyService.getLatestDeviceProperties(DEVICE_ID)).thenReturn(properties); + when(sceneRuleMapper.selectById(SCENE_RULE_ID)).thenReturn(sceneRule); + + assertDoesNotThrow(() -> sceneRuleService.executeSceneRuleByTimer(SCENE_RULE_ID)); + + verify(devicePropertyService, atLeastOnce()).getLatestDeviceProperties(DEVICE_ID); + } + } + + @Nested + @DisplayName("所有条件组不满足测试 - Validates: Requirement 2.5") + class AllConditionGroupsFailTest { + + @Test + @DisplayName("所有条件组都不满足时,应跳过动作执行") + void testAllConditionGroups_allFail_shouldSkipExecution() { + IotSceneRuleDO.TriggerCondition condition1 = createDevicePropertyCondition( + DEVICE_ID, PROPERTY_IDENTIFIER, IotSceneRuleConditionOperatorEnum.GREATER_THAN.getOperator(), "50"); + IotSceneRuleDO.TriggerCondition condition2 = createDevicePropertyCondition( + DEVICE_ID + 1, PROPERTY_IDENTIFIER, IotSceneRuleConditionOperatorEnum.GREATER_THAN.getOperator(), "50"); + + List> conditionGroups = + createTwoSingleConditionGroups(condition1, condition2); + + IotSceneRuleDO sceneRule = createBaseSceneRule(); + IotSceneRuleDO.Trigger trigger = createTimerTrigger("0 0 12 * * ?", conditionGroups); + sceneRule.setTriggers(ListUtil.toList(trigger)); + + mockDeviceProperty(DEVICE_ID, PROPERTY_IDENTIFIER, 30); + mockDeviceProperty(DEVICE_ID + 1, PROPERTY_IDENTIFIER, 30); + when(sceneRuleMapper.selectById(SCENE_RULE_ID)).thenReturn(sceneRule); + + assertDoesNotThrow(() -> sceneRuleService.executeSceneRuleByTimer(SCENE_RULE_ID)); + + verify(devicePropertyService, atLeastOnce()).getLatestDeviceProperties(DEVICE_ID); + verify(devicePropertyService, atLeastOnce()).getLatestDeviceProperties(DEVICE_ID + 1); + } + } + + @Nested + @DisplayName("设备状态条件测试 - Validates: Requirements 4.1, 4.2") + class DeviceStateConditionTest { + + @Test + @DisplayName("设备在线状态条件满足时,应匹配成功") + void testDeviceStateCondition_online_shouldMatch() { + IotSceneRuleDO.TriggerCondition condition = createDeviceStateCondition( + DEVICE_ID, IotSceneRuleConditionOperatorEnum.EQUALS.getOperator(), + String.valueOf(IotDeviceStateEnum.ONLINE.getState())); + + List> conditionGroups = createSingleConditionGroups(condition); + + IotSceneRuleDO sceneRule = createBaseSceneRule(); + IotSceneRuleDO.Trigger trigger = createTimerTrigger("0 0 12 * * ?", conditionGroups); + sceneRule.setTriggers(ListUtil.toList(trigger)); + + mockDeviceState(DEVICE_ID, IotDeviceStateEnum.ONLINE.getState()); + when(sceneRuleMapper.selectById(SCENE_RULE_ID)).thenReturn(sceneRule); + + assertDoesNotThrow(() -> sceneRuleService.executeSceneRuleByTimer(SCENE_RULE_ID)); + + verify(deviceService, atLeastOnce()).getDevice(DEVICE_ID); + } + + @Test + @DisplayName("设备不存在时,条件应不匹配") + void testDeviceStateCondition_deviceNotExists_shouldNotMatch() { + IotSceneRuleDO.TriggerCondition condition = createDeviceStateCondition( + DEVICE_ID, IotSceneRuleConditionOperatorEnum.EQUALS.getOperator(), + String.valueOf(IotDeviceStateEnum.ONLINE.getState())); + + List> conditionGroups = createSingleConditionGroups(condition); + + IotSceneRuleDO sceneRule = createBaseSceneRule(); + IotSceneRuleDO.Trigger trigger = createTimerTrigger("0 0 12 * * ?", conditionGroups); + sceneRule.setTriggers(ListUtil.toList(trigger)); + + when(deviceService.getDevice(DEVICE_ID)).thenReturn(null); + when(sceneRuleMapper.selectById(SCENE_RULE_ID)).thenReturn(sceneRule); + + assertDoesNotThrow(() -> sceneRuleService.executeSceneRuleByTimer(SCENE_RULE_ID)); + + verify(deviceService, atLeastOnce()).getDevice(DEVICE_ID); + } + } + + @Nested + @DisplayName("设备属性条件测试 - Validates: Requirements 3.1, 3.2, 3.3") + class DevicePropertyConditionTest { + + @Test + @DisplayName("设备属性条件满足时,应匹配成功") + void testDevicePropertyCondition_match_shouldPass() { + IotSceneRuleDO.TriggerCondition condition = createDevicePropertyCondition( + DEVICE_ID, PROPERTY_IDENTIFIER, IotSceneRuleConditionOperatorEnum.GREATER_THAN.getOperator(), "25"); + + List> conditionGroups = createSingleConditionGroups(condition); + + IotSceneRuleDO sceneRule = createBaseSceneRule(); + IotSceneRuleDO.Trigger trigger = createTimerTrigger("0 0 12 * * ?", conditionGroups); + sceneRule.setTriggers(ListUtil.toList(trigger)); + + mockDeviceProperty(DEVICE_ID, PROPERTY_IDENTIFIER, 30); + when(sceneRuleMapper.selectById(SCENE_RULE_ID)).thenReturn(sceneRule); + + assertDoesNotThrow(() -> sceneRuleService.executeSceneRuleByTimer(SCENE_RULE_ID)); + + verify(devicePropertyService, atLeastOnce()).getLatestDeviceProperties(DEVICE_ID); + } + + @Test + @DisplayName("设备属性不存在时,条件应不匹配") + void testDevicePropertyCondition_propertyNotExists_shouldNotMatch() { + IotSceneRuleDO.TriggerCondition condition = createDevicePropertyCondition( + DEVICE_ID, "nonexistent", IotSceneRuleConditionOperatorEnum.GREATER_THAN.getOperator(), "25"); + + List> conditionGroups = createSingleConditionGroups(condition); + + IotSceneRuleDO sceneRule = createBaseSceneRule(); + IotSceneRuleDO.Trigger trigger = createTimerTrigger("0 0 12 * * ?", conditionGroups); + sceneRule.setTriggers(ListUtil.toList(trigger)); + + when(devicePropertyService.getLatestDeviceProperties(DEVICE_ID)).thenReturn(Collections.emptyMap()); + when(sceneRuleMapper.selectById(SCENE_RULE_ID)).thenReturn(sceneRule); + + assertDoesNotThrow(() -> sceneRuleService.executeSceneRuleByTimer(SCENE_RULE_ID)); + + verify(devicePropertyService, atLeastOnce()).getLatestDeviceProperties(DEVICE_ID); + } + + @Test + @DisplayName("设备属性等于条件测试") + void testDevicePropertyCondition_equals_shouldMatch() { + IotSceneRuleDO.TriggerCondition condition = createDevicePropertyCondition( + DEVICE_ID, PROPERTY_IDENTIFIER, IotSceneRuleConditionOperatorEnum.EQUALS.getOperator(), "30"); + + List> conditionGroups = createSingleConditionGroups(condition); + + IotSceneRuleDO sceneRule = createBaseSceneRule(); + IotSceneRuleDO.Trigger trigger = createTimerTrigger("0 0 12 * * ?", conditionGroups); + sceneRule.setTriggers(ListUtil.toList(trigger)); + + mockDeviceProperty(DEVICE_ID, PROPERTY_IDENTIFIER, 30); + when(sceneRuleMapper.selectById(SCENE_RULE_ID)).thenReturn(sceneRule); + + assertDoesNotThrow(() -> sceneRuleService.executeSceneRuleByTimer(SCENE_RULE_ID)); + + verify(devicePropertyService, atLeastOnce()).getLatestDeviceProperties(DEVICE_ID); + } + } + + @Nested + @DisplayName("场景规则状态测试") + class SceneRuleStatusTest { + + @Test + @DisplayName("场景规则不存在时,应直接返回") + void testSceneRule_notExists_shouldReturn() { + when(sceneRuleMapper.selectById(SCENE_RULE_ID)).thenReturn(null); + + assertDoesNotThrow(() -> sceneRuleService.executeSceneRuleByTimer(SCENE_RULE_ID)); + + verify(devicePropertyService, never()).getLatestDeviceProperties(any()); + } + + @Test + @DisplayName("场景规则已禁用时,应直接返回") + void testSceneRule_disabled_shouldReturn() { + IotSceneRuleDO sceneRule = createBaseSceneRule(); + sceneRule.setStatus(CommonStatusEnum.DISABLE.getStatus()); + + when(sceneRuleMapper.selectById(SCENE_RULE_ID)).thenReturn(sceneRule); + + assertDoesNotThrow(() -> sceneRuleService.executeSceneRuleByTimer(SCENE_RULE_ID)); + + verify(devicePropertyService, never()).getLatestDeviceProperties(any()); + } + + @Test + @DisplayName("场景规则无定时触发器时,应直接返回") + void testSceneRule_noTimerTrigger_shouldReturn() { + IotSceneRuleDO sceneRule = createBaseSceneRule(); + IotSceneRuleDO.Trigger deviceTrigger = new IotSceneRuleDO.Trigger(); + deviceTrigger.setType(IotSceneRuleTriggerTypeEnum.DEVICE_PROPERTY_POST.getType()); + sceneRule.setTriggers(ListUtil.toList(deviceTrigger)); + + when(sceneRuleMapper.selectById(SCENE_RULE_ID)).thenReturn(sceneRule); + + assertDoesNotThrow(() -> sceneRuleService.executeSceneRuleByTimer(SCENE_RULE_ID)); + + verify(devicePropertyService, never()).getLatestDeviceProperties(any()); + } + } + + @Nested + @DisplayName("复杂条件组合测试") + class ComplexConditionCombinationTest { + + @Test + @DisplayName("混合条件类型测试:设备属性 + 设备状态") + void testMixedConditionTypes_propertyAndState() { + IotSceneRuleDO.TriggerCondition propertyCondition = createDevicePropertyCondition( + DEVICE_ID, PROPERTY_IDENTIFIER, IotSceneRuleConditionOperatorEnum.GREATER_THAN.getOperator(), "20"); + IotSceneRuleDO.TriggerCondition stateCondition = createDeviceStateCondition( + DEVICE_ID, IotSceneRuleConditionOperatorEnum.EQUALS.getOperator(), + String.valueOf(IotDeviceStateEnum.ONLINE.getState())); + + List> conditionGroups = + createSingleGroupWithMultipleConditions(propertyCondition, stateCondition); + + IotSceneRuleDO sceneRule = createBaseSceneRule(); + IotSceneRuleDO.Trigger trigger = createTimerTrigger("0 0 12 * * ?", conditionGroups); + sceneRule.setTriggers(ListUtil.toList(trigger)); + + mockDeviceProperty(DEVICE_ID, PROPERTY_IDENTIFIER, 30); + mockDeviceState(DEVICE_ID, IotDeviceStateEnum.ONLINE.getState()); + when(sceneRuleMapper.selectById(SCENE_RULE_ID)).thenReturn(sceneRule); + + assertDoesNotThrow(() -> sceneRuleService.executeSceneRuleByTimer(SCENE_RULE_ID)); + + verify(devicePropertyService, atLeastOnce()).getLatestDeviceProperties(DEVICE_ID); + verify(deviceService, atLeastOnce()).getDevice(DEVICE_ID); + } + + @Test + @DisplayName("多条件组 OR 逻辑 + 组内 AND 逻辑综合测试") + void testComplexOrAndLogic() { + // 条件组1:温度 > 30 AND 湿度 < 50(不满足) + // 条件组2:温度 > 20 AND 设备在线(满足) + IotSceneRuleDO.TriggerCondition group1Cond1 = createDevicePropertyCondition( + DEVICE_ID, "temperature", IotSceneRuleConditionOperatorEnum.GREATER_THAN.getOperator(), "30"); + IotSceneRuleDO.TriggerCondition group1Cond2 = createDevicePropertyCondition( + DEVICE_ID, "humidity", IotSceneRuleConditionOperatorEnum.LESS_THAN.getOperator(), "50"); + + IotSceneRuleDO.TriggerCondition group2Cond1 = createDevicePropertyCondition( + DEVICE_ID, "temperature", IotSceneRuleConditionOperatorEnum.GREATER_THAN.getOperator(), "20"); + IotSceneRuleDO.TriggerCondition group2Cond2 = createDeviceStateCondition( + DEVICE_ID, IotSceneRuleConditionOperatorEnum.EQUALS.getOperator(), + String.valueOf(IotDeviceStateEnum.ONLINE.getState())); + + // 创建两个条件组 + List group1 = new ArrayList<>(); + group1.add(group1Cond1); + group1.add(group1Cond2); + List group2 = new ArrayList<>(); + group2.add(group2Cond1); + group2.add(group2Cond2); + List> conditionGroups = new ArrayList<>(); + conditionGroups.add(group1); + conditionGroups.add(group2); + + IotSceneRuleDO sceneRule = createBaseSceneRule(); + IotSceneRuleDO.Trigger trigger = createTimerTrigger("0 0 12 * * ?", conditionGroups); + sceneRule.setTriggers(ListUtil.toList(trigger)); + + // Mock:温度 25,湿度 60,设备在线 + Map properties = new HashMap<>(); + IotDevicePropertyDO tempProperty = new IotDevicePropertyDO(); + tempProperty.setValue(25); + properties.put("temperature", tempProperty); + IotDevicePropertyDO humidityProperty = new IotDevicePropertyDO(); + humidityProperty.setValue(60); + properties.put("humidity", humidityProperty); + when(devicePropertyService.getLatestDeviceProperties(DEVICE_ID)).thenReturn(properties); + mockDeviceState(DEVICE_ID, IotDeviceStateEnum.ONLINE.getState()); + when(sceneRuleMapper.selectById(SCENE_RULE_ID)).thenReturn(sceneRule); + + assertDoesNotThrow(() -> sceneRuleService.executeSceneRuleByTimer(SCENE_RULE_ID)); + + verify(devicePropertyService, atLeastOnce()).getLatestDeviceProperties(DEVICE_ID); + } + } + +} From 7ec541e5bbb07c3a08fd78ce5b3af6ec056d1ec2 Mon Sep 17 00:00:00 2001 From: puhui999 Date: Sun, 25 Jan 2026 18:24:43 +0800 Subject: [PATCH 6/6] =?UTF-8?q?perf(iot):=E3=80=90=E5=9C=BA=E6=99=AF?= =?UTF-8?q?=E8=81=94=E5=8A=A8=E3=80=91WebSocket=20=E9=87=8D=E8=BF=9E?= =?UTF-8?q?=E9=94=81=E4=BB=8E=20Redisson=20=E5=88=86=E5=B8=83=E5=BC=8F?= =?UTF-8?q?=E9=94=81=E6=94=B9=E4=B8=BA=20ReentrantLock=20=E5=8D=95?= =?UTF-8?q?=E6=9C=BA=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- yudao-dependencies/pom.xml | 1 - .../redis/rule/IotWebSocketLockRedisDAO.java | 67 ------------------- .../action/IotWebSocketDataRuleAction.java | 44 ++++++++---- 3 files changed, 32 insertions(+), 80 deletions(-) delete mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/rule/IotWebSocketLockRedisDAO.java diff --git a/yudao-dependencies/pom.xml b/yudao-dependencies/pom.xml index 3ff1534cee..ae72b994ad 100644 --- a/yudao-dependencies/pom.xml +++ b/yudao-dependencies/pom.xml @@ -76,7 +76,6 @@ 2.3.0 4.7.9-20251224.161447 4.40.607.ALL - 4.12.0 diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/rule/IotWebSocketLockRedisDAO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/rule/IotWebSocketLockRedisDAO.java deleted file mode 100644 index d50dc548af..0000000000 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/rule/IotWebSocketLockRedisDAO.java +++ /dev/null @@ -1,67 +0,0 @@ -package cn.iocoder.yudao.module.iot.dal.redis.rule; - -import jakarta.annotation.Resource; -import org.redisson.api.RLock; -import org.redisson.api.RedissonClient; -import org.springframework.stereotype.Repository; - -import java.util.concurrent.TimeUnit; - -import static cn.iocoder.yudao.module.iot.dal.redis.RedisKeyConstants.WEBSOCKET_CONNECT_LOCK; - -/** - * IoT WebSocket 连接锁 Redis DAO - *

- * 用于保证 WebSocket 重连操作的线程安全,避免多线程同时重连导致的资源竞争 - * - * @author HUIHUI - */ -@Repository -public class IotWebSocketLockRedisDAO { - - /** - * 锁等待超时时间(毫秒) - */ - private static final long LOCK_WAIT_TIME_MS = 5000; - - /** - * 锁持有超时时间(毫秒) - */ - private static final long LOCK_LEASE_TIME_MS = 10000; - - @Resource - private RedissonClient redissonClient; - - /** - * 在分布式锁保护下执行操作 - * - * @param serverUrl WebSocket 服务器地址 - * @param runnable 需要执行的操作 - * @throws Exception 如果获取锁超时或执行操作时发生异常 - */ - public void lock(String serverUrl, Runnable runnable) throws Exception { - String lockKey = formatKey(serverUrl); - RLock lock = redissonClient.getLock(lockKey); - - try { - // 尝试获取分布式锁 - boolean acquired = lock.tryLock(LOCK_WAIT_TIME_MS, LOCK_LEASE_TIME_MS, TimeUnit.MILLISECONDS); - if (!acquired) { - throw new RuntimeException("获取 WebSocket 连接锁超时,服务器: " + serverUrl); - } - - // 执行操作 - runnable.run(); - } finally { - // 释放锁 - if (lock.isHeldByCurrentThread()) { - lock.unlock(); - } - } - } - - private static String formatKey(String serverUrl) { - return String.format(WEBSOCKET_CONNECT_LOCK, serverUrl); - } - -} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java index 651562987a..ebfe1f8c10 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java @@ -3,13 +3,15 @@ package cn.iocoder.yudao.module.iot.service.rule.data.action; import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkWebSocketConfig; -import cn.iocoder.yudao.module.iot.dal.redis.rule.IotWebSocketLockRedisDAO; import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum; import cn.iocoder.yudao.module.iot.service.rule.data.action.websocket.IotWebSocketClient; -import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + /** * WebSocket 的 {@link IotDataRuleAction} 实现类 *

@@ -24,8 +26,17 @@ import org.springframework.stereotype.Component; public class IotWebSocketDataRuleAction extends IotDataRuleCacheableAction { - @Resource - private IotWebSocketLockRedisDAO webSocketLockRedisDAO; + /** + * 锁等待超时时间(毫秒) + */ + private static final long LOCK_WAIT_TIME_MS = 5000; + + /** + * 重连锁,key 为 WebSocket 服务器地址 + *

+ * WebSocket 连接是与特定服务器实例绑定的,使用单机锁即可保证重连的线程安全 + */ + private final ConcurrentHashMap reconnectLocks = new ConcurrentHashMap<>(); @Override public Integer getType() { @@ -87,23 +98,32 @@ public class IotWebSocketDataRuleAction extends } /** - * 使用分布式锁进行重连 + * 使用锁进行重连,保证同一服务器地址的重连操作线程安全 * * @param webSocketClient WebSocket 客户端 * @param config 配置信息 */ private void reconnectWithLock(IotWebSocketClient webSocketClient, IotDataSinkWebSocketConfig config) throws Exception { - webSocketLockRedisDAO.lock(config.getServerUrl(), () -> { + ReentrantLock lock = reconnectLocks.computeIfAbsent(config.getServerUrl(), k -> new ReentrantLock()); + boolean acquired = false; + try { + acquired = lock.tryLock(LOCK_WAIT_TIME_MS, TimeUnit.MILLISECONDS); + if (!acquired) { + throw new RuntimeException("获取 WebSocket 重连锁超时,服务器: " + config.getServerUrl()); + } // 双重检查:获取锁后再次检查连接状态,避免重复连接 if (!webSocketClient.isConnected()) { log.warn("[reconnectWithLock][WebSocket 连接已断开,尝试重新连接,服务器: {}]", config.getServerUrl()); - try { - webSocketClient.connect(); - } catch (Exception e) { - throw new RuntimeException("WebSocket 重连失败,服务器: " + config.getServerUrl(), e); - } + webSocketClient.connect(); } - }); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("获取 WebSocket 重连锁被中断,服务器: " + config.getServerUrl(), e); + } finally { + if (acquired && lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } } }