feat:【iot】优化 gateway 整体代码风格(空行)等

This commit is contained in:
YunaiV
2026-01-27 20:21:10 +08:00
parent 432e1ed230
commit 610ae6d532
16 changed files with 31 additions and 65 deletions

View File

@@ -88,9 +88,9 @@ public class IotMqttConnectionManager {
connectionMap.remove(oldEndpoint);
}
// 注册新连接
connectionMap.put(endpoint, connectionInfo);
deviceEndpointMap.put(deviceId, endpoint);
log.info("[registerConnection][注册设备连接,设备 ID: {},连接: {}product key: {}device name: {}]",
deviceId, getEndpointAddress(endpoint), connectionInfo.getProductKey(), connectionInfo.getDeviceName());
}
@@ -102,13 +102,12 @@ public class IotMqttConnectionManager {
*/
public void unregisterConnection(MqttEndpoint endpoint) {
ConnectionInfo connectionInfo = connectionMap.remove(endpoint);
if (connectionInfo != null) {
Long deviceId = connectionInfo.getDeviceId();
deviceEndpointMap.remove(deviceId);
log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]", deviceId,
getEndpointAddress(endpoint));
if (connectionInfo == null) {
return;
}
Long deviceId = connectionInfo.getDeviceId();
deviceEndpointMap.remove(deviceId);
log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]", deviceId, getEndpointAddress(endpoint));
}
/**

View File

@@ -471,9 +471,7 @@ public class IotMqttUpstreamHandler {
/**
* 注册连接
*/
private void registerConnection(MqttEndpoint endpoint, IotDeviceRespDTO device,
String clientId) {
private void registerConnection(MqttEndpoint endpoint, IotDeviceRespDTO device, String clientId) {
IotMqttConnectionManager.ConnectionInfo connectionInfo = new IotMqttConnectionManager.ConnectionInfo()
.setDeviceId(device.getId())
.setProductKey(device.getProductKey())
@@ -481,7 +479,6 @@ public class IotMqttUpstreamHandler {
.setClientId(clientId)
.setAuthenticated(true)
.setRemoteAddress(connectionManager.getEndpointAddress(endpoint));
connectionManager.registerConnection(endpoint, device.getId(), connectionInfo);
}
@@ -510,15 +507,13 @@ public class IotMqttUpstreamHandler {
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
deviceMessageService.sendDeviceMessage(offlineMessage, connectionInfo.getProductKey(),
connectionInfo.getDeviceName(), serverId);
log.info("[cleanupConnection][设备离线,设备 ID: {},设备名称: {}]",
connectionInfo.getDeviceId(), connectionInfo.getDeviceName());
log.info("[cleanupConnection][设备离线,设备 ID: {},设备名称: {}]", connectionInfo.getDeviceId(), connectionInfo.getDeviceName());
}
// 注销连接
connectionManager.unregisterConnection(endpoint);
} catch (Exception e) {
log.error("[cleanupConnection][清理连接失败,客户端 ID: {},错误: {}]",
endpoint.clientIdentifier(), e.getMessage());
log.error("[cleanupConnection][清理连接失败,客户端 ID: {},错误: {}]", endpoint.clientIdentifier(), e.getMessage());
}
}

View File

