feat(iot):【协议改造】emqx 初步改造(60%):支持 device register 设备注册

This commit is contained in:
YunaiV
2026-02-03 22:29:06 +08:00
parent 85c1b05bca
commit cc0d786d0f
4 changed files with 115 additions and 12 deletions

View File

@@ -199,7 +199,7 @@ public class IotEmqxProtocol implements IotProtocol {
router.route().handler(BodyHandler.create().setBodyLimit(1024 * 1024)); // 限制 body 大小为 1MB防止大包攻击
// 2. 创建处理器
IotEmqxAuthEventHandler handler = new IotEmqxAuthEventHandler(serverId);
IotEmqxAuthEventHandler handler = new IotEmqxAuthEventHandler(serverId, this);
router.post(IotMqttTopicUtils.MQTT_AUTH_PATH).handler(handler::handleAuth);
router.post(IotMqttTopicUtils.MQTT_ACL_PATH).handler(handler::handleAcl);
router.post(IotMqttTopicUtils.MQTT_EVENT_PATH).handler(handler::handleEvent);
@@ -517,4 +517,15 @@ public class IotEmqxProtocol implements IotProtocol {
.onFailure(e -> log.error("[publishMessage][IoT EMQX 协议 {} 发布失败, topic: {}]", getId(), topic, e));
}
/**
* 延迟发布消息到 MQTT Broker
*
* @param topic 主题
* @param payload 消息内容
* @param delayMs 延迟时间(毫秒)
*/
public void publishDelayMessage(String topic, byte[] payload, long delayMs) {
vertx.setTimer(delayMs, id -> publishMessage(topic, payload));
}
}

View File

