From cc0d786d0fd62e075da186d45c4e504a22f9a769 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Tue, 3 Feb 2026 22:29:06 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=88iot=EF=BC=89=EF=BC=9A=E3=80=90?= =?UTF-8?q?=E5=8D=8F=E8=AE=AE=E6=94=B9=E9=80=A0=E3=80=91emqx=20=E5=88=9D?= =?UTF-8?q?=E6=AD=A5=E6=94=B9=E9=80=A0=EF=BC=8860%=EF=BC=89=EF=BC=9A?= =?UTF-8?q?=E6=94=AF=E6=8C=81=20device=20register=20=E8=AE=BE=E5=A4=87?= =?UTF-8?q?=E6=B3=A8=E5=86=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../protocol/emqx/IotEmqxProtocol.java | 13 ++- .../upstream/IotEmqxAuthEventHandler.java | 93 ++++++++++++++++++- .../src/main/resources/application.yaml | 6 +- ...rectDeviceMqttProtocolIntegrationTest.java | 15 +-- 4 files changed, 115 insertions(+), 12 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxProtocol.java index 5072159256..0d64186aa5 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxProtocol.java @@ -199,7 +199,7 @@ public class IotEmqxProtocol implements IotProtocol { router.route().handler(BodyHandler.create().setBodyLimit(1024 * 1024)); // 限制 body 大小为 1MB,防止大包攻击 // 2. 创建处理器 - IotEmqxAuthEventHandler handler = new IotEmqxAuthEventHandler(serverId); + IotEmqxAuthEventHandler handler = new IotEmqxAuthEventHandler(serverId, this); router.post(IotMqttTopicUtils.MQTT_AUTH_PATH).handler(handler::handleAuth); router.post(IotMqttTopicUtils.MQTT_ACL_PATH).handler(handler::handleAcl); router.post(IotMqttTopicUtils.MQTT_EVENT_PATH).handler(handler::handleEvent); @@ -517,4 +517,15 @@ public class IotEmqxProtocol implements IotProtocol { .onFailure(e -> log.error("[publishMessage][IoT EMQX 协议 {} 发布失败, topic: {}]", getId(), topic, e)); } + /** + * 延迟发布消息到 MQTT Broker + * + * @param topic 主题 + * @param payload 消息内容 + * @param delayMs 延迟时间(毫秒) + */ + public void publishDelayMessage(String topic, byte[] payload, long delayMs) { + vertx.setTimer(delayMs, id -> publishMessage(topic, payload)); + } + } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/upstream/IotEmqxAuthEventHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/upstream/IotEmqxAuthEventHandler.java index 0ba250cb1a..53705aa64e 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/upstream/IotEmqxAuthEventHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/upstream/IotEmqxAuthEventHandler.java @@ -1,14 +1,19 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.emqx.handler.upstream; +import cn.hutool.core.lang.Assert; import cn.hutool.core.util.BooleanUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.extra.spring.SpringUtil; import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; +import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxProtocol; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils; import io.vertx.core.json.JsonObject; @@ -24,6 +29,7 @@ import java.util.Locale; * 1. 设备认证接口 - 对应 EMQX HTTP 认证插件 {@link #handleAuth(RoutingContext)} * 2. 设备事件处理接口 - 对应 EMQX Webhook 事件通知 {@link #handleEvent(RoutingContext)} * 3. 设备 ACL 权限接口 - 对应 EMQX HTTP ACL 插件 {@link #handleAcl(RoutingContext)} + * 4. 设备注册接口 - 集成一型一密设备注册 {@link #handleDeviceRegister(RoutingContext, String, String)} * * @author 芋道源码 */ @@ -57,13 +63,21 @@ public class IotEmqxAuthEventHandler { */ private static final String EVENT_CLIENT_DISCONNECTED = "client.disconnected"; + /** + * 认证类型标识 - 设备注册 + */ + private static final String AUTH_TYPE_REGISTER = "|authType=register|"; + private final String serverId; + private final IotEmqxProtocol protocol; + private final IotDeviceMessageService deviceMessageService; private final IotDeviceCommonApi deviceApi; - public IotEmqxAuthEventHandler(String serverId) { + public IotEmqxAuthEventHandler(String serverId, IotEmqxProtocol protocol) { this.serverId = serverId; + this.protocol = protocol; this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); } @@ -91,7 +105,13 @@ public class IotEmqxAuthEventHandler { return; } - // 2. 执行认证 + // 2.1 情况一:判断是否为注册请求 + if (StrUtil.endWith(clientId, AUTH_TYPE_REGISTER)) { + handleDeviceRegister(context, username, password); + return; + } + + // 2.2 情况二:执行认证 boolean authResult = handleDeviceAuth(clientId, username, password); log.info("[handleAuth][设备认证结果: {} -> {}]", username, authResult); if (authResult) { @@ -380,4 +400,73 @@ public class IotEmqxAuthEventHandler { } } + // ========= 注册处理 ========= + + /** + * 处理设备注册请求(一型一密) + * + * @param context 路由上下文 + * @param username 用户名 + * @param password 密码(签名) + */ + private void handleDeviceRegister(RoutingContext context, String username, String password) { + try { + // 1. 解析设备信息 + IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(username); + if (deviceInfo == null) { + log.warn("[handleDeviceRegister][设备注册失败: 无法解析 username={}]", username); + sendAuthResponse(context, RESULT_DENY); + return; + } + + // 2. 调用注册 API + IotDeviceRegisterReqDTO params = new IotDeviceRegisterReqDTO() + .setProductKey(deviceInfo.getProductKey()) + .setDeviceName(deviceInfo.getDeviceName()) + .setSign(password); + CommonResult result = deviceApi.registerDevice(params); + result.checkError(); + + // 3. 允许连接 + log.info("[handleDeviceRegister][设备注册成功: {}]", username); + sendAuthResponse(context, RESULT_ALLOW); + + // 4. 延迟 5 秒发送注册结果(等待设备连接成功并完成订阅) + sendRegisterResultMessage(username, result.getData()); + } catch (Exception e) { + log.warn("[handleDeviceRegister][设备注册失败: {}, 错误: {}]", username, e.getMessage()); + sendAuthResponse(context, RESULT_DENY); + } + } + + /** + * 发送注册结果消息给设备 + *

+ * 注意:延迟 5 秒发送,等待设备连接成功并完成订阅。 + * + * @param username 用户名 + * @param result 注册结果 + */ + @SuppressWarnings("DataFlowIssue") + private void sendRegisterResultMessage(String username, IotDeviceRegisterRespDTO result) { + IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(username); + Assert.notNull(deviceInfo, "设备信息不能为空"); + try { + // 1. 构建响应消息 + String method = IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(); + IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(null, method, result, 0, null); + + // 2. 编码消息 + byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, "Alink"); + + // 3. 构建响应主题并延迟发布(等待设备连接成功并完成订阅) + String replyTopic = IotMqttTopicUtils.buildTopicByMethod(method, + deviceInfo.getProductKey(), deviceInfo.getDeviceName(), true); + protocol.publishDelayMessage(replyTopic, encodedData, 5000); + log.info("[sendRegisterResultMessage][发送注册结果: topic={}]", replyTopic); + } catch (Exception e) { + log.error("[sendRegisterResultMessage][发送注册结果失败: {}]", username, e); + } + } + } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml index aaf19dc1c7..9c6b500d5b 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml @@ -117,9 +117,9 @@ yudao: # 针对引入的 MQTT 组件的配置 # ==================================== - id: mqtt-json - enabled: false + enabled: true protocol: mqtt - port: 1884 + port: 1883 serialize: json mqtt: max-message-size: 8192 # 最大消息大小(字节) @@ -129,7 +129,7 @@ yudao: # 针对引入的 EMQX 组件的配置 # ==================================== - id: emqx-1 - enabled: true + enabled: false protocol: emqx port: 8090 # EMQX HTTP Hook 端口(/mqtt/auth、/mqtt/event) emqx: diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotDirectDeviceMqttProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotDirectDeviceMqttProtocolIntegrationTest.java index 415d15a4de..22ac321817 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotDirectDeviceMqttProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotDirectDeviceMqttProtocolIntegrationTest.java @@ -201,7 +201,12 @@ public class IotDirectDeviceMqttProtocolIntegrationTest { MqttClient client = MqttClient.create(vertx, options); try { - // 2. 设置消息处理器,接收注册响应 + // 2. 连接服务器(连接成功后服务端会自动处理注册并发送响应) + client.connect(SERVER_PORT, SERVER_HOST) + .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + log.info("[testDeviceRegister][连接成功,等待注册响应...]"); + + // 3.1 设置消息处理器,接收注册响应 CompletableFuture responseFuture = new CompletableFuture<>(); client.publishHandler(message -> { log.info("[testDeviceRegister][收到响应: topic={}, payload={}]", @@ -209,11 +214,9 @@ public class IotDirectDeviceMqttProtocolIntegrationTest { IotDeviceMessage response = CODEC.decode(message.payload().getBytes()); responseFuture.complete(response); }); - - // 3. 连接服务器(连接成功后服务端会自动处理注册并发送响应) - client.connect(SERVER_PORT, SERVER_HOST) - .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); - log.info("[testDeviceRegister][连接成功,等待注册响应...]"); + // 3.2 订阅 _reply 主题 + String replyTopic = String.format("/sys/%s/%s/thing/auth/register_reply", PRODUCT_KEY, deviceName); + subscribe(client, replyTopic); // 4. 等待注册响应 IotDeviceMessage response = responseFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);