feat(iot):【协议改造】mqtt 初步改造(20%)

This commit is contained in:
YunaiV
2026-02-01 20:56:56 +08:00
parent c105ecea96
commit 1e2dc281e3
16 changed files with 964 additions and 784 deletions

View File

@@ -5,12 +5,7 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxAuthEventProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
@@ -67,41 +62,4 @@ public class IotGatewayConfiguration {
}
}
/**
* IoT 网关 MQTT 协议配置类
*/
@Configuration
@ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.mqtt", name = "enabled", havingValue = "true")
@Slf4j
public static class MqttProtocolConfiguration {
@Bean(name = "mqttVertx", destroyMethod = "close")
public Vertx mqttVertx() {
return Vertx.vertx();
}
@Bean
public IotMqttUpstreamProtocol iotMqttUpstreamProtocol(IotGatewayProperties gatewayProperties,
IotDeviceMessageService messageService,
IotMqttConnectionManager connectionManager,
@Qualifier("mqttVertx") Vertx mqttVertx) {
return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getMqtt(), messageService,
connectionManager, mqttVertx);
}
@Bean
public IotMqttDownstreamHandler iotMqttDownstreamHandler(IotDeviceMessageService messageService,
IotMqttConnectionManager connectionManager) {
return new IotMqttDownstreamHandler(messageService, connectionManager);
}
@Bean
public IotMqttDownstreamSubscriber iotMqttDownstreamSubscriber(IotMqttUpstreamProtocol mqttUpstreamProtocol,
IotMqttDownstreamHandler downstreamHandler,
IotMessageBus messageBus) {
return new IotMqttDownstreamSubscriber(mqttUpstreamProtocol, downstreamHandler, messageBus);
}
}
}

View File

@@ -1,13 +1,11 @@
package cn.iocoder.yudao.module.iot.gateway.config;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapConfig;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpConfig;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttConfig;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConfig;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpConfig;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketConfig;
import io.vertx.core.net.KeyCertOptions;
import io.vertx.core.net.TrustOptions;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
@@ -87,11 +85,6 @@ public class IotGatewayProperties {
*/
private EmqxProperties emqx;
/**
* MQTT 组件配置
*/
private MqttProperties mqtt;
}
@Data
@@ -263,79 +256,6 @@ public class IotGatewayProperties {
}
@Data
public static class MqttProperties {
/**
* 是否开启
*/
@NotNull(message = "是否开启不能为空")
private Boolean enabled;
/**
* 服务器端口
*/
private Integer port = 1883;
/**
* 最大消息大小(字节)
*/
private Integer maxMessageSize = 8192;
/**
* 连接超时时间(秒)
*/
private Integer connectTimeoutSeconds = 60;
/**
* 保持连接超时时间(秒)
*/
private Integer keepAliveTimeoutSeconds = 300;
// NOTESSL 相关参数后续统一到 protocol 层级(优先级低)
/**
* 是否启用 SSL
*/
private Boolean sslEnabled = false;
/**
* SSL 配置
*/
private SslOptions sslOptions = new SslOptions();
/**
* SSL 配置选项
*/
@Data
public static class SslOptions {
/**
* 密钥证书选项
*/
private KeyCertOptions keyCertOptions;
/**
* 信任选项
*/
private TrustOptions trustOptions;
/**
* SSL 证书路径
*/
private String certPath;
/**
* SSL 私钥路径
*/
private String keyPath;
/**
* 信任存储路径
*/
private String trustStorePath;
/**
* 信任存储密码
*/
private String trustStorePassword;
}
}
// NOTE暂未统一为 ProtocolProperties待协议改造完成再调整
/**
* 协议实例配置
@@ -376,6 +296,8 @@ public class IotGatewayProperties {
*/
private String serialize;
// ========== 各协议配置 ==========
/**
* HTTP 协议配置
*/
@@ -406,6 +328,12 @@ public class IotGatewayProperties {
@Valid
private IotWebSocketConfig websocket;
/**
* MQTT 协议配置
*/
@Valid
private IotMqttConfig mqtt;
}
}

View File

@@ -6,6 +6,7 @@ import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketProtocol;
@@ -106,6 +107,8 @@ public class IotProtocolManager implements SmartLifecycle {
return createCoapProtocol(config);
case WEBSOCKET:
return createWebSocketProtocol(config);
case MQTT:
return createMqttProtocol(config);
default:
throw new IllegalArgumentException(String.format(
"[createProtocol][协议实例 %s 的协议类型 %s 暂不支持]", config.getId(), protocolType));
@@ -162,4 +165,14 @@ public class IotProtocolManager implements SmartLifecycle {
return new IotWebSocketProtocol(config);
}
/**
* 创建 MQTT 协议实例
*
* @param config 协议实例配置
* @return MQTT 协议实例
*/
private IotMqttProtocol createMqttProtocol(IotGatewayProperties.ProtocolInstanceProperties config) {
return new IotMqttProtocol(config);
}
}

View File

@@ -0,0 +1,44 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
import lombok.Data;
// TODO @AIvalidator 参数校验。也看看其他几个配置类有没有类似问题
/**
* IoT 网关 MQTT 协议配置
*
* @author 芋道源码
*/
@Data
public class IotMqttConfig {
/**
* 是否启用 SSL
*/
private Boolean sslEnabled = false;
/**
* SSL 证书路径
*/
private String sslCertPath;
/**
* SSL 私钥路径
*/
private String sslKeyPath;
/**
* 最大消息大小(字节)
*/
private Integer maxMessageSize = 8192;
/**
* 连接超时时间(秒)
*/
private Integer connectTimeoutSeconds = 60;
/**
* 保持连接超时时间(秒)
*/
private Integer keepAliveTimeoutSeconds = 300;
}

View File

