From d2c000d64dedcb95e6bc15383db697eaf8abf1e6 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Tue, 27 Jan 2026 09:58:07 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E3=80=90iot=E3=80=91websocket=20?= =?UTF-8?q?=E5=8D=8F=E8=AE=AE=EF=BC=9A=E5=88=9D=E5=A7=8B=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/IotGatewayConfiguration.java | 36 ++ .../gateway/config/IotGatewayProperties.java | 57 +++ .../IotWebSocketDownstreamSubscriber.java | 64 +++ .../IotWebSocketUpstreamProtocol.java | 112 ++++ .../IotWebSocketConnectionManager.java | 146 ++++++ .../router/IotWebSocketDownstreamHandler.java | 61 +++ .../router/IotWebSocketUpstreamHandler.java | 482 ++++++++++++++++++ .../src/main/resources/application.yaml | 12 + ...eviceWebSocketProtocolIntegrationTest.java | 365 +++++++++++++ ...eviceWebSocketProtocolIntegrationTest.java | 356 +++++++++++++ ...eviceWebSocketProtocolIntegrationTest.java | 263 ++++++++++ 11 files changed, 1954 insertions(+) create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketDownstreamSubscriber.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketUpstreamProtocol.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/manager/IotWebSocketConnectionManager.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/router/IotWebSocketDownstreamHandler.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/router/IotWebSocketUpstreamHandler.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotDirectDeviceWebSocketProtocolIntegrationTest.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotGatewayDeviceWebSocketProtocolIntegrationTest.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotGatewaySubDeviceWebSocketProtocolIntegrationTest.java diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java index 79d978c4db..6b47d0df53 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java @@ -18,6 +18,9 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnection import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager; +import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager; import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.core.Vertx; @@ -209,4 +212,37 @@ public class IotGatewayConfiguration { } + /** + * IoT 网关 WebSocket 协议配置类 + */ + @Configuration + @ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.websocket", name = "enabled", havingValue = "true") + @Slf4j + public static class WebSocketProtocolConfiguration { + + @Bean(name = "websocketVertx", destroyMethod = "close") + public Vertx websocketVertx() { + return Vertx.vertx(); + } + + @Bean + public IotWebSocketUpstreamProtocol iotWebSocketUpstreamProtocol(IotGatewayProperties gatewayProperties, + IotDeviceService deviceService, + IotDeviceMessageService messageService, + IotWebSocketConnectionManager connectionManager, + @Qualifier("websocketVertx") Vertx websocketVertx) { + return new IotWebSocketUpstreamProtocol(gatewayProperties.getProtocol().getWebsocket(), + deviceService, messageService, connectionManager, websocketVertx); + } + + @Bean + public IotWebSocketDownstreamSubscriber iotWebSocketDownstreamSubscriber(IotWebSocketUpstreamProtocol protocolHandler, + IotDeviceMessageService messageService, + IotWebSocketConnectionManager connectionManager, + IotMessageBus messageBus) { + return new IotWebSocketDownstreamSubscriber(protocolHandler, messageService, connectionManager, messageBus); + } + + } + } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java index 3bdf6fb3e4..0b9720ad12 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java @@ -103,6 +103,11 @@ public class IotGatewayProperties { */ private CoapProperties coap; + /** + * WebSocket 组件配置 + */ + private WebSocketProperties websocket; + } @Data @@ -586,4 +591,56 @@ public class IotGatewayProperties { } + @Data + public static class WebSocketProperties { + + /** + * 是否开启 + */ + @NotNull(message = "是否开启不能为空") + private Boolean enabled; + + /** + * 服务器端口(默认:8094) + */ + private Integer port = 8094; + + /** + * WebSocket 路径(默认:/ws) + */ + @NotEmpty(message = "WebSocket 路径不能为空") + private String path = "/ws"; + + /** + * 最大消息大小(字节,默认 64KB) + */ + private Integer maxMessageSize = 65536; + + /** + * 最大帧大小(字节,默认 64KB) + */ + private Integer maxFrameSize = 65536; + + /** + * 空闲超时时间(秒,默认 60) + */ + private Integer idleTimeoutSeconds = 60; + + /** + * 是否启用 SSL(wss://) + */ + private Boolean sslEnabled = false; + + /** + * SSL 证书路径 + */ + private String sslCertPath; + + /** + * SSL 私钥路径 + */ + private String sslKeyPath; + + } + } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketDownstreamSubscriber.java new file mode 100644 index 0000000000..429d077282 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketDownstreamSubscriber.java @@ -0,0 +1,64 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.websocket; + +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.router.IotWebSocketDownstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import jakarta.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT 网关 WebSocket 下游订阅者:接收下行给设备的消息 + * + * @author 芋道源码 + */ +@Slf4j +@RequiredArgsConstructor +public class IotWebSocketDownstreamSubscriber implements IotMessageSubscriber { + + private final IotWebSocketUpstreamProtocol protocol; + + private final IotDeviceMessageService messageService; + + private final IotWebSocketConnectionManager connectionManager; + + private final IotMessageBus messageBus; + + private IotWebSocketDownstreamHandler downstreamHandler; + + @PostConstruct + public void init() { + // 初始化下游处理器 + this.downstreamHandler = new IotWebSocketDownstreamHandler(messageService, connectionManager); + + messageBus.register(this); + log.info("[init][WebSocket 下游订阅者初始化完成,服务器 ID: {},Topic: {}]", + protocol.getServerId(), getTopic()); + } + + @Override + public String getTopic() { + return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(protocol.getServerId()); + } + + @Override + public String getGroup() { + // 保证点对点消费,需要保证独立的 Group,所以使用 Topic 作为 Group + return getTopic(); + } + + @Override + public void onMessage(IotDeviceMessage message) { + try { + downstreamHandler.handle(message); + } catch (Exception e) { + log.error("[onMessage][处理下行消息失败,设备 ID: {},方法: {},消息 ID: {}]", + message.getDeviceId(), message.getMethod(), message.getId(), e); + } + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketUpstreamProtocol.java new file mode 100644 index 0000000000..4bcb0e4b2b --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotWebSocketUpstreamProtocol.java @@ -0,0 +1,112 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.websocket; + +import cn.hutool.core.util.ObjUtil; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; +import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.router.IotWebSocketUpstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.net.PemKeyCertOptions; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT 网关 WebSocket 协议:接收设备上行消息 + * + * @author 芋道源码 + */ +@Slf4j +public class IotWebSocketUpstreamProtocol { + + private final IotGatewayProperties.WebSocketProperties wsProperties; + + private final IotDeviceService deviceService; + + private final IotDeviceMessageService messageService; + + private final IotWebSocketConnectionManager connectionManager; + + private final Vertx vertx; + + @Getter + private final String serverId; + + private HttpServer httpServer; + + public IotWebSocketUpstreamProtocol(IotGatewayProperties.WebSocketProperties wsProperties, + IotDeviceService deviceService, + IotDeviceMessageService messageService, + IotWebSocketConnectionManager connectionManager, + Vertx vertx) { + this.wsProperties = wsProperties; + this.deviceService = deviceService; + this.messageService = messageService; + this.connectionManager = connectionManager; + this.vertx = vertx; + this.serverId = IotDeviceMessageUtils.generateServerId(wsProperties.getPort()); + } + + @PostConstruct + public void start() { + // 1.1 创建服务器选项 + HttpServerOptions options = new HttpServerOptions() + .setPort(wsProperties.getPort()) + .setIdleTimeout(wsProperties.getIdleTimeoutSeconds()) + .setMaxWebSocketFrameSize(wsProperties.getMaxFrameSize()) + .setMaxWebSocketMessageSize(wsProperties.getMaxMessageSize()); + // 1.2 配置 SSL(如果启用) + if (Boolean.TRUE.equals(wsProperties.getSslEnabled())) { + PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions() + .setKeyPath(wsProperties.getSslKeyPath()) + .setCertPath(wsProperties.getSslCertPath()); + options.setSsl(true).setKeyCertOptions(pemKeyCertOptions); + } + + // 2. 创建服务器并设置 WebSocket 处理器 + httpServer = vertx.createHttpServer(options); + httpServer.webSocketHandler(socket -> { + // 验证路径 + if (ObjUtil.notEqual(wsProperties.getPath(), socket.path())) { + log.warn("[webSocketHandler][WebSocket 路径不匹配,拒绝连接,路径: {},期望: {}]", + socket.path(), wsProperties.getPath()); + // TODO @AI:已经被废弃,看看换什么其他方法; + socket.reject(); + return; + } + + // 创建上行处理器 + IotWebSocketUpstreamHandler handler = new IotWebSocketUpstreamHandler( + this, messageService, deviceService, connectionManager); + handler.handle(socket); + }); + + // 3. 启动服务器 + try { + httpServer.listen().result(); + log.info("[start][IoT 网关 WebSocket 协议启动成功,端口:{},路径:{}]", + wsProperties.getPort(), wsProperties.getPath()); + } catch (Exception e) { + log.error("[start][IoT 网关 WebSocket 协议启动失败]", e); + throw e; + } + } + + @PreDestroy + public void stop() { + if (httpServer != null) { + try { + httpServer.close().result(); + log.info("[stop][IoT 网关 WebSocket 协议已停止]"); + } catch (Exception e) { + log.error("[stop][IoT 网关 WebSocket 协议停止失败]", e); + } + } + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/manager/IotWebSocketConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/manager/IotWebSocketConnectionManager.java new file mode 100644 index 0000000000..cfbcb1fe51 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/manager/IotWebSocketConnectionManager.java @@ -0,0 +1,146 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager; + +import io.vertx.core.http.ServerWebSocket; +import lombok.Data; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * IoT 网关 WebSocket 连接管理器 + *

