feat:【iot】MQTT 协议:1)增加 register 接口

feat:【iot】TCP/UDP 协议:统一 register 返回数据的格式
This commit is contained in:
YunaiV
2026-01-26 23:51:28 +08:00
parent 70135174e5
commit 99bcd252a3
4 changed files with 472 additions and 234 deletions

View File

@@ -1,19 +1,26 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceGetReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.mqtt.MqttEndpoint;
@@ -21,6 +28,7 @@ import io.vertx.mqtt.MqttTopicSubscription;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
/**
* MQTT 上行消息处理器
@@ -30,6 +38,16 @@ import java.util.List;
@Slf4j
public class IotMqttUpstreamHandler {
/**
* 默认编解码类型MQTT 使用 Alink 协议)
*/
private static final String DEFAULT_CODEC_TYPE = "Alink";
/**
* register 请求的 topic 后缀
*/
private static final String REGISTER_TOPIC_SUFFIX = "/thing/auth/register";
private final IotDeviceMessageService deviceMessageService;
private final IotMqttConnectionManager connectionManager;
@@ -85,20 +103,28 @@ public class IotMqttUpstreamHandler {
});
// 4. 设置消息处理器
endpoint.publishHandler(message -> {
endpoint.publishHandler(mqttMessage -> {
try {
processMessage(clientId, message.topicName(), message.payload().getBytes());
// 4.1 根据 topic 判断是否为 register 请求
String topic = mqttMessage.topicName();
byte[] payload = mqttMessage.payload().getBytes();
if (topic.endsWith(REGISTER_TOPIC_SUFFIX)) {
// register 请求:使用默认编解码器处理(设备可能未注册)
processRegisterMessage(clientId, topic, payload, endpoint);
} else {
// 业务请求:正常处理
processMessage(clientId, topic, payload);
}
// 根据 QoS 级别发送相应的确认消息
if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
// 4.2 根据 QoS 级别发送相应的确认消息
if (mqttMessage.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
// QoS 1: 发送 PUBACK 确认
endpoint.publishAcknowledge(message.messageId());
} else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
endpoint.publishAcknowledge(mqttMessage.messageId());
} else if (mqttMessage.qosLevel() == MqttQoS.EXACTLY_ONCE) {
// QoS 2: 发送 PUBREC 确认
endpoint.publishReceived(message.messageId());
endpoint.publishReceived(mqttMessage.messageId());
}
// QoS 0 无需确认
} catch (Exception e) {
log.error("[handle][消息解码失败,断开连接,客户端 ID: {},地址: {},错误: {}]",
clientId, connectionManager.getEndpointAddress(endpoint), e.getMessage());
@@ -161,10 +187,9 @@ public class IotMqttUpstreamHandler {
return;
}
// 3. 解码消息(使用从 topic 解析的 productKey 和 deviceName
String productKey = topicParts[2];
String deviceName = topicParts[3];
// 3. 解码消息(使用从 topic 解析的 productKey 和 deviceName
try {
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName);
if (message == null) {
@@ -172,10 +197,9 @@ public class IotMqttUpstreamHandler {
return;
}
// 4. 处理业务消息(认证已在连接时完成)
log.info("[processMessage][收到设备消息,设备: {}.{}, 方法: {}]",
productKey, deviceName, message.getMethod());
// 4. 处理业务消息(认证已在连接时完成)
handleBusinessRequest(message, productKey, deviceName);
} catch (Exception e) {
log.error("[processMessage][消息处理异常,客户端 ID: {},主题: {},错误: {}]",
@@ -246,6 +270,195 @@ public class IotMqttUpstreamHandler {
}
}
/**
* 处理 register 消息(设备动态注册,使用默认编解码器)
*
* @param clientId 客户端 ID
* @param topic 主题
* @param payload 消息内容
* @param endpoint MQTT 连接端点
*/
private void processRegisterMessage(String clientId, String topic, byte[] payload, MqttEndpoint endpoint) {
// 1.1 基础检查
if (ArrayUtil.isEmpty(payload)) {
return;
}
// 1.2 解析主题,获取 productKey 和 deviceName
String[] topicParts = topic.split("/");
if (topicParts.length < 4 || StrUtil.hasBlank(topicParts[2], topicParts[3])) {
log.warn("[processRegisterMessage][topic({}) 格式不正确]", topic);
return;
}
String productKey = topicParts[2];
String deviceName = topicParts[3];
// 2. 使用默认编解码器解码消息(设备可能未注册,无法获取 codecType
IotDeviceMessage message;
try {
message = deviceMessageService.decodeDeviceMessage(payload, DEFAULT_CODEC_TYPE);
if (message == null) {
log.warn("[processRegisterMessage][消息解码失败,客户端 ID: {},主题: {}]", clientId, topic);
return;
}
} catch (Exception e) {
log.error("[processRegisterMessage][消息解码异常,客户端 ID: {},主题: {},错误: {}]",
clientId, topic, e.getMessage(), e);
return;
}
// 3. 处理设备动态注册请求
log.info("[processRegisterMessage][收到设备注册消息,设备: {}.{}, 方法: {}]",
productKey, deviceName, message.getMethod());
try {
handleRegisterRequest(message, productKey, deviceName, endpoint);
} catch (Exception e) {
log.error("[processRegisterMessage][消息处理异常,客户端 ID: {},主题: {},错误: {}]",
clientId, topic, e.getMessage(), e);
}
}
/**
* 处理设备动态注册请求(一型一密,不需要 deviceSecret
*
* @param message 消息信息
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param endpoint MQTT 连接端点
* @see <a href="https://help.aliyun.com/zh/iot/user-guide/unique-certificate-per-product-verification">阿里云 - 一型一密</a>
*/
private void handleRegisterRequest(IotDeviceMessage message, String productKey, String deviceName, MqttEndpoint endpoint) {
String clientId = endpoint.clientIdentifier();
try {
// 1. 解析注册参数
IotDeviceRegisterReqDTO registerParams = parseRegisterParams(message.getParams());
if (registerParams == null) {
log.warn("[handleRegisterRequest][注册参数解析失败,客户端 ID: {}]", clientId);
sendRegisterErrorResponse(endpoint, productKey, deviceName, message.getRequestId(), "注册参数不完整");
return;
}
// 2. 调用动态注册 API
CommonResult<IotDeviceRegisterRespDTO> result = deviceApi.registerDevice(registerParams);
if (result.isError()) {
log.warn("[handleRegisterRequest][注册失败,客户端 ID: {},错误: {}]", clientId, result.getMsg());
sendRegisterErrorResponse(endpoint, productKey, deviceName, message.getRequestId(), result.getMsg());
return;
}
// 3. 发送成功响应(包含 deviceSecret
sendRegisterSuccessResponse(endpoint, productKey, deviceName, message.getRequestId(), result.getData());
log.info("[handleRegisterRequest][注册成功,设备名: {},客户端 ID: {}]",
registerParams.getDeviceName(), clientId);
} catch (Exception e) {
log.error("[handleRegisterRequest][注册处理异常,客户端 ID: {}]", clientId, e);
sendRegisterErrorResponse(endpoint, productKey, deviceName, message.getRequestId(), "注册处理异常");
}
}
/**
* 解析注册参数
*
* @param params 参数对象(通常为 Map 类型)
* @return 注册参数 DTO解析失败时返回 null
*/
@SuppressWarnings("unchecked")
private IotDeviceRegisterReqDTO parseRegisterParams(Object params) {
if (params == null) {
return null;
}
try {
// 参数默认为 Map 类型,直接转换
if (params instanceof Map) {
Map<String, Object> paramMap = (Map<String, Object>) params;
String productKey = MapUtil.getStr(paramMap, "productKey");
String deviceName = MapUtil.getStr(paramMap, "deviceName");
String productSecret = MapUtil.getStr(paramMap, "productSecret");
if (StrUtil.hasBlank(productKey, deviceName, productSecret)) {
return null;
}
return new IotDeviceRegisterReqDTO()
.setProductKey(productKey)
.setDeviceName(deviceName)
.setProductSecret(productSecret);
}
// 如果已经是目标类型,直接返回
if (params instanceof IotDeviceRegisterReqDTO) {
return (IotDeviceRegisterReqDTO) params;
}
// 其他情况尝试 JSON 转换
String jsonStr = JsonUtils.toJsonString(params);
return JsonUtils.parseObject(jsonStr, IotDeviceRegisterReqDTO.class);
} catch (Exception e) {
log.error("[parseRegisterParams][解析注册参数({})失败]", params, e);
return null;
}
}
/**
* 发送注册成功响应(包含 deviceSecret
*
* @param endpoint MQTT 连接端点
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param requestId 请求 ID
* @param registerResp 注册响应
*/
private void sendRegisterSuccessResponse(MqttEndpoint endpoint, String productKey, String deviceName,
String requestId, IotDeviceRegisterRespDTO registerResp) {
try {
// 1. 构建响应消息(参考 HTTP 返回格式,直接返回 IotDeviceRegisterRespDTO
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId,
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerResp, 0, null);
// 2. 编码消息(使用默认编解码器,因为设备可能还未注册)
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, DEFAULT_CODEC_TYPE);
// 3. 构建响应主题并发送(格式:/sys/{productKey}/{deviceName}/thing/auth/register_reply
String replyTopic = IotMqttTopicUtils.buildTopicByMethod(
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), productKey, deviceName, true);
endpoint.publish(replyTopic, io.vertx.core.buffer.Buffer.buffer(encodedData),
MqttQoS.AT_LEAST_ONCE, false, false);
log.debug("[sendRegisterSuccessResponse][发送注册成功响应,主题: {}]", replyTopic);
} catch (Exception e) {
log.error("[sendRegisterSuccessResponse][发送注册成功响应异常,客户端 ID: {}]",
endpoint.clientIdentifier(), e);
}
}
/**
* 发送注册错误响应
*
* @param endpoint MQTT 连接端点
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param requestId 请求 ID
* @param errorMessage 错误消息
*/
private void sendRegisterErrorResponse(MqttEndpoint endpoint, String productKey, String deviceName,
String requestId, String errorMessage) {
try {
// 1. 构建响应消息
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId,
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), null, 400, errorMessage);
// 2. 编码消息(使用默认编解码器,因为设备可能还未注册)
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, DEFAULT_CODEC_TYPE);
// 3. 构建响应主题并发送(格式:/sys/{productKey}/{deviceName}/thing/auth/register_reply
String replyTopic = IotMqttTopicUtils.buildTopicByMethod(
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), productKey, deviceName, true);
endpoint.publish(replyTopic, io.vertx.core.buffer.Buffer.buffer(encodedData),
MqttQoS.AT_LEAST_ONCE, false, false);
log.debug("[sendRegisterErrorResponse][发送注册错误响应,主题: {}]", replyTopic);
} catch (Exception e) {
log.error("[sendRegisterErrorResponse][发送注册错误响应异常,客户端 ID: {}]",
endpoint.clientIdentifier(), e);
}
}
/**
* 处理业务请求
*/