@@ -1,14 +1,19 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.emqx.handler.upstream;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxProtocol;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils;
import io.vertx.core.json.JsonObject;
@@ -24,6 +29,7 @@ import java.util.Locale;
* 1. 设备认证接口 - 对应 EMQX HTTP 认证插件 {@link #handleAuth(RoutingContext)}
* 2. 设备事件处理接口 - 对应 EMQX Webhook 事件通知 {@link #handleEvent(RoutingContext)}
* 3. 设备 ACL 权限接口 - 对应 EMQX HTTP ACL 插件 {@link #handleAcl(RoutingContext)}
* 4. 设备注册接口 - 集成一型一密设备注册 {@link #handleDeviceRegister(RoutingContext, String, String)}
*
* @author 芋道源码
*/
@@ -57,13 +63,21 @@ public class IotEmqxAuthEventHandler {
*/
private static final String EVENT_CLIENT_DISCONNECTED = "client.disconnected";
/**
* 认证类型标识 - 设备注册
*/
private static final String AUTH_TYPE_REGISTER = "|authType=register|";
private final String serverId;
private final IotEmqxProtocol protocol;
private final IotDeviceMessageService deviceMessageService;
private final IotDeviceCommonApi deviceApi;
public IotEmqxAuthEventHandler(String serverId) {
public IotEmqxAuthEventHandler(String serverId, IotEmqxProtocol protocol) {
this.serverId = serverId;
this.protocol = protocol;
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
}
@@ -91,7 +105,13 @@ public class IotEmqxAuthEventHandler {
return;
}
// 2. 执行认证
// 2.1 情况一:判断是否为注册请求
if (StrUtil.endWith(clientId, AUTH_TYPE_REGISTER)) {
handleDeviceRegister(context, username, password);
return;
}
// 2.2 情况二:执行认证
boolean authResult = handleDeviceAuth(clientId, username, password);
log.info("[handleAuth][设备认证结果: {} -> {}]", username, authResult);
if (authResult) {
@@ -380,4 +400,73 @@ public class IotEmqxAuthEventHandler {
}
}
// ========= 注册处理 =========
/**
* 处理设备注册请求(一型一密)
*
* @param context 路由上下文
* @param username 用户名
* @param password 密码(签名)
*/
private void handleDeviceRegister(RoutingContext context, String username, String password) {
try {
// 1. 解析设备信息
IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(username);
if (deviceInfo == null) {
log.warn("[handleDeviceRegister][设备注册失败: 无法解析 username={}]", username);
sendAuthResponse(context, RESULT_DENY);
return;
}
// 2. 调用注册 API
IotDeviceRegisterReqDTO params = new IotDeviceRegisterReqDTO()
.setProductKey(deviceInfo.getProductKey())
.setDeviceName(deviceInfo.getDeviceName())
.setSign(password);
CommonResult<IotDeviceRegisterRespDTO> result = deviceApi.registerDevice(params);
result.checkError();
// 3. 允许连接
log.info("[handleDeviceRegister][设备注册成功: {}]", username);
sendAuthResponse(context, RESULT_ALLOW);
// 4. 延迟 5 秒发送注册结果(等待设备连接成功并完成订阅)
sendRegisterResultMessage(username, result.getData());
} catch (Exception e) {
log.warn("[handleDeviceRegister][设备注册失败: {}, 错误: {}]", username, e.getMessage());
sendAuthResponse(context, RESULT_DENY);
}
}
/**
* 发送注册结果消息给设备
* <p>
* 注意:延迟 5 秒发送,等待设备连接成功并完成订阅。
*
* @param username 用户名
* @param result 注册结果
*/
@SuppressWarnings("DataFlowIssue")
private void sendRegisterResultMessage(String username, IotDeviceRegisterRespDTO result) {
IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(username);
Assert.notNull(deviceInfo, "设备信息不能为空");
try {
// 1. 构建响应消息
String method = IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod();
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(null, method, result, 0, null);
// 2. 编码消息
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, "Alink");
// 3. 构建响应主题并延迟发布(等待设备连接成功并完成订阅)
String replyTopic = IotMqttTopicUtils.buildTopicByMethod(method,
deviceInfo.getProductKey(), deviceInfo.getDeviceName(), true);
protocol.publishDelayMessage(replyTopic, encodedData, 5000);
log.info("[sendRegisterResultMessage][发送注册结果: topic={}]", replyTopic);
} catch (Exception e) {
log.error("[sendRegisterResultMessage][发送注册结果失败: {}]", username, e);
}
}
}

View File

@@ -117,9 +117,9 @@ yudao:
# 针对引入的 MQTT 组件的配置
# ====================================
- id: mqtt-json
enabled: false
enabled: true
protocol: mqtt
port: 1884
port: 1883
serialize: json
mqtt:
max-message-size: 8192 # 最大消息大小(字节)
@@ -129,7 +129,7 @@ yudao:
# 针对引入的 EMQX 组件的配置
# ====================================
- id: emqx-1
enabled: true
enabled: false
protocol: emqx
port: 8090 # EMQX HTTP Hook 端口(/mqtt/auth、/mqtt/event
emqx:

View File

@@ -201,7 +201,12 @@ public class IotDirectDeviceMqttProtocolIntegrationTest {
MqttClient client = MqttClient.create(vertx, options);
try {
// 2. 设置消息处理器,接收注册响应
// 2. 连接服务器(连接成功后服务端会自动处理注册并发送响应
client.connect(SERVER_PORT, SERVER_HOST)
.toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
log.info("[testDeviceRegister][连接成功,等待注册响应...]");
// 3.1 设置消息处理器,接收注册响应
CompletableFuture<IotDeviceMessage> responseFuture = new CompletableFuture<>();
client.publishHandler(message -> {
log.info("[testDeviceRegister][收到响应: topic={}, payload={}]",
@@ -209,11 +214,9 @@ public class IotDirectDeviceMqttProtocolIntegrationTest {
IotDeviceMessage response = CODEC.decode(message.payload().getBytes());
responseFuture.complete(response);
});
// 3. 连接服务器(连接成功后服务端会自动处理注册并发送响应)
client.connect(SERVER_PORT, SERVER_HOST)
.toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
log.info("[testDeviceRegister][连接成功,等待注册响应...]");
// 3.2 订阅 _reply 主题
String replyTopic = String.format("/sys/%s/%s/thing/auth/register_reply", PRODUCT_KEY, deviceName);
subscribe(client, replyTopic);
// 4. 等待注册响应
IotDeviceMessage response = responseFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);