+ * 统一管理 WebSocket 连接的认证状态、设备会话和消息发送功能: + * 1. 管理 WebSocket 连接的认证状态 + * 2. 管理设备会话和在线状态 + * 3. 管理消息发送到设备 + * + * @author 芋道源码 + */ +@Slf4j +@Component +public class IotWebSocketConnectionManager { + + /** + * 连接信息映射:ServerWebSocket -> 连接信息 + */ + private final Map connectionMap = new ConcurrentHashMap<>(); + + /** + * 设备 ID -> ServerWebSocket 的映射 + */ + private final Map deviceSocketMap = new ConcurrentHashMap<>(); + + /** + * 注册设备连接(包含认证信息) + * + * @param socket WebSocket 连接 + * @param deviceId 设备 ID + * @param connectionInfo 连接信息 + */ + public void registerConnection(ServerWebSocket socket, Long deviceId, ConnectionInfo connectionInfo) { + // 如果设备已有其他连接,先清理旧连接 + ServerWebSocket oldSocket = deviceSocketMap.get(deviceId); + if (oldSocket != null && oldSocket != socket) { + log.info("[registerConnection][设备已有其他连接,断开旧连接,设备 ID: {},旧连接: {}]", + deviceId, oldSocket.remoteAddress()); + oldSocket.close(); + // 清理旧连接的映射 + connectionMap.remove(oldSocket); + } + + connectionMap.put(socket, connectionInfo); + deviceSocketMap.put(deviceId, socket); + + log.info("[registerConnection][注册设备连接,设备 ID: {},连接: {},product key: {},device name: {}]", + deviceId, socket.remoteAddress(), connectionInfo.getProductKey(), connectionInfo.getDeviceName()); + } + + /** + * 注销设备连接 + * + * @param socket WebSocket 连接 + */ + public void unregisterConnection(ServerWebSocket socket) { + ConnectionInfo connectionInfo = connectionMap.remove(socket); + if (connectionInfo != null) { + Long deviceId = connectionInfo.getDeviceId(); + deviceSocketMap.remove(deviceId); + log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]", + deviceId, socket.remoteAddress()); + } + } + + /** + * 获取连接信息 + */ + public ConnectionInfo getConnectionInfo(ServerWebSocket socket) { + return connectionMap.get(socket); + } + + /** + * 根据设备 ID 获取连接信息 + */ + public ConnectionInfo getConnectionInfoByDeviceId(Long deviceId) { + ServerWebSocket socket = deviceSocketMap.get(deviceId); + return socket != null ? connectionMap.get(socket) : null; + } + + /** + * 发送消息到设备(文本消息) + * + * @param deviceId 设备 ID + * @param message JSON 消息 + * @return 是否发送成功 + */ + public boolean sendToDevice(Long deviceId, String message) { + ServerWebSocket socket = deviceSocketMap.get(deviceId); + if (socket == null) { + log.warn("[sendToDevice][设备未连接,设备 ID: {}]", deviceId); + return false; + } + + try { + socket.writeTextMessage(message); + log.debug("[sendToDevice][发送消息成功,设备 ID: {},数据长度: {} 字节]", deviceId, message.length()); + return true; + } catch (Exception e) { + log.error("[sendToDevice][发送消息失败,设备 ID: {}]", deviceId, e); + // 发送失败时清理连接 + unregisterConnection(socket); + return false; + } + } + + /** + * 连接信息(包含认证信息) + */ + @Data + @Accessors(chain = true) + public static class ConnectionInfo { + + /** + * 设备 ID + */ + private Long deviceId; + /** + * 产品 Key + */ + private String productKey; + /** + * 设备名称 + */ + private String deviceName; + + /** + * 客户端 ID + */ + private String clientId; + + // TODO @AI:增加有个 codecType 字段;后续可以使用,参考 tcp、udp;然后下行的时候,也基于这个 codeType 去获取; + + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/router/IotWebSocketDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/router/IotWebSocketDownstreamHandler.java new file mode 100644 index 0000000000..91310cd2a0 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/router/IotWebSocketDownstreamHandler.java @@ -0,0 +1,61 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.websocket.router; + +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.gateway.codec.websocket.IotWebSocketJsonDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.StandardCharsets; + +/** + * IoT 网关 WebSocket 下行消息处理器 + * + * @author 芋道源码 + */ +@Slf4j +@RequiredArgsConstructor +public class IotWebSocketDownstreamHandler { + + // TODO @芋艿:codeType 的处理; + private static final String CODEC_TYPE = IotWebSocketJsonDeviceMessageCodec.TYPE; + + private final IotDeviceMessageService deviceMessageService; + + private final IotWebSocketConnectionManager connectionManager; + + /** + * 处理下行消息 + */ + public void handle(IotDeviceMessage message) { + try { + log.info("[handle][处理下行消息,设备 ID: {},方法: {},消息 ID: {}]", + message.getDeviceId(), message.getMethod(), message.getId()); + + // 1. 获取连接信息 + IotWebSocketConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfoByDeviceId( + message.getDeviceId()); + if (connectionInfo == null) { + log.error("[handle][连接信息不存在,设备 ID: {}]", message.getDeviceId()); + return; + } + + // 2. 编码消息并发送到设备 + byte[] bytes = deviceMessageService.encodeDeviceMessage(message, CODEC_TYPE); + String jsonMessage = new String(bytes, StandardCharsets.UTF_8); + boolean success = connectionManager.sendToDevice(message.getDeviceId(), jsonMessage); + if (success) { + log.info("[handle][下行消息发送成功,设备 ID: {},方法: {},消息 ID: {},数据长度: {} 字节]", + message.getDeviceId(), message.getMethod(), message.getId(), bytes.length); + } else { + log.error("[handle][下行消息发送失败,设备 ID: {},方法: {},消息 ID: {}]", + message.getDeviceId(), message.getMethod(), message.getId()); + } + } catch (Exception e) { + log.error("[handle][处理下行消息失败,设备 ID: {},方法: {},消息内容: {}]", + message.getDeviceId(), message.getMethod(), message, e); + } + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/router/IotWebSocketUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/router/IotWebSocketUpstreamHandler.java new file mode 100644 index 0000000000..14d9d142d0 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/router/IotWebSocketUpstreamHandler.java @@ -0,0 +1,482 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.websocket.router; + +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.BooleanUtil; +import cn.hutool.core.util.IdUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.extra.spring.SpringUtil; +import cn.iocoder.yudao.framework.common.pojo.CommonResult; +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; +import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; +import cn.iocoder.yudao.module.iot.gateway.codec.websocket.IotWebSocketJsonDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.vertx.core.Handler; +import io.vertx.core.http.ServerWebSocket; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.StandardCharsets; + +/** + * WebSocket 上行消息处理器 + * + * @author 芋道源码 + */ +@Slf4j +public class IotWebSocketUpstreamHandler implements Handler { + + // TODO @芋艿:codeType 的处理; + private static final String CODEC_TYPE = IotWebSocketJsonDeviceMessageCodec.TYPE; + + private static final String AUTH_METHOD = "auth"; + + private final IotDeviceMessageService deviceMessageService; + + private final IotDeviceService deviceService; + + private final IotWebSocketConnectionManager connectionManager; + + private final IotDeviceCommonApi deviceApi; + + private final String serverId; + + public IotWebSocketUpstreamHandler(IotWebSocketUpstreamProtocol protocol, + IotDeviceMessageService deviceMessageService, + IotDeviceService deviceService, + IotWebSocketConnectionManager connectionManager) { + this.deviceMessageService = deviceMessageService; + this.deviceService = deviceService; + this.connectionManager = connectionManager; + this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); + this.serverId = protocol.getServerId(); + } + + @Override + public void handle(ServerWebSocket socket) { + // 1. 接受 WebSocket 连接 + String clientId = IdUtil.simpleUUID(); + log.debug("[handle][设备连接,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress()); + // TODO @AI:这个方法已经废弃,看看有没其他替换的 + socket.accept(); + + // 2.1 设置异常和关闭处理器 + socket.exceptionHandler(ex -> { + log.warn("[handle][连接异常,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress()); + cleanupConnection(socket); + }); + socket.closeHandler(v -> { + log.debug("[handle][连接关闭,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress()); + cleanupConnection(socket); + }); + + // 2.2 设置文本消息处理器 + socket.textMessageHandler(message -> { + try { + processMessage(clientId, message, socket); + } catch (Exception e) { + log.error("[handle][消息解码失败,断开连接,客户端 ID: {},地址: {},错误: {}]", + clientId, socket.remoteAddress(), e.getMessage()); + cleanupConnection(socket); + socket.close(); + } + }); + } + + /** + * 处理消息 + * + * @param clientId 客户端 ID + * @param message 消息(JSON 字符串) + * @param socket WebSocket 连接 + * @throws Exception 消息解码失败时抛出异常 + */ + private void processMessage(String clientId, String message, ServerWebSocket socket) throws Exception { + // 1.1 基础检查 + if (StrUtil.isBlank(message)) { + return; + } + // 1.2 解码消息 + // TODO @AI:应该只有初始使用 CODEC_TYPE 解析,后续基于 + IotDeviceMessage deviceMessage; + try { + deviceMessage = deviceMessageService.decodeDeviceMessage( + message.getBytes(StandardCharsets.UTF_8), CODEC_TYPE); + if (deviceMessage == null) { + throw new Exception("解码后消息为空"); + } + } catch (Exception e) { + throw new Exception("消息解码失败: " + e.getMessage(), e); + } + + // 2. 根据消息类型路由处理 + try { + if (AUTH_METHOD.equals(deviceMessage.getMethod())) { + // 认证请求 + handleAuthenticationRequest(clientId, deviceMessage, socket); + } else if (IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod().equals(deviceMessage.getMethod())) { + // 设备动态注册请求 + handleRegisterRequest(clientId, deviceMessage, socket); + } else { + // 业务消息 + handleBusinessRequest(clientId, deviceMessage, socket); + } + } catch (Exception e) { + log.error("[processMessage][处理消息失败,客户端 ID: {},消息方法: {}]", + clientId, deviceMessage.getMethod(), e); + // 发送错误响应,避免客户端一直等待 + try { + sendErrorResponse(socket, deviceMessage.getRequestId(), "消息处理失败"); + } catch (Exception responseEx) { + log.error("[processMessage][发送错误响应失败,客户端 ID: {}]", clientId, responseEx); + } + } + } + + /** + * 处理认证请求 + * + * @param clientId 客户端 ID + * @param message 消息信息 + * @param socket WebSocket 连接 + */ + private void handleAuthenticationRequest(String clientId, IotDeviceMessage message, ServerWebSocket socket) { + try { + // 1.1 解析认证参数 + IotDeviceAuthReqDTO authParams = parseAuthParams(message.getParams()); + if (authParams == null) { + log.warn("[handleAuthenticationRequest][认证参数解析失败,客户端 ID: {}]", clientId); + sendErrorResponse(socket, message.getRequestId(), "认证参数不完整"); + return; + } + // 1.2 执行认证 + if (!validateDeviceAuth(authParams)) { + log.warn("[handleAuthenticationRequest][认证失败,客户端 ID: {},username: {}]", + clientId, authParams.getUsername()); + sendErrorResponse(socket, message.getRequestId(), "认证失败"); + return; + } + + // 2.1 解析设备信息 + IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.getUsername()); + if (deviceInfo == null) { + sendErrorResponse(socket, message.getRequestId(), "解析设备信息失败"); + return; + } + // 2.2 获取设备信息 + IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(), + deviceInfo.getDeviceName()); + if (device == null) { + sendErrorResponse(socket, message.getRequestId(), "设备不存在"); + return; + } + + // 3.1 注册连接 + registerConnection(socket, device, clientId); + // 3.2 发送上线消息 + sendOnlineMessage(device); + // 3.3 发送成功响应 + sendSuccessResponse(socket, message.getRequestId(), "认证成功"); + log.info("[handleAuthenticationRequest][认证成功,设备 ID: {},设备名: {}]", + device.getId(), device.getDeviceName()); + } catch (Exception e) { + log.error("[handleAuthenticationRequest][认证处理异常,客户端 ID: {}]", clientId, e); + sendErrorResponse(socket, message.getRequestId(), "认证处理异常"); + } + } + + /** + * 处理设备动态注册请求(一型一密,不需要认证) + * + * @param clientId 客户端 ID + * @param message 消息信息 + * @param socket WebSocket 连接 + * @see 阿里云 - 一型一密 + */ + private void handleRegisterRequest(String clientId, IotDeviceMessage message, ServerWebSocket socket) { + try { + // 1. 解析注册参数 + IotDeviceRegisterReqDTO registerParams = parseRegisterParams(message.getParams()); + if (registerParams == null) { + log.warn("[handleRegisterRequest][注册参数解析失败,客户端 ID: {}]", clientId); + sendErrorResponse(socket, message.getRequestId(), "注册参数不完整"); + return; + } + + // 2. 调用动态注册 + CommonResult result = deviceApi.registerDevice(registerParams); + if (result.isError()) { + log.warn("[handleRegisterRequest][注册失败,客户端 ID: {},错误: {}]", clientId, result.getMsg()); + sendErrorResponse(socket, message.getRequestId(), result.getMsg()); + return; + } + + // 3. 发送成功响应(包含 deviceSecret) + sendRegisterSuccessResponse(socket, message.getRequestId(), result.getData()); + log.info("[handleRegisterRequest][注册成功,客户端 ID: {},设备名: {}]", + clientId, registerParams.getDeviceName()); + } catch (Exception e) { + log.error("[handleRegisterRequest][注册处理异常,客户端 ID: {}]", clientId, e); + sendErrorResponse(socket, message.getRequestId(), "注册处理异常"); + } + } + + /** + * 处理业务请求 + * + * @param clientId 客户端 ID + * @param message 消息信息 + * @param socket WebSocket 连接 + */ + private void handleBusinessRequest(String clientId, IotDeviceMessage message, ServerWebSocket socket) { + try { + // 1. 获取认证信息并处理业务消息 + IotWebSocketConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket); + if (connectionInfo == null) { + log.warn("[handleBusinessRequest][连接未认证,拒绝处理业务消息,客户端 ID: {}]", clientId); + sendErrorResponse(socket, message.getRequestId(), "连接未认证"); + return; + } + + // 2. 发送消息到消息总线 + deviceMessageService.sendDeviceMessage(message, connectionInfo.getProductKey(), + connectionInfo.getDeviceName(), serverId); + log.info("[handleBusinessRequest][发送消息到消息总线,客户端 ID: {},消息: {}", + clientId, message.toString()); + } catch (Exception e) { + log.error("[handleBusinessRequest][业务请求处理异常,客户端 ID: {}]", clientId, e); + } + } + + /** + * 注册连接信息 + * + * @param socket WebSocket 连接 + * @param device 设备 + * @param clientId 客户端 ID + */ + private void registerConnection(ServerWebSocket socket, IotDeviceRespDTO device, String clientId) { + IotWebSocketConnectionManager.ConnectionInfo connectionInfo = new IotWebSocketConnectionManager.ConnectionInfo() + .setDeviceId(device.getId()) + .setProductKey(device.getProductKey()) + .setDeviceName(device.getDeviceName()) + .setClientId(clientId); + // 注册连接 + connectionManager.registerConnection(socket, device.getId(), connectionInfo); + } + + /** + * 发送设备上线消息 + * + * @param device 设备信息 + */ + private void sendOnlineMessage(IotDeviceRespDTO device) { + try { + IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline(); + deviceMessageService.sendDeviceMessage(onlineMessage, device.getProductKey(), + device.getDeviceName(), serverId); + } catch (Exception e) { + log.error("[sendOnlineMessage][发送上线消息失败,设备: {}]", device.getDeviceName(), e); + } + } + + /** + * 清理连接 + * + * @param socket WebSocket 连接 + */ + private void cleanupConnection(ServerWebSocket socket) { + try { + // 1. 发送离线消息(如果已认证) + IotWebSocketConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket); + if (connectionInfo != null) { + IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline(); + deviceMessageService.sendDeviceMessage(offlineMessage, connectionInfo.getProductKey(), + connectionInfo.getDeviceName(), serverId); + } + + // 2. 注销连接 + connectionManager.unregisterConnection(socket); + } catch (Exception e) { + log.error("[cleanupConnection][清理连接失败]", e); + } + } + + /** + * 发送响应消息 + * + * @param socket WebSocket 连接 + * @param success 是否成功 + * @param message 消息 + * @param requestId 请求 ID + */ + private void sendResponse(ServerWebSocket socket, boolean success, String message, String requestId) { + try { + Object responseData = MapUtil.builder() + .put("success", success) + .put("message", message) + .build(); + + int code = success ? 0 : 401; + IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, responseData, + code, message); + + byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, CODEC_TYPE); + socket.writeTextMessage(new String(encodedData, StandardCharsets.UTF_8)); + } catch (Exception e) { + log.error("[sendResponse][发送响应失败,requestId: {}]", requestId, e); + } + } + + /** + * 验证设备认证信息 + * + * @param authParams 认证参数 + * @return 是否认证成功 + */ + private boolean validateDeviceAuth(IotDeviceAuthReqDTO authParams) { + try { + CommonResult result = deviceApi.authDevice(new IotDeviceAuthReqDTO() + .setClientId(authParams.getClientId()).setUsername(authParams.getUsername()) + .setPassword(authParams.getPassword())); + result.checkError(); + return BooleanUtil.isTrue(result.getData()); + } catch (Exception e) { + log.error("[validateDeviceAuth][设备认证异常,username: {}]", authParams.getUsername(), e); + return false; + } + } + + /** + * 发送错误响应 + * + * @param socket WebSocket 连接 + * @param requestId 请求 ID + * @param errorMessage 错误消息 + */ + private void sendErrorResponse(ServerWebSocket socket, String requestId, String errorMessage) { + sendResponse(socket, false, errorMessage, requestId); + } + + /** + * 发送成功响应 + * + * @param socket WebSocket 连接 + * @param requestId 请求 ID + * @param message 消息 + */ + @SuppressWarnings("SameParameterValue") + private void sendSuccessResponse(ServerWebSocket socket, String requestId, String message) { + sendResponse(socket, true, message, requestId); + } + + /** + * 解析认证参数 + * + * @param params 参数对象(通常为 Map 类型) + * @return 认证参数 DTO,解析失败时返回 null + */ + @SuppressWarnings("unchecked") + private IotDeviceAuthReqDTO parseAuthParams(Object params) { + if (params == null) { + return null; + } + + try { + // 参数默认为 Map 类型,直接转换 + if (params instanceof java.util.Map) { + java.util.Map paramMap = (java.util.Map) params; + return new IotDeviceAuthReqDTO() + .setClientId(MapUtil.getStr(paramMap, "clientId")) + .setUsername(MapUtil.getStr(paramMap, "username")) + .setPassword(MapUtil.getStr(paramMap, "password")); + } + + // 如果已经是目标类型,直接返回 + if (params instanceof IotDeviceAuthReqDTO) { + return (IotDeviceAuthReqDTO) params; + } + + // 其他情况尝试 JSON 转换 + // TODO @芋艿:要不要优化下; + String jsonStr = JsonUtils.toJsonString(params); + return JsonUtils.convertObject(jsonStr, IotDeviceAuthReqDTO.class); + } catch (Exception e) { + log.error("[parseAuthParams][解析认证参数({})失败]", params, e); + return null; + } + } + + /** + * 解析注册参数 + * + * @param params 参数对象(通常为 Map 类型) + * @return 注册参数 DTO,解析失败时返回 null + */ + @SuppressWarnings("unchecked") + private IotDeviceRegisterReqDTO parseRegisterParams(Object params) { + if (params == null) { + return null; + } + + try { + // 参数默认为 Map 类型,直接转换 + if (params instanceof java.util.Map) { + java.util.Map paramMap = (java.util.Map) params; + String productKey = MapUtil.getStr(paramMap, "productKey"); + String deviceName = MapUtil.getStr(paramMap, "deviceName"); + String productSecret = MapUtil.getStr(paramMap, "productSecret"); + if (StrUtil.hasBlank(productKey, deviceName, productSecret)) { + return null; + } + return new IotDeviceRegisterReqDTO() + .setProductKey(productKey) + .setDeviceName(deviceName) + .setProductSecret(productSecret); + } + + // 如果已经是目标类型,直接返回 + if (params instanceof IotDeviceRegisterReqDTO) { + return (IotDeviceRegisterReqDTO) params; + } + + // 其他情况尝试 JSON 转换 + String jsonStr = JsonUtils.toJsonString(params); + return JsonUtils.parseObject(jsonStr, IotDeviceRegisterReqDTO.class); + } catch (Exception e) { + log.error("[parseRegisterParams][解析注册参数({})失败]", params, e); + return null; + } + } + + /** + * 发送注册成功响应(包含 deviceSecret) + * + * @param socket WebSocket 连接 + * @param requestId 请求 ID + * @param registerResp 注册响应 + */ + private void sendRegisterSuccessResponse(ServerWebSocket socket, String requestId, + IotDeviceRegisterRespDTO registerResp) { + try { + // 1. 构建响应消息(参考 HTTP 返回格式,直接返回 IotDeviceRegisterRespDTO) + IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, + IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerResp, 0, null); + // 2. 发送响应 + byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, CODEC_TYPE); + socket.writeTextMessage(new String(encodedData, StandardCharsets.UTF_8)); + } catch (Exception e) { + log.error("[sendRegisterSuccessResponse][发送注册成功响应失败,requestId: {}]", requestId, e); + } + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml index cce0449663..691e5cf56c 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml @@ -123,6 +123,17 @@ yudao: max-message-size: 1024 # 最大消息大小(字节) ack-timeout: 2000 # ACK 超时时间(毫秒) max-retransmit: 4 # 最大重传次数 + # ==================================== + # 针对引入的 WebSocket 组件的配置 + # ==================================== + websocket: + enabled: false # 是否启用 WebSocket 协议 + port: 8094 # WebSocket 服务端口(默认 8094) + path: /ws # WebSocket 路径(默认 /ws) + max-message-size: 65536 # 最大消息大小(字节,默认 64KB) + max-frame-size: 65536 # 最大帧大小(字节,默认 64KB) + idle-timeout-seconds: 60 # 空闲超时时间(秒,默认 60) + ssl-enabled: false # 是否启用 SSL(wss://) --- #################### 日志相关配置 #################### @@ -144,6 +155,7 @@ logging: cn.iocoder.yudao.module.iot.gateway.protocol.mqtt: DEBUG cn.iocoder.yudao.module.iot.gateway.protocol.mqttws: DEBUG cn.iocoder.yudao.module.iot.gateway.protocol.coap: DEBUG + cn.iocoder.yudao.module.iot.gateway.protocol.websocket: DEBUG # 根日志级别 root: INFO diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotDirectDeviceWebSocketProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotDirectDeviceWebSocketProtocolIntegrationTest.java new file mode 100644 index 0000000000..9121207bdc --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotDirectDeviceWebSocketProtocolIntegrationTest.java @@ -0,0 +1,365 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.websocket; + +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.IdUtil; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; +import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.codec.websocket.IotWebSocketJsonDeviceMessageCodec; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpClient; +import io.vertx.core.http.WebSocket; +import io.vertx.core.http.WebSocketConnectOptions; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * IoT 直连设备 WebSocket 协议集成测试(手动测试) + * + *

测试场景:直连设备(IotProductDeviceTypeEnum 的 DIRECT 类型)通过 WebSocket 协议直接连接平台 + * + *

使用步骤: + *

    + *
  1. 启动 yudao-module-iot-gateway 服务(WebSocket 端口 8094)
  2. + *
  3. 运行以下测试方法: + *
      + *
    • {@link #testAuth()} - 设备认证
    • + *
    • {@link #testDeviceRegister()} - 设备动态注册(一型一密)
    • + *
    • {@link #testPropertyPost()} - 设备属性上报
    • + *
    • {@link #testEventPost()} - 设备事件上报
    • + *
    + *
  4. + *
+ * + *

注意:WebSocket 协议是有状态的长连接,认证成功后同一连接上的后续请求无需再携带认证信息 + * + * @author 芋道源码 + */ +@Slf4j +public class IotDirectDeviceWebSocketProtocolIntegrationTest { + + private static final String SERVER_HOST = "127.0.0.1"; + private static final int SERVER_PORT = 8094; + private static final String WS_PATH = "/ws"; + private static final int TIMEOUT_SECONDS = 5; + + // 编解码器 + private static final IotDeviceMessageCodec CODEC = new IotWebSocketJsonDeviceMessageCodec(); + + // ===================== 直连设备信息(根据实际情况修改,从 iot_device 表查询) ===================== + private static final String PRODUCT_KEY = "4aymZgOTOOCrDKRT"; + private static final String DEVICE_NAME = "small"; + private static final String DEVICE_SECRET = "0baa4c2ecc104ae1a26b4070c218bdf3"; + + // Vert.x 实例 + private static Vertx vertx; + + @BeforeAll + public static void setUp() { + vertx = Vertx.vertx(); + } + + @AfterAll + public static void tearDown() { + if (vertx != null) { + vertx.close(); + } + } + + // ===================== 认证测试 ===================== + + /** + * 认证测试:获取设备 Token + */ + @Test + public void testAuth() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + AtomicReference responseRef = new AtomicReference<>(); + + // 1. 创建 WebSocket 连接 + HttpClient client = vertx.createHttpClient(); + WebSocketConnectOptions options = new WebSocketConnectOptions() + .setHost(SERVER_HOST) + .setPort(SERVER_PORT) + .setURI(WS_PATH); + + client.webSocket(options).onComplete(ar -> { + if (ar.succeeded()) { + WebSocket ws = ar.result(); + log.info("[testAuth][WebSocket 连接成功]"); + + // 设置消息处理器 + ws.textMessageHandler(message -> { + log.info("[testAuth][收到响应: {}]", message); + responseRef.set(message); + ws.close(); + latch.countDown(); + }); + + // 2. 构建认证消息 + IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); + IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() + .setClientId(authInfo.getClientId()) + .setUsername(authInfo.getUsername()) + .setPassword(authInfo.getPassword()); + IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); + + // 3. 编码并发送 + byte[] payload = CODEC.encode(request); + String jsonMessage = new String(payload, StandardCharsets.UTF_8); + log.info("[testAuth][发送认证请求: {}]", jsonMessage); + ws.writeTextMessage(jsonMessage); + } else { + log.error("[testAuth][WebSocket 连接失败]", ar.cause()); + latch.countDown(); + } + }); + + // 4. 等待响应 + boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (completed && responseRef.get() != null) { + IotDeviceMessage response = CODEC.decode(responseRef.get().getBytes(StandardCharsets.UTF_8)); + log.info("[testAuth][解码响应: {}]", response); + } else { + log.warn("[testAuth][测试超时或未收到响应]"); + } + } + + // ===================== 动态注册测试 ===================== + + /** + * 直连设备动态注册测试(一型一密) + */ + @Test + public void testDeviceRegister() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + AtomicReference responseRef = new AtomicReference<>(); + + HttpClient client = vertx.createHttpClient(); + WebSocketConnectOptions options = new WebSocketConnectOptions() + .setHost(SERVER_HOST) + .setPort(SERVER_PORT) + .setURI(WS_PATH); + + client.webSocket(options).onComplete(ar -> { + if (ar.succeeded()) { + WebSocket ws = ar.result(); + log.info("[testDeviceRegister][WebSocket 连接成功]"); + + ws.textMessageHandler(message -> { + log.info("[testDeviceRegister][收到响应: {}]", message); + responseRef.set(message); + ws.close(); + latch.countDown(); + }); + + // 构建注册消息 + IotDeviceRegisterReqDTO registerReqDTO = new IotDeviceRegisterReqDTO(); + registerReqDTO.setProductKey(PRODUCT_KEY); + registerReqDTO.setDeviceName("test-ws-" + System.currentTimeMillis()); + registerReqDTO.setProductSecret("test-product-secret"); + IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerReqDTO, null, null, null); + + byte[] payload = CODEC.encode(request); + String jsonMessage = new String(payload, StandardCharsets.UTF_8); + log.info("[testDeviceRegister][发送注册请求: {}]", jsonMessage); + ws.writeTextMessage(jsonMessage); + } else { + log.error("[testDeviceRegister][WebSocket 连接失败]", ar.cause()); + latch.countDown(); + } + }); + + boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (completed && responseRef.get() != null) { + IotDeviceMessage response = CODEC.decode(responseRef.get().getBytes(StandardCharsets.UTF_8)); + log.info("[testDeviceRegister][解码响应: {}]", response); + } else { + log.warn("[testDeviceRegister][测试超时或未收到响应]"); + } + } + + // ===================== 直连设备属性上报测试 ===================== + + /** + * 属性上报测试 + */ + @Test + public void testPropertyPost() throws Exception { + CountDownLatch latch = new CountDownLatch(2); // 认证 + 属性上报 + AtomicReference authResponseRef = new AtomicReference<>(); + AtomicReference propertyResponseRef = new AtomicReference<>(); + + HttpClient client = vertx.createHttpClient(); + WebSocketConnectOptions options = new WebSocketConnectOptions() + .setHost(SERVER_HOST) + .setPort(SERVER_PORT) + .setURI(WS_PATH); + + client.webSocket(options).onComplete(ar -> { + if (ar.succeeded()) { + WebSocket ws = ar.result(); + log.info("[testPropertyPost][WebSocket 连接成功]"); + + final boolean[] authenticated = {false}; + + ws.textMessageHandler(message -> { + log.info("[testPropertyPost][收到响应: {}]", message); + if (!authenticated[0]) { + authResponseRef.set(message); + authenticated[0] = true; + latch.countDown(); + + // 认证成功后发送属性上报 + IotDeviceMessage propertyRequest = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), + IotDevicePropertyPostReqDTO.of(MapUtil.builder() + .put("width", 1) + .put("height", "2") + .build()), + null, null, null); + byte[] payload = CODEC.encode(propertyRequest); + String jsonMessage = new String(payload, StandardCharsets.UTF_8); + log.info("[testPropertyPost][发送属性上报请求: {}]", jsonMessage); + ws.writeTextMessage(jsonMessage); + } else { + propertyResponseRef.set(message); + ws.close(); + latch.countDown(); + } + }); + + // 先发送认证请求 + IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); + IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() + .setClientId(authInfo.getClientId()) + .setUsername(authInfo.getUsername()) + .setPassword(authInfo.getPassword()); + IotDeviceMessage authRequest = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); + + byte[] payload = CODEC.encode(authRequest); + String jsonMessage = new String(payload, StandardCharsets.UTF_8); + log.info("[testPropertyPost][发送认证请求: {}]", jsonMessage); + ws.writeTextMessage(jsonMessage); + } else { + log.error("[testPropertyPost][WebSocket 连接失败]", ar.cause()); + latch.countDown(); + latch.countDown(); + } + }); + + boolean completed = latch.await(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS); + if (completed) { + if (authResponseRef.get() != null) { + IotDeviceMessage authResponse = CODEC.decode(authResponseRef.get().getBytes(StandardCharsets.UTF_8)); + log.info("[testPropertyPost][认证响应: {}]", authResponse); + } + if (propertyResponseRef.get() != null) { + IotDeviceMessage propertyResponse = CODEC.decode(propertyResponseRef.get().getBytes(StandardCharsets.UTF_8)); + log.info("[testPropertyPost][属性上报响应: {}]", propertyResponse); + } + } else { + log.warn("[testPropertyPost][测试超时]"); + } + } + + // ===================== 直连设备事件上报测试 ===================== + + /** + * 事件上报测试 + */ + @Test + public void testEventPost() throws Exception { + CountDownLatch latch = new CountDownLatch(2); // 认证 + 事件上报 + AtomicReference authResponseRef = new AtomicReference<>(); + AtomicReference eventResponseRef = new AtomicReference<>(); + + HttpClient client = vertx.createHttpClient(); + WebSocketConnectOptions options = new WebSocketConnectOptions() + .setHost(SERVER_HOST) + .setPort(SERVER_PORT) + .setURI(WS_PATH); + + client.webSocket(options).onComplete(ar -> { + if (ar.succeeded()) { + WebSocket ws = ar.result(); + log.info("[testEventPost][WebSocket 连接成功]"); + + final boolean[] authenticated = {false}; + + ws.textMessageHandler(message -> { + log.info("[testEventPost][收到响应: {}]", message); + if (!authenticated[0]) { + authResponseRef.set(message); + authenticated[0] = true; + latch.countDown(); + + // 认证成功后发送事件上报 + IotDeviceMessage eventRequest = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.EVENT_POST.getMethod(), + IotDeviceEventPostReqDTO.of( + "eat", + MapUtil.builder().put("rice", 3).build(), + System.currentTimeMillis()), + null, null, null); + byte[] payload = CODEC.encode(eventRequest); + String jsonMessage = new String(payload, StandardCharsets.UTF_8); + log.info("[testEventPost][发送事件上报请求: {}]", jsonMessage); + ws.writeTextMessage(jsonMessage); + } else { + eventResponseRef.set(message); + ws.close(); + latch.countDown(); + } + }); + + // 先发送认证请求 + IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); + IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() + .setClientId(authInfo.getClientId()) + .setUsername(authInfo.getUsername()) + .setPassword(authInfo.getPassword()); + IotDeviceMessage authRequest = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); + + byte[] payload = CODEC.encode(authRequest); + String jsonMessage = new String(payload, StandardCharsets.UTF_8); + log.info("[testEventPost][发送认证请求: {}]", jsonMessage); + ws.writeTextMessage(jsonMessage); + } else { + log.error("[testEventPost][WebSocket 连接失败]", ar.cause()); + latch.countDown(); + latch.countDown(); + } + }); + + boolean completed = latch.await(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS); + if (completed) { + if (authResponseRef.get() != null) { + IotDeviceMessage authResponse = CODEC.decode(authResponseRef.get().getBytes(StandardCharsets.UTF_8)); + log.info("[testEventPost][认证响应: {}]", authResponse); + } + if (eventResponseRef.get() != null) { + IotDeviceMessage eventResponse = CODEC.decode(eventResponseRef.get().getBytes(StandardCharsets.UTF_8)); + log.info("[testEventPost][事件上报响应: {}]", eventResponse); + } + } else { + log.warn("[testEventPost][测试超时]"); + } + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotGatewayDeviceWebSocketProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotGatewayDeviceWebSocketProtocolIntegrationTest.java new file mode 100644 index 0000000000..464efb0f44 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotGatewayDeviceWebSocketProtocolIntegrationTest.java @@ -0,0 +1,356 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.websocket; + +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.IdUtil; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; +import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPackPostReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoAddReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoDeleteReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoGetReqDTO; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; +import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.codec.websocket.IotWebSocketJsonDeviceMessageCodec; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpClient; +import io.vertx.core.http.WebSocket; +import io.vertx.core.http.WebSocketConnectOptions; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * IoT 网关设备 WebSocket 协议集成测试(手动测试) + * + *

测试场景:网关设备(IotProductDeviceTypeEnum 的 GATEWAY 类型)通过 WebSocket 协议管理子设备拓扑关系 + * + *

使用步骤: + *

    + *
  1. 启动 yudao-module-iot-gateway 服务(WebSocket 端口 8094)
  2. + *
  3. 运行以下测试方法: + *
      + *
    • {@link #testAuth()} - 网关设备认证
    • + *
    • {@link #testTopoAdd()} - 添加子设备拓扑关系
    • + *
    • {@link #testTopoDelete()} - 删除子设备拓扑关系
    • + *
    • {@link #testTopoGet()} - 获取子设备拓扑关系
    • + *
    • {@link #testSubDeviceRegister()} - 子设备动态注册
    • + *
    • {@link #testPropertyPackPost()} - 批量上报属性(网关 + 子设备)
    • + *
    + *
  4. + *
+ * + *

注意:WebSocket 协议是有状态的长连接,认证成功后同一连接上的后续请求无需再携带认证信息 + * + * @author 芋道源码 + */ +@Slf4j +public class IotGatewayDeviceWebSocketProtocolIntegrationTest { + + private static final String SERVER_HOST = "127.0.0.1"; + private static final int SERVER_PORT = 8094; + private static final String WS_PATH = "/ws"; + private static final int TIMEOUT_SECONDS = 5; + + // 编解码器 + private static final IotDeviceMessageCodec CODEC = new IotWebSocketJsonDeviceMessageCodec(); + + // ===================== 网关设备信息(根据实际情况修改,从 iot_device 表查询网关设备) ===================== + private static final String GATEWAY_PRODUCT_KEY = "m6XcS1ZJ3TW8eC0v"; + private static final String GATEWAY_DEVICE_NAME = "sub-ddd"; + private static final String GATEWAY_DEVICE_SECRET = "b3d62c70f8a4495487ed1d35d61ac2b3"; + + // ===================== 子设备信息(根据实际情况修改,从 iot_device 表查询子设备) ===================== + private static final String SUB_DEVICE_PRODUCT_KEY = "jAufEMTF1W6wnPhn"; + private static final String SUB_DEVICE_NAME = "chazuo-it"; + private static final String SUB_DEVICE_SECRET = "d46ef9b28ab14238b9c00a3a668032af"; + + // Vert.x 实例 + private static Vertx vertx; + + @BeforeAll + public static void setUp() { + vertx = Vertx.vertx(); + } + + @AfterAll + public static void tearDown() { + if (vertx != null) { + vertx.close(); + } + } + + // ===================== 认证测试 ===================== + + /** + * 网关设备认证测试 + */ + @Test + public void testAuth() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + AtomicReference responseRef = new AtomicReference<>(); + + HttpClient client = vertx.createHttpClient(); + WebSocketConnectOptions options = new WebSocketConnectOptions() + .setHost(SERVER_HOST) + .setPort(SERVER_PORT) + .setURI(WS_PATH); + + client.webSocket(options).onComplete(ar -> { + if (ar.succeeded()) { + WebSocket ws = ar.result(); + log.info("[testAuth][WebSocket 连接成功]"); + + ws.textMessageHandler(message -> { + log.info("[testAuth][收到响应: {}]", message); + responseRef.set(message); + ws.close(); + latch.countDown(); + }); + + // 构建认证消息 + IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo( + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET); + IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() + .setClientId(authInfo.getClientId()) + .setUsername(authInfo.getUsername()) + .setPassword(authInfo.getPassword()); + IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); + + byte[] payload = CODEC.encode(request); + String jsonMessage = new String(payload, StandardCharsets.UTF_8); + log.info("[testAuth][发送认证请求: {}]", jsonMessage); + ws.writeTextMessage(jsonMessage); + } else { + log.error("[testAuth][WebSocket 连接失败]", ar.cause()); + latch.countDown(); + } + }); + + boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (completed && responseRef.get() != null) { + IotDeviceMessage response = CODEC.decode(responseRef.get().getBytes(StandardCharsets.UTF_8)); + log.info("[testAuth][解码响应: {}]", response); + } else { + log.warn("[testAuth][测试超时或未收到响应]"); + } + } + + // ===================== 拓扑管理测试 ===================== + + /** + * 添加子设备拓扑关系测试 + */ + @Test + public void testTopoAdd() throws Exception { + executeAuthenticatedRequest("testTopoAdd", ws -> { + // 构建子设备认证信息 + IotDeviceAuthReqDTO subAuthInfo = IotDeviceAuthUtils.getAuthInfo( + SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME, SUB_DEVICE_SECRET); + IotDeviceAuthReqDTO subDeviceAuth = new IotDeviceAuthReqDTO() + .setClientId(subAuthInfo.getClientId()) + .setUsername(subAuthInfo.getUsername()) + .setPassword(subAuthInfo.getPassword()); + // 构建请求参数 + IotDeviceTopoAddReqDTO params = new IotDeviceTopoAddReqDTO(); + params.setSubDevices(Collections.singletonList(subDeviceAuth)); + return IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.TOPO_ADD.getMethod(), + params, + null, null, null); + }); + } + + /** + * 删除子设备拓扑关系测试 + */ + @Test + public void testTopoDelete() throws Exception { + executeAuthenticatedRequest("testTopoDelete", ws -> { + IotDeviceTopoDeleteReqDTO params = new IotDeviceTopoDeleteReqDTO(); + params.setSubDevices(Collections.singletonList( + new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME))); + return IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.TOPO_DELETE.getMethod(), + params, + null, null, null); + }); + } + + /** + * 获取子设备拓扑关系测试 + */ + @Test + public void testTopoGet() throws Exception { + executeAuthenticatedRequest("testTopoGet", ws -> { + IotDeviceTopoGetReqDTO params = new IotDeviceTopoGetReqDTO(); + return IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.TOPO_GET.getMethod(), + params, + null, null, null); + }); + } + + // ===================== 子设备注册测试 ===================== + + /** + * 子设备动态注册测试 + */ + @Test + public void testSubDeviceRegister() throws Exception { + executeAuthenticatedRequest("testSubDeviceRegister", ws -> { + IotSubDeviceRegisterReqDTO subDevice = new IotSubDeviceRegisterReqDTO(); + subDevice.setProductKey(SUB_DEVICE_PRODUCT_KEY); + subDevice.setDeviceName("mougezishebei-ws"); + return IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.SUB_DEVICE_REGISTER.getMethod(), + Collections.singletonList(subDevice), + null, null, null); + }); + } + + // ===================== 批量上报测试 ===================== + + /** + * 批量上报属性测试(网关 + 子设备) + */ + @Test + public void testPropertyPackPost() throws Exception { + executeAuthenticatedRequest("testPropertyPackPost", ws -> { + // 构建【网关设备】自身属性 + Map gatewayProperties = MapUtil.builder() + .put("temperature", 25.5) + .build(); + // 构建【网关设备】自身事件 + IotDevicePropertyPackPostReqDTO.EventValue gatewayEvent = new IotDevicePropertyPackPostReqDTO.EventValue(); + gatewayEvent.setValue(MapUtil.builder().put("message", "gateway started").build()); + gatewayEvent.setTime(System.currentTimeMillis()); + Map gatewayEvents = MapUtil.builder() + .put("statusReport", gatewayEvent) + .build(); + // 构建【网关子设备】属性 + Map subDeviceProperties = MapUtil.builder() + .put("power", 100) + .build(); + // 构建【网关子设备】事件 + IotDevicePropertyPackPostReqDTO.EventValue subDeviceEvent = new IotDevicePropertyPackPostReqDTO.EventValue(); + subDeviceEvent.setValue(MapUtil.builder().put("errorCode", 0).build()); + subDeviceEvent.setTime(System.currentTimeMillis()); + Map subDeviceEvents = MapUtil.builder() + .put("healthCheck", subDeviceEvent) + .build(); + // 构建子设备数据 + IotDevicePropertyPackPostReqDTO.SubDeviceData subDeviceData = new IotDevicePropertyPackPostReqDTO.SubDeviceData(); + subDeviceData.setIdentity(new IotDeviceIdentity(SUB_DEVICE_PRODUCT_KEY, SUB_DEVICE_NAME)); + subDeviceData.setProperties(subDeviceProperties); + subDeviceData.setEvents(subDeviceEvents); + // 构建请求参数 + IotDevicePropertyPackPostReqDTO params = new IotDevicePropertyPackPostReqDTO(); + params.setProperties(gatewayProperties); + params.setEvents(gatewayEvents); + params.setSubDevices(List.of(subDeviceData)); + return IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.PROPERTY_PACK_POST.getMethod(), + params, + null, null, null); + }); + } + + // ===================== 辅助方法 ===================== + + /** + * 执行需要认证的请求 + * + * @param testName 测试名称 + * @param requestSupplier 请求消息提供者 + */ + private void executeAuthenticatedRequest(String testName, java.util.function.Function requestSupplier) throws Exception { + CountDownLatch latch = new CountDownLatch(2); + AtomicReference authResponseRef = new AtomicReference<>(); + AtomicReference businessResponseRef = new AtomicReference<>(); + + HttpClient client = vertx.createHttpClient(); + WebSocketConnectOptions options = new WebSocketConnectOptions() + .setHost(SERVER_HOST) + .setPort(SERVER_PORT) + .setURI(WS_PATH); + + client.webSocket(options).onComplete(ar -> { + if (ar.succeeded()) { + WebSocket ws = ar.result(); + log.info("[{}][WebSocket 连接成功]", testName); + + final boolean[] authenticated = {false}; + + ws.textMessageHandler(message -> { + log.info("[{}][收到响应: {}]", testName, message); + if (!authenticated[0]) { + authResponseRef.set(message); + authenticated[0] = true; + latch.countDown(); + + // 认证成功后发送业务请求 + IotDeviceMessage businessRequest = requestSupplier.apply(ws); + byte[] payload = CODEC.encode(businessRequest); + String jsonMessage = new String(payload, StandardCharsets.UTF_8); + log.info("[{}][发送业务请求: {}]", testName, jsonMessage); + ws.writeTextMessage(jsonMessage); + } else { + businessResponseRef.set(message); + ws.close(); + latch.countDown(); + } + }); + + // 先发送认证请求 + IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo( + GATEWAY_PRODUCT_KEY, GATEWAY_DEVICE_NAME, GATEWAY_DEVICE_SECRET); + IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() + .setClientId(authInfo.getClientId()) + .setUsername(authInfo.getUsername()) + .setPassword(authInfo.getPassword()); + IotDeviceMessage authRequest = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); + + byte[] payload = CODEC.encode(authRequest); + String jsonMessage = new String(payload, StandardCharsets.UTF_8); + log.info("[{}][发送认证请求: {}]", testName, jsonMessage); + ws.writeTextMessage(jsonMessage); + } else { + log.error("[{}][WebSocket 连接失败]", testName, ar.cause()); + latch.countDown(); + latch.countDown(); + } + }); + + boolean completed = latch.await(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS); + if (completed) { + if (authResponseRef.get() != null) { + IotDeviceMessage authResponse = CODEC.decode(authResponseRef.get().getBytes(StandardCharsets.UTF_8)); + log.info("[{}][认证响应: {}]", testName, authResponse); + } + if (businessResponseRef.get() != null) { + IotDeviceMessage businessResponse = CODEC.decode(businessResponseRef.get().getBytes(StandardCharsets.UTF_8)); + log.info("[{}][业务响应: {}]", testName, businessResponse); + } + } else { + log.warn("[{}][测试超时]", testName); + } + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotGatewaySubDeviceWebSocketProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotGatewaySubDeviceWebSocketProtocolIntegrationTest.java new file mode 100644 index 0000000000..4903d13cfe --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotGatewaySubDeviceWebSocketProtocolIntegrationTest.java @@ -0,0 +1,263 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.websocket; + +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.IdUtil; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; +import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.codec.websocket.IotWebSocketJsonDeviceMessageCodec; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpClient; +import io.vertx.core.http.WebSocket; +import io.vertx.core.http.WebSocketConnectOptions; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * IoT 网关子设备 WebSocket 协议集成测试(手动测试) + * + *

测试场景:子设备(IotProductDeviceTypeEnum 的 SUB 类型)通过网关设备代理上报数据 + * + *

重要说明:子设备无法直接连接平台,所有请求均由网关设备(Gateway)代为转发。 + * + *

使用步骤: + *

    + *
  1. 启动 yudao-module-iot-gateway 服务(WebSocket 端口 8094)
  2. + *
  3. 确保子设备已通过 {@link IotGatewayDeviceWebSocketProtocolIntegrationTest#testTopoAdd()} 绑定到网关
  4. + *
  5. 运行以下测试方法: + *
      + *
    • {@link #testAuth()} - 子设备认证
    • + *
    • {@link #testPropertyPost()} - 子设备属性上报(由网关代理转发)
    • + *
    • {@link #testEventPost()} - 子设备事件上报(由网关代理转发)
    • + *
    + *
  6. + *
+ * + *

注意:WebSocket 协议是有状态的长连接,认证成功后同一连接上的后续请求无需再携带认证信息 + * + * @author 芋道源码 + */ +@Slf4j +public class IotGatewaySubDeviceWebSocketProtocolIntegrationTest { + + private static final String SERVER_HOST = "127.0.0.1"; + private static final int SERVER_PORT = 8094; + private static final String WS_PATH = "/ws"; + private static final int TIMEOUT_SECONDS = 5; + + // 编解码器 + private static final IotDeviceMessageCodec CODEC = new IotWebSocketJsonDeviceMessageCodec(); + + // ===================== 网关子设备信息(根据实际情况修改,从 iot_device 表查询子设备) ===================== + private static final String PRODUCT_KEY = "jAufEMTF1W6wnPhn"; + private static final String DEVICE_NAME = "chazuo-it"; + private static final String DEVICE_SECRET = "d46ef9b28ab14238b9c00a3a668032af"; + + // Vert.x 实例 + private static Vertx vertx; + + @BeforeAll + public static void setUp() { + vertx = Vertx.vertx(); + } + + @AfterAll + public static void tearDown() { + if (vertx != null) { + vertx.close(); + } + } + + // ===================== 认证测试 ===================== + + /** + * 子设备认证测试 + */ + @Test + public void testAuth() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + AtomicReference responseRef = new AtomicReference<>(); + + HttpClient client = vertx.createHttpClient(); + WebSocketConnectOptions options = new WebSocketConnectOptions() + .setHost(SERVER_HOST) + .setPort(SERVER_PORT) + .setURI(WS_PATH); + + client.webSocket(options).onComplete(ar -> { + if (ar.succeeded()) { + WebSocket ws = ar.result(); + log.info("[testAuth][WebSocket 连接成功]"); + + ws.textMessageHandler(message -> { + log.info("[testAuth][收到响应: {}]", message); + responseRef.set(message); + ws.close(); + latch.countDown(); + }); + + // 构建认证消息 + IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); + IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() + .setClientId(authInfo.getClientId()) + .setUsername(authInfo.getUsername()) + .setPassword(authInfo.getPassword()); + IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); + + byte[] payload = CODEC.encode(request); + String jsonMessage = new String(payload, StandardCharsets.UTF_8); + log.info("[testAuth][发送认证请求: {}]", jsonMessage); + ws.writeTextMessage(jsonMessage); + } else { + log.error("[testAuth][WebSocket 连接失败]", ar.cause()); + latch.countDown(); + } + }); + + boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (completed && responseRef.get() != null) { + IotDeviceMessage response = CODEC.decode(responseRef.get().getBytes(StandardCharsets.UTF_8)); + log.info("[testAuth][解码响应: {}]", response); + } else { + log.warn("[testAuth][测试超时或未收到响应]"); + } + } + + // ===================== 子设备属性上报测试 ===================== + + /** + * 子设备属性上报测试 + */ + @Test + public void testPropertyPost() throws Exception { + executeAuthenticatedRequest("testPropertyPost", ws -> { + log.info("[testPropertyPost][子设备属性上报 - 请求实际由 Gateway 代为转发]"); + return IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), + IotDevicePropertyPostReqDTO.of(MapUtil.builder() + .put("power", 100) + .put("status", "online") + .put("temperature", 36.5) + .build()), + null, null, null); + }); + } + + // ===================== 子设备事件上报测试 ===================== + + /** + * 子设备事件上报测试 + */ + @Test + public void testEventPost() throws Exception { + executeAuthenticatedRequest("testEventPost", ws -> { + log.info("[testEventPost][子设备事件上报 - 请求实际由 Gateway 代为转发]"); + return IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.EVENT_POST.getMethod(), + IotDeviceEventPostReqDTO.of( + "alarm", + MapUtil.builder() + .put("level", "warning") + .put("message", "temperature too high") + .put("threshold", 40) + .put("current", 42) + .build(), + System.currentTimeMillis()), + null, null, null); + }); + } + + // ===================== 辅助方法 ===================== + + /** + * 执行需要认证的请求 + * + * @param testName 测试名称 + * @param requestSupplier 请求消息提供者 + */ + private void executeAuthenticatedRequest(String testName, java.util.function.Function requestSupplier) throws Exception { + CountDownLatch latch = new CountDownLatch(2); + AtomicReference authResponseRef = new AtomicReference<>(); + AtomicReference businessResponseRef = new AtomicReference<>(); + + HttpClient client = vertx.createHttpClient(); + WebSocketConnectOptions options = new WebSocketConnectOptions() + .setHost(SERVER_HOST) + .setPort(SERVER_PORT) + .setURI(WS_PATH); + + client.webSocket(options).onComplete(ar -> { + if (ar.succeeded()) { + WebSocket ws = ar.result(); + log.info("[{}][WebSocket 连接成功]", testName); + + final boolean[] authenticated = {false}; + + ws.textMessageHandler(message -> { + log.info("[{}][收到响应: {}]", testName, message); + if (!authenticated[0]) { + authResponseRef.set(message); + authenticated[0] = true; + latch.countDown(); + + // 认证成功后发送业务请求 + IotDeviceMessage businessRequest = requestSupplier.apply(ws); + byte[] payload = CODEC.encode(businessRequest); + String jsonMessage = new String(payload, StandardCharsets.UTF_8); + log.info("[{}][发送业务请求: {}]", testName, jsonMessage); + ws.writeTextMessage(jsonMessage); + } else { + businessResponseRef.set(message); + ws.close(); + latch.countDown(); + } + }); + + // 先发送认证请求 + IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); + IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() + .setClientId(authInfo.getClientId()) + .setUsername(authInfo.getUsername()) + .setPassword(authInfo.getPassword()); + IotDeviceMessage authRequest = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); + + byte[] payload = CODEC.encode(authRequest); + String jsonMessage = new String(payload, StandardCharsets.UTF_8); + log.info("[{}][发送认证请求: {}]", testName, jsonMessage); + ws.writeTextMessage(jsonMessage); + } else { + log.error("[{}][WebSocket 连接失败]", testName, ar.cause()); + latch.countDown(); + latch.countDown(); + } + }); + + boolean completed = latch.await(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS); + if (completed) { + if (authResponseRef.get() != null) { + IotDeviceMessage authResponse = CODEC.decode(authResponseRef.get().getBytes(StandardCharsets.UTF_8)); + log.info("[{}][认证响应: {}]", testName, authResponse); + } + if (businessResponseRef.get() != null) { + IotDeviceMessage businessResponse = CODEC.decode(businessResponseRef.get().getBytes(StandardCharsets.UTF_8)); + log.info("[{}][业务响应: {}]", testName, businessResponse); + } + } else { + log.warn("[{}][测试超时]", testName); + } + } + +}