@@ -34,7 +34,7 @@ public class IotTcpDownstreamSubscriber implements IotMessageSubscriber<IotDevic
public void init() {
// 初始化下游处理器
this.downstreamHandler = new IotTcpDownstreamHandler(messageService, connectionManager);
// 注册下游订阅者
messageBus.register(this);
log.info("[init][TCP 下游订阅者初始化完成,服务器 ID: {}Topic: {}]",
protocol.getServerId(), getTopic());

View File

@@ -51,9 +51,9 @@ public class IotTcpConnectionManager {
connectionMap.remove(oldSocket);
}
// 注册新连接
connectionMap.put(socket, connectionInfo);
deviceSocketMap.put(deviceId, socket);
log.info("[registerConnection][注册设备连接,设备 ID: {},连接: {}product key: {}device name: {}]",
deviceId, socket.remoteAddress(), connectionInfo.getProductKey(), connectionInfo.getDeviceName());
}
@@ -65,12 +65,12 @@ public class IotTcpConnectionManager {
*/
public void unregisterConnection(NetSocket socket) {
ConnectionInfo connectionInfo = connectionMap.remove(socket);
if (connectionInfo != null) {
Long deviceId = connectionInfo.getDeviceId();
deviceSocketMap.remove(deviceId);
log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]",
deviceId, socket.remoteAddress());
if (connectionInfo == null) {
return;
}
Long deviceId = connectionInfo.getDeviceId();
deviceSocketMap.remove(deviceId);
log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]", deviceId, socket.remoteAddress());
}
/**

View File

@@ -34,7 +34,7 @@ public class IotUdpDownstreamSubscriber implements IotMessageSubscriber<IotDevic
public void init() {
// 初始化下游处理器
this.downstreamHandler = new IotUdpDownstreamHandler(messageService, sessionManager, protocol);
// 注册下游订阅者
messageBus.register(this);
log.info("[init][UDP 下游订阅者初始化完成,服务器 ID: {}Topic: {}]",
protocol.getServerId(), getTopic());

View File

@@ -34,7 +34,7 @@ public class IotWebSocketDownstreamSubscriber implements IotMessageSubscriber<Io
public void init() {
// 初始化下游处理器
this.downstreamHandler = new IotWebSocketDownstreamHandler(messageService, connectionManager);
// 注册下游订阅者
messageBus.register(this);
log.info("[init][WebSocket 下游订阅者初始化完成,服务器 ID: {}Topic: {}]",
protocol.getServerId(), getTopic());

View File

@@ -73,8 +73,7 @@ public class IotWebSocketUpstreamProtocol {
httpServer.webSocketHandler(socket -> {
// 验证路径
if (ObjUtil.notEqual(wsProperties.getPath(), socket.path())) {
log.warn("[webSocketHandler][WebSocket 路径不匹配,拒绝连接,路径: {},期望: {}]",
socket.path(), wsProperties.getPath());
log.warn("[webSocketHandler][WebSocket 路径不匹配,拒绝连接,路径: {},期望: {}]", socket.path(), wsProperties.getPath());
// TODO @AI已经被废弃看看换什么其他方法
socket.reject();
return;
@@ -89,8 +88,7 @@ public class IotWebSocketUpstreamProtocol {
// 3. 启动服务器
try {
httpServer.listen().result();
log.info("[start][IoT 网关 WebSocket 协议启动成功,端口:{},路径:{}]",
wsProperties.getPort(), wsProperties.getPath());
log.info("[start][IoT 网关 WebSocket 协议启动成功,端口:{},路径:{}]", wsProperties.getPort(), wsProperties.getPath());
} catch (Exception e) {
log.error("[start][IoT 网关 WebSocket 协议启动失败]", e);
throw e;

View File

@@ -51,9 +51,9 @@ public class IotWebSocketConnectionManager {
connectionMap.remove(oldSocket);
}
// 注册新连接
connectionMap.put(socket, connectionInfo);
deviceSocketMap.put(deviceId, socket);
log.info("[registerConnection][注册设备连接,设备 ID: {},连接: {}product key: {}device name: {}]",
deviceId, socket.remoteAddress(), connectionInfo.getProductKey(), connectionInfo.getDeviceName());
}
@@ -65,12 +65,13 @@ public class IotWebSocketConnectionManager {
*/
public void unregisterConnection(ServerWebSocket socket) {
ConnectionInfo connectionInfo = connectionMap.remove(socket);
if (connectionInfo != null) {
Long deviceId = connectionInfo.getDeviceId();
deviceSocketMap.remove(deviceId);
log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]",
deviceId, socket.remoteAddress());
if (connectionInfo == null) {
return;
}
Long deviceId = connectionInfo.getDeviceId();
deviceSocketMap.remove(deviceId);
log.info("[unregisterConnection][注销设备连接,设备 ID: {},连接: {}]",
deviceId, socket.remoteAddress());
}
/**

View File

@@ -327,8 +327,7 @@ public class IotWebSocketUpstreamHandler implements Handler<ServerWebSocket> {
.build();
int code = success ? 0 : 401;
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, responseData,
code, message);
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, responseData, code, message);
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, CODEC_TYPE);
socket.writeTextMessage(new String(encodedData, StandardCharsets.UTF_8));

View File

@@ -84,9 +84,6 @@ public class IotDirectDeviceTcpProtocolIntegrationTest {
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length);
if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) {
log.info("[testAuth][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload));
}
// 2.1 发送请求
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {
@@ -123,9 +120,6 @@ public class IotDirectDeviceTcpProtocolIntegrationTest {
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testDeviceRegister][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length);
if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) {
log.info("[testDeviceRegister][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload));
}
// 2.1 发送请求
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {

View File

@@ -1,7 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.HexUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
@@ -99,9 +98,6 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest {
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length);
if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) {
log.info("[testAuth][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload));
}
// 2.1 发送请求
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {

View File

@@ -1,7 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.HexUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
@@ -85,9 +84,6 @@ public class IotGatewaySubDeviceTcpProtocolIntegrationTest {
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length);
if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) {
log.info("[testAuth][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload));
}
// 2.1 发送请求
try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) {

View File

@@ -1,7 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.HexUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
@@ -90,9 +89,6 @@ public class IotDirectDeviceUdpProtocolIntegrationTest {
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length);
if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) {
log.info("[testAuth][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload));
}
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {
@@ -130,9 +126,6 @@ public class IotDirectDeviceUdpProtocolIntegrationTest {
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testDeviceRegister][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length);
if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) {
log.info("[testDeviceRegister][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload));
}
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {

View File

@@ -1,7 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.HexUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
@@ -102,9 +101,6 @@ public class IotGatewayDeviceUdpProtocolIntegrationTest {
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length);
if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) {
log.info("[testAuth][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload));
}
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {

View File

@@ -1,7 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.HexUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
@@ -91,9 +90,6 @@ public class IotGatewaySubDeviceUdpProtocolIntegrationTest {
// 1.2 编码
byte[] payload = CODEC.encode(request);
log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length);
if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) {
log.info("[testAuth][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload));
}
// 2.1 发送请求
try (DatagramSocket socket = new DatagramSocket()) {

View File

@@ -81,6 +81,8 @@ public class IotDirectDeviceWebSocketProtocolIntegrationTest {
// ===================== 认证测试 =====================
// TODO @AI参考 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java 或 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotDirectDeviceMqttProtocolIntegrationTest.java 类,优化代码结构
/**
* 认证测试:获取设备 Token
*/
@@ -96,6 +98,7 @@ public class IotDirectDeviceWebSocketProtocolIntegrationTest {
.setPort(SERVER_PORT)
.setURI(WS_PATH);
// TODO @AI这里有告警Deprecate /instead use WebSocketClient.connect(WebSocketConnectOptions)
client.webSocket(options).onComplete(ar -> {
if (ar.succeeded()) {
WebSocket ws = ar.result();