diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/RedisKeyConstants.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/RedisKeyConstants.java
index c8041a673c..95d210252f 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/RedisKeyConstants.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/RedisKeyConstants.java
@@ -84,4 +84,12 @@ public interface RedisKeyConstants {
*/
String SCENE_RULE_LIST = "iot:scene_rule_list";
+ /**
+ * WebSocket 连接分布式锁
+ *
+ * KEY 格式:websocket_connect_lock:${serverUrl}
+ * 用于保证 WebSocket 重连操作的线程安全
+ */
+ String WEBSOCKET_CONNECT_LOCK = "iot:websocket_connect_lock:%s";
+
}
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/rule/IotWebSocketLockRedisDAO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/rule/IotWebSocketLockRedisDAO.java
new file mode 100644
index 0000000000..d50dc548af
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/rule/IotWebSocketLockRedisDAO.java
@@ -0,0 +1,67 @@
+package cn.iocoder.yudao.module.iot.dal.redis.rule;
+
+import jakarta.annotation.Resource;
+import org.redisson.api.RLock;
+import org.redisson.api.RedissonClient;
+import org.springframework.stereotype.Repository;
+
+import java.util.concurrent.TimeUnit;
+
+import static cn.iocoder.yudao.module.iot.dal.redis.RedisKeyConstants.WEBSOCKET_CONNECT_LOCK;
+
+/**
+ * IoT WebSocket 连接锁 Redis DAO
+ *
+ * 用于保证 WebSocket 重连操作的线程安全,避免多线程同时重连导致的资源竞争
+ *
+ * @author HUIHUI
+ */
+@Repository
+public class IotWebSocketLockRedisDAO {
+
+ /**
+ * 锁等待超时时间(毫秒)
+ */
+ private static final long LOCK_WAIT_TIME_MS = 5000;
+
+ /**
+ * 锁持有超时时间(毫秒)
+ */
+ private static final long LOCK_LEASE_TIME_MS = 10000;
+
+ @Resource
+ private RedissonClient redissonClient;
+
+ /**
+ * 在分布式锁保护下执行操作
+ *
+ * @param serverUrl WebSocket 服务器地址
+ * @param runnable 需要执行的操作
+ * @throws Exception 如果获取锁超时或执行操作时发生异常
+ */
+ public void lock(String serverUrl, Runnable runnable) throws Exception {
+ String lockKey = formatKey(serverUrl);
+ RLock lock = redissonClient.getLock(lockKey);
+
+ try {
+ // 尝试获取分布式锁
+ boolean acquired = lock.tryLock(LOCK_WAIT_TIME_MS, LOCK_LEASE_TIME_MS, TimeUnit.MILLISECONDS);
+ if (!acquired) {
+ throw new RuntimeException("获取 WebSocket 连接锁超时,服务器: " + serverUrl);
+ }
+
+ // 执行操作
+ runnable.run();
+ } finally {
+ // 释放锁
+ if (lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ }
+ }
+ }
+
+ private static String formatKey(String serverUrl) {
+ return String.format(WEBSOCKET_CONNECT_LOCK, serverUrl);
+ }
+
+}
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java
index c0445df906..651562987a 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/IotWebSocketDataRuleAction.java
@@ -3,8 +3,10 @@ package cn.iocoder.yudao.module.iot.service.rule.data.action;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkWebSocketConfig;
+import cn.iocoder.yudao.module.iot.dal.redis.rule.IotWebSocketLockRedisDAO;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
import cn.iocoder.yudao.module.iot.service.rule.data.action.websocket.IotWebSocketClient;
+import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -22,6 +24,9 @@ import org.springframework.stereotype.Component;
public class IotWebSocketDataRuleAction extends
IotDataRuleCacheableAction {
+ @Resource
+ private IotWebSocketLockRedisDAO webSocketLockRedisDAO;
+
@Override
public Integer getType() {
return IotDataSinkTypeEnum.WEBSOCKET.getType();
@@ -62,12 +67,11 @@ public class IotWebSocketDataRuleAction extends
protected void execute(IotDeviceMessage message, IotDataSinkWebSocketConfig config) throws Exception {
try {
// 1.1 获取或创建 WebSocket 客户端
- // TODO @puhui999:需要加锁,保证必须连接上;
IotWebSocketClient webSocketClient = getProducer(config);
- // 1.2 检查连接状态,如果断开则重新连接
+
+ // 1.2 检查连接状态,如果断开则使用分布式锁保证重连的线程安全
if (!webSocketClient.isConnected()) {
- log.warn("[execute][WebSocket 连接已断开,尝试重新连接,服务器: {}]", config.getServerUrl());
- webSocketClient.connect();
+ reconnectWithLock(webSocketClient, config);
}
// 2.1 发送消息
@@ -82,4 +86,24 @@ public class IotWebSocketDataRuleAction extends
}
}
+ /**
+ * 使用分布式锁进行重连
+ *
+ * @param webSocketClient WebSocket 客户端
+ * @param config 配置信息
+ */
+ private void reconnectWithLock(IotWebSocketClient webSocketClient, IotDataSinkWebSocketConfig config) throws Exception {
+ webSocketLockRedisDAO.lock(config.getServerUrl(), () -> {
+ // 双重检查:获取锁后再次检查连接状态,避免重复连接
+ if (!webSocketClient.isConnected()) {
+ log.warn("[reconnectWithLock][WebSocket 连接已断开,尝试重新连接,服务器: {}]", config.getServerUrl());
+ try {
+ webSocketClient.connect();
+ } catch (Exception e) {
+ throw new RuntimeException("WebSocket 重连失败,服务器: " + config.getServerUrl(), e);
+ }
+ }
+ });
+ }
+
}
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java
index 2f55d6ee74..e898f61cb8 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClient.java
@@ -4,13 +4,9 @@ import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkWebSocketConfig;
import lombok.extern.slf4j.Slf4j;
+import okhttp3.*;
-import java.net.URI;
-import java.net.http.HttpClient;
-import java.net.http.WebSocket;
-import java.time.Duration;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -19,21 +15,23 @@ import java.util.concurrent.atomic.AtomicBoolean;
*
* 负责与外部 WebSocket 服务器建立连接并发送设备消息
* 支持 ws:// 和 wss:// 协议,支持 JSON 和 TEXT 数据格式
- * 基于 Java 11+ 内置的 java.net.http.WebSocket 实现
+ * 基于 OkHttp WebSocket 实现,兼容 JDK 8+
+ *
+ * 注意:该类的线程安全由调用方(IotWebSocketDataRuleAction)通过分布式锁保证
*
* @author HUIHUI
*/
@Slf4j
-public class IotWebSocketClient implements WebSocket.Listener {
+public class IotWebSocketClient {
private final String serverUrl;
private final Integer connectTimeoutMs;
private final Integer sendTimeoutMs;
private final String dataFormat;
- private WebSocket webSocket;
+ private OkHttpClient okHttpClient;
+ private volatile WebSocket webSocket;
private final AtomicBoolean connected = new AtomicBoolean(false);
- private final StringBuilder messageBuffer = new StringBuilder();
public IotWebSocketClient(String serverUrl, Integer connectTimeoutMs, Integer sendTimeoutMs, String dataFormat) {
this.serverUrl = serverUrl;
@@ -44,8 +42,9 @@ public class IotWebSocketClient implements WebSocket.Listener {
/**
* 连接到 WebSocket 服务器
+ *
+ * 注意:调用方需要通过分布式锁保证并发安全
*/
- @SuppressWarnings("resource")
public void connect() throws Exception {
if (connected.get()) {
log.warn("[connect][WebSocket 客户端已经连接,无需重复连接]");
@@ -53,17 +52,32 @@ public class IotWebSocketClient implements WebSocket.Listener {
}
try {
- HttpClient httpClient = HttpClient.newBuilder()
- .connectTimeout(Duration.ofMillis(connectTimeoutMs))
+ // 创建 OkHttpClient
+ okHttpClient = new OkHttpClient.Builder()
+ .connectTimeout(connectTimeoutMs, TimeUnit.MILLISECONDS)
+ .readTimeout(sendTimeoutMs, TimeUnit.MILLISECONDS)
+ .writeTimeout(sendTimeoutMs, TimeUnit.MILLISECONDS)
.build();
- CompletableFuture future = httpClient.newWebSocketBuilder()
- .connectTimeout(Duration.ofMillis(connectTimeoutMs))
- .buildAsync(URI.create(serverUrl), this);
+ // 创建 WebSocket 请求
+ Request request = new Request.Builder()
+ .url(serverUrl)
+ .build();
+
+ // 使用 CountDownLatch 等待连接完成
+ CountDownLatch connectLatch = new CountDownLatch(1);
+ AtomicBoolean connectSuccess = new AtomicBoolean(false);
+
+ // 创建 WebSocket 连接
+ webSocket = okHttpClient.newWebSocket(request, new IotWebSocketListener(connectLatch, connectSuccess));
// 等待连接完成
- webSocket = future.get(connectTimeoutMs, TimeUnit.MILLISECONDS);
- connected.set(true);
+ boolean await = connectLatch.await(connectTimeoutMs, TimeUnit.MILLISECONDS);
+ if (!await || !connectSuccess.get()) {
+ close();
+ throw new Exception("WebSocket 连接超时或失败,服务器地址: " + serverUrl);
+ }
+
log.info("[connect][WebSocket 客户端连接成功,服务器地址: {}]", serverUrl);
} catch (Exception e) {
close();
@@ -72,36 +86,6 @@ public class IotWebSocketClient implements WebSocket.Listener {
}
}
- @Override
- public void onOpen(WebSocket webSocket) {
- log.debug("[onOpen][WebSocket 连接已打开]");
- webSocket.request(1);
- }
-
- @Override
- public CompletionStage> onText(WebSocket webSocket, CharSequence data, boolean last) {
- messageBuffer.append(data);
- if (last) {
- log.debug("[onText][收到 WebSocket 消息: {}]", messageBuffer);
- messageBuffer.setLength(0);
- }
- webSocket.request(1);
- return null;
- }
-
- @Override
- public CompletionStage> onClose(WebSocket webSocket, int statusCode, String reason) {
- connected.set(false);
- log.info("[onClose][WebSocket 连接已关闭,状态码: {},原因: {}]", statusCode, reason);
- return null;
- }
-
- @Override
- public void onError(WebSocket webSocket, Throwable error) {
- connected.set(false);
- log.error("[onError][WebSocket 发生错误]", error);
- }
-
/**
* 发送设备消息
*
@@ -109,7 +93,8 @@ public class IotWebSocketClient implements WebSocket.Listener {
* @throws Exception 发送异常
*/
public void sendMessage(IotDeviceMessage message) throws Exception {
- if (!connected.get() || webSocket == null) {
+ WebSocket ws = this.webSocket;
+ if (!connected.get() || ws == null) {
throw new IllegalStateException("WebSocket 客户端未连接");
}
@@ -121,9 +106,11 @@ public class IotWebSocketClient implements WebSocket.Listener {
messageData = message.toString();
}
- // 发送消息并等待完成
- CompletableFuture future = webSocket.sendText(messageData, true);
- future.get(sendTimeoutMs, TimeUnit.MILLISECONDS);
+ // 发送消息
+ boolean success = ws.send(messageData);
+ if (!success) {
+ throw new Exception("WebSocket 发送消息失败,消息队列已满或连接已关闭");
+ }
log.debug("[sendMessage][发送消息成功,设备 ID: {},消息长度: {}]",
message.getDeviceId(), messageData.length());
} catch (Exception e) {
@@ -136,18 +123,17 @@ public class IotWebSocketClient implements WebSocket.Listener {
* 关闭连接
*/
public void close() {
- if (!connected.get() && webSocket == null) {
- return;
- }
-
try {
if (webSocket != null) {
- webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "客户端主动关闭")
- .orTimeout(5, TimeUnit.SECONDS)
- .exceptionally(e -> {
- log.warn("[close][发送关闭帧失败]", e);
- return null;
- });
+ // 发送正常关闭帧,状态码 1000 表示正常关闭
+ webSocket.close(1000, "客户端主动关闭");
+ webSocket = null;
+ }
+ if (okHttpClient != null) {
+ // 关闭连接池和调度器
+ okHttpClient.dispatcher().executorService().shutdown();
+ okHttpClient.connectionPool().evictAll();
+ okHttpClient = null;
}
connected.set(false);
log.info("[close][WebSocket 客户端连接已关闭,服务器地址: {}]", serverUrl);
@@ -174,4 +160,50 @@ public class IotWebSocketClient implements WebSocket.Listener {
'}';
}
+ /**
+ * OkHttp WebSocket 监听器
+ */
+ private class IotWebSocketListener extends WebSocketListener {
+
+ private final CountDownLatch connectLatch;
+ private final AtomicBoolean connectSuccess;
+
+ public IotWebSocketListener(CountDownLatch connectLatch, AtomicBoolean connectSuccess) {
+ this.connectLatch = connectLatch;
+ this.connectSuccess = connectSuccess;
+ }
+
+ @Override
+ public void onOpen(WebSocket webSocket, Response response) {
+ connected.set(true);
+ connectSuccess.set(true);
+ connectLatch.countDown();
+ log.info("[onOpen][WebSocket 连接已打开,服务器: {}]", serverUrl);
+ }
+
+ @Override
+ public void onMessage(WebSocket webSocket, String text) {
+ log.debug("[onMessage][收到消息: {}]", text);
+ }
+
+ @Override
+ public void onClosing(WebSocket webSocket, int code, String reason) {
+ connected.set(false);
+ log.info("[onClosing][WebSocket 正在关闭,code: {}, reason: {}]", code, reason);
+ }
+
+ @Override
+ public void onClosed(WebSocket webSocket, int code, String reason) {
+ connected.set(false);
+ log.info("[onClosed][WebSocket 已关闭,code: {}, reason: {}]", code, reason);
+ }
+
+ @Override
+ public void onFailure(WebSocket webSocket, Throwable t, Response response) {
+ connected.set(false);
+ connectLatch.countDown(); // 确保连接失败时也释放等待
+ log.error("[onFailure][WebSocket 连接失败]", t);
+ }
+ }
+
}
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClientTest.java b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClientTest.java
new file mode 100644
index 0000000000..d3568db8b9
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/data/action/websocket/IotWebSocketClientTest.java
@@ -0,0 +1,257 @@
+package cn.iocoder.yudao.module.iot.service.rule.data.action.websocket;
+
+import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
+import okhttp3.Response;
+import okhttp3.WebSocket;
+import okhttp3.WebSocketListener;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * {@link IotWebSocketClient} 的单元测试
+ *
+ * @author HUIHUI
+ */
+class IotWebSocketClientTest {
+
+ private MockWebServer mockWebServer;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ mockWebServer = new MockWebServer();
+ mockWebServer.start();
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ if (mockWebServer != null) {
+ mockWebServer.shutdown();
+ }
+ }
+
+ /**
+ * 简单的 WebSocket 监听器,用于测试
+ */
+ private static class TestWebSocketListener extends WebSocketListener {
+ @Override
+ public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) {
+ // 连接打开
+ }
+
+ @Override
+ public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
+ // 收到消息
+ }
+
+ @Override
+ public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
+ webSocket.close(code, reason);
+ }
+
+ @Override
+ public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, @Nullable Response response) {
+ // 连接失败
+ }
+ }
+
+ @Test
+ public void testConstructor_defaultValues() {
+ // 准备参数
+ String serverUrl = "ws://localhost:8080";
+
+ // 调用
+ IotWebSocketClient client = new IotWebSocketClient(serverUrl, null, null, null);
+
+ // 断言:验证默认值被正确设置
+ assertNotNull(client);
+ assertFalse(client.isConnected());
+ }
+
+ @Test
+ public void testConstructor_customValues() {
+ // 准备参数
+ String serverUrl = "ws://localhost:8080";
+ Integer connectTimeoutMs = 3000;
+ Integer sendTimeoutMs = 5000;
+ String dataFormat = "TEXT";
+
+ // 调用
+ IotWebSocketClient client = new IotWebSocketClient(serverUrl, connectTimeoutMs, sendTimeoutMs, dataFormat);
+
+ // 断言
+ assertNotNull(client);
+ assertFalse(client.isConnected());
+ }
+
+ @Test
+ public void testConnect_success() throws Exception {
+ // 准备参数:使用 MockWebServer 的 WebSocket 端点
+ String serverUrl = "ws://" + mockWebServer.getHostName() + ":" + mockWebServer.getPort();
+ IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON");
+
+ // mock:设置 MockWebServer 响应 WebSocket 升级请求
+ mockWebServer.enqueue(new MockResponse().withWebSocketUpgrade(new TestWebSocketListener()));
+
+ // 调用
+ client.connect();
+
+ // 断言
+ assertTrue(client.isConnected());
+
+ // 清理
+ client.close();
+ }
+
+ @Test
+ public void testConnect_alreadyConnected() throws Exception {
+ // 准备参数
+ String serverUrl = "ws://" + mockWebServer.getHostName() + ":" + mockWebServer.getPort();
+ IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON");
+
+ // mock
+ mockWebServer.enqueue(new MockResponse().withWebSocketUpgrade(new TestWebSocketListener()));
+
+ // 调用:第一次连接
+ client.connect();
+ assertTrue(client.isConnected());
+
+ // 调用:第二次连接(应该不会重复连接)
+ client.connect();
+ assertTrue(client.isConnected());
+
+ // 清理
+ client.close();
+ }
+
+ @Test
+ public void testSendMessage_success() throws Exception {
+ // 准备参数
+ String serverUrl = "ws://" + mockWebServer.getHostName() + ":" + mockWebServer.getPort();
+ IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON");
+
+ IotDeviceMessage message = IotDeviceMessage.builder()
+ .deviceId(123L)
+ .method("thing.property.report")
+ .params("{\"temperature\": 25.5}")
+ .build();
+
+ // mock
+ mockWebServer.enqueue(new MockResponse().withWebSocketUpgrade(new TestWebSocketListener()));
+
+ // 调用
+ client.connect();
+ client.sendMessage(message);
+
+ // 断言:消息发送成功不抛异常
+ assertTrue(client.isConnected());
+
+ // 清理
+ client.close();
+ }
+
+ @Test
+ public void testSendMessage_notConnected() {
+ // 准备参数
+ String serverUrl = "ws://localhost:8080";
+ IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON");
+
+ IotDeviceMessage message = IotDeviceMessage.builder()
+ .deviceId(123L)
+ .method("thing.property.report")
+ .params("{\"temperature\": 25.5}")
+ .build();
+
+ // 调用 & 断言:未连接时发送消息应抛出异常
+ assertThrows(IllegalStateException.class, () -> client.sendMessage(message));
+ }
+
+ @Test
+ public void testClose_success() throws Exception {
+ // 准备参数
+ String serverUrl = "ws://" + mockWebServer.getHostName() + ":" + mockWebServer.getPort();
+ IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON");
+
+ // mock
+ mockWebServer.enqueue(new MockResponse().withWebSocketUpgrade(new TestWebSocketListener()));
+
+ // 调用
+ client.connect();
+ assertTrue(client.isConnected());
+
+ client.close();
+
+ // 断言
+ assertFalse(client.isConnected());
+ }
+
+ @Test
+ public void testClose_notConnected() {
+ // 准备参数
+ String serverUrl = "ws://localhost:8080";
+ IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON");
+
+ // 调用:关闭未连接的客户端不应抛异常
+ assertDoesNotThrow(client::close);
+ assertFalse(client.isConnected());
+ }
+
+ @Test
+ public void testIsConnected_initialState() {
+ // 准备参数
+ String serverUrl = "ws://localhost:8080";
+ IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON");
+
+ // 断言:初始状态应为未连接
+ assertFalse(client.isConnected());
+ }
+
+ @Test
+ public void testToString() {
+ // 准备参数
+ String serverUrl = "ws://localhost:8080";
+ IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON");
+
+ // 调用
+ String result = client.toString();
+
+ // 断言
+ assertNotNull(result);
+ assertTrue(result.contains("serverUrl='ws://localhost:8080'"));
+ assertTrue(result.contains("dataFormat='JSON'"));
+ assertTrue(result.contains("connected=false"));
+ }
+
+ @Test
+ public void testSendMessage_textFormat() throws Exception {
+ // 准备参数
+ String serverUrl = "ws://" + mockWebServer.getHostName() + ":" + mockWebServer.getPort();
+ IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "TEXT");
+
+ IotDeviceMessage message = IotDeviceMessage.builder()
+ .deviceId(123L)
+ .method("thing.property.report")
+ .params("{\"temperature\": 25.5}")
+ .build();
+
+ // mock
+ mockWebServer.enqueue(new MockResponse().withWebSocketUpgrade(new TestWebSocketListener()));
+
+ // 调用
+ client.connect();
+ client.sendMessage(message);
+
+ // 断言:消息发送成功不抛异常
+ assertTrue(client.isConnected());
+
+ // 清理
+ client.close();
+ }
+
+}