@@ -0,0 +1,317 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolInstanceProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.downstream.IotMqttDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.downstream.IotMqttDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream.IotMqttConnectionHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream.IotMqttRegisterHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream.IotMqttUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Vertx;
import io.vertx.core.net.PemKeyCertOptions;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
import io.vertx.mqtt.MqttTopicSubscription;
import io.vertx.mqtt.messages.MqttPublishMessage;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.stream.Collectors;
/**
* IoT 网关 MQTT 协议:接收设备上行消息
*
* @author 芋道源码
*/
@Slf4j
public class IotMqttProtocol implements IotProtocol {
/**
* 协议配置
*/
private final ProtocolInstanceProperties properties;
/**
* 服务器 ID用于消息追踪全局唯一
*/
@Getter
private final String serverId;
/**
* 运行状态
*/
@Getter
private volatile boolean running = false;
/**
* Vert.x 实例
*/
private Vertx vertx;
/**
* MQTT 服务器
*/
private MqttServer mqttServer;
/**
* 连接管理器
*/
private IotMqttConnectionManager connectionManager;
/**
* 下行消息订阅者
*/
private IotMqttDownstreamSubscriber downstreamSubscriber;
// TODO @AI这个是不是提前创建下因为是无状态的。
private IotMqttConnectionHandler connectionHandler;
private IotMqttRegisterHandler registerHandler;
private IotMqttUpstreamHandler upstreamHandler;
public IotMqttProtocol(ProtocolInstanceProperties properties) {
this.properties = properties;
this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort());
// TODO @AI初始化连接器参考 IotTcpProtocol
// TODO @AI初始化下行消息订阅者参考 IotTcpProtocol
}
@Override
public String getId() {
return properties.getId();
}
@Override
public IotProtocolTypeEnum getType() {
return IotProtocolTypeEnum.MQTT;
}
// TODO @AI这个方法的整体注释风格参考 IotTcpProtocol 的 start 方法。
@Override
public void start() {
if (running) {
log.warn("[start][IoT MQTT 协议 {} 已经在运行中]", getId());
return;
}
// 1.1 创建 Vertx 实例(每个 Protocol 独立管理)
this.vertx = Vertx.vertx();
// 1.2 创建连接管理器
this.connectionManager = new IotMqttConnectionManager();
// 1.3 初始化 Handler
initHandlers();
// 2. 创建服务器选项
IotMqttConfig mqttConfig = properties.getMqtt();
// TODO @AIdefault 值,在 IotMqttConfig 处理;
MqttServerOptions options = new MqttServerOptions()
.setPort(properties.getPort())
.setMaxMessageSize(mqttConfig != null ? mqttConfig.getMaxMessageSize() : 8192)
.setTimeoutOnConnect(mqttConfig != null ? mqttConfig.getConnectTimeoutSeconds() : 60);
// 3. 配置 SSL如果启用
if (mqttConfig != null && Boolean.TRUE.equals(mqttConfig.getSslEnabled())) {
PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions()
.setKeyPath(mqttConfig.getSslKeyPath())
.setCertPath(mqttConfig.getSslCertPath());
options.setSsl(true).setKeyCertOptions(pemKeyCertOptions);
}
// 4. 创建服务器并设置连接处理器
mqttServer = MqttServer.create(vertx, options);
mqttServer.endpointHandler(this::handleEndpoint);
// 5. 启动服务器
try {
mqttServer.listen().result();
running = true;
log.info("[start][IoT MQTT 协议 {} 启动成功,端口:{}serverId{}]",
getId(), properties.getPort(), serverId);
// 6. 启动下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
IotMqttDownstreamHandler downstreamHandler = new IotMqttDownstreamHandler(
SpringUtil.getBean(IotDeviceMessageService.class), connectionManager);
this.downstreamSubscriber = new IotMqttDownstreamSubscriber(this, downstreamHandler, messageBus);
this.downstreamSubscriber.start();
} catch (Exception e) {
log.error("[start][IoT MQTT 协议 {} 启动失败]", getId(), e);
// 启动失败时关闭 Vertx
if (vertx != null) {
vertx.close();
vertx = null;
}
throw e;
}
}
@Override
public void stop() {
if (!running) {
return;
}
// 1. 停止下行消息订阅者
if (downstreamSubscriber != null) {
try {
downstreamSubscriber.stop();
log.info("[stop][IoT MQTT 协议 {} 下行消息订阅者已停止]", getId());
} catch (Exception e) {
log.error("[stop][IoT MQTT 协议 {} 下行消息订阅者停止失败]", getId(), e);
}
downstreamSubscriber = null;
}
// 2.1 关闭 MQTT 服务器
if (mqttServer != null) {
try {
mqttServer.close().result();
log.info("[stop][IoT MQTT 协议 {} 服务器已停止]", getId());
} catch (Exception e) {
log.error("[stop][IoT MQTT 协议 {} 服务器停止失败]", getId(), e);
}
mqttServer = null;
}
// 2.2 关闭 Vertx 实例
if (vertx != null) {
try {
vertx.close().result();
log.info("[stop][IoT MQTT 协议 {} Vertx 已关闭]", getId());
} catch (Exception e) {
log.error("[stop][IoT MQTT 协议 {} Vertx 关闭失败]", getId(), e);
}
vertx = null;
}
running = false;
log.info("[stop][IoT MQTT 协议 {} 已停止]", getId());
}
/**
* 初始化 Handler
*/
private void initHandlers() {
IotDeviceMessageService messageService = SpringUtil.getBean(IotDeviceMessageService.class);
IotDeviceCommonApi deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
this.connectionHandler = new IotMqttConnectionHandler(connectionManager, messageService, deviceApi, serverId);
this.registerHandler = new IotMqttRegisterHandler(connectionManager, messageService, deviceApi);
this.upstreamHandler = new IotMqttUpstreamHandler(connectionManager, messageService, serverId);
}
/**
* 处理 MQTT 连接端点
*
* @param endpoint MQTT 连接端点
*/
private void handleEndpoint(MqttEndpoint endpoint) {
String clientId = endpoint.clientIdentifier();
// 1. 委托 connectionHandler 处理连接认证
// TODO @AIregister topic 不需要注册,需要判断下;
if (!connectionHandler.handleConnect(endpoint)) {
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
return;
}
// 2.1 设置异常和关闭处理器
endpoint.exceptionHandler(ex -> {
log.warn("[handleEndpoint][连接异常,客户端 ID: {},地址: {}]",
clientId, connectionManager.getEndpointAddress(endpoint));
// TODO @AI是不是改成 endpoint close 更合适?
connectionHandler.cleanupConnection(endpoint);
});
endpoint.closeHandler(v -> connectionHandler.cleanupConnection(endpoint));
endpoint.disconnectHandler(v -> {
log.debug("[handleEndpoint][设备断开连接,客户端 ID: {}]", clientId);
connectionHandler.cleanupConnection(endpoint);
});
// 2.2 设置心跳处理器
endpoint.pingHandler(v -> log.debug("[handleEndpoint][收到客户端心跳,客户端 ID: {}]", clientId));
// 3.1 设置消息处理器
endpoint.publishHandler(message -> processMessage(endpoint, message));
// 3.2 设置 QoS 2 消息的 PUBREL 处理器
endpoint.publishReleaseHandler(endpoint::publishComplete);
// 4.1 设置订阅处理器
endpoint.subscribeHandler(subscribe -> {
// TODO @AIconvertList 简化;
List<String> topicNames = subscribe.topicSubscriptions().stream()
.map(MqttTopicSubscription::topicName)
.collect(Collectors.toList());
log.debug("[handleEndpoint][设备订阅,客户端 ID: {},主题: {}]", clientId, topicNames);
// TODO @AIconvertList 简化;
List<MqttQoS> grantedQoSLevels = subscribe.topicSubscriptions().stream()
.map(MqttTopicSubscription::qualityOfService)
.collect(Collectors.toList());
endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQoSLevels);
});
// 4.2 设置取消订阅处理器
endpoint.unsubscribeHandler(unsubscribe -> {
log.debug("[handleEndpoint][设备取消订阅,客户端 ID: {},主题: {}]", clientId, unsubscribe.topics());
endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
});
// 5. 接受连接
endpoint.accept(false);
}
/**
* 处理消息(发布)
*
* @param endpoint MQTT 连接端点
* @param message 发布消息
*/
// TODO @AI看看要不要一定程度参考 IotTcpUpstreamHandler 的 processMessage 方法;
private void processMessage(MqttEndpoint endpoint, MqttPublishMessage message) {
String clientId = endpoint.clientIdentifier();
try {
String topic = message.topicName();
byte[] payload = message.payload().getBytes();
// 根据 topic 分发到不同 handler
if (registerHandler.isRegisterMessage(topic)) {
registerHandler.handleRegister(endpoint, topic, payload);
} else {
upstreamHandler.handleMessage(endpoint, topic, payload);
}
// 根据 QoS 级别发送相应的确认消息
handleQoSAck(endpoint, message);
} catch (Exception e) {
// TODO @AI异常的时候直接断开
log.error("[handlePublish][消息处理失败,断开连接,客户端 ID: {},地址: {},错误: {}]",
clientId, connectionManager.getEndpointAddress(endpoint), e.getMessage());
connectionHandler.cleanupConnection(endpoint);
endpoint.close();
}
}
/**
* 处理 QoS 确认
*
* @param endpoint MQTT 连接端点
* @param message 发布消息
*/
private void handleQoSAck(MqttEndpoint endpoint, MqttPublishMessage message) {
if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
// QoS 1: 发送 PUBACK 确认
endpoint.publishAcknowledge(message.messageId());
} else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
// QoS 2: 发送 PUBREC 确认
endpoint.publishReceived(message.messageId());
}
// QoS 0 无需确认
}
}

