diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java index 2115f76c02..aa57281e04 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java @@ -3,7 +3,7 @@ package cn.iocoder.yudao.module.iot.gateway.config; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolManager; import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxAuthEventProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.handler.downstream.IotEmqxDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager; import io.vertx.core.Vertx; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxAuthEventProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxAuthEventProtocol.java index ce10cf76d9..5d3b5e3c00 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxAuthEventProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxAuthEventProtocol.java @@ -2,7 +2,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.emqx; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; -import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.router.IotEmqxAuthEventHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.handler.upstream.IotEmqxAuthEventHandler; import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils; import io.vertx.core.Vertx; import io.vertx.core.http.HttpServer; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java index 47b2f1646e..5ebaa1f01e 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxUpstreamProtocol.java @@ -6,7 +6,7 @@ import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.router.IotEmqxUpstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.handler.upstream.IotEmqxUpstreamHandler; import io.netty.handler.codec.mqtt.MqttQoS; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/downstream/IotEmqxDownstreamHandler.java similarity index 97% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/downstream/IotEmqxDownstreamHandler.java index 06632b3e8f..a05fd1120a 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/downstream/IotEmqxDownstreamHandler.java @@ -1,4 +1,4 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.emqx.router; +package cn.iocoder.yudao.module.iot.gateway.protocol.emqx.handler.downstream; import cn.hutool.core.util.StrUtil; import cn.hutool.extra.spring.SpringUtil; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/downstream/IotEmqxDownstreamSubscriber.java similarity index 61% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxDownstreamSubscriber.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/downstream/IotEmqxDownstreamSubscriber.java index 4b5bad2d59..bcce471987 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/downstream/IotEmqxDownstreamSubscriber.java @@ -1,10 +1,11 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.emqx; +package cn.iocoder.yudao.module.iot.gateway.protocol.emqx.handler.downstream; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber; -import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.router.IotEmqxDownstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; /** @@ -22,6 +23,17 @@ public class IotEmqxDownstreamSubscriber extends IotProtocolDownstreamSubscriber this.downstreamHandler = new IotEmqxDownstreamHandler(protocol); } + @PostConstruct + public void startSubscriber() { + // EMQX 模式下,由 Spring 管理 Bean 生命周期;需要显式启动订阅者,才能从消息总线消费下行消息并发布到 Broker + start(); + } + + @PreDestroy + public void stopSubscriber() { + stop(); + } + @Override protected void handleMessage(IotDeviceMessage message) { downstreamHandler.handle(message); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxAuthEventHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/upstream/IotEmqxAuthEventHandler.java similarity index 99% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxAuthEventHandler.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/upstream/IotEmqxAuthEventHandler.java index 6b6694fd90..ae548cc4b6 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxAuthEventHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/upstream/IotEmqxAuthEventHandler.java @@ -1,4 +1,4 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.emqx.router; +package cn.iocoder.yudao.module.iot.gateway.protocol.emqx.handler.upstream; import cn.hutool.core.util.BooleanUtil; import cn.hutool.core.util.StrUtil; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/upstream/IotEmqxUpstreamHandler.java similarity index 96% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxUpstreamHandler.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/upstream/IotEmqxUpstreamHandler.java index 81d8cbb13a..5ff8d120dc 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/router/IotEmqxUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/upstream/IotEmqxUpstreamHandler.java @@ -1,4 +1,4 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.emqx.router; +package cn.iocoder.yudao.module.iot.gateway.protocol.emqx.handler.upstream; import cn.hutool.core.util.StrUtil; import cn.hutool.extra.spring.SpringUtil; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml index d24a631d5f..4916c0d238 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml @@ -92,7 +92,7 @@ yudao: # 针对引入的 WebSocket 组件的配置 # ==================================== - id: websocket-json - enabled: true + enabled: false protocol: websocket port: 8094 serialize: json @@ -117,9 +117,9 @@ yudao: # 针对引入的 MQTT 组件的配置 # ==================================== - id: mqtt-json - enabled: true + enabled: false protocol: mqtt - port: 1883 + port: 1884 serialize: json mqtt: max-message-size: 8192 # 最大消息大小(字节) @@ -132,7 +132,7 @@ yudao: # 针对引入的 EMQX 组件的配置 # ==================================== emqx: - enabled: false + enabled: true http-port: 8090 # MQTT HTTP 服务端口 mqtt-host: 127.0.0.1 # MQTT Broker 地址 mqtt-port: 1883 # MQTT Broker 端口 diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotDirectDeviceEmqxProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotDirectDeviceEmqxProtocolIntegrationTest.java new file mode 100644 index 0000000000..a2e85919a5 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotDirectDeviceEmqxProtocolIntegrationTest.java @@ -0,0 +1,437 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.emqx; + +import cn.hutool.core.map.MapUtil; +import cn.hutool.http.HttpResponse; +import cn.hutool.http.HttpUtil; +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; +import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.codec.alink.IotAlinkDeviceMessageCodec; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.mqtt.MqttClient; +import io.vertx.mqtt.MqttClientOptions; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.TimeUnit; + +/** + * IoT 直连设备 EMQX 协议集成测试(手动测试) + * + *
测试场景:直连设备(IotProductDeviceTypeEnum 的 DIRECT 类型)通过 EMQX Broker 连接平台 + * + *
EMQX 协议架构: + *
+ * +--------+ MQTT +-------------+ HTTP Hook +---------+ + * | 设备 | ----------------> | EMQX Broker | --------------------> | 网关 | + * +--------+ +-------------+ +---------+ + * | | | + * | 1. 连接认证 | 2. 调用 /mqtt/auth | + * | 3. 发布消息 | 4. 调用 /mqtt/event (上线/下线) | + * | | 5. 网关订阅 EMQX 消息 | + * | | | + *+ * + *
测试分类: + *
使用步骤: + *
+ * 当设备连接 EMQX 时,EMQX 会自动调用网关的 /mqtt/auth 接口进行认证 + */ + @Test + public void testDeviceConnect() throws Exception { + // 1. 构建认证信息 + IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); + log.info("[testDeviceConnect][认证信息: clientId={}, username={}, password={}]", + authInfo.getClientId(), authInfo.getUsername(), authInfo.getPassword()); + + // 2. 创建客户端并连接 EMQX Broker + MqttClient client = createClient(authInfo); + try { + client.connect(MQTT_PORT, SERVER_HOST) + .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + log.info("[testDeviceConnect][连接 EMQX Broker 成功,客户端 ID: {}]", client.clientId()); + log.info("[testDeviceConnect][EMQX 会自动调用网关的 /mqtt/auth 接口进行认证]"); + log.info("[testDeviceConnect][EMQX 会自动调用网关的 /mqtt/event 接口通知设备上线]"); + } finally { + disconnect(client); + log.info("[testDeviceConnect][EMQX 会自动调用网关的 /mqtt/event 接口通知设备下线]"); + } + } + + /** + * 属性上报测试:设备通过 EMQX Broker 发布属性消息 + *
+ * 消息流程:设备 -> EMQX Broker -> 网关(订阅 EMQX 消息)
+ */
+ @Test
+ public void testPropertyPost() throws Exception {
+ // 1. 连接 EMQX Broker
+ MqttClient client = connectToEmqx();
+ log.info("[testPropertyPost][连接 EMQX Broker 成功]");
+
+ try {
+ // 2.1 构建属性上报消息
+ IotDeviceMessage request = IotDeviceMessage.requestOf(
+ IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(),
+ IotDevicePropertyPostReqDTO.of(MapUtil.
+ * 消息流程:设备 -> EMQX Broker -> 网关(订阅 EMQX 消息)
+ */
+ @Test
+ public void testEventPost() throws Exception {
+ // 1. 连接 EMQX Broker
+ MqttClient client = connectToEmqx();
+ log.info("[testEventPost][连接 EMQX Broker 成功]");
+
+ try {
+ // 2.1 构建事件上报消息
+ IotDeviceMessage request = IotDeviceMessage.requestOf(
+ IotDeviceMessageMethodEnum.EVENT_POST.getMethod(),
+ IotDeviceEventPostReqDTO.of(
+ "eat",
+ MapUtil.
+ * 消息流程:网关 -> EMQX Broker -> 设备
+ */
+ @Test
+ public void testSubscribe() throws Exception {
+ // 1. 连接 EMQX Broker
+ MqttClient client = connectToEmqx();
+ log.info("[testSubscribe][连接 EMQX Broker 成功]");
+
+ try {
+ // 2. 设置消息处理器
+ client.publishHandler(message -> log.info("[testSubscribe][收到下行消息: topic={}, payload={}]",
+ message.topicName(), message.payload().toString()));
+
+ // 3. 订阅下行主题
+ String topic = String.format("/sys/%s/%s/thing/service/#", PRODUCT_KEY, DEVICE_NAME);
+ log.info("[testSubscribe][订阅主题: {}]", topic);
+ subscribe(client, topic);
+ log.info("[testSubscribe][订阅成功,等待下行消息... (30秒后自动断开)]");
+ log.info("[testSubscribe][网关下发的消息会通过 EMQX Broker 转发给设备]");
+
+ // 4. 保持连接 30 秒等待消息
+ Thread.sleep(30000);
+ } finally {
+ disconnect(client);
+ }
+ }
+
+ // ==================================================================================
+ // 第二部分:模拟 EMQX Server 调用网关 HTTP Hook 接口
+ // 说明:这些接口是 EMQX Server 自动调用的,这里只是用于单独测试接口功能
+ // ==================================================================================
+
+ /**
+ * 认证接口测试:模拟 EMQX Server 调用 /mqtt/auth 接口
+ *
+ * 注意:正常情况下此接口由 EMQX HTTP 认证插件自动调用,这里只是测试接口本身
+ */
+ @Test
+ public void testEmqxAuthHook() {
+ // 1.1 构建请求
+ String url = String.format("http://%s:%d/mqtt/auth", SERVER_HOST, HTTP_PORT);
+ IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
+ // 1.2 EMQX 认证请求格式
+ String payload = JsonUtils.toJsonString(MapUtil.builder()
+ .put("clientid", authInfo.getClientId())
+ .put("username", authInfo.getUsername())
+ .put("password", authInfo.getPassword())
+ .build());
+ // 1.3 输出请求
+ log.info("[testEmqxAuthHook][模拟 EMQX Server 调用认证接口]");
+ log.info("[testEmqxAuthHook][请求 URL: {}]", url);
+ log.info("[testEmqxAuthHook][请求体: {}]", payload);
+
+ // 2.1 发送请求
+ try (HttpResponse httpResponse = HttpUtil.createPost(url)
+ .header("Content-Type", "application/json")
+ .body(payload)
+ .execute()) {
+ // 2.2 输出结果
+ log.info("[testEmqxAuthHook][响应状态码: {}]", httpResponse.getStatus());
+ log.info("[testEmqxAuthHook][响应体: {}]", httpResponse.body());
+ log.info("[testEmqxAuthHook][认证结果: result=allow 表示认证成功, result=deny 表示认证失败]");
+ }
+ }
+
+ /**
+ * 认证失败测试:模拟 EMQX Server 调用 /mqtt/auth 接口(错误密码)
+ */
+ @Test
+ public void testEmqxAuthHookFailed() {
+ // 1.1 构建请求
+ String url = String.format("http://%s:%d/mqtt/auth", SERVER_HOST, HTTP_PORT);
+ // 1.2 使用错误的密码
+ String payload = JsonUtils.toJsonString(MapUtil.builder()
+ .put("clientid", PRODUCT_KEY + "." + DEVICE_NAME)
+ .put("username", DEVICE_NAME + "&" + PRODUCT_KEY)
+ .put("password", "wrong_password")
+ .build());
+ // 1.3 输出请求
+ log.info("[testEmqxAuthHookFailed][模拟 EMQX Server 调用认证接口(错误密码)]");
+ log.info("[testEmqxAuthHookFailed][请求 URL: {}]", url);
+ log.info("[testEmqxAuthHookFailed][请求体: {}]", payload);
+
+ // 2.1 发送请求
+ try (HttpResponse httpResponse = HttpUtil.createPost(url)
+ .header("Content-Type", "application/json")
+ .body(payload)
+ .execute()) {
+ // 2.2 输出结果
+ log.info("[testEmqxAuthHookFailed][响应状态码: {}]", httpResponse.getStatus());
+ log.info("[testEmqxAuthHookFailed][响应体: {}]", httpResponse.body());
+ log.info("[testEmqxAuthHookFailed][预期结果: result=deny]");
+ }
+ }
+
+ /**
+ * 设备上线事件测试:模拟 EMQX Server Webhook 调用 /mqtt/event 接口
+ *
+ * 注意:正常情况下此接口由 EMQX Webhook 插件自动调用,这里只是测试接口本身
+ */
+ @Test
+ public void testEmqxClientConnectedHook() {
+ // 1.1 构建请求
+ String url = String.format("http://%s:%d/mqtt/event", SERVER_HOST, HTTP_PORT);
+ IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
+ // 1.2 EMQX Webhook client.connected 事件格式
+ String payload = JsonUtils.toJsonString(MapUtil.builder()
+ .put("event", "client.connected")
+ .put("clientid", authInfo.getClientId())
+ .put("username", authInfo.getUsername())
+ .put("peername", "127.0.0.1:12345")
+ .put("connected_at", System.currentTimeMillis())
+ .build());
+ // 1.3 输出请求
+ log.info("[testEmqxClientConnectedHook][模拟 EMQX Server Webhook 调用设备上线事件]");
+ log.info("[testEmqxClientConnectedHook][请求 URL: {}]", url);
+ log.info("[testEmqxClientConnectedHook][请求体: {}]", payload);
+
+ // 2.1 发送请求
+ try (HttpResponse httpResponse = HttpUtil.createPost(url)
+ .header("Content-Type", "application/json")
+ .body(payload)
+ .execute()) {
+ // 2.2 输出结果
+ log.info("[testEmqxClientConnectedHook][响应状态码: {}]", httpResponse.getStatus());
+ log.info("[testEmqxClientConnectedHook][响应体: {}]", httpResponse.body());
+ log.info("[testEmqxClientConnectedHook][预期结果: 状态码 200,设备状态更新为在线]");
+ }
+ }
+
+ /**
+ * 设备下线事件测试:模拟 EMQX Server Webhook 调用 /mqtt/event 接口
+ *
+ * 注意:正常情况下此接口由 EMQX Webhook 插件自动调用,这里只是测试接口本身
+ */
+ @Test
+ public void testEmqxClientDisconnectedHook() {
+ // 1.1 构建请求
+ String url = String.format("http://%s:%d/mqtt/event", SERVER_HOST, HTTP_PORT);
+ IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
+ // 1.2 EMQX Webhook client.disconnected 事件格式
+ String payload = JsonUtils.toJsonString(MapUtil.builder()
+ .put("event", "client.disconnected")
+ .put("clientid", authInfo.getClientId())
+ .put("username", authInfo.getUsername())
+ .put("reason", "normal")
+ .put("disconnected_at", System.currentTimeMillis())
+ .build());
+ // 1.3 输出请求
+ log.info("[testEmqxClientDisconnectedHook][模拟 EMQX Server Webhook 调用设备下线事件]");
+ log.info("[testEmqxClientDisconnectedHook][请求 URL: {}]", url);
+ log.info("[testEmqxClientDisconnectedHook][请求体: {}]", payload);
+
+ // 2.1 发送请求
+ try (HttpResponse httpResponse = HttpUtil.createPost(url)
+ .header("Content-Type", "application/json")
+ .body(payload)
+ .execute()) {
+ // 2.2 输出结果
+ log.info("[testEmqxClientDisconnectedHook][响应状态码: {}]", httpResponse.getStatus());
+ log.info("[testEmqxClientDisconnectedHook][响应体: {}]", httpResponse.body());
+ log.info("[testEmqxClientDisconnectedHook][预期结果: 状态码 200,设备状态更新为离线]");
+ }
+ }
+
+ // ===================== 辅助方法 =====================
+
+ /**
+ * 创建 MQTT 客户端
+ *
+ * @param authInfo 认证信息
+ * @return MQTT 客户端
+ */
+ private MqttClient createClient(IotDeviceAuthReqDTO authInfo) {
+ MqttClientOptions options = new MqttClientOptions()
+ .setClientId(authInfo.getClientId())
+ .setUsername(authInfo.getUsername())
+ .setPassword(authInfo.getPassword())
+ .setCleanSession(true)
+ .setKeepAliveInterval(60);
+ return MqttClient.create(vertx, options);
+ }
+
+ /**
+ * 连接 EMQX Broker 并认证设备
+ *
+ * @return 已认证的 MQTT 客户端
+ */
+ private MqttClient connectToEmqx() throws Exception {
+ IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
+ MqttClient client = createClient(authInfo);
+ client.connect(MQTT_PORT, SERVER_HOST)
+ .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ return client;
+ }
+
+ /**
+ * 订阅主题
+ *
+ * @param client MQTT 客户端
+ * @param topic 主题
+ */
+ private void subscribe(MqttClient client, String topic) throws Exception {
+ client.subscribe(topic, MqttQoS.AT_LEAST_ONCE.value())
+ .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ log.info("[subscribe][订阅主题成功: {}]", topic);
+ }
+
+ /**
+ * 发布消息
+ *
+ * @param client MQTT 客户端
+ * @param topic 发布主题
+ * @param request 请求消息
+ */
+ private void publish(MqttClient client, String topic, IotDeviceMessage request) throws Exception {
+ byte[] payload = CODEC.encode(request);
+ log.info("[publish][发送消息: topic={}, payload={}]", topic, new String(payload));
+ client.publish(topic, Buffer.buffer(payload), MqttQoS.AT_LEAST_ONCE, false, false)
+ .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ log.info("[publish][消息发布成功]");
+ }
+
+ /**
+ * 断开连接
+ *
+ * @param client MQTT 客户端
+ */
+ private void disconnect(MqttClient client) throws Exception {
+ client.disconnect()
+ .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ log.info("[disconnect][断开连接成功]");
+ }
+
+}