diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotHttpProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotHttpProtocol.java index f3a3c0d14d..164c06f3e9 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotHttpProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/IotHttpProtocol.java @@ -125,7 +125,11 @@ public class IotHttpProtocol implements IotProtocol { this.downstreamSubscriber.start(); } catch (Exception e) { log.error("[start][IoT HTTP 协议 {} 启动失败]", getId(), e); - // 启动失败时关闭 Vertx + // 启动失败时关闭资源 + if (httpServer != null) { + httpServer.close(); + httpServer = null; + } if (vertx != null) { vertx.close(); vertx = null; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttConfig.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttConfig.java index 48060d7285..416dcced66 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttConfig.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttConfig.java @@ -26,13 +26,6 @@ public class IotMqttConfig { @Min(value = 1, message = "连接超时时间不能小于 1 秒") private Integer connectTimeoutSeconds = 60; - /** - * 保持连接超时时间(秒) - */ - @NotNull(message = "保持连接超时时间不能为空") - @Min(value = 1, message = "保持连接超时时间不能小于 1 秒") - private Integer keepAliveTimeoutSeconds = 300; - /** * 是否启用 SSL */ diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttProtocol.java index 7a77f0bf32..85d21853ef 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotMqttProtocol.java @@ -1,13 +1,15 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt; +import cn.hutool.core.lang.Assert; +import cn.hutool.core.util.StrUtil; import cn.hutool.extra.spring.SpringUtil; import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolInstanceProperties; import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol; -import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.downstream.IotMqttDownstreamHandler; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.downstream.IotMqttDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream.IotMqttAuthHandler; @@ -15,6 +17,7 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream.IotMqt import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream.IotMqttUpstreamHandler; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import cn.iocoder.yudao.module.iot.gateway.util.IotMqttTopicUtils; import io.netty.handler.codec.mqtt.MqttConnectReturnCode; import io.netty.handler.codec.mqtt.MqttQoS; import io.vertx.core.Vertx; @@ -26,13 +29,10 @@ import io.vertx.mqtt.MqttTopicSubscription; import io.vertx.mqtt.messages.MqttPublishMessage; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import cn.hutool.core.lang.Assert; -import cn.hutool.core.util.StrUtil; +import java.util.ArrayList; import java.util.List; -import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertList; - /** * IoT 网关 MQTT 协议:接收设备上行消息 * @@ -249,11 +249,22 @@ public class IotMqttProtocol implements IotProtocol { // 3.2 设置 QoS 2 消息的 PUBREL 处理器 endpoint.publishReleaseHandler(endpoint::publishComplete); - // 4.1 设置订阅处理器 + // 4.1 设置订阅处理器(带 ACL 校验) endpoint.subscribeHandler(subscribe -> { - List topicNames = convertList(subscribe.topicSubscriptions(), MqttTopicSubscription::topicName); - log.debug("[handleEndpoint][设备订阅,客户端 ID: {},主题: {}]", clientId, topicNames); - List grantedQoSLevels = convertList(subscribe.topicSubscriptions(), MqttTopicSubscription::qualityOfService); + IotMqttConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(endpoint); + List grantedQoSLevels = new ArrayList<>(); + for (MqttTopicSubscription sub : subscribe.topicSubscriptions()) { + String topicName = sub.topicName(); + // 校验主题是否属于当前设备 + if (connectionInfo != null && IotMqttTopicUtils.isTopicSubscribeAllowed( + topicName, connectionInfo.getProductKey(), connectionInfo.getDeviceName())) { + grantedQoSLevels.add(sub.qualityOfService()); + log.debug("[handleEndpoint][订阅成功,客户端 ID: {},主题: {}]", clientId, topicName); + } else { + log.warn("[handleEndpoint][订阅被拒绝,客户端 ID: {},主题: {}]", clientId, topicName); + grantedQoSLevels.add(MqttQoS.FAILURE); + } + } endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQoSLevels); }); // 4.2 设置取消订阅处理器 diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttAbstractHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttAbstractHandler.java index 4acb037a34..12445cb85b 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttAbstractHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttAbstractHandler.java @@ -30,7 +30,6 @@ public abstract class IotMqttAbstractHandler { protected final IotMqttConnectionManager connectionManager; protected final IotDeviceMessageService deviceMessageService; - // done @AI:基于 method 通过 IotMqttTopicUtils.buildTopicByMethod 计算 reply topic /** * 发送成功响应到设备 * @@ -43,20 +42,8 @@ public abstract class IotMqttAbstractHandler { */ protected void sendSuccessResponse(MqttEndpoint endpoint, String productKey, String deviceName, String requestId, String method, Object data) { - try { - // 1. 构建响应消息 - IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, method, data, 0, null); - - // 2. 编码消息(使用默认编解码器) - byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, DEFAULT_CODEC_TYPE); - - // 3. 构建响应主题并发送 - String replyTopic = IotMqttTopicUtils.buildTopicByMethod(method, productKey, deviceName, true); - endpoint.publish(replyTopic, Buffer.buffer(encodedData), MqttQoS.AT_LEAST_ONCE, false, false); - log.debug("[sendSuccessResponse][发送成功响应,主题: {}]", replyTopic); - } catch (Exception e) { - log.error("[sendSuccessResponse][发送成功响应异常,客户端 ID: {}]", endpoint.clientIdentifier(), e); - } + IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, method, data, 0, null); + writeResponse(endpoint, productKey, deviceName, method, responseMessage); } /** @@ -72,22 +59,32 @@ public abstract class IotMqttAbstractHandler { */ protected void sendErrorResponse(MqttEndpoint endpoint, String productKey, String deviceName, String requestId, String method, Integer errorCode, String errorMessage) { - try { - // 1. 构建响应消息 - IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, method, null, errorCode, errorMessage); + IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, method, null, errorCode, errorMessage); + writeResponse(endpoint, productKey, deviceName, method, responseMessage); + } - // 2. 编码消息(使用默认编解码器) + /** + * 写入响应消息到设备 + * + * @param endpoint MQTT 连接端点 + * @param productKey 产品 Key + * @param deviceName 设备名称 + * @param method 方法名 + * @param responseMessage 响应消息 + */ + private void writeResponse(MqttEndpoint endpoint, String productKey, String deviceName, + String method, IotDeviceMessage responseMessage) { + try { + // 1. 编码消息(使用默认编解码器) byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, DEFAULT_CODEC_TYPE); - // 3. 构建响应主题并发送 + // 2. 构建响应主题,并发送 String replyTopic = IotMqttTopicUtils.buildTopicByMethod(method, productKey, deviceName, true); endpoint.publish(replyTopic, Buffer.buffer(encodedData), MqttQoS.AT_LEAST_ONCE, false, false); - log.debug("[sendErrorResponse][发送错误响应,主题: {}]", replyTopic); + log.debug("[writeResponse][发送响应,主题: {},code: {}]", replyTopic, responseMessage.getCode()); } catch (Exception e) { - log.error("[sendErrorResponse][发送错误响应异常,客户端 ID: {}]", endpoint.clientIdentifier(), e); + log.error("[writeResponse][发送响应异常,客户端 ID: {}]", endpoint.clientIdentifier(), e); } } - // TODO @AI:当前 sendSuccessResponse/sendErrorResponse 已足够清晰,暂不抽取 writeResponse(必须抽取!!!) - } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttAuthHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttAuthHandler.java index b2155a3a66..49228761d4 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttAuthHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttAuthHandler.java @@ -103,7 +103,7 @@ public class IotMqttAuthHandler extends IotMqttAbstractHandler { .setProductKey(device.getProductKey()) .setDeviceName(device.getDeviceName()) .setRemoteAddress(connectionManager.getEndpointAddress(endpoint)); - connectionManager.registerConnection(endpoint, device.getId(), connectionInfo); + connectionManager.registerConnection(endpoint, connectionInfo); } /** diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttRegisterHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttRegisterHandler.java index 77fda8ea0a..ec1dce2061 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttRegisterHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttRegisterHandler.java @@ -77,6 +77,9 @@ public class IotMqttRegisterHandler extends IotMqttAbstractHandler { // 接受连接,并发送错误响应 endpoint.accept(false); sendErrorResponse(endpoint, productKey, deviceName, null, method, 500, e.getMessage()); + } finally { + // 注册完成后关闭连接(一型一密只用于获取 deviceSecret,不保持连接) + endpoint.close(); } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttUpstreamHandler.java index 4f6836e8ba..4014ccdf03 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttUpstreamHandler.java @@ -2,7 +2,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.upstream; import cn.hutool.core.lang.Assert; import cn.hutool.core.util.ArrayUtil; -import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.framework.common.exception.ServiceException; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager; @@ -50,12 +49,10 @@ public class IotMqttUpstreamHandler extends IotMqttAbstractHandler { } // 1.2 解析主题,获取 productKey 和 deviceName String[] topicParts = topic.split("/"); - if (topicParts.length < 4 || StrUtil.hasBlank(topicParts[2], topicParts[3])) { - log.warn("[handleBusinessRequest][topic({}) 格式不正确,无法解析有效的 productKey 和 deviceName]", topic); - return; - } - productKey = topicParts[2]; - deviceName = topicParts[3]; + productKey = ArrayUtil.get(topicParts, 2); + deviceName = ArrayUtil.get(topicParts, 3); + Assert.notBlank(productKey, "产品 Key 不能为空"); + Assert.notBlank(deviceName, "设备名称不能为空"); // 1.3 校验设备信息,防止伪造设备消息 IotMqttConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(endpoint); Assert.notNull(connectionInfo, "无法获取连接信息"); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java index 6ebc123054..ccb9fa5a60 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/manager/IotMqttConnectionManager.java @@ -73,25 +73,24 @@ public class IotMqttConnectionManager { * 注册设备连接(包含认证信息) * * @param endpoint MQTT 连接端点 - * @param deviceId 设备 ID * @param connectionInfo 连接信息 */ - // TODO @AI:移除掉 deviceId ???参考别的 tcp 等模块协议 - public void registerConnection(MqttEndpoint endpoint, Long deviceId, ConnectionInfo connectionInfo) { + public void registerConnection(MqttEndpoint endpoint, ConnectionInfo connectionInfo) { + Long deviceId = connectionInfo.getDeviceId(); // 如果设备已有其他连接,先清理旧连接 MqttEndpoint oldEndpoint = deviceEndpointMap.get(deviceId); if (oldEndpoint != null && oldEndpoint != endpoint) { log.info("[registerConnection][设备已有其他连接,断开旧连接,设备 ID: {},旧连接: {}]", deviceId, getEndpointAddress(oldEndpoint)); - oldEndpoint.close(); - // 清理旧连接的映射 + // 先清理映射,再关闭连接(避免旧连接处理器干扰) connectionMap.remove(oldEndpoint); + oldEndpoint.close(); } // 注册新连接 connectionMap.put(endpoint, connectionInfo); deviceEndpointMap.put(deviceId, endpoint); - log.info("[registerConnection][注册设备连接,设备 ID: {},连接: {},product key: {},device name: {}]", + log.info("[registerConnection][注册设备连接,设备 ID: {},连接: {},productKey: {},deviceName: {}]", deviceId, getEndpointAddress(endpoint), connectionInfo.getProductKey(), connectionInfo.getDeviceName()); } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpConnectionManager.java index 36c5928762..b7b72a370b 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpConnectionManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/manager/IotTcpConnectionManager.java @@ -47,8 +47,8 @@ public class IotTcpConnectionManager { * @param deviceId 设备 ID * @param connectionInfo 连接信息 */ - public void registerConnection(NetSocket socket, Long deviceId, ConnectionInfo connectionInfo) { - // 检查连接数是否已达上限 + public synchronized void registerConnection(NetSocket socket, Long deviceId, ConnectionInfo connectionInfo) { + // 检查连接数是否已达上限(同步方法确保检查和注册的原子性) if (connectionMap.size() >= maxConnections) { throw new IllegalStateException("连接数已达上限: " + maxConnections); } @@ -57,9 +57,9 @@ public class IotTcpConnectionManager { if (oldSocket != null && oldSocket != socket) { log.info("[registerConnection][设备已有其他连接,断开旧连接,设备 ID: {},旧连接: {}]", deviceId, oldSocket.remoteAddress()); - oldSocket.close(); - // 清理旧连接的映射 + // 先清理映射,再关闭连接 connectionMap.remove(oldSocket); + oldSocket.close(); } // 注册新连接 diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/manager/IotUdpSessionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/manager/IotUdpSessionManager.java index 8195c99961..d807bce756 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/manager/IotUdpSessionManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/manager/IotUdpSessionManager.java @@ -9,6 +9,7 @@ import lombok.Data; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; /** @@ -47,8 +48,8 @@ public class IotUdpSessionManager { * @param deviceId 设备 ID * @param sessionInfo 会话信息 */ - public void registerSession(Long deviceId, SessionInfo sessionInfo) { - // 检查是否为新设备,且会话数已达上限 + public synchronized void registerSession(Long deviceId, SessionInfo sessionInfo) { + // 检查是否为新设备,且会话数已达上限(同步方法确保检查和注册的原子性) if (deviceSessionCache.getIfPresent(deviceId) == null && deviceSessionCache.size() >= maxSessions) { throw new IllegalStateException("会话数已达上限: " + maxSessions); @@ -113,16 +114,21 @@ public class IotUdpSessionManager { } InetSocketAddress address = sessionInfo.getAddress(); try { + // 使用 CompletableFuture 同步等待发送结果 + CompletableFuture future = new CompletableFuture<>(); socket.send(Buffer.buffer(data), address.getPort(), address.getHostString(), result -> { if (result.succeeded()) { log.debug("[sendToDevice][发送消息成功,设备 ID: {},地址: {},数据长度: {} 字节]", deviceId, buildAddressKey(address), data.length); - return; + future.complete(true); + } else { + log.error("[sendToDevice][发送消息失败,设备 ID: {},地址: {}]", + deviceId, buildAddressKey(address), result.cause()); + future.complete(false); } - log.error("[sendToDevice][发送消息失败,设备 ID: {},地址: {}]", - deviceId, buildAddressKey(address), result.cause()); }); - return true; + // 同步等待结果,超时 5 秒 + return future.get(5, TimeUnit.SECONDS); } catch (Exception e) { log.error("[sendToDevice][发送消息异常,设备 ID: {}]", deviceId, e); return false; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java index 7f72937efb..2a842966fe 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/util/IotMqttTopicUtils.java @@ -63,4 +63,26 @@ public final class IotMqttTopicUtils { return SYS_TOPIC_PREFIX + productKey + "/" + deviceName + "/" + topicSuffix; } + /** + * 校验主题是否允许订阅 + *

+ * 规则:主题必须以 /sys/{productKey}/{deviceName}/ 开头, + * 或者是通配符形式 /sys/{productKey}/{deviceName}/# + * + * @param topic 订阅的主题 + * @param productKey 产品 Key + * @param deviceName 设备名称 + * @return 是否允许订阅 + */ + public static boolean isTopicSubscribeAllowed(String topic, String productKey, String deviceName) { + if (!StrUtil.isAllNotBlank(topic, productKey, deviceName)) { + return false; + } + // 构建设备主题前缀 + String deviceTopicPrefix = SYS_TOPIC_PREFIX + productKey + "/" + deviceName + "/"; + // 主题必须以设备前缀开头,或者是设备前缀的通配符形式 + return topic.startsWith(deviceTopicPrefix) + || topic.equals(SYS_TOPIC_PREFIX + productKey + "/" + deviceName + "/#"); + } + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml index 671a103fc8..a133d4ffd0 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml @@ -124,7 +124,6 @@ yudao: mqtt: max-message-size: 8192 # 最大消息大小(字节) connect-timeout-seconds: 60 # 连接超时时间(秒) - keep-alive-timeout-seconds: 300 # 保持连接超时时间(秒) ssl-enabled: false # 是否启用 SSL # 协议配置(旧版,保持兼容)