From 1e2dc281e3ad2f7064eb0a857e2a0bc9ece9733a Mon Sep 17 00:00:00 2001 From: YunaiV Date: Sun, 1 Feb 2026 20:56:56 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=88iot=EF=BC=89=EF=BC=9A=E3=80=90?= =?UTF-8?q?=E5=8D=8F=E8=AE=AE=E6=94=B9=E9=80=A0=E3=80=91mqtt=20=E5=88=9D?= =?UTF-8?q?=E6=AD=A5=E6=94=B9=E9=80=A0=EF=BC=8820%=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/IotGatewayConfiguration.java | 42 -- .../gateway/config/IotGatewayProperties.java | 90 +-- .../gateway/protocol/IotProtocolManager.java | 13 + .../gateway/protocol/mqtt/IotMqttConfig.java | 44 ++ .../protocol/mqtt/IotMqttProtocol.java | 317 +++++++++++ .../mqtt/IotMqttUpstreamProtocol.java | 117 ---- .../downstream}/IotMqttDownstreamHandler.java | 23 +- .../IotMqttDownstreamSubscriber.java | 7 +- .../upstream/IotMqttAbstractHandler.java | 105 ++++ .../upstream/IotMqttConnectionHandler.java | 178 ++++++ .../upstream/IotMqttRegisterHandler.java | 182 +++++++ .../upstream/IotMqttUpstreamHandler.java | 81 +++ .../manager/IotMqttConnectionManager.java | 4 +- .../mqtt/router/IotMqttUpstreamHandler.java | 511 ------------------ .../downstream/IotTcpDownstreamHandler.java | 4 +- .../src/main/resources/application.yaml | 30 +- 16 files changed, 964 insertions(+), 784 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttConfig.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttProtocol.java delete mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java rename yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/{router => handler/downstream}/IotMqttDownstreamHandler.java (89%) rename yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/{ => handler/downstream}/IotMqttDownstreamSubscriber.java (82%) create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttAbstractHandler.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttConnectionHandler.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttRegisterHandler.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttUpstreamHandler.java delete mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java index e9800c34e4..2115f76c02 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java @@ -5,12 +5,7 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolManager; import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxAuthEventProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttDownstreamSubscriber; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttDownstreamHandler; import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager; -import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.core.Vertx; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Qualifier; @@ -67,41 +62,4 @@ public class IotGatewayConfiguration { } } - /** - * IoT 网关 MQTT 协议配置类 - */ - @Configuration - @ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.mqtt", name = "enabled", havingValue = "true") - @Slf4j - public static class MqttProtocolConfiguration { - - @Bean(name = "mqttVertx", destroyMethod = "close") - public Vertx mqttVertx() { - return Vertx.vertx(); - } - - @Bean - public IotMqttUpstreamProtocol iotMqttUpstreamProtocol(IotGatewayProperties gatewayProperties, - IotDeviceMessageService messageService, - IotMqttConnectionManager connectionManager, - @Qualifier("mqttVertx") Vertx mqttVertx) { - return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getMqtt(), messageService, - connectionManager, mqttVertx); - } - - @Bean - public IotMqttDownstreamHandler iotMqttDownstreamHandler(IotDeviceMessageService messageService, - IotMqttConnectionManager connectionManager) { - return new IotMqttDownstreamHandler(messageService, connectionManager); - } - - @Bean - public IotMqttDownstreamSubscriber iotMqttDownstreamSubscriber(IotMqttUpstreamProtocol mqttUpstreamProtocol, - IotMqttDownstreamHandler downstreamHandler, - IotMessageBus messageBus) { - return new IotMqttDownstreamSubscriber(mqttUpstreamProtocol, downstreamHandler, messageBus); - } - - } - } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java index ce1616132b..8cb6595b55 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java @@ -1,13 +1,11 @@ package cn.iocoder.yudao.module.iot.gateway.config; -import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapConfig; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpConfig; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttConfig; import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConfig; import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpConfig; import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketConfig; -import io.vertx.core.net.KeyCertOptions; -import io.vertx.core.net.TrustOptions; import jakarta.validation.Valid; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; @@ -87,11 +85,6 @@ public class IotGatewayProperties { */ private EmqxProperties emqx; - /** - * MQTT 组件配置 - */ - private MqttProperties mqtt; - } @Data @@ -263,79 +256,6 @@ public class IotGatewayProperties { } - @Data - public static class MqttProperties { - - /** - * 是否开启 - */ - @NotNull(message = "是否开启不能为空") - private Boolean enabled; - - /** - * 服务器端口 - */ - private Integer port = 1883; - - /** - * 最大消息大小(字节) - */ - private Integer maxMessageSize = 8192; - - /** - * 连接超时时间(秒) - */ - private Integer connectTimeoutSeconds = 60; - /** - * 保持连接超时时间(秒) - */ - private Integer keepAliveTimeoutSeconds = 300; - - // NOTE:SSL 相关参数后续统一到 protocol 层级(优先级低) - /** - * 是否启用 SSL - */ - private Boolean sslEnabled = false; - /** - * SSL 配置 - */ - private SslOptions sslOptions = new SslOptions(); - - /** - * SSL 配置选项 - */ - @Data - public static class SslOptions { - - /** - * 密钥证书选项 - */ - private KeyCertOptions keyCertOptions; - /** - * 信任选项 - */ - private TrustOptions trustOptions; - /** - * SSL 证书路径 - */ - private String certPath; - /** - * SSL 私钥路径 - */ - private String keyPath; - /** - * 信任存储路径 - */ - private String trustStorePath; - /** - * 信任存储密码 - */ - private String trustStorePassword; - - } - - } - // NOTE:暂未统一为 ProtocolProperties,待协议改造完成再调整 /** * 协议实例配置 @@ -376,6 +296,8 @@ public class IotGatewayProperties { */ private String serialize; + // ========== 各协议配置 ========== + /** * HTTP 协议配置 */ @@ -406,6 +328,12 @@ public class IotGatewayProperties { @Valid private IotWebSocketConfig websocket; + /** + * MQTT 协议配置 + */ + @Valid + private IotMqttConfig mqtt; + } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolManager.java index 45b6789041..47b41a3e28 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolManager.java @@ -6,6 +6,7 @@ import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketProtocol; @@ -106,6 +107,8 @@ public class IotProtocolManager implements SmartLifecycle { return createCoapProtocol(config); case WEBSOCKET: return createWebSocketProtocol(config); + case MQTT: + return createMqttProtocol(config); default: throw new IllegalArgumentException(String.format( "[createProtocol][协议实例 %s 的协议类型 %s 暂不支持]", config.getId(), protocolType)); @@ -162,4 +165,14 @@ public class IotProtocolManager implements SmartLifecycle { return new IotWebSocketProtocol(config); } + /** + * 创建 MQTT 协议实例 + * + * @param config 协议实例配置 + * @return MQTT 协议实例 + */ + private IotMqttProtocol createMqttProtocol(IotGatewayProperties.ProtocolInstanceProperties config) { + return new IotMqttProtocol(config); + } + } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttConfig.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttConfig.java new file mode 100644 index 0000000000..5fb7f779fe --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttConfig.java @@ -0,0 +1,44 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt; + +import lombok.Data; + +// TODO @AI:validator 参数校验。也看看其他几个配置类有没有类似问题 +/** + * IoT 网关 MQTT 协议配置 + * + * @author 芋道源码 + */ +@Data +public class IotMqttConfig { + + /** + * 是否启用 SSL + */ + private Boolean sslEnabled = false; + + /** + * SSL 证书路径 + */ + private String sslCertPath; + + /** + * SSL 私钥路径 + */ + private String sslKeyPath; + + /** + * 最大消息大小(字节) + */ + private Integer maxMessageSize = 8192; + + /** + * 连接超时时间(秒) + */ + private Integer connectTimeoutSeconds = 60; + + /** + * 保持连接超时时间(秒) + */ + private Integer keepAliveTimeoutSeconds = 300; + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttProtocol.java new file mode 100644 index 0000000000..a8d8cb28d9 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttProtocol.java @@ -0,0 +1,317 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt; + +import cn.hutool.extra.spring.SpringUtil; +import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; +import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolInstanceProperties; +import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.downstream.IotMqttDownstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.downstream.IotMqttDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream.IotMqttConnectionHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream.IotMqttRegisterHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream.IotMqttUpstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.netty.handler.codec.mqtt.MqttConnectReturnCode; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.vertx.core.Vertx; +import io.vertx.core.net.PemKeyCertOptions; +import io.vertx.mqtt.MqttEndpoint; +import io.vertx.mqtt.MqttServer; +import io.vertx.mqtt.MqttServerOptions; +import io.vertx.mqtt.MqttTopicSubscription; +import io.vertx.mqtt.messages.MqttPublishMessage; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * IoT 网关 MQTT 协议:接收设备上行消息 + * + * @author 芋道源码 + */ +@Slf4j +public class IotMqttProtocol implements IotProtocol { + + /** + * 协议配置 + */ + private final ProtocolInstanceProperties properties; + /** + * 服务器 ID(用于消息追踪,全局唯一) + */ + @Getter + private final String serverId; + + /** + * 运行状态 + */ + @Getter + private volatile boolean running = false; + + /** + * Vert.x 实例 + */ + private Vertx vertx; + /** + * MQTT 服务器 + */ + private MqttServer mqttServer; + /** + * 连接管理器 + */ + private IotMqttConnectionManager connectionManager; + + /** + * 下行消息订阅者 + */ + private IotMqttDownstreamSubscriber downstreamSubscriber; + + // TODO @AI:这个是不是提前创建下?因为是无状态的。 + private IotMqttConnectionHandler connectionHandler; + private IotMqttRegisterHandler registerHandler; + private IotMqttUpstreamHandler upstreamHandler; + + public IotMqttProtocol(ProtocolInstanceProperties properties) { + this.properties = properties; + this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort()); + + // TODO @AI:初始化连接器,参考 IotTcpProtocol + + // TODO @AI:初始化下行消息订阅者,参考 IotTcpProtocol + } + + @Override + public String getId() { + return properties.getId(); + } + + @Override + public IotProtocolTypeEnum getType() { + return IotProtocolTypeEnum.MQTT; + } + + // TODO @AI:这个方法的整体注释风格,参考 IotTcpProtocol 的 start 方法。 + @Override + public void start() { + if (running) { + log.warn("[start][IoT MQTT 协议 {} 已经在运行中]", getId()); + return; + } + + // 1.1 创建 Vertx 实例(每个 Protocol 独立管理) + this.vertx = Vertx.vertx(); + + // 1.2 创建连接管理器 + this.connectionManager = new IotMqttConnectionManager(); + + // 1.3 初始化 Handler + initHandlers(); + + // 2. 创建服务器选项 + IotMqttConfig mqttConfig = properties.getMqtt(); + // TODO @AI:default 值,在 IotMqttConfig 处理; + MqttServerOptions options = new MqttServerOptions() + .setPort(properties.getPort()) + .setMaxMessageSize(mqttConfig != null ? mqttConfig.getMaxMessageSize() : 8192) + .setTimeoutOnConnect(mqttConfig != null ? mqttConfig.getConnectTimeoutSeconds() : 60); + + // 3. 配置 SSL(如果启用) + if (mqttConfig != null && Boolean.TRUE.equals(mqttConfig.getSslEnabled())) { + PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions() + .setKeyPath(mqttConfig.getSslKeyPath()) + .setCertPath(mqttConfig.getSslCertPath()); + options.setSsl(true).setKeyCertOptions(pemKeyCertOptions); + } + + // 4. 创建服务器并设置连接处理器 + mqttServer = MqttServer.create(vertx, options); + mqttServer.endpointHandler(this::handleEndpoint); + + // 5. 启动服务器 + try { + mqttServer.listen().result(); + running = true; + log.info("[start][IoT MQTT 协议 {} 启动成功,端口:{},serverId:{}]", + getId(), properties.getPort(), serverId); + + // 6. 启动下行消息订阅者 + IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class); + IotMqttDownstreamHandler downstreamHandler = new IotMqttDownstreamHandler( + SpringUtil.getBean(IotDeviceMessageService.class), connectionManager); + this.downstreamSubscriber = new IotMqttDownstreamSubscriber(this, downstreamHandler, messageBus); + this.downstreamSubscriber.start(); + } catch (Exception e) { + log.error("[start][IoT MQTT 协议 {} 启动失败]", getId(), e); + // 启动失败时关闭 Vertx + if (vertx != null) { + vertx.close(); + vertx = null; + } + throw e; + } + } + + @Override + public void stop() { + if (!running) { + return; + } + // 1. 停止下行消息订阅者 + if (downstreamSubscriber != null) { + try { + downstreamSubscriber.stop(); + log.info("[stop][IoT MQTT 协议 {} 下行消息订阅者已停止]", getId()); + } catch (Exception e) { + log.error("[stop][IoT MQTT 协议 {} 下行消息订阅者停止失败]", getId(), e); + } + downstreamSubscriber = null; + } + + // 2.1 关闭 MQTT 服务器 + if (mqttServer != null) { + try { + mqttServer.close().result(); + log.info("[stop][IoT MQTT 协议 {} 服务器已停止]", getId()); + } catch (Exception e) { + log.error("[stop][IoT MQTT 协议 {} 服务器停止失败]", getId(), e); + } + mqttServer = null; + } + // 2.2 关闭 Vertx 实例 + if (vertx != null) { + try { + vertx.close().result(); + log.info("[stop][IoT MQTT 协议 {} Vertx 已关闭]", getId()); + } catch (Exception e) { + log.error("[stop][IoT MQTT 协议 {} Vertx 关闭失败]", getId(), e); + } + vertx = null; + } + running = false; + log.info("[stop][IoT MQTT 协议 {} 已停止]", getId()); + } + + /** + * 初始化 Handler + */ + private void initHandlers() { + IotDeviceMessageService messageService = SpringUtil.getBean(IotDeviceMessageService.class); + IotDeviceCommonApi deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); + this.connectionHandler = new IotMqttConnectionHandler(connectionManager, messageService, deviceApi, serverId); + this.registerHandler = new IotMqttRegisterHandler(connectionManager, messageService, deviceApi); + this.upstreamHandler = new IotMqttUpstreamHandler(connectionManager, messageService, serverId); + } + + /** + * 处理 MQTT 连接端点 + * + * @param endpoint MQTT 连接端点 + */ + private void handleEndpoint(MqttEndpoint endpoint) { + String clientId = endpoint.clientIdentifier(); + + // 1. 委托 connectionHandler 处理连接认证 + // TODO @AI:register topic 不需要注册,需要判断下; + if (!connectionHandler.handleConnect(endpoint)) { + endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD); + return; + } + + // 2.1 设置异常和关闭处理器 + endpoint.exceptionHandler(ex -> { + log.warn("[handleEndpoint][连接异常,客户端 ID: {},地址: {}]", + clientId, connectionManager.getEndpointAddress(endpoint)); + // TODO @AI:是不是改成 endpoint close 更合适? + connectionHandler.cleanupConnection(endpoint); + }); + endpoint.closeHandler(v -> connectionHandler.cleanupConnection(endpoint)); + endpoint.disconnectHandler(v -> { + log.debug("[handleEndpoint][设备断开连接,客户端 ID: {}]", clientId); + connectionHandler.cleanupConnection(endpoint); + }); + // 2.2 设置心跳处理器 + endpoint.pingHandler(v -> log.debug("[handleEndpoint][收到客户端心跳,客户端 ID: {}]", clientId)); + + // 3.1 设置消息处理器 + endpoint.publishHandler(message -> processMessage(endpoint, message)); + // 3.2 设置 QoS 2 消息的 PUBREL 处理器 + endpoint.publishReleaseHandler(endpoint::publishComplete); + + // 4.1 设置订阅处理器 + endpoint.subscribeHandler(subscribe -> { + // TODO @AI:convertList 简化; + List topicNames = subscribe.topicSubscriptions().stream() + .map(MqttTopicSubscription::topicName) + .collect(Collectors.toList()); + log.debug("[handleEndpoint][设备订阅,客户端 ID: {},主题: {}]", clientId, topicNames); + + // TODO @AI:convertList 简化; + List grantedQoSLevels = subscribe.topicSubscriptions().stream() + .map(MqttTopicSubscription::qualityOfService) + .collect(Collectors.toList()); + endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQoSLevels); + }); + // 4.2 设置取消订阅处理器 + endpoint.unsubscribeHandler(unsubscribe -> { + log.debug("[handleEndpoint][设备取消订阅,客户端 ID: {},主题: {}]", clientId, unsubscribe.topics()); + endpoint.unsubscribeAcknowledge(unsubscribe.messageId()); + }); + + // 5. 接受连接 + endpoint.accept(false); + } + + /** + * 处理消息(发布) + * + * @param endpoint MQTT 连接端点 + * @param message 发布消息 + */ + // TODO @AI:看看要不要一定程度,参考 IotTcpUpstreamHandler 的 processMessage 方法; + private void processMessage(MqttEndpoint endpoint, MqttPublishMessage message) { + String clientId = endpoint.clientIdentifier(); + try { + String topic = message.topicName(); + byte[] payload = message.payload().getBytes(); + + // 根据 topic 分发到不同 handler + if (registerHandler.isRegisterMessage(topic)) { + registerHandler.handleRegister(endpoint, topic, payload); + } else { + upstreamHandler.handleMessage(endpoint, topic, payload); + } + + // 根据 QoS 级别发送相应的确认消息 + handleQoSAck(endpoint, message); + } catch (Exception e) { + // TODO @AI:异常的时候,直接断开; + log.error("[handlePublish][消息处理失败,断开连接,客户端 ID: {},地址: {},错误: {}]", + clientId, connectionManager.getEndpointAddress(endpoint), e.getMessage()); + connectionHandler.cleanupConnection(endpoint); + endpoint.close(); + } + } + + /** + * 处理 QoS 确认 + * + * @param endpoint MQTT 连接端点 + * @param message 发布消息 + */ + private void handleQoSAck(MqttEndpoint endpoint, MqttPublishMessage message) { + if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) { + // QoS 1: 发送 PUBACK 确认 + endpoint.publishAcknowledge(message.messageId()); + } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) { + // QoS 2: 发送 PUBREC 确认 + endpoint.publishReceived(message.messageId()); + } + // QoS 0 无需确认 + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java deleted file mode 100644 index 46fbc7c3fa..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttUpstreamProtocol.java +++ /dev/null @@ -1,117 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt; - -import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; -import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; -import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; -import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttUpstreamHandler; -import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; -import io.vertx.core.Vertx; -import io.vertx.mqtt.MqttServer; -import io.vertx.mqtt.MqttServerOptions; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; - -/** - * IoT 网关 MQTT 协议:接收设备上行消息 - * - * @author 芋道源码 - */ -@Slf4j -public class IotMqttUpstreamProtocol implements IotProtocol { - - private static final String ID = "mqtt"; - - private volatile boolean running = false; - - private final IotGatewayProperties.MqttProperties mqttProperties; - - private final IotDeviceMessageService messageService; - - private final IotMqttConnectionManager connectionManager; - - private final Vertx vertx; - - @Getter - private final String serverId; - - private MqttServer mqttServer; - - public IotMqttUpstreamProtocol(IotGatewayProperties.MqttProperties mqttProperties, - IotDeviceMessageService messageService, - IotMqttConnectionManager connectionManager, - Vertx vertx) { - this.mqttProperties = mqttProperties; - this.messageService = messageService; - this.connectionManager = connectionManager; - this.vertx = vertx; - this.serverId = IotDeviceMessageUtils.generateServerId(mqttProperties.getPort()); - } - - @Override - public String getId() { - return ID; - } - - @Override - public IotProtocolTypeEnum getType() { - return IotProtocolTypeEnum.MQTT; - } - - @Override - public boolean isRunning() { - return running; - } - - // TODO @haohao:这里的编写,是不是和 tcp 对应的,风格保持一致哈; - @Override - @PostConstruct - public void start() { - // 创建服务器选项 - MqttServerOptions options = new MqttServerOptions() - .setPort(mqttProperties.getPort()) - .setMaxMessageSize(mqttProperties.getMaxMessageSize()) - .setTimeoutOnConnect(mqttProperties.getConnectTimeoutSeconds()); - - // 配置 SSL(如果启用) - if (Boolean.TRUE.equals(mqttProperties.getSslEnabled())) { - options.setSsl(true) - .setKeyCertOptions(mqttProperties.getSslOptions().getKeyCertOptions()) - .setTrustOptions(mqttProperties.getSslOptions().getTrustOptions()); - } - - // 创建服务器并设置连接处理器 - mqttServer = MqttServer.create(vertx, options); - mqttServer.endpointHandler(endpoint -> { - IotMqttUpstreamHandler handler = new IotMqttUpstreamHandler(this, messageService, connectionManager); - handler.handle(endpoint); - }); - - // 启动服务器 - try { - mqttServer.listen().result(); - running = true; - log.info("[start][IoT 网关 MQTT 协议启动成功,端口:{}]", mqttProperties.getPort()); - } catch (Exception e) { - log.error("[start][IoT 网关 MQTT 协议启动失败]", e); - throw e; - } - } - - @Override - @PreDestroy - public void stop() { - if (mqttServer != null) { - try { - mqttServer.close().result(); - running = false; - log.info("[stop][IoT 网关 MQTT 协议已停止]"); - } catch (Exception e) { - log.error("[stop][IoT 网关 MQTT 协议停止失败]", e); - } - } - } -} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/downstream/IotMqttDownstreamHandler.java similarity index 89% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/downstream/IotMqttDownstreamHandler.java index c848833f66..69b363e5d0 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/downstream/IotMqttDownstreamHandler.java @@ -1,4 +1,4 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.downstream; import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; @@ -7,31 +7,22 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnecti import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils; import io.netty.handler.codec.mqtt.MqttQoS; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; /** * IoT 网关 MQTT 协议:下行消息处理器 - *