View File

@@ -502,15 +502,10 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
private void sendRegisterSuccessResponse(NetSocket socket, String requestId,
IotDeviceRegisterRespDTO registerResp, String codecType) {
try {
// 构建响应数据
Object responseData = MapUtil.builder()
.put("success", true)
.put("deviceSecret", registerResp.getDeviceSecret())
.put("message", "注册成功")
.build();
// 1. 构建响应消息(参考 HTTP 返回格式,直接返回 IotDeviceRegisterRespDTO
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId,
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), responseData, 0, "注册成功");
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerResp, 0, null);
// 2. 发送响应
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType);
socket.write(Buffer.buffer(encodedData));
} catch (Exception e) {

View File

@@ -405,15 +405,10 @@ public class IotUdpUpstreamHandler {
String requestId, IotDeviceRegisterRespDTO registerResp,
String codecType) {
try {
// 构建响应数据
Object responseData = MapUtil.builder()
.put("success", true)
.put("deviceSecret", registerResp.getDeviceSecret())
.put("message", "注册成功")
.build();
// 1. 构建响应消息(参考 HTTP 返回格式,直接返回 IotDeviceRegisterRespDTO
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId,
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), responseData, 0, "注册成功");
// 发送响应
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerResp, 0, null);
// 2. 发送响应
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType);
socket.send(Buffer.buffer(encodedData), address.getPort(), address.getHostString(), result -> {
if (result.failed()) {

View File

@@ -2,12 +2,15 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec;
import cn.iocoder.yudao.module.iot.gateway.codec.alink.IotAlinkDeviceMessageCodec;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
@@ -18,6 +21,7 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -31,9 +35,10 @@ import java.util.concurrent.TimeUnit;
* <li>启动 yudao-module-iot-gateway 服务MQTT 端口 1883</li>
* <li>运行以下测试方法:
* <ul>
* <li>{@link #testConnect()} - 设备连接认证</li>
* <li>{@link #testAuth()} - 设备连接认证</li>
* <li>{@link #testPropertyPost()} - 设备属性上报</li>
* <li>{@link #testEventPost()} - 设备事件上报</li>
* <li>{@link #testSubscribe()} - 订阅下行消息</li>
* </ul>
* </li>
* </ol>
@@ -50,6 +55,9 @@ public class IotDirectDeviceMqttProtocolIntegrationTest {
private static final int SERVER_PORT = 1883;
private static final int TIMEOUT_SECONDS = 10;
// ===================== 编解码器MQTT 使用 Alink 协议) =====================
private static final IotDeviceMessageCodec CODEC = new IotAlinkDeviceMessageCodec();
// ===================== 直连设备信息(根据实际情况修改,从 iot_device 表查询) =====================
private static final String PRODUCT_KEY = "4aymZgOTOOCrDKRT";
private static final String DEVICE_NAME = "small";
@@ -73,10 +81,10 @@ public class IotDirectDeviceMqttProtocolIntegrationTest {
// ===================== 连接认证测试 =====================
/**
* 连接认证测试:设备通过 MQTT 协议连接平台
* 认证测试:获取设备 Token
*/
@Test
public void testConnect() throws Exception {
public void testAuth() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
// 1. 构建认证信息
@@ -84,16 +92,8 @@ public class IotDirectDeviceMqttProtocolIntegrationTest {
log.info("[testConnect][认证信息: clientId={}, username={}, password={}]",
authInfo.getClientId(), authInfo.getUsername(), authInfo.getPassword());
// 2. 创建 MQTT 客户端配置
MqttClientOptions options = new MqttClientOptions()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword())
.setCleanSession(true)
.setKeepAliveInterval(60);
// 3. 创建 MQTT 客户端并连接
MqttClient client = MqttClient.create(vertx, options);
// 2. 创建客户端并连接
MqttClient client = connect(authInfo);
client.connect(SERVER_PORT, SERVER_HOST)
.onComplete(ar -> {
if (ar.succeeded()) {
@@ -114,7 +114,7 @@ public class IotDirectDeviceMqttProtocolIntegrationTest {
}
});
// 4. 等待测试完成
// 3. 等待测试完成
boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (!completed) {
log.warn("[testConnect][测试超时]");
@@ -128,86 +128,31 @@ public class IotDirectDeviceMqttProtocolIntegrationTest {
*/
@Test
public void testPropertyPost() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
// 1. 连接并认证
MqttClient client = connectAndAuth();
log.info("[testPropertyPost][连接认证成功]");
// 1. 构建认证信息
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
// 2. 订阅 _reply 主题
String replyTopic = String.format("/sys/%s/%s/thing/property/post_reply", PRODUCT_KEY, DEVICE_NAME);
subscribeReply(client, replyTopic);
// 2. 创建 MQTT 客户端配置
MqttClientOptions options = new MqttClientOptions()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword())
.setCleanSession(true)
.setKeepAliveInterval(60);
// 3. 构建属性上报消息
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(),
IotDevicePropertyPostReqDTO.of(MapUtil.<String, Object>builder()
.put("width", 1)
.put("height", "2")
.build()),
null, null, null);
// 3. 创建 MQTT 客户端并连接
MqttClient client = MqttClient.create(vertx, options);
client.connect(SERVER_PORT, SERVER_HOST)
.onComplete(connectAr -> {
if (connectAr.succeeded()) {
log.info("[testPropertyPost][连接成功]");
// 4. 发布消息并等待响应
String topic = String.format("/sys/%s/%s/thing/property/post", PRODUCT_KEY, DEVICE_NAME);
IotDeviceMessage response = publishAndWaitReply(client, topic, request);
log.info("[testPropertyPost][响应消息: {}]", response);
// 4.1 设置消息处理器,接收 _reply 响应
client.publishHandler(message -> {
log.info("[testPropertyPost][收到响应: topic={}, payload={}]",
message.topicName(), message.payload().toString());
});
// 4.2 订阅 _reply 主题
String replyTopic = String.format("/sys/%s/%s/thing/property/post_reply", PRODUCT_KEY, DEVICE_NAME);
client.subscribe(replyTopic, MqttQoS.AT_LEAST_ONCE.value())
.onComplete(subscribeAr -> {
if (subscribeAr.succeeded()) {
log.info("[testPropertyPost][订阅响应主题成功: {}]", replyTopic);
// 5. 构建属性上报消息Alink 协议格式)
String topic = String.format("/sys/%s/%s/thing/property/post", PRODUCT_KEY, DEVICE_NAME);
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("version", "1.0")
.put("method", IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod())
.put("params", IotDevicePropertyPostReqDTO.of(MapUtil.<String, Object>builder()
.put("width", 1)
.put("height", "2")
.build()))
.build());
log.info("[testPropertyPost][发送消息: topic={}, payload={}]", topic, payload);
// 6. 发布消息
client.publish(topic, Buffer.buffer(payload), MqttQoS.AT_LEAST_ONCE, false, false)
.onComplete(publishAr -> {
if (publishAr.succeeded()) {
log.info("[testPropertyPost][消息发布成功messageId={}]", publishAr.result());
} else {
log.error("[testPropertyPost][消息发布失败]", publishAr.cause());
}
// 等待一会儿接收响应
vertx.setTimer(2000, id -> {
client.disconnect()
.onComplete(disconnectAr -> {
log.info("[testPropertyPost][断开连接]");
latch.countDown();
});
});
});
} else {
log.error("[testPropertyPost][订阅响应主题失败]", subscribeAr.cause());
latch.countDown();
}
});
} else {
log.error("[testPropertyPost][连接失败]", connectAr.cause());
latch.countDown();
}
});
// 7. 等待测试完成
boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (!completed) {
log.warn("[testPropertyPost][测试超时]");
}
// 5. 断开连接
disconnect(client);
}
// ===================== 直连设备事件上报测试 =====================
@@ -217,87 +162,69 @@ public class IotDirectDeviceMqttProtocolIntegrationTest {
*/
@Test
public void testEventPost() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
// 1. 连接并认证
MqttClient client = connectAndAuth();
log.info("[testEventPost][连接认证成功]");
// 1. 构建认证信息
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
// 2. 订阅 _reply 主题
String replyTopic = String.format("/sys/%s/%s/thing/event/post_reply", PRODUCT_KEY, DEVICE_NAME);
subscribeReply(client, replyTopic);
// 2. 创建 MQTT 客户端配置
MqttClientOptions options = new MqttClientOptions()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword())
.setCleanSession(true)
.setKeepAliveInterval(60);
// 3. 构建事件上报消息
IotDeviceMessage request = IotDeviceMessage.of(
IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.EVENT_POST.getMethod(),
IotDeviceEventPostReqDTO.of(
"eat",
MapUtil.<String, Object>builder().put("rice", 3).build(),
System.currentTimeMillis()),
null, null, null);
// 3. 创建 MQTT 客户端并连接
MqttClient client = MqttClient.create(vertx, options);
// TODO @AI可以像 tcp 里面一样有个复用么auth 流程;
client.connect(SERVER_PORT, SERVER_HOST)
.onComplete(connectAr -> {
if (connectAr.succeeded()) {
log.info("[testEventPost][连接成功]");
// 4. 发布消息并等待响应
String topic = String.format("/sys/%s/%s/thing/event/post", PRODUCT_KEY, DEVICE_NAME);
IotDeviceMessage response = publishAndWaitReply(client, topic, request);
log.info("[testEventPost][响应消息: {}]", response);
// 4.1 设置消息处理器,接收 _reply 响应
client.publishHandler(message -> {
log.info("[testEventPost][收到响应: topic={}, payload={}]",
message.topicName(), message.payload().toString());
});
// 5. 断开连接
disconnect(client);
}
// 4.2 订阅 _reply 主题
String replyTopic = String.format("/sys/%s/%s/thing/event/post_reply", PRODUCT_KEY, DEVICE_NAME);
client.subscribe(replyTopic, MqttQoS.AT_LEAST_ONCE.value())
.onComplete(subscribeAr -> {
if (subscribeAr.succeeded()) {
log.info("[testEventPost][订阅响应主题成功: {}]", replyTopic);
// ===================== 设备动态注册测试(一型一密) =====================
// 5. 构建事件上报消息Alink 协议格式)
String topic = String.format("/sys/%s/%s/thing/event/post", PRODUCT_KEY, DEVICE_NAME);
String payload = JsonUtils.toJsonString(MapUtil.builder()
.put("id", IdUtil.fastSimpleUUID())
.put("version", "1.0")
.put("method", IotDeviceMessageMethodEnum.EVENT_POST.getMethod())
.put("params", IotDeviceEventPostReqDTO.of(
"eat",
MapUtil.<String, Object>builder().put("rice", 3).build(),
System.currentTimeMillis()))
.build());
log.info("[testEventPost][发送消息: topic={}, payload={}]", topic, payload);
/**
* 直连设备动态注册测试(一型一密)
* <p>
* 使用产品密钥productSecret验证身份成功后返回设备密钥deviceSecret
* <p>
* 注意:此接口不需要认证
*/
@Test
public void testDeviceRegister() throws Exception {
// 1. 连接并认证(使用已有设备连接)
MqttClient client = connectAndAuth();
log.info("[testDeviceRegister][连接认证成功]");
// 6. 发布消息
client.publish(topic, Buffer.buffer(payload), MqttQoS.AT_LEAST_ONCE, false, false)
.onComplete(publishAr -> {
if (publishAr.succeeded()) {
log.info("[testEventPost][消息发布成功messageId={}]", publishAr.result());
} else {
log.error("[testEventPost][消息发布失败]", publishAr.cause());
}
// 2.1 构建注册消息
IotDeviceRegisterReqDTO registerReqDTO = new IotDeviceRegisterReqDTO();
registerReqDTO.setProductKey(PRODUCT_KEY);
registerReqDTO.setDeviceName("test-mqtt-" + System.currentTimeMillis());
registerReqDTO.setProductSecret("test-product-secret");
IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(),
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerReqDTO, null, null, null);
// 2.2 订阅 _reply 主题
String replyTopic = String.format("/sys/%s/%s/thing/auth/register_reply",
registerReqDTO.getProductKey(), registerReqDTO.getDeviceName());
subscribeReply(client, replyTopic);
// 等待一会儿接收响应
vertx.setTimer(2000, id -> {
client.disconnect()
.onComplete(disconnectAr -> {
log.info("[testEventPost][断开连接]");
latch.countDown();
});
});
});
} else {
log.error("[testEventPost][订阅响应主题失败]", subscribeAr.cause());
latch.countDown();
}
});
} else {
log.error("[testEventPost][连接失败]", connectAr.cause());
latch.countDown();
}
});
// 3. 发布消息并等待响应
String topic = String.format("/sys/%s/%s/thing/auth/register",
registerReqDTO.getProductKey(), registerReqDTO.getDeviceName());
IotDeviceMessage response = publishAndWaitReply(client, topic, request);
log.info("[testDeviceRegister][响应消息: {}]", response);
log.info("[testDeviceRegister][成功后可使用返回的 deviceSecret 进行一机一密认证]");
// 7. 等待测试完成
boolean completed = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (!completed) {
log.warn("[testEventPost][测试超时]");
}
// 4. 断开连接
disconnect(client);
}
// ===================== 订阅下行消息测试 =====================
@@ -309,62 +236,170 @@ public class IotDirectDeviceMqttProtocolIntegrationTest {
public void testSubscribe() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
// 1. 构建认证信息
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
// 1. 连接并认证
MqttClient client = connectAndAuth();
log.info("[testSubscribe][连接认证成功]");
// 2. 创建 MQTT 客户端配置
MqttClientOptions options = new MqttClientOptions()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword())
.setCleanSession(true)
.setKeepAliveInterval(60);
// 2. 设置消息处理器
client.publishHandler(message -> {
log.info("[testSubscribe][收到消息: topic={}, payload={}]",
message.topicName(), message.payload().toString());
});
// 3. 创建 MQTT 客户端并连接
MqttClient client = MqttClient.create(vertx, options);
client.connect(SERVER_PORT, SERVER_HOST)
.onComplete(connectAr -> {
if (connectAr.succeeded()) {
log.info("[testSubscribe][连接成功]");
// 3. 订阅下行主题
String topic = String.format("/sys/%s/%s/thing/service/#", PRODUCT_KEY, DEVICE_NAME);
log.info("[testSubscribe][订阅主题: {}]", topic);
// 4. 设置消息处理器
client.publishHandler(message -> {
log.info("[testSubscribe][收到消息: topic={}, payload={}]",
message.topicName(), message.payload().toString());
});
// 5. 订阅下行主题
String topic = String.format("/sys/%s/%s/thing/service/#", PRODUCT_KEY, DEVICE_NAME);
log.info("[testSubscribe][订阅主题: {}]", topic);
client.subscribe(topic, MqttQoS.AT_LEAST_ONCE.value())
.onComplete(subscribeAr -> {
if (subscribeAr.succeeded()) {
log.info("[testSubscribe][订阅成功,等待下行消息... (按 Ctrl+C 结束)]");
// 保持连接 30 秒等待消息
vertx.setTimer(30000, id -> {
client.disconnect()
.onComplete(disconnectAr -> {
log.info("[testSubscribe][断开连接]");
latch.countDown();
});
});
} else {
log.error("[testSubscribe][订阅失败]", subscribeAr.cause());
client.subscribe(topic, MqttQoS.AT_LEAST_ONCE.value())
.onComplete(subscribeAr -> {
if (subscribeAr.succeeded()) {
log.info("[testSubscribe][订阅成功,等待下行消息... (30秒后自动断开)]");
// 保持连接 30 秒等待消息
vertx.setTimer(30000, id -> {
client.disconnect()
.onComplete(disconnectAr -> {
log.info("[testSubscribe][断开连接]");
latch.countDown();
}
});
});
});
} else {
log.error("[testSubscribe][连接失败]", connectAr.cause());
log.error("[testSubscribe][订阅失败]", subscribeAr.cause());
latch.countDown();
}
});
// 6. 等待测试完成
// 4. 等待测试完成
boolean completed = latch.await(60, TimeUnit.SECONDS);
if (!completed) {
log.warn("[testSubscribe][测试超时]");
}
}
// ===================== 辅助方法 =====================
/**
* 创建 MQTT 客户端
*
* @param authInfo 认证信息
* @return MQTT 客户端
*/
private MqttClient connect(IotDeviceAuthReqDTO authInfo) {
MqttClientOptions options = new MqttClientOptions()
.setClientId(authInfo.getClientId())
.setUsername(authInfo.getUsername())
.setPassword(authInfo.getPassword())
.setCleanSession(true)
.setKeepAliveInterval(60);
return MqttClient.create(vertx, options);
}
/**
* 连接并认证设备
*
* @return 已认证的 MQTT 客户端
*/
private MqttClient connectAndAuth() throws Exception {
// 1. 创建客户端并连接
IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET);
MqttClient client = connect(authInfo);
// 2.1 连接
CompletableFuture<MqttClient> future = new CompletableFuture<>();
client.connect(SERVER_PORT, SERVER_HOST)
.onComplete(ar -> {
if (ar.succeeded()) {
future.complete(client);
} else {
future.completeExceptionally(ar.cause());
}
});
// 2.2 等待连接结果
return future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
/**
* 订阅响应主题
*
* @param client MQTT 客户端
* @param replyTopic 响应主题
*/
private void subscribeReply(MqttClient client, String replyTopic) throws Exception {
// 1. 订阅响应主题
CompletableFuture<Void> future = new CompletableFuture<>();
client.subscribe(replyTopic, MqttQoS.AT_LEAST_ONCE.value())
.onComplete(ar -> {
if (ar.succeeded()) {
log.info("[subscribeReply][订阅响应主题成功: {}]", replyTopic);
future.complete(null);
} else {
future.completeExceptionally(ar.cause());
}
});
// 2. 等待订阅结果
future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
/**
* 发布消息并等待响应
*
* @param client MQTT 客户端
* @param topic 发布主题
* @param request 请求消息
* @return 响应消息
*/
private IotDeviceMessage publishAndWaitReply(MqttClient client, String topic, IotDeviceMessage request) {
// 1. 设置消息处理器,接收响应
CompletableFuture<IotDeviceMessage> future = new CompletableFuture<>();
client.publishHandler(message -> {
log.info("[publishAndWaitReply][收到响应: topic={}, payload={}]",
message.topicName(), message.payload().toString());
IotDeviceMessage response = CODEC.decode(message.payload().getBytes());
future.complete(response);
});
// 2. 编码并发布消息
byte[] payload = CODEC.encode(request);
log.info("[publishAndWaitReply][Codec: {}, 发送消息: topic={}, payload={}]",
CODEC.type(), topic, new String(payload));
client.publish(topic, Buffer.buffer(payload), MqttQoS.AT_LEAST_ONCE, false, false)
.onComplete(ar -> {
if (ar.succeeded()) {
log.info("[publishAndWaitReply][消息发布成功messageId={}]", ar.result());
} else {
log.error("[publishAndWaitReply][消息发布失败]", ar.cause());
future.completeExceptionally(ar.cause());
}
});
// 3. 等待响应(超时返回 null
try {
return future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (Exception e) {
log.warn("[publishAndWaitReply][等待响应超时或失败]");
return null;
}
}
/**
* 断开连接
*
* @param client MQTT 客户端
*/
private void disconnect(MqttClient client) throws Exception {
// 1. 断开连接
CompletableFuture<Void> future = new CompletableFuture<>();
client.disconnect()
.onComplete(ar -> {
if (ar.succeeded()) {
log.info("[disconnect][断开连接成功]");
future.complete(null);
} else {
future.completeExceptionally(ar.cause());
}
});
// 2. 等待断开结果
future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
}