View File

@@ -1,117 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 MQTT 协议:接收设备上行消息
*
* @author 芋道源码
*/
@Slf4j
public class IotMqttUpstreamProtocol implements IotProtocol {
private static final String ID = "mqtt";
private volatile boolean running = false;
private final IotGatewayProperties.MqttProperties mqttProperties;
private final IotDeviceMessageService messageService;
private final IotMqttConnectionManager connectionManager;
private final Vertx vertx;
@Getter
private final String serverId;
private MqttServer mqttServer;
public IotMqttUpstreamProtocol(IotGatewayProperties.MqttProperties mqttProperties,
IotDeviceMessageService messageService,
IotMqttConnectionManager connectionManager,
Vertx vertx) {
this.mqttProperties = mqttProperties;
this.messageService = messageService;
this.connectionManager = connectionManager;
this.vertx = vertx;
this.serverId = IotDeviceMessageUtils.generateServerId(mqttProperties.getPort());
}
@Override
public String getId() {
return ID;
}
@Override
public IotProtocolTypeEnum getType() {
return IotProtocolTypeEnum.MQTT;
}
@Override
public boolean isRunning() {
return running;
}
// TODO @haohao这里的编写是不是和 tcp 对应的,风格保持一致哈;
@Override
@PostConstruct
public void start() {
// 创建服务器选项
MqttServerOptions options = new MqttServerOptions()
.setPort(mqttProperties.getPort())
.setMaxMessageSize(mqttProperties.getMaxMessageSize())
.setTimeoutOnConnect(mqttProperties.getConnectTimeoutSeconds());
// 配置 SSL如果启用
if (Boolean.TRUE.equals(mqttProperties.getSslEnabled())) {
options.setSsl(true)
.setKeyCertOptions(mqttProperties.getSslOptions().getKeyCertOptions())
.setTrustOptions(mqttProperties.getSslOptions().getTrustOptions());
}
// 创建服务器并设置连接处理器
mqttServer = MqttServer.create(vertx, options);
mqttServer.endpointHandler(endpoint -> {
IotMqttUpstreamHandler handler = new IotMqttUpstreamHandler(this, messageService, connectionManager);
handler.handle(endpoint);
});
// 启动服务器
try {
mqttServer.listen().result();
running = true;
log.info("[start][IoT 网关 MQTT 协议启动成功,端口:{}]", mqttProperties.getPort());
} catch (Exception e) {
log.error("[start][IoT 网关 MQTT 协议启动失败]", e);
throw e;
}
}
@Override
@PreDestroy
public void stop() {
if (mqttServer != null) {
try {
mqttServer.close().result();
running = false;
log.info("[stop][IoT 网关 MQTT 协议已停止]");
} catch (Exception e) {
log.error("[stop][IoT 网关 MQTT 协议停止失败]", e);
}
}
}
}

View File