- * 专门处理下行消息的业务逻辑,包括: - * 1. 消息编码 - * 2. 主题构建 - * 3. 消息发送 * * @author 芋道源码 */ @Slf4j +@RequiredArgsConstructor public class IotMqttDownstreamHandler { private final IotDeviceMessageService deviceMessageService; private final IotMqttConnectionManager connectionManager; - public IotMqttDownstreamHandler(IotDeviceMessageService deviceMessageService, - IotMqttConnectionManager connectionManager) { - this.deviceMessageService = deviceMessageService; - this.connectionManager = connectionManager; - } - /** * 处理下行消息 * @@ -40,6 +31,7 @@ public class IotMqttDownstreamHandler { */ public boolean handleDownstreamMessage(IotDeviceMessage message) { try { + // TODO @AI:参考 IotTcpDownstreamHandler 逻辑; // 1. 基础校验 if (message == null || message.getDeviceId() == null) { log.warn("[handleDownstreamMessage][消息或设备 ID 为空,忽略处理]"); @@ -47,6 +39,7 @@ public class IotMqttDownstreamHandler { } // 2. 检查设备是否在线 + // TODO @AI:这块逻辑,是不是冗余?直接使用 3. 获取连接信息判断不就行了? if (connectionManager.isDeviceOffline(message.getDeviceId())) { log.warn("[handleDownstreamMessage][设备离线,无法发送消息,设备 ID:{}]", message.getDeviceId()); return false; @@ -59,7 +52,7 @@ public class IotMqttDownstreamHandler { return false; } - // 4. 编码消息 + // 4. 序列化 byte[] payload = deviceMessageService.encodeDeviceMessage(message, connectionInfo.getProductKey(), connectionInfo.getDeviceName()); if (payload == null || payload.length == 0) { @@ -68,6 +61,7 @@ public class IotMqttDownstreamHandler { } // 5. 发送消息到设备 + // TODO @AI:参考 IotTcpDownstreamHandler 的逻辑; return sendMessageToDevice(message, connectionInfo, payload); } catch (Exception e) { if (message != null) { @@ -78,6 +72,7 @@ public class IotMqttDownstreamHandler { } } + // TODO @AI 是不是合并到 handleDownstreamMessage 里; /** * 发送消息到设备 * @@ -91,6 +86,7 @@ public class IotMqttDownstreamHandler { byte[] payload) { // 1. 构建主题 String topic = buildDownstreamTopic(message, connectionInfo); + // TODO @AI:直接断言,非空! if (StrUtil.isBlank(topic)) { log.warn("[sendMessageToDevice][主题构建失败,设备 ID:{},方法:{}]", message.getDeviceId(), message.getMethod()); @@ -118,6 +114,7 @@ public class IotMqttDownstreamHandler { */ private String buildDownstreamTopic(IotDeviceMessage message, IotMqttConnectionManager.ConnectionInfo connectionInfo) { + // TODO @AI:直接断言,非空! String method = message.getMethod(); if (StrUtil.isBlank(method)) { return null; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/downstream/IotMqttDownstreamSubscriber.java similarity index 82% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/downstream/IotMqttDownstreamSubscriber.java index fe9b600b99..f40cac1929 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/downstream/IotMqttDownstreamSubscriber.java @@ -1,9 +1,9 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt; +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.downstream; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttDownstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttProtocol; import lombok.extern.slf4j.Slf4j; /** @@ -18,7 +18,7 @@ public class IotMqttDownstreamSubscriber extends IotProtocolDownstreamSubscriber private final IotMqttDownstreamHandler downstreamHandler; - public IotMqttDownstreamSubscriber(IotMqttUpstreamProtocol protocol, + public IotMqttDownstreamSubscriber(IotMqttProtocol protocol, IotMqttDownstreamHandler downstreamHandler, IotMessageBus messageBus) { super(protocol, messageBus); @@ -27,6 +27,7 @@ public class IotMqttDownstreamSubscriber extends IotProtocolDownstreamSubscriber @Override protected void handleMessage(IotDeviceMessage message) { + // TODO @AI:参考 IotTcpDownstreamHandler 不处理返回值,甚至不用返回值; boolean success = downstreamHandler.handleDownstreamMessage(message); if (success) { log.debug("[handleMessage][下行消息处理成功, messageId: {}, method: {}, deviceId: {}]", diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttAbstractHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttAbstractHandler.java new file mode 100644 index 0000000000..b91a151a02 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttAbstractHandler.java @@ -0,0 +1,105 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream; + +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.vertx.core.buffer.Buffer; +import io.vertx.mqtt.MqttEndpoint; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT 网关 MQTT 协议的处理器抽象基类 + *

+ * 提供通用的连接校验、响应发送等功能 + * + * @author 芋道源码 + */ +@Slf4j +@RequiredArgsConstructor +public abstract class IotMqttAbstractHandler { + + // TODO @AI:不使用 codec,使用 serializer 来进行编解码; + /** + * 默认编解码类型(MQTT 使用 Alink 协议) + */ + protected static final String DEFAULT_CODEC_TYPE = "Alink"; + + protected final IotMqttConnectionManager connectionManager; + protected final IotDeviceMessageService deviceMessageService; + + // TODO @AI:这个是否需要???!!! + /** + * 是否需要连接已认证(默认 true) + *

+ * 仅 IotMqttConnectionHandler 覆盖为 false + * + * @return 是否需要连接已认证 + */ + protected boolean requiresAuthenticated() { + return true; + } + + // TODO @AI:不确定,是不是基于 method 就可以算出来 reply topic ???!!! + // TODO @AI:需要传递 seriabler 序列对象,不是通过 deviceMessageService.encodeDeviceMessage 获取到合适的; + /** + * 发送成功响应到设备 + * + * @param endpoint MQTT 连接端点 + * @param productKey 产品 Key + * @param deviceName 设备名称 + * @param requestId 请求 ID + * @param method 方法名 + * @param data 响应数据 + */ + protected void sendSuccessResponse(MqttEndpoint endpoint, String productKey, String deviceName, + String requestId, String method, Object data) { + try { + // 1. 构建响应消息 + IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, method, data, 0, null); + + // 2. 编码消息(使用默认编解码器) + byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, DEFAULT_CODEC_TYPE); + + // 3. 构建响应主题并发送 + String replyTopic = IotMqttTopicUtils.buildTopicByMethod(method, productKey, deviceName, true); + endpoint.publish(replyTopic, Buffer.buffer(encodedData), MqttQoS.AT_LEAST_ONCE, false, false); + log.debug("[sendSuccessResponse][发送成功响应,主题: {}]", replyTopic); + } catch (Exception e) { + log.error("[sendSuccessResponse][发送成功响应异常,客户端 ID: {}]", endpoint.clientIdentifier(), e); + } + } + + /** + * 发送错误响应到设备 + * + * @param endpoint MQTT 连接端点 + * @param productKey 产品 Key + * @param deviceName 设备名称 + * @param requestId 请求 ID + * @param method 方法名 + * @param errorMessage 错误消息 + */ + protected void sendErrorResponse(MqttEndpoint endpoint, String productKey, String deviceName, + String requestId, String method, String errorMessage) { + try { + // 1. 构建响应消息 + IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, method, null, 400, errorMessage); + + // 2. 编码消息(使用默认编解码器) + byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, DEFAULT_CODEC_TYPE); + + // 3. 构建响应主题并发送 + String replyTopic = IotMqttTopicUtils.buildTopicByMethod(method, productKey, deviceName, true); + endpoint.publish(replyTopic, Buffer.buffer(encodedData), MqttQoS.AT_LEAST_ONCE, false, false); + log.debug("[sendErrorResponse][发送错误响应,主题: {}]", replyTopic); + } catch (Exception e) { + log.error("[sendErrorResponse][发送错误响应异常,客户端 ID: {}]", endpoint.clientIdentifier(), e); + } + } + + // TODO @AI:搞个基础的 writeResponse 会不会更好?参考 IotTcpUpstreamHandler 里; + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttConnectionHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttConnectionHandler.java new file mode 100644 index 0000000000..2cd7f23ddf --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttConnectionHandler.java @@ -0,0 +1,178 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream; + +import cn.hutool.core.util.BooleanUtil; +import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.framework.common.pojo.CommonResult; +import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceGetReqDTO; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.vertx.mqtt.MqttEndpoint; +import lombok.extern.slf4j.Slf4j; + +// TODO @AI:改成 auth 这样的命名,更好理解; +/** + * IoT 网关 MQTT 连接处理器 + *

+ * 处理 MQTT CONNECT 事件,完成设备认证、连接注册、上线通知 + * + * @author 芋道源码 + */ +@Slf4j +public class IotMqttConnectionHandler extends IotMqttAbstractHandler { + + // TODO @AI:通过 springutil 去获取! + private final IotDeviceCommonApi deviceApi; + private final String serverId; + + public IotMqttConnectionHandler(IotMqttConnectionManager connectionManager, + IotDeviceMessageService deviceMessageService, + IotDeviceCommonApi deviceApi, + String serverId) { + super(connectionManager, deviceMessageService); + this.deviceApi = deviceApi; + this.serverId = serverId; + } + + @Override + protected boolean requiresAuthenticated() { + return false; // 连接阶段不需要已认证 + } + + /** + * 处理 MQTT 连接请求 + * + * @param endpoint MQTT 连接端点 + * @return 认证是否成功 + */ + public boolean handleConnect(MqttEndpoint endpoint) { + // TODO @AI:整个 try catch 下; + // TODO @AI:是不是参考 IotTcpUpstreamHandler 的代码结构 + String clientId = endpoint.clientIdentifier(); + String username = endpoint.auth() != null ? endpoint.auth().getUsername() : null; + String password = endpoint.auth() != null ? endpoint.auth().getPassword() : null; + log.debug("[handleConnect][设备连接请求,客户端 ID: {},用户名: {},地址: {}]", + clientId, username, connectionManager.getEndpointAddress(endpoint)); + + // 进行认证 + if (!authenticateDevice(clientId, username, password, endpoint)) { + log.warn("[handleConnect][设备认证失败,拒绝连接,客户端 ID: {},用户名: {}]", clientId, username); + return false; + } + log.info("[handleConnect][设备认证成功,建立连接,客户端 ID: {},用户名: {}]", clientId, username); + return true; + } + + /** + * 在 MQTT 连接时进行设备认证 + * + * @param clientId 客户端 ID + * @param username 用户名 + * @param password 密码 + * @param endpoint MQTT 连接端点 + * @return 认证是否成功 + */ + private boolean authenticateDevice(String clientId, String username, String password, MqttEndpoint endpoint) { + try { + // 1.1 解析认证参数 + // TODO @AI:断言,统一交给上层打印日志; + if (StrUtil.hasEmpty(clientId, username, password)) { + log.warn("[authenticateDevice][认证参数不完整,客户端 ID: {},用户名: {}]", clientId, username); + return false; + } + // 1.2 构建认证参数 + IotDeviceAuthReqDTO authParams = new IotDeviceAuthReqDTO() + .setClientId(clientId) + .setUsername(username) + .setPassword(password); + + // 2.1 执行认证 + CommonResult authResult = deviceApi.authDevice(authParams); + // TODO @AI:断言,统一交给上层打印日志; + if (!authResult.isSuccess() || !BooleanUtil.isTrue(authResult.getData())) { + log.warn("[authenticateDevice][设备认证失败,客户端 ID: {},用户名: {},错误: {}]", + clientId, username, authResult.getMsg()); + return false; + } + // 2.2 获取设备信息 + IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(username); + if (deviceInfo == null) { + log.warn("[authenticateDevice][用户名格式不正确,客户端 ID: {},用户名: {}]", clientId, username); + return false; + } + // 2.3 获取设备信息 + // TODO @AI:报错需要处理下; + IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(), deviceInfo.getDeviceName()); + if (device == null) { + log.warn("[authenticateDevice][设备不存在,客户端 ID: {},用户名: {}]", clientId, username); + return false; + } + + // 3.1 注册连接 + registerConnection(endpoint, device, clientId); + // 3.2 发送设备上线消息 + sendOnlineMessage(device); + return true; + } catch (Exception e) { + log.error("[authenticateDevice][设备认证异常,客户端 ID: {},用户名: {}]", clientId, username, e); + return false; + } + } + + /** + * 注册连接 + */ + private void registerConnection(MqttEndpoint endpoint, IotDeviceRespDTO device, String clientId) { + IotMqttConnectionManager.ConnectionInfo connectionInfo = new IotMqttConnectionManager.ConnectionInfo() + .setDeviceId(device.getId()) + .setProductKey(device.getProductKey()) + .setDeviceName(device.getDeviceName()) + .setClientId(clientId) + .setAuthenticated(true) + .setRemoteAddress(connectionManager.getEndpointAddress(endpoint)); + connectionManager.registerConnection(endpoint, device.getId(), connectionInfo); + } + + /** + * 发送设备上线消息 + */ + private void sendOnlineMessage(IotDeviceRespDTO device) { + try { + IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline(); + deviceMessageService.sendDeviceMessage(onlineMessage, device.getProductKey(), + device.getDeviceName(), serverId); + log.info("[sendOnlineMessage][设备上线,设备 ID: {},设备名称: {}]", device.getId(), device.getDeviceName()); + } catch (Exception e) { + log.error("[sendOnlineMessage][发送设备上线消息失败,设备 ID: {},错误: {}]", device.getId(), e.getMessage()); + } + } + + /** + * 清理连接 + * + * @param endpoint MQTT 连接端点 + */ + public void cleanupConnection(MqttEndpoint endpoint) { + try { + // 1. 发送设备离线消息 + IotMqttConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(endpoint); + if (connectionInfo != null) { + IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline(); + deviceMessageService.sendDeviceMessage(offlineMessage, connectionInfo.getProductKey(), + connectionInfo.getDeviceName(), serverId); + } + + // 2. 注销连接 + connectionManager.unregisterConnection(endpoint); + } catch (Exception e) { + log.error("[cleanupConnection][清理连接失败,客户端 ID: {},错误: {}]", + endpoint.clientIdentifier(), e.getMessage()); + } + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttRegisterHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttRegisterHandler.java new file mode 100644 index 0000000000..4a1ddc17a4 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttRegisterHandler.java @@ -0,0 +1,182 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream; + +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.ArrayUtil; +import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.framework.common.pojo.CommonResult; +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.vertx.mqtt.MqttEndpoint; +import lombok.extern.slf4j.Slf4j; + +import java.util.Map; + +/** + * IoT 网关 MQTT 设备注册处理器 + *

+ * 处理设备动态注册消息(一型一密) + * + * @author 芋道源码 + */ +@Slf4j +public class IotMqttRegisterHandler extends IotMqttAbstractHandler { + + // TODO IotDeviceMessageMethodEnum.DEVICE_REGISTER 计算出来?IotMqttTopicUtils? + /** + * register 请求的 topic 后缀 + */ + public static final String REGISTER_TOPIC_SUFFIX = "/thing/auth/register"; + + private final IotDeviceCommonApi deviceApi; + + public IotMqttRegisterHandler(IotMqttConnectionManager connectionManager, + IotDeviceMessageService deviceMessageService, + IotDeviceCommonApi deviceApi) { + super(connectionManager, deviceMessageService); + // TODO @AI:通过 springutil 处理; + this.deviceApi = deviceApi; + } + + /** + * 判断是否为注册消息 + * + * @param topic 主题 + * @return 是否为注册消息 + */ + public boolean isRegisterMessage(String topic) { + // TODO @AI:是不是搞到 IotMqttTopicUtils 里? + return topic != null && topic.endsWith(REGISTER_TOPIC_SUFFIX); + } + + /** + * 处理注册消息 + * + * @param endpoint MQTT 连接端点 + * @param topic 主题 + * @param payload 消息内容 + */ + public void handleRegister(MqttEndpoint endpoint, String topic, byte[] payload) { + String clientId = endpoint.clientIdentifier(); + // 1.1 基础检查 + if (ArrayUtil.isEmpty(payload)) { + return; + } + // 1.2 解析主题,获取 productKey 和 deviceName + // TODO @AI:直接断言报错; + String[] topicParts = topic.split("/"); + if (topicParts.length < 4 || StrUtil.hasBlank(topicParts[2], topicParts[3])) { + log.warn("[handleRegister][topic({}) 格式不正确]", topic); + return; + } + String productKey = topicParts[2]; + String deviceName = topicParts[3]; + // TODO @AI:直接断言报错; + + // 2. 使用默认编解码器解码消息(设备可能未注册,无法获取 codecType) + // TODO @AI:使用默认的 json; + IotDeviceMessage message; + try { + message = deviceMessageService.decodeDeviceMessage(payload, DEFAULT_CODEC_TYPE); + if (message == null) { + log.warn("[handleRegister][消息解码失败,客户端 ID: {},主题: {}]", clientId, topic); + return; + } + } catch (Exception e) { + log.error("[handleRegister][消息解码异常,客户端 ID: {},主题: {},错误: {}]", + clientId, topic, e.getMessage(), e); + return; + } + + // 3. 处理设备动态注册请求 + log.info("[handleRegister][收到设备注册消息,设备: {}.{}, 方法: {}]", + productKey, deviceName, message.getMethod()); + try { + processRegisterRequest(message, productKey, deviceName, endpoint); + } catch (Exception e) { + // TODO @AI:各种情况下的翻译; + log.error("[handleRegister][消息处理异常,客户端 ID: {},主题: {},错误: {}]", + clientId, topic, e.getMessage(), e); + } + } + + /** + * 处理设备动态注册请求(一型一密,不需要 deviceSecret) + * + * @param message 消息信息 + * @param productKey 产品 Key + * @param deviceName 设备名称 + * @param endpoint MQTT 连接端点 + * @see 阿里云 - 一型一密 + */ + private void processRegisterRequest(IotDeviceMessage message, String productKey, String deviceName, + MqttEndpoint endpoint) { + String clientId = endpoint.clientIdentifier(); + String method = IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(); + try { + // 1. 解析注册参数 + IotDeviceRegisterReqDTO params = parseRegisterParams(message.getParams()); + if (params == null) { + log.warn("[processRegisterRequest][注册参数解析失败,客户端 ID: {}]", clientId); + sendErrorResponse(endpoint, productKey, deviceName, message.getRequestId(), method, "注册参数不完整"); + return; + } + + // 2. 调用动态注册 API + CommonResult result = deviceApi.registerDevice(params); + if (result.isError()) { + log.warn("[processRegisterRequest][注册失败,客户端 ID: {},错误: {}]", clientId, result.getMsg()); + sendErrorResponse(endpoint, productKey, deviceName, message.getRequestId(), method, result.getMsg()); + return; + } + + // 3. 发送成功响应(包含 deviceSecret) + sendSuccessResponse(endpoint, productKey, deviceName, message.getRequestId(), method, result.getData()); + log.info("[processRegisterRequest][注册成功,设备名: {},客户端 ID: {}]", + params.getDeviceName(), clientId); + } catch (Exception e) { + log.error("[processRegisterRequest][注册处理异常,客户端 ID: {}]", clientId, e); + sendErrorResponse(endpoint, productKey, deviceName, message.getRequestId(), method, "注册处理异常"); + } + } + + // TODO @AI:解析可以简化,参考别的 tcp 对应的 + /** + * 解析注册参数 + * + * @param params 参数对象(通常为 Map 类型) + * @return 注册参数 DTO,解析失败时返回 null + */ + @SuppressWarnings("unchecked") + private IotDeviceRegisterReqDTO parseRegisterParams(Object params) { + if (params == null) { + return null; + } + try { + // 参数默认为 Map 类型,直接转换 + if (params instanceof Map) { + Map paramMap = (Map) params; + return new IotDeviceRegisterReqDTO() + .setProductKey(MapUtil.getStr(paramMap, "productKey")) + .setDeviceName(MapUtil.getStr(paramMap, "deviceName")) + .setProductSecret(MapUtil.getStr(paramMap, "productSecret")); + } + // 如果已经是目标类型,直接返回 + if (params instanceof IotDeviceRegisterReqDTO) { + return (IotDeviceRegisterReqDTO) params; + } + + // 其他情况尝试 JSON 转换 + return JsonUtils.convertObject(params, IotDeviceRegisterReqDTO.class); + } catch (Exception e) { + log.error("[parseRegisterParams][解析注册参数({})失败]", params, e); + return null; + } + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttUpstreamHandler.java new file mode 100644 index 0000000000..cb78d0af87 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttUpstreamHandler.java @@ -0,0 +1,81 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream; + +import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.vertx.mqtt.MqttEndpoint; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT 网关 MQTT 上行消息处理器 + *

+ * 处理业务消息(属性上报、事件上报等) + * + * @author 芋道源码 + */ +@Slf4j +public class IotMqttUpstreamHandler extends IotMqttAbstractHandler { + + private final String serverId; + + public IotMqttUpstreamHandler(IotMqttConnectionManager connectionManager, + IotDeviceMessageService deviceMessageService, + String serverId) { + super(connectionManager, deviceMessageService); + this.serverId = serverId; + } + + /** + * 处理业务消息 + * + * @param endpoint MQTT 连接端点 + * @param topic 主题 + * @param payload 消息内容 + */ + public void handleMessage(MqttEndpoint endpoint, String topic, byte[] payload) { + String clientId = endpoint.clientIdentifier(); + + // 1. 基础检查 + if (payload == null || payload.length == 0) { + return; + } + + // 2. 解析主题,获取 productKey 和 deviceName + String[] topicParts = topic.split("/"); + if (topicParts.length < 4 || StrUtil.hasBlank(topicParts[2], topicParts[3])) { + log.warn("[handleMessage][topic({}) 格式不正确,无法解析有效的 productKey 和 deviceName]", topic); + return; + } + + // 3. 解码消息(使用从 topic 解析的 productKey 和 deviceName) + String productKey = topicParts[2]; + String deviceName = topicParts[3]; + try { + IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName); + if (message == null) { + log.warn("[handleMessage][消息解码失败,客户端 ID: {},主题: {}]", clientId, topic); + return; + } + + // 4. 处理业务消息(认证已在连接时完成) + log.info("[handleMessage][收到设备消息,设备: {}.{}, 方法: {}]", + productKey, deviceName, message.getMethod()); + handleBusinessRequest(message, productKey, deviceName); + } catch (Exception e) { + // TODO @AI:各种情况下的翻译; + log.error("[handleMessage][消息处理异常,客户端 ID: {},主题: {},错误: {}]", + clientId, topic, e.getMessage(), e); + } + } + + /** + * 处理业务请求 + */ + private void handleBusinessRequest(IotDeviceMessage message, String productKey, String deviceName) { + // 发送消息到消息总线 + message.setServerId(serverId); + deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java index 082a2ad797..4f432fed47 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java @@ -40,6 +40,7 @@ public class IotMqttConnectionManager { */ private final Map deviceEndpointMap = new ConcurrentHashMap<>(); + // TODO @AI:这里会存在返回 "unknown" 的情况么?是不是必须返回,否则还是异常更合理点? /** * 安全获取 endpoint 地址 *

@@ -66,7 +67,6 @@ public class IotMqttConnectionManager { } catch (Exception ignored) { // 连接已关闭,忽略异常 } - return realTimeAddress; } @@ -129,7 +129,6 @@ public class IotMqttConnectionManager { if (endpoint == null) { return null; } - // 获取连接信息 return getConnectionInfo(endpoint); } @@ -208,6 +207,7 @@ public class IotMqttConnectionManager { */ private String clientId; + // TODO @AI:是不是要去掉!感觉没用啊; /** * 是否已认证 */ diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java deleted file mode 100644 index d40dba447c..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamHandler.java +++ /dev/null @@ -1,511 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router; - -import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.ArrayUtil; -import cn.hutool.core.util.BooleanUtil; -import cn.hutool.core.util.StrUtil; -import cn.hutool.extra.spring.SpringUtil; -import cn.iocoder.yudao.framework.common.pojo.CommonResult; -import cn.iocoder.yudao.framework.common.util.json.JsonUtils; -import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceGetReqDTO; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; -import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; -import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; -import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO; -import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO; -import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager; -import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; -import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils; -import io.netty.handler.codec.mqtt.MqttConnectReturnCode; -import io.netty.handler.codec.mqtt.MqttQoS; -import io.vertx.mqtt.MqttEndpoint; -import io.vertx.mqtt.MqttTopicSubscription; -import lombok.extern.slf4j.Slf4j; - -import java.util.List; -import java.util.Map; - -/** - * MQTT 上行消息处理器 - * - * @author 芋道源码 - */ -@Slf4j -public class IotMqttUpstreamHandler { - - /** - * 默认编解码类型(MQTT 使用 Alink 协议) - */ - private static final String DEFAULT_CODEC_TYPE = "Alink"; - - /** - * register 请求的 topic 后缀 - */ - private static final String REGISTER_TOPIC_SUFFIX = "/thing/auth/register"; - - private final IotDeviceMessageService deviceMessageService; - - private final IotMqttConnectionManager connectionManager; - - private final IotDeviceCommonApi deviceApi; - - private final String serverId; - - public IotMqttUpstreamHandler(IotMqttUpstreamProtocol protocol, - IotDeviceMessageService deviceMessageService, - IotMqttConnectionManager connectionManager) { - this.deviceMessageService = deviceMessageService; - this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); - this.connectionManager = connectionManager; - this.serverId = protocol.getServerId(); - } - - /** - * 处理 MQTT 连接 - * - * @param endpoint MQTT 连接端点 - */ - public void handle(MqttEndpoint endpoint) { - String clientId = endpoint.clientIdentifier(); - String username = endpoint.auth() != null ? endpoint.auth().getUsername() : null; - String password = endpoint.auth() != null ? endpoint.auth().getPassword() : null; - - log.debug("[handle][设备连接请求,客户端 ID: {},用户名: {},地址: {}]", - clientId, username, connectionManager.getEndpointAddress(endpoint)); - - // 1. 先进行认证 - if (!authenticateDevice(clientId, username, password, endpoint)) { - log.warn("[handle][设备认证失败,拒绝连接,客户端 ID: {},用户名: {}]", clientId, username); - endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD); - return; - } - - log.info("[handle][设备认证成功,建立连接,客户端 ID: {},用户名: {}]", clientId, username); - - // 2. 设置心跳处理器(监听客户端的 PINGREQ 消息) - endpoint.pingHandler(v -> { - log.debug("[handle][收到客户端心跳,客户端 ID: {}]", clientId); - // Vert.x 会自动发送 PINGRESP 响应,无需手动处理 - }); - - // 3. 设置异常和关闭处理器 - endpoint.exceptionHandler(ex -> { - log.warn("[handle][连接异常,客户端 ID: {},地址: {}]", clientId, connectionManager.getEndpointAddress(endpoint)); - cleanupConnection(endpoint); - }); - endpoint.closeHandler(v -> { - cleanupConnection(endpoint); - }); - - // 4. 设置消息处理器 - endpoint.publishHandler(mqttMessage -> { - try { - // 4.1 根据 topic 判断是否为 register 请求 - String topic = mqttMessage.topicName(); - byte[] payload = mqttMessage.payload().getBytes(); - if (topic.endsWith(REGISTER_TOPIC_SUFFIX)) { - // register 请求:使用默认编解码器处理(设备可能未注册) - processRegisterMessage(clientId, topic, payload, endpoint); - } else { - // 业务请求:正常处理 - processMessage(clientId, topic, payload); - } - - // 4.2 根据 QoS 级别发送相应的确认消息 - if (mqttMessage.qosLevel() == MqttQoS.AT_LEAST_ONCE) { - // QoS 1: 发送 PUBACK 确认 - endpoint.publishAcknowledge(mqttMessage.messageId()); - } else if (mqttMessage.qosLevel() == MqttQoS.EXACTLY_ONCE) { - // QoS 2: 发送 PUBREC 确认 - endpoint.publishReceived(mqttMessage.messageId()); - } - // QoS 0 无需确认 - } catch (Exception e) { - log.error("[handle][消息解码失败,断开连接,客户端 ID: {},地址: {},错误: {}]", - clientId, connectionManager.getEndpointAddress(endpoint), e.getMessage()); - cleanupConnection(endpoint); - endpoint.close(); - } - }); - - // 5. 设置订阅处理器 - endpoint.subscribeHandler(subscribe -> { - // 提取主题名称列表用于日志显示 - List topicNames = subscribe.topicSubscriptions().stream() - .map(MqttTopicSubscription::topicName) - .collect(java.util.stream.Collectors.toList()); - log.debug("[handle][设备订阅,客户端 ID: {},主题: {}]", clientId, topicNames); - - // 提取 QoS 列表 - List grantedQoSLevels = subscribe.topicSubscriptions().stream() - .map(MqttTopicSubscription::qualityOfService) - .collect(java.util.stream.Collectors.toList()); - endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQoSLevels); - }); - - // 6. 设置取消订阅处理器 - endpoint.unsubscribeHandler(unsubscribe -> { - log.debug("[handle][设备取消订阅,客户端 ID: {},主题: {}]", clientId, unsubscribe.topics()); - endpoint.unsubscribeAcknowledge(unsubscribe.messageId()); - }); - - // 7. 设置 QoS 2消息的 PUBREL 处理器 - endpoint.publishReleaseHandler(endpoint::publishComplete); - - // 8. 设置断开连接处理器 - endpoint.disconnectHandler(v -> { - log.debug("[handle][设备断开连接,客户端 ID: {}]", clientId); - cleanupConnection(endpoint); - }); - - // 9. 接受连接 - endpoint.accept(false); - } - - /** - * 处理消息 - * - * @param clientId 客户端 ID - * @param topic 主题 - * @param payload 消息内容 - */ - private void processMessage(String clientId, String topic, byte[] payload) { - // 1. 基础检查 - if (payload == null || payload.length == 0) { - return; - } - - // 2. 解析主题,获取 productKey 和 deviceName - String[] topicParts = topic.split("/"); - if (topicParts.length < 4 || StrUtil.hasBlank(topicParts[2], topicParts[3])) { - log.warn("[processMessage][topic({}) 格式不正确,无法解析有效的 productKey 和 deviceName]", topic); - return; - } - - // 3. 解码消息(使用从 topic 解析的 productKey 和 deviceName) - String productKey = topicParts[2]; - String deviceName = topicParts[3]; - try { - IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName); - if (message == null) { - log.warn("[processMessage][消息解码失败,客户端 ID: {},主题: {}]", clientId, topic); - return; - } - - // 4. 处理业务消息(认证已在连接时完成) - log.info("[processMessage][收到设备消息,设备: {}.{}, 方法: {}]", - productKey, deviceName, message.getMethod()); - handleBusinessRequest(message, productKey, deviceName); - } catch (Exception e) { - log.error("[processMessage][消息处理异常,客户端 ID: {},主题: {},错误: {}]", - clientId, topic, e.getMessage(), e); - } - } - - /** - * 在 MQTT 连接时进行设备认证 - * - * @param clientId 客户端 ID - * @param username 用户名 - * @param password 密码 - * @param endpoint MQTT 连接端点 - * @return 认证是否成功 - */ - private boolean authenticateDevice(String clientId, String username, String password, MqttEndpoint endpoint) { - try { - // 1. 参数校验 - if (StrUtil.hasEmpty(clientId, username, password)) { - log.warn("[authenticateDevice][认证参数不完整,客户端 ID: {},用户名: {}]", clientId, username); - return false; - } - - // 2. 构建认证参数 - IotDeviceAuthReqDTO authParams = new IotDeviceAuthReqDTO() - .setClientId(clientId) - .setUsername(username) - .setPassword(password); - - // 3. 调用设备认证 API - CommonResult authResult = deviceApi.authDevice(authParams); - if (!authResult.isSuccess() || !BooleanUtil.isTrue(authResult.getData())) { - log.warn("[authenticateDevice][设备认证失败,客户端 ID: {},用户名: {},错误: {}]", - clientId, username, authResult.getMsg()); - return false; - } - - // 4. 获取设备信息 - IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(username); - if (deviceInfo == null) { - log.warn("[authenticateDevice][用户名格式不正确,客户端 ID: {},用户名: {}]", clientId, username); - return false; - } - - IotDeviceGetReqDTO getReqDTO = new IotDeviceGetReqDTO() - .setProductKey(deviceInfo.getProductKey()) - .setDeviceName(deviceInfo.getDeviceName()); - - CommonResult deviceResult = deviceApi.getDevice(getReqDTO); - if (!deviceResult.isSuccess() || deviceResult.getData() == null) { - log.warn("[authenticateDevice][获取设备信息失败,客户端 ID: {},用户名: {},错误: {}]", - clientId, username, deviceResult.getMsg()); - return false; - } - - // 5. 注册连接 - IotDeviceRespDTO device = deviceResult.getData(); - registerConnection(endpoint, device, clientId); - - // 6. 发送设备上线消息 - sendOnlineMessage(device); - - return true; - } catch (Exception e) { - log.error("[authenticateDevice][设备认证异常,客户端 ID: {},用户名: {}]", clientId, username, e); - return false; - } - } - - /** - * 处理 register 消息(设备动态注册,使用默认编解码器) - * - * @param clientId 客户端 ID - * @param topic 主题 - * @param payload 消息内容 - * @param endpoint MQTT 连接端点 - */ - private void processRegisterMessage(String clientId, String topic, byte[] payload, MqttEndpoint endpoint) { - // 1.1 基础检查 - if (ArrayUtil.isEmpty(payload)) { - return; - } - // 1.2 解析主题,获取 productKey 和 deviceName - String[] topicParts = topic.split("/"); - if (topicParts.length < 4 || StrUtil.hasBlank(topicParts[2], topicParts[3])) { - log.warn("[processRegisterMessage][topic({}) 格式不正确]", topic); - return; - } - String productKey = topicParts[2]; - String deviceName = topicParts[3]; - - // 2. 使用默认编解码器解码消息(设备可能未注册,无法获取 codecType) - IotDeviceMessage message; - try { - message = deviceMessageService.decodeDeviceMessage(payload, DEFAULT_CODEC_TYPE); - if (message == null) { - log.warn("[processRegisterMessage][消息解码失败,客户端 ID: {},主题: {}]", clientId, topic); - return; - } - } catch (Exception e) { - log.error("[processRegisterMessage][消息解码异常,客户端 ID: {},主题: {},错误: {}]", - clientId, topic, e.getMessage(), e); - return; - } - - // 3. 处理设备动态注册请求 - log.info("[processRegisterMessage][收到设备注册消息,设备: {}.{}, 方法: {}]", - productKey, deviceName, message.getMethod()); - try { - handleRegisterRequest(message, productKey, deviceName, endpoint); - } catch (Exception e) { - log.error("[processRegisterMessage][消息处理异常,客户端 ID: {},主题: {},错误: {}]", - clientId, topic, e.getMessage(), e); - } - } - - /** - * 处理设备动态注册请求(一型一密,不需要 deviceSecret) - * - * @param message 消息信息 - * @param productKey 产品 Key - * @param deviceName 设备名称 - * @param endpoint MQTT 连接端点 - * @see 阿里云 - 一型一密 - */ - private void handleRegisterRequest(IotDeviceMessage message, String productKey, String deviceName, MqttEndpoint endpoint) { - String clientId = endpoint.clientIdentifier(); - try { - // 1. 解析注册参数 - IotDeviceRegisterReqDTO params = parseRegisterParams(message.getParams()); - if (params == null) { - log.warn("[handleRegisterRequest][注册参数解析失败,客户端 ID: {}]", clientId); - sendRegisterErrorResponse(endpoint, productKey, deviceName, message.getRequestId(), "注册参数不完整"); - return; - } - - // 2. 调用动态注册 API - CommonResult result = deviceApi.registerDevice(params); - if (result.isError()) { - log.warn("[handleRegisterRequest][注册失败,客户端 ID: {},错误: {}]", clientId, result.getMsg()); - sendRegisterErrorResponse(endpoint, productKey, deviceName, message.getRequestId(), result.getMsg()); - return; - } - - // 3. 发送成功响应(包含 deviceSecret) - sendRegisterSuccessResponse(endpoint, productKey, deviceName, message.getRequestId(), result.getData()); - log.info("[handleRegisterRequest][注册成功,设备名: {},客户端 ID: {}]", - params.getDeviceName(), clientId); - } catch (Exception e) { - log.error("[handleRegisterRequest][注册处理异常,客户端 ID: {}]", clientId, e); - sendRegisterErrorResponse(endpoint, productKey, deviceName, message.getRequestId(), "注册处理异常"); - } - } - - /** - * 解析注册参数 - * - * @param params 参数对象(通常为 Map 类型) - * @return 注册参数 DTO,解析失败时返回 null - */ - @SuppressWarnings({"unchecked", "DuplicatedCode"}) - private IotDeviceRegisterReqDTO parseRegisterParams(Object params) { - if (params == null) { - return null; - } - try { - // 参数默认为 Map 类型,直接转换 - if (params instanceof Map) { - Map paramMap = (Map) params; - return new IotDeviceRegisterReqDTO() - .setProductKey(MapUtil.getStr(paramMap, "productKey")) - .setDeviceName(MapUtil.getStr(paramMap, "deviceName")) - .setProductSecret(MapUtil.getStr(paramMap, "productSecret")); - } - // 如果已经是目标类型,直接返回 - if (params instanceof IotDeviceRegisterReqDTO) { - return (IotDeviceRegisterReqDTO) params; - } - - // 其他情况尝试 JSON 转换 - return JsonUtils.convertObject(params, IotDeviceRegisterReqDTO.class); - } catch (Exception e) { - log.error("[parseRegisterParams][解析注册参数({})失败]", params, e); - return null; - } - } - - /** - * 发送注册成功响应(包含 deviceSecret) - * - * @param endpoint MQTT 连接端点 - * @param productKey 产品 Key - * @param deviceName 设备名称 - * @param requestId 请求 ID - * @param registerResp 注册响应 - */ - private void sendRegisterSuccessResponse(MqttEndpoint endpoint, String productKey, String deviceName, - String requestId, IotDeviceRegisterRespDTO registerResp) { - try { - // 1. 构建响应消息(参考 HTTP 返回格式,直接返回 IotDeviceRegisterRespDTO) - IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, - IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerResp, 0, null); - - // 2. 编码消息(使用默认编解码器,因为设备可能还未注册) - byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, DEFAULT_CODEC_TYPE); - - // 3. 构建响应主题并发送(格式:/sys/{productKey}/{deviceName}/thing/auth/register_reply) - String replyTopic = IotMqttTopicUtils.buildTopicByMethod( - IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), productKey, deviceName, true); - endpoint.publish(replyTopic, io.vertx.core.buffer.Buffer.buffer(encodedData), - MqttQoS.AT_LEAST_ONCE, false, false); - log.debug("[sendRegisterSuccessResponse][发送注册成功响应,主题: {}]", replyTopic); - } catch (Exception e) { - log.error("[sendRegisterSuccessResponse][发送注册成功响应异常,客户端 ID: {}]", - endpoint.clientIdentifier(), e); - } - } - - /** - * 发送注册错误响应 - * - * @param endpoint MQTT 连接端点 - * @param productKey 产品 Key - * @param deviceName 设备名称 - * @param requestId 请求 ID - * @param errorMessage 错误消息 - */ - private void sendRegisterErrorResponse(MqttEndpoint endpoint, String productKey, String deviceName, - String requestId, String errorMessage) { - try { - // 1. 构建响应消息 - IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, - IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), null, 400, errorMessage); - - // 2. 编码消息(使用默认编解码器,因为设备可能还未注册) - byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, DEFAULT_CODEC_TYPE); - - // 3. 构建响应主题并发送(格式:/sys/{productKey}/{deviceName}/thing/auth/register_reply) - String replyTopic = IotMqttTopicUtils.buildTopicByMethod( - IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), productKey, deviceName, true); - endpoint.publish(replyTopic, io.vertx.core.buffer.Buffer.buffer(encodedData), - MqttQoS.AT_LEAST_ONCE, false, false); - log.debug("[sendRegisterErrorResponse][发送注册错误响应,主题: {}]", replyTopic); - } catch (Exception e) { - log.error("[sendRegisterErrorResponse][发送注册错误响应异常,客户端 ID: {}]", - endpoint.clientIdentifier(), e); - } - } - - /** - * 处理业务请求 - */ - private void handleBusinessRequest(IotDeviceMessage message, String productKey, String deviceName) { - // 发送消息到消息总线 - message.setServerId(serverId); - deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId); - } - - /** - * 注册连接 - */ - private void registerConnection(MqttEndpoint endpoint, IotDeviceRespDTO device, String clientId) { - IotMqttConnectionManager.ConnectionInfo connectionInfo = new IotMqttConnectionManager.ConnectionInfo() - .setDeviceId(device.getId()) - .setProductKey(device.getProductKey()) - .setDeviceName(device.getDeviceName()) - .setClientId(clientId) - .setAuthenticated(true) - .setRemoteAddress(connectionManager.getEndpointAddress(endpoint)); - connectionManager.registerConnection(endpoint, device.getId(), connectionInfo); - } - - /** - * 发送设备上线消息 - */ - private void sendOnlineMessage(IotDeviceRespDTO device) { - try { - IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline(); - deviceMessageService.sendDeviceMessage(onlineMessage, device.getProductKey(), - device.getDeviceName(), serverId); - log.info("[sendOnlineMessage][设备上线,设备 ID: {},设备名称: {}]", device.getId(), device.getDeviceName()); - } catch (Exception e) { - log.error("[sendOnlineMessage][发送设备上线消息失败,设备 ID: {},错误: {}]", device.getId(), e.getMessage()); - } - } - - /** - * 清理连接 - */ - private void cleanupConnection(MqttEndpoint endpoint) { - try { - IotMqttConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(endpoint); - if (connectionInfo != null) { - // 发送设备离线消息 - IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline(); - deviceMessageService.sendDeviceMessage(offlineMessage, connectionInfo.getProductKey(), - connectionInfo.getDeviceName(), serverId); - log.info("[cleanupConnection][设备离线,设备 ID: {},设备名称: {}]", connectionInfo.getDeviceId(), connectionInfo.getDeviceName()); - } - - // 注销连接 - connectionManager.unregisterConnection(endpoint); - } catch (Exception e) { - log.error("[cleanupConnection][清理连接失败,客户端 ID: {},错误: {}]", endpoint.clientIdentifier(), e.getMessage()); - } - } - -} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/handler/downstream/IotTcpDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/handler/downstream/IotTcpDownstreamHandler.java index 986bfbe60d..b3ae6a0ca4 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/handler/downstream/IotTcpDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/handler/downstream/IotTcpDownstreamHandler.java @@ -45,8 +45,8 @@ public class IotTcpDownstreamHandler { } // 2. 序列化 + 帧编码 - byte[] serializedData = serializer.serialize(message); - Buffer frameData = codec.encode(serializedData); + byte[] payload = serializer.serialize(message); + Buffer frameData = codec.encode(payload); // 3. 发送到设备 boolean success = connectionManager.sendToDevice(message.getDeviceId(), frameData.getBytes()); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml index add4dce6a8..671a103fc8 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml @@ -57,9 +57,9 @@ yudao: # 针对引入的 TCP 组件的配置 # ==================================== - id: tcp-json + enabled: false type: tcp port: 8091 - enabled: false serialize: json tcp: max-connections: 1000 @@ -79,9 +79,9 @@ yudao: # 针对引入的 UDP 组件的配置 # ==================================== - id: udp-json + enabled: false type: udp port: 8093 - enabled: false serialize: json udp: max-sessions: 1000 # 最大会话数 @@ -92,9 +92,9 @@ yudao: # 针对引入的 WebSocket 组件的配置 # ==================================== - id: websocket-json + enabled: true type: websocket port: 8094 - enabled: true serialize: json websocket: path: /ws @@ -106,13 +106,26 @@ yudao: # 针对引入的 CoAP 组件的配置 # ==================================== - id: coap-json + enabled: false type: coap port: 5683 - enabled: true coap: max-message-size: 1024 # 最大消息大小(字节) ack-timeout-ms: 2000 # ACK 超时时间(毫秒) max-retransmit: 4 # 最大重传次数 + # ==================================== + # 针对引入的 MQTT 组件的配置 + # ==================================== + - id: mqtt-json + enabled: true + type: mqtt + port: 1883 + serialize: json + mqtt: + max-message-size: 8192 # 最大消息大小(字节) + connect-timeout-seconds: 60 # 连接超时时间(秒) + keep-alive-timeout-seconds: 300 # 保持连接超时时间(秒) + ssl-enabled: false # 是否启用 SSL # 协议配置(旧版,保持兼容) protocol: @@ -150,15 +163,6 @@ yudao: key-store-password: "your-keystore-password" # 客户端证书库密码 trust-store-path: "classpath:certs/trust.jks" # 信任的 CA 证书库路径 trust-store-password: "your-truststore-password" # 信任的 CA 证书库密码 - # ==================================== - # 针对引入的 MQTT 组件的配置 - # ==================================== - mqtt: - enabled: false - port: 1883 - max-message-size: 8192 - connect-timeout-seconds: 60 - ssl-enabled: false --- #################### 日志相关配置 ####################