diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java index 9fb42c5849..082a2ad797 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java @@ -88,9 +88,9 @@ public class IotMqttConnectionManager { connectionMap.remove(oldEndpoint); } + // 注册新连接 connectionMap.put(endpoint, connectionInfo); deviceEndpointMap.put(deviceId, endpoint); - log.info("[registerConnection][注册设备连接,设备 ID: {},连接: {},product key: {},device name: {}]", deviceId, getEndpointAddress(endpoint), connectionInfo.getProductKey(), connectionInfo.getDeviceName()); } @@ -102,13 +102,12 @@ public class IotMqttConnectionManager { */ public void unregisterConnection(MqttEndpoint endpoint) { ConnectionInfo connectionInfo = connectionMap.remove(endpoint); - if (connectionInfo != null) { - Long deviceId = connectionInfo.getDeviceId(); - deviceEndpointMap.remove(deviceId); - - log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]", deviceId, - getEndpointAddress(endpoint)); + if (connectionInfo == null) { + return; } + Long deviceId = connectionInfo.getDeviceId(); + deviceEndpointMap.remove(deviceId); + log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]", deviceId, getEndpointAddress(endpoint)); } /** diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java index 6347b5bd1a..9dcbdd649e 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java @@ -471,9 +471,7 @@ public class IotMqttUpstreamHandler { /** * 注册连接 */ - private void registerConnection(MqttEndpoint endpoint, IotDeviceRespDTO device, - String clientId) { - + private void registerConnection(MqttEndpoint endpoint, IotDeviceRespDTO device, String clientId) { IotMqttConnectionManager.ConnectionInfo connectionInfo = new IotMqttConnectionManager.ConnectionInfo() .setDeviceId(device.getId()) .setProductKey(device.getProductKey()) @@ -481,7 +479,6 @@ public class IotMqttUpstreamHandler { .setClientId(clientId) .setAuthenticated(true) .setRemoteAddress(connectionManager.getEndpointAddress(endpoint)); - connectionManager.registerConnection(endpoint, device.getId(), connectionInfo); } @@ -510,15 +507,13 @@ public class IotMqttUpstreamHandler { IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline(); deviceMessageService.sendDeviceMessage(offlineMessage, connectionInfo.getProductKey(), connectionInfo.getDeviceName(), serverId); - log.info("[cleanupConnection][设备离线,设备 ID: {},设备名称: {}]", - connectionInfo.getDeviceId(), connectionInfo.getDeviceName()); + log.info("[cleanupConnection][设备离线,设备 ID: {},设备名称: {}]", connectionInfo.getDeviceId(), connectionInfo.getDeviceName()); } // 注销连接 connectionManager.unregisterConnection(endpoint); } catch (Exception e) { - log.error("[cleanupConnection][清理连接失败,客户端 ID: {},错误: {}]", - endpoint.clientIdentifier(), e.getMessage()); + log.error("[cleanupConnection][清理连接失败,客户端 ID: {},错误: {}]", endpoint.clientIdentifier(), e.getMessage()); } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpDownstreamSubscriber.java index aec671811c..3f0cc02bcf 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpDownstreamSubscriber.java @@ -34,7 +34,7 @@ public class IotTcpDownstreamSubscriber implements IotMessageSubscriber { // 验证路径 if (ObjUtil.notEqual(wsProperties.getPath(), socket.path())) { - log.warn("[webSocketHandler][WebSocket 路径不匹配,拒绝连接,路径: {},期望: {}]", - socket.path(), wsProperties.getPath()); + log.warn("[webSocketHandler][WebSocket 路径不匹配,拒绝连接,路径: {},期望: {}]", socket.path(), wsProperties.getPath()); // TODO @AI:已经被废弃,看看换什么其他方法; socket.reject(); return; @@ -89,8 +88,7 @@ public class IotWebSocketUpstreamProtocol { // 3. 启动服务器 try { httpServer.listen().result(); - log.info("[start][IoT 网关 WebSocket 协议启动成功,端口:{},路径:{}]", - wsProperties.getPort(), wsProperties.getPath()); + log.info("[start][IoT 网关 WebSocket 协议启动成功,端口:{},路径:{}]", wsProperties.getPort(), wsProperties.getPath()); } catch (Exception e) { log.error("[start][IoT 网关 WebSocket 协议启动失败]", e); throw e; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/manager/IotWebSocketConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/manager/IotWebSocketConnectionManager.java index cfbcb1fe51..406aa1443e 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/manager/IotWebSocketConnectionManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/manager/IotWebSocketConnectionManager.java @@ -51,9 +51,9 @@ public class IotWebSocketConnectionManager { connectionMap.remove(oldSocket); } + // 注册新连接 connectionMap.put(socket, connectionInfo); deviceSocketMap.put(deviceId, socket); - log.info("[registerConnection][注册设备连接,设备 ID: {},连接: {},product key: {},device name: {}]", deviceId, socket.remoteAddress(), connectionInfo.getProductKey(), connectionInfo.getDeviceName()); } @@ -65,12 +65,13 @@ public class IotWebSocketConnectionManager { */ public void unregisterConnection(ServerWebSocket socket) { ConnectionInfo connectionInfo = connectionMap.remove(socket); - if (connectionInfo != null) { - Long deviceId = connectionInfo.getDeviceId(); - deviceSocketMap.remove(deviceId); - log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]", - deviceId, socket.remoteAddress()); + if (connectionInfo == null) { + return; } + Long deviceId = connectionInfo.getDeviceId(); + deviceSocketMap.remove(deviceId); + log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]", + deviceId, socket.remoteAddress()); } /** diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/router/IotWebSocketUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/router/IotWebSocketUpstreamHandler.java index 14d9d142d0..e7deda3546 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/router/IotWebSocketUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/router/IotWebSocketUpstreamHandler.java @@ -327,8 +327,7 @@ public class IotWebSocketUpstreamHandler implements Handler { .build(); int code = success ? 0 : 401; - IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, responseData, - code, message); + IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, responseData, code, message); byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, CODEC_TYPE); socket.writeTextMessage(new String(encodedData, StandardCharsets.UTF_8)); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java index af4ba31e65..1bb6935fb5 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java @@ -84,9 +84,6 @@ public class IotDirectDeviceTcpProtocolIntegrationTest { // 1.2 编码 byte[] payload = CODEC.encode(request); log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length); - if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) { - log.info("[testAuth][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload)); - } // 2.1 发送请求 try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { @@ -123,9 +120,6 @@ public class IotDirectDeviceTcpProtocolIntegrationTest { // 1.2 编码 byte[] payload = CODEC.encode(request); log.info("[testDeviceRegister][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length); - if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) { - log.info("[testDeviceRegister][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload)); - } // 2.1 发送请求 try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewayDeviceTcpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewayDeviceTcpProtocolIntegrationTest.java index adbbff76cf..2dd05fa520 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewayDeviceTcpProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewayDeviceTcpProtocolIntegrationTest.java @@ -1,7 +1,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.tcp; import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.HexUtil; import cn.hutool.core.util.IdUtil; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; @@ -99,9 +98,6 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest { // 1.2 编码 byte[] payload = CODEC.encode(request); log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length); - if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) { - log.info("[testAuth][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload)); - } // 2.1 发送请求 try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewaySubDeviceTcpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewaySubDeviceTcpProtocolIntegrationTest.java index 31473d14b7..3379dbbe40 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewaySubDeviceTcpProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotGatewaySubDeviceTcpProtocolIntegrationTest.java @@ -1,7 +1,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.tcp; import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.HexUtil; import cn.hutool.core.util.IdUtil; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; @@ -85,9 +84,6 @@ public class IotGatewaySubDeviceTcpProtocolIntegrationTest { // 1.2 编码 byte[] payload = CODEC.encode(request); log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length); - if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) { - log.info("[testAuth][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload)); - } // 2.1 发送请求 try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotDirectDeviceUdpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotDirectDeviceUdpProtocolIntegrationTest.java index 42e28b0dee..689d3eda05 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotDirectDeviceUdpProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotDirectDeviceUdpProtocolIntegrationTest.java @@ -1,7 +1,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.udp; import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.HexUtil; import cn.hutool.core.util.IdUtil; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; @@ -90,9 +89,6 @@ public class IotDirectDeviceUdpProtocolIntegrationTest { // 1.2 编码 byte[] payload = CODEC.encode(request); log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length); - if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) { - log.info("[testAuth][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload)); - } // 2.1 发送请求 try (DatagramSocket socket = new DatagramSocket()) { @@ -130,9 +126,6 @@ public class IotDirectDeviceUdpProtocolIntegrationTest { // 1.2 编码 byte[] payload = CODEC.encode(request); log.info("[testDeviceRegister][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length); - if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) { - log.info("[testDeviceRegister][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload)); - } // 2.1 发送请求 try (DatagramSocket socket = new DatagramSocket()) { diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotGatewayDeviceUdpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotGatewayDeviceUdpProtocolIntegrationTest.java index aa0db66205..607b1a1277 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotGatewayDeviceUdpProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotGatewayDeviceUdpProtocolIntegrationTest.java @@ -1,7 +1,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.udp; import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.HexUtil; import cn.hutool.core.util.IdUtil; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; @@ -102,9 +101,6 @@ public class IotGatewayDeviceUdpProtocolIntegrationTest { // 1.2 编码 byte[] payload = CODEC.encode(request); log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length); - if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) { - log.info("[testAuth][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload)); - } // 2.1 发送请求 try (DatagramSocket socket = new DatagramSocket()) { diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotGatewaySubDeviceUdpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotGatewaySubDeviceUdpProtocolIntegrationTest.java index 948024e25f..b22a27c9bc 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotGatewaySubDeviceUdpProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/IotGatewaySubDeviceUdpProtocolIntegrationTest.java @@ -1,7 +1,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.udp; import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.HexUtil; import cn.hutool.core.util.IdUtil; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; @@ -91,9 +90,6 @@ public class IotGatewaySubDeviceUdpProtocolIntegrationTest { // 1.2 编码 byte[] payload = CODEC.encode(request); log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length); - if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) { - log.info("[testAuth][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload)); - } // 2.1 发送请求 try (DatagramSocket socket = new DatagramSocket()) { diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotDirectDeviceWebSocketProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotDirectDeviceWebSocketProtocolIntegrationTest.java index 2a830b755a..1cca286c83 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotDirectDeviceWebSocketProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/IotDirectDeviceWebSocketProtocolIntegrationTest.java @@ -81,6 +81,8 @@ public class IotDirectDeviceWebSocketProtocolIntegrationTest { // ===================== 认证测试 ===================== + // TODO @AI:参考 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java 或 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotDirectDeviceMqttProtocolIntegrationTest.java 类,优化代码结构 + /** * 认证测试:获取设备 Token */ @@ -96,6 +98,7 @@ public class IotDirectDeviceWebSocketProtocolIntegrationTest { .setPort(SERVER_PORT) .setURI(WS_PATH); + // TODO @AI:这里有告警;Deprecate /instead use WebSocketClient.connect(WebSocketConnectOptions) client.webSocket(options).onComplete(ar -> { if (ar.succeeded()) { WebSocket ws = ar.result();