@@ -1,4 +1,4 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.downstream;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
@@ -7,31 +7,22 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnecti
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils;
import io.netty.handler.codec.mqtt.MqttQoS;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 MQTT 协议下行消息处理器
* <p>
* 专门处理下行消息的业务逻辑包括
* 1. 消息编码
* 2. 主题构建
* 3. 消息发送
*
* @author 芋道源码
*/
@Slf4j
@RequiredArgsConstructor
public class IotMqttDownstreamHandler {
private final IotDeviceMessageService deviceMessageService;
private final IotMqttConnectionManager connectionManager;
public IotMqttDownstreamHandler(IotDeviceMessageService deviceMessageService,
IotMqttConnectionManager connectionManager) {
this.deviceMessageService = deviceMessageService;
this.connectionManager = connectionManager;
}
/**
* 处理下行消息
*
@@ -40,6 +31,7 @@ public class IotMqttDownstreamHandler {
*/
public boolean handleDownstreamMessage(IotDeviceMessage message) {
try {
// TODO @AI参考 IotTcpDownstreamHandler 逻辑
// 1. 基础校验
if (message == null || message.getDeviceId() == null) {
log.warn("[handleDownstreamMessage][消息或设备 ID 为空,忽略处理]");
@@ -47,6 +39,7 @@ public class IotMqttDownstreamHandler {
}
// 2. 检查设备是否在线
// TODO @AI这块逻辑是不是冗余直接使用 3. 获取连接信息判断不就行了
if (connectionManager.isDeviceOffline(message.getDeviceId())) {
log.warn("[handleDownstreamMessage][设备离线,无法发送消息,设备 ID{}]", message.getDeviceId());
return false;
@@ -59,7 +52,7 @@ public class IotMqttDownstreamHandler {
return false;
}
// 4. 编码消息
// 4. 序列化
byte[] payload = deviceMessageService.encodeDeviceMessage(message, connectionInfo.getProductKey(),
connectionInfo.getDeviceName());
if (payload == null || payload.length == 0) {
@@ -68,6 +61,7 @@ public class IotMqttDownstreamHandler {
}
// 5. 发送消息到设备
// TODO @AI参考 IotTcpDownstreamHandler 的逻辑
return sendMessageToDevice(message, connectionInfo, payload);
} catch (Exception e) {
if (message != null) {
@@ -78,6 +72,7 @@ public class IotMqttDownstreamHandler {
}
}
// TODO @AI 是不是合并到 handleDownstreamMessage
/**
* 发送消息到设备
*
@@ -91,6 +86,7 @@ public class IotMqttDownstreamHandler {
byte[] payload) {
// 1. 构建主题
String topic = buildDownstreamTopic(message, connectionInfo);
// TODO @AI直接断言非空
if (StrUtil.isBlank(topic)) {
log.warn("[sendMessageToDevice][主题构建失败,设备 ID{},方法:{}]",
message.getDeviceId(), message.getMethod());
@@ -118,6 +114,7 @@ public class IotMqttDownstreamHandler {
*/
private String buildDownstreamTopic(IotDeviceMessage message,
IotMqttConnectionManager.ConnectionInfo connectionInfo) {
// TODO @AI直接断言非空
String method = message.getMethod();
if (StrUtil.isBlank(method)) {
return null;

View File

@@ -1,9 +1,9 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.downstream;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttProtocol;
import lombok.extern.slf4j.Slf4j;
/**
@@ -18,7 +18,7 @@ public class IotMqttDownstreamSubscriber extends IotProtocolDownstreamSubscriber
private final IotMqttDownstreamHandler downstreamHandler;
public IotMqttDownstreamSubscriber(IotMqttUpstreamProtocol protocol,
public IotMqttDownstreamSubscriber(IotMqttProtocol protocol,
IotMqttDownstreamHandler downstreamHandler,
IotMessageBus messageBus) {
super(protocol, messageBus);
@@ -27,6 +27,7 @@ public class IotMqttDownstreamSubscriber extends IotProtocolDownstreamSubscriber
@Override
protected void handleMessage(IotDeviceMessage message) {
// TODO @AI参考 IotTcpDownstreamHandler 不处理返回值甚至不用返回值
boolean success = downstreamHandler.handleDownstreamMessage(message);
if (success) {
log.debug("[handleMessage][下行消息处理成功, messageId: {}, method: {}, deviceId: {}]",

View File

@@ -0,0 +1,105 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.MqttEndpoint;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 MQTT 协议的处理器抽象基类
* <p>
* 提供通用的连接校验、响应发送等功能
*
* @author 芋道源码
*/
@Slf4j
@RequiredArgsConstructor
public abstract class IotMqttAbstractHandler {
// TODO @AI不使用 codec使用 serializer 来进行编解码;
/**
* 默认编解码类型MQTT 使用 Alink 协议)
*/
protected static final String DEFAULT_CODEC_TYPE = "Alink";
protected final IotMqttConnectionManager connectionManager;
protected final IotDeviceMessageService deviceMessageService;
// TODO @AI这个是否需要
/**
* 是否需要连接已认证(默认 true
* <p>
* 仅 IotMqttConnectionHandler 覆盖为 false
*
* @return 是否需要连接已认证
*/
protected boolean requiresAuthenticated() {
return true;
}
// TODO @AI不确定是不是基于 method 就可以算出来 reply topic
// TODO @AI需要传递 seriabler 序列对象,不是通过 deviceMessageService.encodeDeviceMessage 获取到合适的;
/**
* 发送成功响应到设备
*
* @param endpoint MQTT 连接端点
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param requestId 请求 ID
* @param method 方法名
* @param data 响应数据
*/
protected void sendSuccessResponse(MqttEndpoint endpoint, String productKey, String deviceName,
String requestId, String method, Object data) {
try {
// 1. 构建响应消息
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, method, data, 0, null);
// 2. 编码消息(使用默认编解码器)
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, DEFAULT_CODEC_TYPE);
// 3. 构建响应主题并发送
String replyTopic = IotMqttTopicUtils.buildTopicByMethod(method, productKey, deviceName, true);
endpoint.publish(replyTopic, Buffer.buffer(encodedData), MqttQoS.AT_LEAST_ONCE, false, false);
log.debug("[sendSuccessResponse][发送成功响应,主题: {}]", replyTopic);
} catch (Exception e) {
log.error("[sendSuccessResponse][发送成功响应异常,客户端 ID: {}]", endpoint.clientIdentifier(), e);
}
}
/**
* 发送错误响应到设备
*
* @param endpoint MQTT 连接端点
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param requestId 请求 ID
* @param method 方法名
* @param errorMessage 错误消息
*/
protected void sendErrorResponse(MqttEndpoint endpoint, String productKey, String deviceName,
String requestId, String method, String errorMessage) {
try {
// 1. 构建响应消息
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, method, null, 400, errorMessage);
// 2. 编码消息(使用默认编解码器)
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, DEFAULT_CODEC_TYPE);
// 3. 构建响应主题并发送
String replyTopic = IotMqttTopicUtils.buildTopicByMethod(method, productKey, deviceName, true);
endpoint.publish(replyTopic, Buffer.buffer(encodedData), MqttQoS.AT_LEAST_ONCE, false, false);
log.debug("[sendErrorResponse][发送错误响应,主题: {}]", replyTopic);
} catch (Exception e) {
log.error("[sendErrorResponse][发送错误响应异常,客户端 ID: {}]", endpoint.clientIdentifier(), e);
}
}
// TODO @AI搞个基础的 writeResponse 会不会更好?参考 IotTcpUpstreamHandler 里;
}

View File

@@ -0,0 +1,178 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceGetReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.mqtt.MqttEndpoint;
import lombok.extern.slf4j.Slf4j;
// TODO @AI改成 auth 这样的命名,更好理解;
/**
* IoT 网关 MQTT 连接处理器
* <p>
* 处理 MQTT CONNECT 事件,完成设备认证、连接注册、上线通知
*
* @author 芋道源码
*/
@Slf4j
public class IotMqttConnectionHandler extends IotMqttAbstractHandler {
// TODO @AI通过 springutil 去获取!
private final IotDeviceCommonApi deviceApi;
private final String serverId;
public IotMqttConnectionHandler(IotMqttConnectionManager connectionManager,
IotDeviceMessageService deviceMessageService,
IotDeviceCommonApi deviceApi,
String serverId) {
super(connectionManager, deviceMessageService);
this.deviceApi = deviceApi;
this.serverId = serverId;
}
@Override
protected boolean requiresAuthenticated() {
return false; // 连接阶段不需要已认证
}
/**
* 处理 MQTT 连接请求
*
* @param endpoint MQTT 连接端点
* @return 认证是否成功
*/
public boolean handleConnect(MqttEndpoint endpoint) {
// TODO @AI整个 try catch 下;
// TODO @AI是不是参考 IotTcpUpstreamHandler 的代码结构
String clientId = endpoint.clientIdentifier();
String username = endpoint.auth() != null ? endpoint.auth().getUsername() : null;
String password = endpoint.auth() != null ? endpoint.auth().getPassword() : null;
log.debug("[handleConnect][设备连接请求,客户端 ID: {},用户名: {},地址: {}]",
clientId, username, connectionManager.getEndpointAddress(endpoint));
// 进行认证
if (!authenticateDevice(clientId, username, password, endpoint)) {
log.warn("[handleConnect][设备认证失败,拒绝连接,客户端 ID: {},用户名: {}]", clientId, username);
return false;
}
log.info("[handleConnect][设备认证成功,建立连接,客户端 ID: {},用户名: {}]", clientId, username);
return true;
}
/**
* 在 MQTT 连接时进行设备认证
*
* @param clientId 客户端 ID
* @param username 用户名
* @param password 密码
* @param endpoint MQTT 连接端点
* @return 认证是否成功
*/
private boolean authenticateDevice(String clientId, String username, String password, MqttEndpoint endpoint) {
try {
// 1.1 解析认证参数
// TODO @AI断言统一交给上层打印日志
if (StrUtil.hasEmpty(clientId, username, password)) {
log.warn("[authenticateDevice][认证参数不完整,客户端 ID: {},用户名: {}]", clientId, username);
return false;
}
// 1.2 构建认证参数
IotDeviceAuthReqDTO authParams = new IotDeviceAuthReqDTO()
.setClientId(clientId)
.setUsername(username)
.setPassword(password);
// 2.1 执行认证
CommonResult<Boolean> authResult = deviceApi.authDevice(authParams);
// TODO @AI断言统一交给上层打印日志
if (!authResult.isSuccess() || !BooleanUtil.isTrue(authResult.getData())) {
log.warn("[authenticateDevice][设备认证失败,客户端 ID: {},用户名: {},错误: {}]",
clientId, username, authResult.getMsg());
return false;
}
// 2.2 获取设备信息
IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(username);
if (deviceInfo == null) {
log.warn("[authenticateDevice][用户名格式不正确,客户端 ID: {},用户名: {}]", clientId, username);
return false;
}
// 2.3 获取设备信息
// TODO @AI报错需要处理下
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(), deviceInfo.getDeviceName());
if (device == null) {
log.warn("[authenticateDevice][设备不存在,客户端 ID: {},用户名: {}]", clientId, username);
return false;
}
// 3.1 注册连接
registerConnection(endpoint, device, clientId);
// 3.2 发送设备上线消息
sendOnlineMessage(device);
return true;
} catch (Exception e) {
log.error("[authenticateDevice][设备认证异常,客户端 ID: {},用户名: {}]", clientId, username, e);
return false;
}
}
/**
* 注册连接
*/
private void registerConnection(MqttEndpoint endpoint, IotDeviceRespDTO device, String clientId) {
IotMqttConnectionManager.ConnectionInfo connectionInfo = new IotMqttConnectionManager.ConnectionInfo()
.setDeviceId(device.getId())
.setProductKey(device.getProductKey())
.setDeviceName(device.getDeviceName())
.setClientId(clientId)
.setAuthenticated(true)
.setRemoteAddress(connectionManager.getEndpointAddress(endpoint));
connectionManager.registerConnection(endpoint, device.getId(), connectionInfo);
}
/**
* 发送设备上线消息
*/
private void sendOnlineMessage(IotDeviceRespDTO device) {
try {
IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
deviceMessageService.sendDeviceMessage(onlineMessage, device.getProductKey(),
device.getDeviceName(), serverId);
log.info("[sendOnlineMessage][设备上线,设备 ID: {},设备名称: {}]", device.getId(), device.getDeviceName());
} catch (Exception e) {
log.error("[sendOnlineMessage][发送设备上线消息失败,设备 ID: {},错误: {}]", device.getId(), e.getMessage());
}
}
/**
* 清理连接
*
* @param endpoint MQTT 连接端点
*/
public void cleanupConnection(MqttEndpoint endpoint) {
try {
// 1. 发送设备离线消息
IotMqttConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(endpoint);
if (connectionInfo != null) {
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
deviceMessageService.sendDeviceMessage(offlineMessage, connectionInfo.getProductKey(),
connectionInfo.getDeviceName(), serverId);
}
// 2. 注销连接
connectionManager.unregisterConnection(endpoint);
} catch (Exception e) {
log.error("[cleanupConnection][清理连接失败,客户端 ID: {},错误: {}]",
endpoint.clientIdentifier(), e.getMessage());
}
}
}

View File

@@ -0,0 +1,182 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.mqtt.MqttEndpoint;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
/**
* IoT 网关 MQTT 设备注册处理器
* <p>
* 处理设备动态注册消息(一型一密)
*
* @author 芋道源码
*/
@Slf4j
public class IotMqttRegisterHandler extends IotMqttAbstractHandler {
// TODO IotDeviceMessageMethodEnum.DEVICE_REGISTER 计算出来IotMqttTopicUtils
/**
* register 请求的 topic 后缀
*/
public static final String REGISTER_TOPIC_SUFFIX = "/thing/auth/register";
private final IotDeviceCommonApi deviceApi;
public IotMqttRegisterHandler(IotMqttConnectionManager connectionManager,
IotDeviceMessageService deviceMessageService,
IotDeviceCommonApi deviceApi) {
super(connectionManager, deviceMessageService);
// TODO @AI通过 springutil 处理;
this.deviceApi = deviceApi;
}
/**
* 判断是否为注册消息
*
* @param topic 主题
* @return 是否为注册消息
*/
public boolean isRegisterMessage(String topic) {
// TODO @AI是不是搞到 IotMqttTopicUtils 里?
return topic != null && topic.endsWith(REGISTER_TOPIC_SUFFIX);
}
/**
* 处理注册消息
*
* @param endpoint MQTT 连接端点
* @param topic 主题
* @param payload 消息内容
*/
public void handleRegister(MqttEndpoint endpoint, String topic, byte[] payload) {
String clientId = endpoint.clientIdentifier();
// 1.1 基础检查
if (ArrayUtil.isEmpty(payload)) {
return;
}
// 1.2 解析主题,获取 productKey 和 deviceName
// TODO @AI直接断言报错
String[] topicParts = topic.split("/");
if (topicParts.length < 4 || StrUtil.hasBlank(topicParts[2], topicParts[3])) {
log.warn("[handleRegister][topic({}) 格式不正确]", topic);
return;
}
String productKey = topicParts[2];
String deviceName = topicParts[3];
// TODO @AI直接断言报错
// 2. 使用默认编解码器解码消息(设备可能未注册,无法获取 codecType
// TODO @AI使用默认的 json
IotDeviceMessage message;
try {
message = deviceMessageService.decodeDeviceMessage(payload, DEFAULT_CODEC_TYPE);
if (message == null) {
log.warn("[handleRegister][消息解码失败,客户端 ID: {},主题: {}]", clientId, topic);
return;
}
} catch (Exception e) {
log.error("[handleRegister][消息解码异常,客户端 ID: {},主题: {},错误: {}]",
clientId, topic, e.getMessage(), e);
return;
}
// 3. 处理设备动态注册请求
log.info("[handleRegister][收到设备注册消息,设备: {}.{}, 方法: {}]",
productKey, deviceName, message.getMethod());
try {
processRegisterRequest(message, productKey, deviceName, endpoint);
} catch (Exception e) {
// TODO @AI各种情况下的翻译
log.error("[handleRegister][消息处理异常,客户端 ID: {},主题: {},错误: {}]",
clientId, topic, e.getMessage(), e);
}
}
/**
* 处理设备动态注册请求(一型一密,不需要 deviceSecret
*
* @param message 消息信息
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param endpoint MQTT 连接端点
* @see <a href="https://help.aliyun.com/zh/iot/user-guide/unique-certificate-per-product-verification">阿里云 - 一型一密</a>
*/
private void processRegisterRequest(IotDeviceMessage message, String productKey, String deviceName,
MqttEndpoint endpoint) {
String clientId = endpoint.clientIdentifier();
String method = IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod();
try {
// 1. 解析注册参数
IotDeviceRegisterReqDTO params = parseRegisterParams(message.getParams());
if (params == null) {
log.warn("[processRegisterRequest][注册参数解析失败,客户端 ID: {}]", clientId);
sendErrorResponse(endpoint, productKey, deviceName, message.getRequestId(), method, "注册参数不完整");
return;
}
// 2. 调用动态注册 API
CommonResult<IotDeviceRegisterRespDTO> result = deviceApi.registerDevice(params);
if (result.isError()) {
log.warn("[processRegisterRequest][注册失败,客户端 ID: {},错误: {}]", clientId, result.getMsg());
sendErrorResponse(endpoint, productKey, deviceName, message.getRequestId(), method, result.getMsg());
return;
}
// 3. 发送成功响应(包含 deviceSecret
sendSuccessResponse(endpoint, productKey, deviceName, message.getRequestId(), method, result.getData());
log.info("[processRegisterRequest][注册成功,设备名: {},客户端 ID: {}]",
params.getDeviceName(), clientId);
} catch (Exception e) {
log.error("[processRegisterRequest][注册处理异常,客户端 ID: {}]", clientId, e);
sendErrorResponse(endpoint, productKey, deviceName, message.getRequestId(), method, "注册处理异常");
}
}
// TODO @AI解析可以简化参考别的 tcp 对应的
/**
* 解析注册参数
*
* @param params 参数对象(通常为 Map 类型)
* @return 注册参数 DTO解析失败时返回 null
*/
@SuppressWarnings("unchecked")
private IotDeviceRegisterReqDTO parseRegisterParams(Object params) {
if (params == null) {
return null;
}
try {
// 参数默认为 Map 类型,直接转换
if (params instanceof Map) {
Map<String, Object> paramMap = (Map<String, Object>) params;
return new IotDeviceRegisterReqDTO()
.setProductKey(MapUtil.getStr(paramMap, "productKey"))
.setDeviceName(MapUtil.getStr(paramMap, "deviceName"))
.setProductSecret(MapUtil.getStr(paramMap, "productSecret"));
}
// 如果已经是目标类型,直接返回
if (params instanceof IotDeviceRegisterReqDTO) {
return (IotDeviceRegisterReqDTO) params;
}
// 其他情况尝试 JSON 转换
return JsonUtils.convertObject(params, IotDeviceRegisterReqDTO.class);
} catch (Exception e) {
log.error("[parseRegisterParams][解析注册参数({})失败]", params, e);
return null;
}
}
}

View File

@@ -0,0 +1,81 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.mqtt.MqttEndpoint;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 MQTT 上行消息处理器
* <p>
* 处理业务消息(属性上报、事件上报等)
*
* @author 芋道源码
*/
@Slf4j
public class IotMqttUpstreamHandler extends IotMqttAbstractHandler {
private final String serverId;
public IotMqttUpstreamHandler(IotMqttConnectionManager connectionManager,
IotDeviceMessageService deviceMessageService,
String serverId) {
super(connectionManager, deviceMessageService);
this.serverId = serverId;
}
/**
* 处理业务消息
*
* @param endpoint MQTT 连接端点
* @param topic 主题
* @param payload 消息内容
*/
public void handleMessage(MqttEndpoint endpoint, String topic, byte[] payload) {
String clientId = endpoint.clientIdentifier();
// 1. 基础检查
if (payload == null || payload.length == 0) {
return;
}
// 2. 解析主题,获取 productKey 和 deviceName
String[] topicParts = topic.split("/");
if (topicParts.length < 4 || StrUtil.hasBlank(topicParts[2], topicParts[3])) {
log.warn("[handleMessage][topic({}) 格式不正确,无法解析有效的 productKey 和 deviceName]", topic);
return;
}
// 3. 解码消息(使用从 topic 解析的 productKey 和 deviceName
String productKey = topicParts[2];
String deviceName = topicParts[3];
try {
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName);
if (message == null) {
log.warn("[handleMessage][消息解码失败,客户端 ID: {},主题: {}]", clientId, topic);
return;
}
// 4. 处理业务消息(认证已在连接时完成)
log.info("[handleMessage][收到设备消息,设备: {}.{}, 方法: {}]",
productKey, deviceName, message.getMethod());
handleBusinessRequest(message, productKey, deviceName);
} catch (Exception e) {
// TODO @AI各种情况下的翻译
log.error("[handleMessage][消息处理异常,客户端 ID: {},主题: {},错误: {}]",
clientId, topic, e.getMessage(), e);
}
}
/**
* 处理业务请求
*/
private void handleBusinessRequest(IotDeviceMessage message, String productKey, String deviceName) {
// 发送消息到消息总线
message.setServerId(serverId);
deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId);
}
}

View File

@@ -40,6 +40,7 @@ public class IotMqttConnectionManager {
*/
private final Map<Long, MqttEndpoint> deviceEndpointMap = new ConcurrentHashMap<>();
// TODO @AI这里会存在返回 "unknown" 的情况么?是不是必须返回,否则还是异常更合理点?
/**
* 安全获取 endpoint 地址
* <p>
@@ -66,7 +67,6 @@ public class IotMqttConnectionManager {
} catch (Exception ignored) {
// 连接已关闭,忽略异常
}
return realTimeAddress;
}
@@ -129,7 +129,6 @@ public class IotMqttConnectionManager {
if (endpoint == null) {
return null;
}
// 获取连接信息
return getConnectionInfo(endpoint);
}
@@ -208,6 +207,7 @@ public class IotMqttConnectionManager {
*/
private String clientId;
// TODO @AI是不是要去掉感觉没用啊
/**
* 是否已认证
*/

View File

@@ -1,511 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceGetReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttTopicSubscription;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
/**
* MQTT 上行消息处理器
*
* @author 芋道源码
*/
@Slf4j
public class IotMqttUpstreamHandler {
/**
* 默认编解码类型MQTT 使用 Alink 协议)
*/
private static final String DEFAULT_CODEC_TYPE = "Alink";
/**
* register 请求的 topic 后缀
*/
private static final String REGISTER_TOPIC_SUFFIX = "/thing/auth/register";
private final IotDeviceMessageService deviceMessageService;
private final IotMqttConnectionManager connectionManager;
private final IotDeviceCommonApi deviceApi;
private final String serverId;
public IotMqttUpstreamHandler(IotMqttUpstreamProtocol protocol,
IotDeviceMessageService deviceMessageService,
IotMqttConnectionManager connectionManager) {
this.deviceMessageService = deviceMessageService;
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
this.connectionManager = connectionManager;
this.serverId = protocol.getServerId();
}
/**
* 处理 MQTT 连接
*
* @param endpoint MQTT 连接端点
*/
public void handle(MqttEndpoint endpoint) {
String clientId = endpoint.clientIdentifier();
String username = endpoint.auth() != null ? endpoint.auth().getUsername() : null;
String password = endpoint.auth() != null ? endpoint.auth().getPassword() : null;
log.debug("[handle][设备连接请求,客户端 ID: {},用户名: {},地址: {}]",
clientId, username, connectionManager.getEndpointAddress(endpoint));
// 1. 先进行认证
if (!authenticateDevice(clientId, username, password, endpoint)) {
log.warn("[handle][设备认证失败,拒绝连接,客户端 ID: {},用户名: {}]", clientId, username);
endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
return;
}
log.info("[handle][设备认证成功,建立连接,客户端 ID: {},用户名: {}]", clientId, username);
// 2. 设置心跳处理器(监听客户端的 PINGREQ 消息)
endpoint.pingHandler(v -> {
log.debug("[handle][收到客户端心跳,客户端 ID: {}]", clientId);
// Vert.x 会自动发送 PINGRESP 响应,无需手动处理
});
// 3. 设置异常和关闭处理器
endpoint.exceptionHandler(ex -> {
log.warn("[handle][连接异常,客户端 ID: {},地址: {}]", clientId, connectionManager.getEndpointAddress(endpoint));
cleanupConnection(endpoint);
});
endpoint.closeHandler(v -> {
cleanupConnection(endpoint);
});
// 4. 设置消息处理器
endpoint.publishHandler(mqttMessage -> {
try {
// 4.1 根据 topic 判断是否为 register 请求
String topic = mqttMessage.topicName();
byte[] payload = mqttMessage.payload().getBytes();
if (topic.endsWith(REGISTER_TOPIC_SUFFIX)) {
// register 请求:使用默认编解码器处理(设备可能未注册)
processRegisterMessage(clientId, topic, payload, endpoint);
} else {
// 业务请求:正常处理
processMessage(clientId, topic, payload);
}
// 4.2 根据 QoS 级别发送相应的确认消息
if (mqttMessage.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
// QoS 1: 发送 PUBACK 确认
endpoint.publishAcknowledge(mqttMessage.messageId());
} else if (mqttMessage.qosLevel() == MqttQoS.EXACTLY_ONCE) {
// QoS 2: 发送 PUBREC 确认
endpoint.publishReceived(mqttMessage.messageId());
}
// QoS 0 无需确认
} catch (Exception e) {
log.error("[handle][消息解码失败,断开连接,客户端 ID: {},地址: {},错误: {}]",
clientId, connectionManager.getEndpointAddress(endpoint), e.getMessage());
cleanupConnection(endpoint);
endpoint.close();
}
});
// 5. 设置订阅处理器
endpoint.subscribeHandler(subscribe -> {
// 提取主题名称列表用于日志显示
List<String> topicNames = subscribe.topicSubscriptions().stream()
.map(MqttTopicSubscription::topicName)
.collect(java.util.stream.Collectors.toList());
log.debug("[handle][设备订阅,客户端 ID: {},主题: {}]", clientId, topicNames);
// 提取 QoS 列表
List<MqttQoS> grantedQoSLevels = subscribe.topicSubscriptions().stream()
.map(MqttTopicSubscription::qualityOfService)
.collect(java.util.stream.Collectors.toList());
endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQoSLevels);
});
// 6. 设置取消订阅处理器
endpoint.unsubscribeHandler(unsubscribe -> {
log.debug("[handle][设备取消订阅,客户端 ID: {},主题: {}]", clientId, unsubscribe.topics());
endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
});
// 7. 设置 QoS 2消息的 PUBREL 处理器
endpoint.publishReleaseHandler(endpoint::publishComplete);
// 8. 设置断开连接处理器
endpoint.disconnectHandler(v -> {
log.debug("[handle][设备断开连接,客户端 ID: {}]", clientId);
cleanupConnection(endpoint);
});
// 9. 接受连接
endpoint.accept(false);
}
/**
* 处理消息
*
* @param clientId 客户端 ID
* @param topic 主题
* @param payload 消息内容
*/
private void processMessage(String clientId, String topic, byte[] payload) {
// 1. 基础检查
if (payload == null || payload.length == 0) {
return;
}
// 2. 解析主题,获取 productKey 和 deviceName
String[] topicParts = topic.split("/");
if (topicParts.length < 4 || StrUtil.hasBlank(topicParts[2], topicParts[3])) {
log.warn("[processMessage][topic({}) 格式不正确,无法解析有效的 productKey 和 deviceName]", topic);
return;
}
// 3. 解码消息(使用从 topic 解析的 productKey 和 deviceName
String productKey = topicParts[2];
String deviceName = topicParts[3];
try {
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName);
if (message == null) {
log.warn("[processMessage][消息解码失败,客户端 ID: {},主题: {}]", clientId, topic);
return;
}
// 4. 处理业务消息(认证已在连接时完成)
log.info("[processMessage][收到设备消息,设备: {}.{}, 方法: {}]",
productKey, deviceName, message.getMethod());
handleBusinessRequest(message, productKey, deviceName);
} catch (Exception e) {
log.error("[processMessage][消息处理异常,客户端 ID: {},主题: {},错误: {}]",
clientId, topic, e.getMessage(), e);
}
}
/**
* 在 MQTT 连接时进行设备认证
*
* @param clientId 客户端 ID
* @param username 用户名
* @param password 密码
* @param endpoint MQTT 连接端点
* @return 认证是否成功
*/
private boolean authenticateDevice(String clientId, String username, String password, MqttEndpoint endpoint) {
try {
// 1. 参数校验
if (StrUtil.hasEmpty(clientId, username, password)) {
log.warn("[authenticateDevice][认证参数不完整,客户端 ID: {},用户名: {}]", clientId, username);
return false;
}
// 2. 构建认证参数
IotDeviceAuthReqDTO authParams = new IotDeviceAuthReqDTO()
.setClientId(clientId)
.setUsername(username)
.setPassword(password);
// 3. 调用设备认证 API
CommonResult<Boolean> authResult = deviceApi.authDevice(authParams);
if (!authResult.isSuccess() || !BooleanUtil.isTrue(authResult.getData())) {
log.warn("[authenticateDevice][设备认证失败,客户端 ID: {},用户名: {},错误: {}]",
clientId, username, authResult.getMsg());
return false;
}
// 4. 获取设备信息
IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(username);
if (deviceInfo == null) {
log.warn("[authenticateDevice][用户名格式不正确,客户端 ID: {},用户名: {}]", clientId, username);
return false;
}
IotDeviceGetReqDTO getReqDTO = new IotDeviceGetReqDTO()
.setProductKey(deviceInfo.getProductKey())
.setDeviceName(deviceInfo.getDeviceName());
CommonResult<IotDeviceRespDTO> deviceResult = deviceApi.getDevice(getReqDTO);
if (!deviceResult.isSuccess() || deviceResult.getData() == null) {
log.warn("[authenticateDevice][获取设备信息失败,客户端 ID: {},用户名: {},错误: {}]",
clientId, username, deviceResult.getMsg());
return false;
}
// 5. 注册连接
IotDeviceRespDTO device = deviceResult.getData();
registerConnection(endpoint, device, clientId);
// 6. 发送设备上线消息
sendOnlineMessage(device);
return true;
} catch (Exception e) {
log.error("[authenticateDevice][设备认证异常,客户端 ID: {},用户名: {}]", clientId, username, e);
return false;
}
}
/**
* 处理 register 消息(设备动态注册,使用默认编解码器)
*
* @param clientId 客户端 ID
* @param topic 主题
* @param payload 消息内容
* @param endpoint MQTT 连接端点
*/
private void processRegisterMessage(String clientId, String topic, byte[] payload, MqttEndpoint endpoint) {
// 1.1 基础检查
if (ArrayUtil.isEmpty(payload)) {
return;
}
// 1.2 解析主题,获取 productKey 和 deviceName
String[] topicParts = topic.split("/");
if (topicParts.length < 4 || StrUtil.hasBlank(topicParts[2], topicParts[3])) {
log.warn("[processRegisterMessage][topic({}) 格式不正确]", topic);
return;
}
String productKey = topicParts[2];
String deviceName = topicParts[3];
// 2. 使用默认编解码器解码消息(设备可能未注册,无法获取 codecType
IotDeviceMessage message;
try {
message = deviceMessageService.decodeDeviceMessage(payload, DEFAULT_CODEC_TYPE);
if (message == null) {
log.warn("[processRegisterMessage][消息解码失败,客户端 ID: {},主题: {}]", clientId, topic);
return;
}
} catch (Exception e) {
log.error("[processRegisterMessage][消息解码异常,客户端 ID: {},主题: {},错误: {}]",
clientId, topic, e.getMessage(), e);
return;
}
// 3. 处理设备动态注册请求
log.info("[processRegisterMessage][收到设备注册消息,设备: {}.{}, 方法: {}]",
productKey, deviceName, message.getMethod());
try {
handleRegisterRequest(message, productKey, deviceName, endpoint);
} catch (Exception e) {
log.error("[processRegisterMessage][消息处理异常,客户端 ID: {},主题: {},错误: {}]",
clientId, topic, e.getMessage(), e);
}
}
/**
* 处理设备动态注册请求(一型一密,不需要 deviceSecret
*
* @param message 消息信息
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param endpoint MQTT 连接端点
* @see <a href="https://help.aliyun.com/zh/iot/user-guide/unique-certificate-per-product-verification">阿里云 - 一型一密</a>
*/
private void handleRegisterRequest(IotDeviceMessage message, String productKey, String deviceName, MqttEndpoint endpoint) {
String clientId = endpoint.clientIdentifier();
try {
// 1. 解析注册参数
IotDeviceRegisterReqDTO params = parseRegisterParams(message.getParams());
if (params == null) {
log.warn("[handleRegisterRequest][注册参数解析失败,客户端 ID: {}]", clientId);
sendRegisterErrorResponse(endpoint, productKey, deviceName, message.getRequestId(), "注册参数不完整");
return;
}
// 2. 调用动态注册 API
CommonResult<IotDeviceRegisterRespDTO> result = deviceApi.registerDevice(params);
if (result.isError()) {
log.warn("[handleRegisterRequest][注册失败,客户端 ID: {},错误: {}]", clientId, result.getMsg());
sendRegisterErrorResponse(endpoint, productKey, deviceName, message.getRequestId(), result.getMsg());
return;
}
// 3. 发送成功响应(包含 deviceSecret
sendRegisterSuccessResponse(endpoint, productKey, deviceName, message.getRequestId(), result.getData());
log.info("[handleRegisterRequest][注册成功,设备名: {},客户端 ID: {}]",
params.getDeviceName(), clientId);
} catch (Exception e) {
log.error("[handleRegisterRequest][注册处理异常,客户端 ID: {}]", clientId, e);
sendRegisterErrorResponse(endpoint, productKey, deviceName, message.getRequestId(), "注册处理异常");
}
}
/**
* 解析注册参数
*
* @param params 参数对象(通常为 Map 类型)
* @return 注册参数 DTO解析失败时返回 null
*/
@SuppressWarnings({"unchecked", "DuplicatedCode"})
private IotDeviceRegisterReqDTO parseRegisterParams(Object params) {
if (params == null) {
return null;
}
try {
// 参数默认为 Map 类型,直接转换
if (params instanceof Map) {
Map<String, Object> paramMap = (Map<String, Object>) params;
return new IotDeviceRegisterReqDTO()
.setProductKey(MapUtil.getStr(paramMap, "productKey"))
.setDeviceName(MapUtil.getStr(paramMap, "deviceName"))
.setProductSecret(MapUtil.getStr(paramMap, "productSecret"));
}
// 如果已经是目标类型,直接返回
if (params instanceof IotDeviceRegisterReqDTO) {
return (IotDeviceRegisterReqDTO) params;
}
// 其他情况尝试 JSON 转换
return JsonUtils.convertObject(params, IotDeviceRegisterReqDTO.class);
} catch (Exception e) {
log.error("[parseRegisterParams][解析注册参数({})失败]", params, e);
return null;
}
}
/**
* 发送注册成功响应(包含 deviceSecret
*
* @param endpoint MQTT 连接端点
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param requestId 请求 ID
* @param registerResp 注册响应
*/
private void sendRegisterSuccessResponse(MqttEndpoint endpoint, String productKey, String deviceName,
String requestId, IotDeviceRegisterRespDTO registerResp) {
try {
// 1. 构建响应消息(参考 HTTP 返回格式,直接返回 IotDeviceRegisterRespDTO
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId,
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerResp, 0, null);
// 2. 编码消息(使用默认编解码器,因为设备可能还未注册)
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, DEFAULT_CODEC_TYPE);
// 3. 构建响应主题并发送(格式:/sys/{productKey}/{deviceName}/thing/auth/register_reply
String replyTopic = IotMqttTopicUtils.buildTopicByMethod(
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), productKey, deviceName, true);
endpoint.publish(replyTopic, io.vertx.core.buffer.Buffer.buffer(encodedData),
MqttQoS.AT_LEAST_ONCE, false, false);
log.debug("[sendRegisterSuccessResponse][发送注册成功响应,主题: {}]", replyTopic);
} catch (Exception e) {
log.error("[sendRegisterSuccessResponse][发送注册成功响应异常,客户端 ID: {}]",
endpoint.clientIdentifier(), e);
}
}
/**
* 发送注册错误响应
*
* @param endpoint MQTT 连接端点
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param requestId 请求 ID
* @param errorMessage 错误消息
*/
private void sendRegisterErrorResponse(MqttEndpoint endpoint, String productKey, String deviceName,
String requestId, String errorMessage) {
try {
// 1. 构建响应消息
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId,
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), null, 400, errorMessage);
// 2. 编码消息(使用默认编解码器,因为设备可能还未注册)
byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, DEFAULT_CODEC_TYPE);
// 3. 构建响应主题并发送(格式:/sys/{productKey}/{deviceName}/thing/auth/register_reply
String replyTopic = IotMqttTopicUtils.buildTopicByMethod(
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), productKey, deviceName, true);
endpoint.publish(replyTopic, io.vertx.core.buffer.Buffer.buffer(encodedData),
MqttQoS.AT_LEAST_ONCE, false, false);
log.debug("[sendRegisterErrorResponse][发送注册错误响应,主题: {}]", replyTopic);
} catch (Exception e) {
log.error("[sendRegisterErrorResponse][发送注册错误响应异常,客户端 ID: {}]",
endpoint.clientIdentifier(), e);
}
}
/**
* 处理业务请求
*/
private void handleBusinessRequest(IotDeviceMessage message, String productKey, String deviceName) {
// 发送消息到消息总线
message.setServerId(serverId);
deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId);
}
/**
* 注册连接
*/
private void registerConnection(MqttEndpoint endpoint, IotDeviceRespDTO device, String clientId) {
IotMqttConnectionManager.ConnectionInfo connectionInfo = new IotMqttConnectionManager.ConnectionInfo()
.setDeviceId(device.getId())
.setProductKey(device.getProductKey())
.setDeviceName(device.getDeviceName())
.setClientId(clientId)
.setAuthenticated(true)
.setRemoteAddress(connectionManager.getEndpointAddress(endpoint));
connectionManager.registerConnection(endpoint, device.getId(), connectionInfo);
}
/**
* 发送设备上线消息
*/
private void sendOnlineMessage(IotDeviceRespDTO device) {
try {
IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
deviceMessageService.sendDeviceMessage(onlineMessage, device.getProductKey(),
device.getDeviceName(), serverId);
log.info("[sendOnlineMessage][设备上线,设备 ID: {},设备名称: {}]", device.getId(), device.getDeviceName());
} catch (Exception e) {
log.error("[sendOnlineMessage][发送设备上线消息失败,设备 ID: {},错误: {}]", device.getId(), e.getMessage());
}
}
/**
* 清理连接
*/
private void cleanupConnection(MqttEndpoint endpoint) {
try {
IotMqttConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(endpoint);
if (connectionInfo != null) {
// 发送设备离线消息
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
deviceMessageService.sendDeviceMessage(offlineMessage, connectionInfo.getProductKey(),
connectionInfo.getDeviceName(), serverId);
log.info("[cleanupConnection][设备离线,设备 ID: {},设备名称: {}]", connectionInfo.getDeviceId(), connectionInfo.getDeviceName());
}
// 注销连接
connectionManager.unregisterConnection(endpoint);
} catch (Exception e) {
log.error("[cleanupConnection][清理连接失败,客户端 ID: {},错误: {}]", endpoint.clientIdentifier(), e.getMessage());
}
}
}

View File

@@ -45,8 +45,8 @@ public class IotTcpDownstreamHandler {
}
// 2. 序列化 + 帧编码
byte[] serializedData = serializer.serialize(message);
Buffer frameData = codec.encode(serializedData);
byte[] payload = serializer.serialize(message);
Buffer frameData = codec.encode(payload);
// 3. 发送到设备
boolean success = connectionManager.sendToDevice(message.getDeviceId(), frameData.getBytes());

View File

@@ -57,9 +57,9 @@ yudao:
# 针对引入的 TCP 组件的配置
# ====================================
- id: tcp-json
enabled: false
type: tcp
port: 8091
enabled: false
serialize: json
tcp:
max-connections: 1000
@@ -79,9 +79,9 @@ yudao:
# 针对引入的 UDP 组件的配置
# ====================================
- id: udp-json
enabled: false
type: udp
port: 8093
enabled: false
serialize: json
udp:
max-sessions: 1000 # 最大会话数
@@ -92,9 +92,9 @@ yudao:
# 针对引入的 WebSocket 组件的配置
# ====================================
- id: websocket-json
enabled: true
type: websocket
port: 8094
enabled: true
serialize: json
websocket:
path: /ws
@@ -106,13 +106,26 @@ yudao:
# 针对引入的 CoAP 组件的配置
# ====================================
- id: coap-json
enabled: false
type: coap
port: 5683
enabled: true
coap:
max-message-size: 1024 # 最大消息大小(字节)
ack-timeout-ms: 2000 # ACK 超时时间(毫秒)
max-retransmit: 4 # 最大重传次数
# ====================================
# 针对引入的 MQTT 组件的配置
# ====================================
- id: mqtt-json
enabled: true
type: mqtt
port: 1883
serialize: json
mqtt:
max-message-size: 8192 # 最大消息大小(字节)
connect-timeout-seconds: 60 # 连接超时时间(秒)
keep-alive-timeout-seconds: 300 # 保持连接超时时间(秒)
ssl-enabled: false # 是否启用 SSL
# 协议配置(旧版,保持兼容)
protocol:
@@ -150,15 +163,6 @@ yudao:
key-store-password: "your-keystore-password" # 客户端证书库密码
trust-store-path: "classpath:certs/trust.jks" # 信任的 CA 证书库路径
trust-store-password: "your-truststore-password" # 信任的 CA 证书库密码
# ====================================
# 针对引入的 MQTT 组件的配置
# ====================================
mqtt:
enabled: false
port: 1883
max-message-size: 8192
connect-timeout-seconds: 60
ssl-enabled: false
--- #################### 日志相关配置 ####################