diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/AbstractIotProtocolDownstreamSubscriber.java similarity index 95% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolDownstreamSubscriber.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/AbstractIotProtocolDownstreamSubscriber.java index 2e2150f6f7..efd61e13a2 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/AbstractIotProtocolDownstreamSubscriber.java @@ -17,7 +17,7 @@ import lombok.extern.slf4j.Slf4j; */ @AllArgsConstructor @Slf4j -public abstract class IotProtocolDownstreamSubscriber implements IotMessageSubscriber { +public abstract class AbstractIotProtocolDownstreamSubscriber implements IotMessageSubscriber { private final IotProtocol protocol; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/downstream/IotCoapDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/downstream/IotCoapDownstreamSubscriber.java index 188d2e6428..3309d2cd49 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/downstream/IotCoapDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/downstream/IotCoapDownstreamSubscriber.java @@ -2,7 +2,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.downstream; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.AbstractIotProtocolDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapProtocol; import lombok.extern.slf4j.Slf4j; @@ -12,7 +12,7 @@ import lombok.extern.slf4j.Slf4j; * @author 芋道源码 */ @Slf4j -public class IotCoapDownstreamSubscriber extends IotProtocolDownstreamSubscriber { +public class IotCoapDownstreamSubscriber extends AbstractIotProtocolDownstreamSubscriber { public IotCoapDownstreamSubscriber(IotCoapProtocol protocol, IotMessageBus messageBus) { super(protocol, messageBus); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/downstream/IotEmqxDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/downstream/IotEmqxDownstreamSubscriber.java index 55aaaac69c..e7e5de98db 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/downstream/IotEmqxDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/downstream/IotEmqxDownstreamSubscriber.java @@ -2,7 +2,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.emqx.handler.downstream; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.AbstractIotProtocolDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxProtocol; import lombok.extern.slf4j.Slf4j; @@ -12,7 +12,7 @@ import lombok.extern.slf4j.Slf4j; * @author 芋道源码 */ @Slf4j -public class IotEmqxDownstreamSubscriber extends IotProtocolDownstreamSubscriber { +public class IotEmqxDownstreamSubscriber extends AbstractIotProtocolDownstreamSubscriber { private final IotEmqxDownstreamHandler downstreamHandler; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/handler/downstream/IotHttpDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/handler/downstream/IotHttpDownstreamSubscriber.java index bfac16ca5e..fe94fe6172 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/handler/downstream/IotHttpDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/handler/downstream/IotHttpDownstreamSubscriber.java @@ -3,7 +3,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.downstream; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.AbstractIotProtocolDownstreamSubscriber; import lombok.extern.slf4j.Slf4j; /** @@ -13,7 +13,7 @@ import lombok.extern.slf4j.Slf4j; */ @Slf4j -public class IotHttpDownstreamSubscriber extends IotProtocolDownstreamSubscriber { +public class IotHttpDownstreamSubscriber extends AbstractIotProtocolDownstreamSubscriber { public IotHttpDownstreamSubscriber(IotProtocol protocol, IotMessageBus messageBus) { super(protocol, messageBus); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/manager/AbstractIotModbusPollScheduler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/manager/AbstractIotModbusPollScheduler.java new file mode 100644 index 0000000000..dedac7acd3 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/manager/AbstractIotModbusPollScheduler.java @@ -0,0 +1,269 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.manager; + +import cn.hutool.core.collection.CollUtil; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO; +import io.vertx.core.Vertx; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet; + +/** + * Modbus 轮询调度器基类 + *

+ * 封装通用的定时器管理、per-device 请求队列限速逻辑。 + * 子类只需实现 {@link #pollPoint(Long, Long)} 定义具体的轮询动作。 + *

+ * + * @author 芋道源码 + */ +@Slf4j +public abstract class AbstractIotModbusPollScheduler { + + protected final Vertx vertx; + + /** + * 同设备最小请求间隔(毫秒),防止 Modbus 设备性能不足时请求堆积 + */ + private static final long MIN_REQUEST_INTERVAL = 1000; + + /** + * 设备点位的定时器映射:deviceId -> (pointId -> PointTimerInfo) + */ + private final Map> devicePointTimers = new ConcurrentHashMap<>(); + + /** + * per-device 请求队列:deviceId -> 待执行请求队列 + */ + private final Map> deviceRequestQueues = new ConcurrentHashMap<>(); + /** + * per-device 上次请求时间戳:deviceId -> lastRequestTimeMs + */ + private final Map deviceLastRequestTime = new ConcurrentHashMap<>(); + /** + * per-device 延迟 timer 标记:deviceId -> 是否有延迟 timer 在等待 + */ + private final Map deviceDelayTimerActive = new ConcurrentHashMap<>(); + + protected AbstractIotModbusPollScheduler(Vertx vertx) { + this.vertx = vertx; + } + + /** + * 点位定时器信息 + */ + @Data + @AllArgsConstructor + private static class PointTimerInfo { + + /** + * Vert.x 定时器 ID + */ + private Long timerId; + /** + * 轮询间隔(用于判断是否需要更新定时器) + */ + private Integer pollInterval; + + } + + // ========== 轮询管理 ========== + + /** + * 更新轮询任务(增量更新) + * + * 1. 【删除】点位:停止对应的轮询定时器 + * 2. 【新增】点位:创建对应的轮询定时器 + * 3. 【修改】点位:pollInterval 变化,重建对应的轮询定时器 + * 【修改】其他属性变化:不需要重建定时器(pollPoint 运行时从 configCache 取最新 point) + */ + public void updatePolling(IotModbusDeviceConfigRespDTO config) { + Long deviceId = config.getDeviceId(); + List newPoints = config.getPoints(); + Map currentTimers = devicePointTimers + .computeIfAbsent(deviceId, k -> new ConcurrentHashMap<>()); + // 1.1 计算新配置中的点位 ID 集合 + Set newPointIds = convertSet(newPoints, IotModbusPointRespDTO::getId); + // 1.2 计算删除的点位 ID 集合 + Set removedPointIds = new HashSet<>(currentTimers.keySet()); + removedPointIds.removeAll(newPointIds); + + // 2. 处理删除的点位:停止不再存在的定时器 + for (Long pointId : removedPointIds) { + PointTimerInfo timerInfo = currentTimers.remove(pointId); + if (timerInfo != null) { + vertx.cancelTimer(timerInfo.getTimerId()); + log.debug("[updatePolling][设备 {} 点位 {} 定时器已删除]", deviceId, pointId); + } + } + + // 3. 处理新增和修改的点位 + if (CollUtil.isEmpty(newPoints)) { + return; + } + for (IotModbusPointRespDTO point : newPoints) { + Long pointId = point.getId(); + Integer newPollInterval = point.getPollInterval(); + PointTimerInfo existingTimer = currentTimers.get(pointId); + // 3.1 新增点位:创建定时器 + if (existingTimer == null) { + Long timerId = createPollTimer(deviceId, pointId, newPollInterval); + if (timerId != null) { + currentTimers.put(pointId, new PointTimerInfo(timerId, newPollInterval)); + log.debug("[updatePolling][设备 {} 点位 {} 定时器已创建, interval={}ms]", + deviceId, pointId, newPollInterval); + } + } else if (!Objects.equals(existingTimer.getPollInterval(), newPollInterval)) { + // 3.2 pollInterval 变化:重建定时器 + vertx.cancelTimer(existingTimer.getTimerId()); + Long timerId = createPollTimer(deviceId, pointId, newPollInterval); + if (timerId != null) { + currentTimers.put(pointId, new PointTimerInfo(timerId, newPollInterval)); + log.debug("[updatePolling][设备 {} 点位 {} 定时器已更新, interval={}ms -> {}ms]", + deviceId, pointId, existingTimer.getPollInterval(), newPollInterval); + } else { + currentTimers.remove(pointId); + } + } + // 3.3 其他属性变化:无需重建定时器,因为 pollPoint() 运行时从 configCache 获取最新 point,自动使用新配置 + } + } + + /** + * 创建轮询定时器 + */ + private Long createPollTimer(Long deviceId, Long pointId, Integer pollInterval) { + if (pollInterval == null || pollInterval <= 0) { + return null; + } + return vertx.setPeriodic(pollInterval, timerId -> { + try { + submitPollRequest(deviceId, pointId); + } catch (Exception e) { + log.error("[createPollTimer][轮询点位失败, deviceId={}, pointId={}]", deviceId, pointId, e); + } + }); + } + + // ========== 请求队列(per-device 限速) ========== + + /** + * 提交轮询请求到设备请求队列(保证同设备请求间隔) + */ + private void submitPollRequest(Long deviceId, Long pointId) { + // 1. 【重要】将请求添加到设备的请求队列 + Queue queue = deviceRequestQueues.computeIfAbsent(deviceId, k -> new ConcurrentLinkedQueue<>()); + queue.offer(() -> pollPoint(deviceId, pointId)); + + // 2. 处理设备请求队列(如果没有延迟 timer 在等待) + processDeviceQueue(deviceId); + } + + /** + * 处理设备请求队列 + */ + private void processDeviceQueue(Long deviceId) { + Queue queue = deviceRequestQueues.get(deviceId); + if (CollUtil.isEmpty(queue)) { + return; + } + // 检查是否已有延迟 timer 在等待 + if (Boolean.TRUE.equals(deviceDelayTimerActive.get(deviceId))) { + return; + } + + // 不满足间隔要求,延迟执行 + long now = System.currentTimeMillis(); + long lastTime = deviceLastRequestTime.getOrDefault(deviceId, 0L); + long elapsed = now - lastTime; + if (elapsed < MIN_REQUEST_INTERVAL) { + scheduleNextRequest(deviceId, MIN_REQUEST_INTERVAL - elapsed); + return; + } + + // 满足间隔要求,立即执行 + Runnable task = queue.poll(); + if (task == null) { + return; + } + deviceLastRequestTime.put(deviceId, now); + task.run(); + // 继续处理队列中的下一个(如果有的话,需要延迟) + if (CollUtil.isNotEmpty(queue)) { + scheduleNextRequest(deviceId); + } + } + + private void scheduleNextRequest(Long deviceId) { + scheduleNextRequest(deviceId, MIN_REQUEST_INTERVAL); + } + + private void scheduleNextRequest(Long deviceId, long delayMs) { + deviceDelayTimerActive.put(deviceId, true); + vertx.setTimer(delayMs, id -> { + deviceDelayTimerActive.put(deviceId, false); + Queue queue = deviceRequestQueues.get(deviceId); + if (CollUtil.isEmpty(queue)) { + return; + } + + // 满足间隔要求,立即执行 + Runnable task = queue.poll(); + if (task == null) { + return; + } + deviceLastRequestTime.put(deviceId, System.currentTimeMillis()); + task.run(); + // 继续处理队列中的下一个(如果有的话,需要延迟) + if (CollUtil.isNotEmpty(queue)) { + scheduleNextRequest(deviceId); + } + }); + } + + // ========== 轮询执行 ========== + + /** + * 轮询单个点位(子类实现具体的读取逻辑) + * + * @param deviceId 设备 ID + * @param pointId 点位 ID + */ + protected abstract void pollPoint(Long deviceId, Long pointId); + + // ========== 停止 ========== + + /** + * 停止设备的轮询 + */ + public void stopPolling(Long deviceId) { + Map timers = devicePointTimers.remove(deviceId); + if (CollUtil.isEmpty(timers)) { + return; + } + for (PointTimerInfo timerInfo : timers.values()) { + vertx.cancelTimer(timerInfo.getTimerId()); + } + // 清理请求队列 + deviceRequestQueues.remove(deviceId); + deviceLastRequestTime.remove(deviceId); + deviceDelayTimerActive.remove(deviceId); + log.debug("[stopPolling][设备 {} 停止了 {} 个轮询定时器]", deviceId, timers.size()); + } + + /** + * 停止所有轮询 + */ + public void stopAll() { + for (Long deviceId : new ArrayList<>(devicePointTimers.keySet())) { + stopPolling(deviceId); + } + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/IotModbusUtils.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/utils/IotModbusCommonUtils.java similarity index 97% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/IotModbusUtils.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/utils/IotModbusCommonUtils.java index 227a886f8b..236277bc0a 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/IotModbusUtils.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/utils/IotModbusCommonUtils.java @@ -1,4 +1,4 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common; +package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.ArrayUtil; @@ -34,7 +34,7 @@ import java.nio.ByteOrder; */ @UtilityClass @Slf4j -public class IotModbusUtils { +public class IotModbusCommonUtils { /** FC01: 读线圈 */ public static final int FC_READ_COILS = 1; @@ -512,4 +512,15 @@ public class IotModbusUtils { return CollUtil.findOne(config.getPoints(), p -> identifier.equals(p.getIdentifier())); } + /** + * 根据点位 ID 查找点位配置 + * + * @param config 设备 Modbus 配置 + * @param pointId 点位 ID + * @return 匹配的点位配置,未找到返回 null + */ + public static IotModbusPointRespDTO findPointById(IotModbusDeviceConfigRespDTO config, Long pointId) { + return CollUtil.findOne(config.getPoints(), p -> p.getId().equals(pointId)); + } + } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/client/IotModbusTcpClientUtils.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/utils/IotModbusTcpMasterUtils.java similarity index 98% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/client/IotModbusTcpClientUtils.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/utils/IotModbusTcpMasterUtils.java index 44728396ac..5804ea6022 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/client/IotModbusTcpClientUtils.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/utils/IotModbusTcpMasterUtils.java @@ -1,4 +1,4 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.client; +package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpConnectionManager; @@ -12,7 +12,7 @@ import io.vertx.core.Future; import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; -import static cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils.*; +import static cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils.*; /** * IoT Modbus TCP 客户端工具类 @@ -26,7 +26,7 @@ import static cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModb */ @UtilityClass @Slf4j -public class IotModbusTcpClientUtils { +public class IotModbusTcpMasterUtils { /** * 读取 Modbus 数据 diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/downstream/IotModbusTcpDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/downstream/IotModbusTcpDownstreamHandler.java index 27df39eefa..32dfde5a64 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/downstream/IotModbusTcpDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/downstream/IotModbusTcpDownstreamHandler.java @@ -4,8 +4,8 @@ import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.client.IotModbusTcpClientUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusTcpMasterUtils; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpConfigCacheService; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpConnectionManager; import lombok.RequiredArgsConstructor; @@ -57,13 +57,13 @@ public class IotModbusTcpDownstreamHandler { String identifier = entry.getKey(); Object value = entry.getValue(); // 2.1 查找对应的点位配置 - IotModbusPointRespDTO point = IotModbusUtils.findPoint(config, identifier); + IotModbusPointRespDTO point = IotModbusCommonUtils.findPoint(config, identifier); if (point == null) { log.warn("[handle][设备 {} 没有点位配置: {}]", message.getDeviceId(), identifier); continue; } // 2.2 检查是否支持写操作 - if (!IotModbusUtils.isWritable(point.getFunctionCode())) { + if (!IotModbusCommonUtils.isWritable(point.getFunctionCode())) { log.warn("[handle][点位 {} 不支持写操作, 功能码={}]", identifier, point.getFunctionCode()); continue; } @@ -91,9 +91,9 @@ public class IotModbusTcpDownstreamHandler { } // 2.1 转换属性值为原始值 - int[] rawValues = IotModbusUtils.convertToRawValues(value, point); + int[] rawValues = IotModbusCommonUtils.convertToRawValues(value, point); // 2.2 执行 Modbus 写入 - IotModbusTcpClientUtils.write(connection, slaveId, point, rawValues) + IotModbusTcpMasterUtils.write(connection, slaveId, point, rawValues) .onSuccess(success -> log.info("[writeProperty][写入成功, deviceId={}, identifier={}, value={}]", config.getDeviceId(), point.getIdentifier(), value)) .onFailure(e -> log.error("[writeProperty][写入失败, deviceId={}, identifier={}]", diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/downstream/IotModbusTcpDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/downstream/IotModbusTcpDownstreamSubscriber.java index c8e22aff79..74afcbb38f 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/downstream/IotModbusTcpDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/downstream/IotModbusTcpDownstreamSubscriber.java @@ -2,7 +2,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.do import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.AbstractIotProtocolDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.IotModbusTcpMasterProtocol; import lombok.extern.slf4j.Slf4j; @@ -12,7 +12,7 @@ import lombok.extern.slf4j.Slf4j; * @author 芋道源码 */ @Slf4j -public class IotModbusTcpDownstreamSubscriber extends IotProtocolDownstreamSubscriber { +public class IotModbusTcpDownstreamSubscriber extends AbstractIotProtocolDownstreamSubscriber { private final IotModbusTcpDownstreamHandler downstreamHandler; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/upstream/IotModbusTcpUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/upstream/IotModbusTcpUpstreamHandler.java index 8762481ebc..de02af06c2 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/upstream/IotModbusTcpUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/upstream/IotModbusTcpUpstreamHandler.java @@ -5,7 +5,7 @@ import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import lombok.extern.slf4j.Slf4j; @@ -40,7 +40,7 @@ public class IotModbusTcpUpstreamHandler { int[] rawValue) { try { // 1.1 转换原始值为物模型属性值 - Object convertedValue = IotModbusUtils.convertToPropertyValue(rawValue, point); + Object convertedValue = IotModbusCommonUtils.convertToPropertyValue(rawValue, point); log.debug("[handleReadResult][设备={}, 属性={}, 原始值={}, 转换值={}]", config.getDeviceId(), point.getIdentifier(), rawValue, convertedValue); // 1.2 构造属性上报消息 diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpPollScheduler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpPollScheduler.java index afe4a9caed..2f049561c9 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpPollScheduler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpPollScheduler.java @@ -1,242 +1,45 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.lang.Assert; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.client.IotModbusTcpClientUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.manager.AbstractIotModbusPollScheduler; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusTcpMasterUtils; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.upstream.IotModbusTcpUpstreamHandler; import io.vertx.core.Vertx; -import lombok.AllArgsConstructor; -import lombok.Data; import lombok.extern.slf4j.Slf4j; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; - -import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet; - -// TODO @AI:类的命名上,要体现上 master。其它类似 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster 也要! /** * IoT Modbus TCP Master 轮询调度器:管理点位的轮询定时器,调度读取任务并上报结果 * * @author 芋道源码 */ @Slf4j -public class IotModbusTcpPollScheduler { +public class IotModbusTcpPollScheduler extends AbstractIotModbusPollScheduler { - private final Vertx vertx; private final IotModbusTcpConnectionManager connectionManager; private final IotModbusTcpUpstreamHandler upstreamHandler; private final IotModbusTcpConfigCacheService configCacheService; - /** - * 同设备最小请求间隔(毫秒),防止 Modbus 设备性能不足时请求堆积 - */ - private static final long MIN_REQUEST_INTERVAL = 100; - - /** - * 设备点位的定时器映射:deviceId -> (pointId -> PointTimerInfo) - */ - private final Map> devicePointTimers = new ConcurrentHashMap<>(); - - /** - * per-device 请求队列:deviceId -> 待执行请求队列 - */ - private final Map> deviceRequestQueues = new ConcurrentHashMap<>(); - - /** - * per-device 上次请求时间戳:deviceId -> lastRequestTimeMs - */ - private final Map deviceLastRequestTime = new ConcurrentHashMap<>(); - - /** - * per-device 延迟 timer 标记:deviceId -> 是否有延迟 timer 在等待 - */ - private final Map deviceDelayTimerActive = new ConcurrentHashMap<>(); - public IotModbusTcpPollScheduler(Vertx vertx, IotModbusTcpConnectionManager connectionManager, IotModbusTcpUpstreamHandler upstreamHandler, IotModbusTcpConfigCacheService configCacheService) { - this.vertx = vertx; + super(vertx); this.connectionManager = connectionManager; this.upstreamHandler = upstreamHandler; this.configCacheService = configCacheService; } - // ========== 点位定时器 ========== - - /** - * 点位定时器信息 - */ - @Data - @AllArgsConstructor - private static class PointTimerInfo { - - /** - * Vert.x 定时器 ID - */ - private Long timerId; - /** - * 轮询间隔(用于判断是否需要更新定时器) - */ - private Integer pollInterval; - - } - - // ========== 轮询管理 ========== - - /** - * 更新轮询任务(增量更新) - * - * 1. 【删除】点位:停止对应的轮询定时器 - * 2. 【新增】点位:创建对应的轮询定时器 - * 3. 【修改】点位:pollInterval 变化,重建对应的轮询定时器 - * 4. 其他属性变化:不需要重建定时器(pollPoint 运行时从 configCache 取最新 point) - */ - public void updatePolling(IotModbusDeviceConfigRespDTO config) { - Long deviceId = config.getDeviceId(); - List newPoints = config.getPoints(); - Map currentTimers = devicePointTimers - .computeIfAbsent(deviceId, k -> new ConcurrentHashMap<>()); - // 1.1 计算新配置中的点位 ID 集合 - Set newPointIds = convertSet(newPoints, IotModbusPointRespDTO::getId); - // 1.2 计算删除的点位 ID 集合 - Set removedPointIds = new HashSet<>(currentTimers.keySet()); - removedPointIds.removeAll(newPointIds); - - // 2. 处理删除的点位:停止不再存在的定时器 - for (Long pointId : removedPointIds) { - PointTimerInfo timerInfo = currentTimers.remove(pointId); - if (timerInfo != null) { - vertx.cancelTimer(timerInfo.getTimerId()); - log.debug("[updatePolling][设备 {} 点位 {} 定时器已删除]", deviceId, pointId); - } - } - - // 3. 处理新增和修改的点位 - if (CollUtil.isEmpty(newPoints)) { - return; - } - for (IotModbusPointRespDTO point : newPoints) { - Long pointId = point.getId(); - Integer newPollInterval = point.getPollInterval(); - PointTimerInfo existingTimer = currentTimers.get(pointId); - // 3.1 新增点位:创建定时器 - if (existingTimer == null) { - Long timerId = createPollTimer(deviceId, pointId, newPollInterval); - if (timerId != null) { - currentTimers.put(pointId, new PointTimerInfo(timerId, newPollInterval)); - log.debug("[updatePolling][设备 {} 点位 {} 定时器已创建, interval={}ms]", - deviceId, pointId, newPollInterval); - } - } else if (!Objects.equals(existingTimer.getPollInterval(), newPollInterval)) { - // 3.2 pollInterval 变化:重建定时器 - vertx.cancelTimer(existingTimer.getTimerId()); - Long timerId = createPollTimer(deviceId, pointId, newPollInterval); - if (timerId != null) { - currentTimers.put(pointId, new PointTimerInfo(timerId, newPollInterval)); - log.debug("[updatePolling][设备 {} 点位 {} 定时器已更新, interval={}ms -> {}ms]", - deviceId, pointId, existingTimer.getPollInterval(), newPollInterval); - } else { - currentTimers.remove(pointId); - } - } - // 3.3 其他属性变化:无需重建定时器,因为 pollPoint() 运行时从 configCache 获取最新 point,自动使用新配置 - } - } - - /** - * 创建轮询定时器 - *

- * 闭包只捕获 deviceId 和 pointId,运行时从 configCache 获取最新配置,避免旧快照问题。 - */ - private Long createPollTimer(Long deviceId, Long pointId, Integer pollInterval) { - if (pollInterval == null || pollInterval <= 0) { - return null; - } - return vertx.setPeriodic(pollInterval, timerId -> { - try { - submitPollRequest(deviceId, pointId); - } catch (Exception e) { - log.error("[createPollTimer][轮询点位失败, deviceId={}, pointId={}]", deviceId, pointId, e); - } - }); - } - - // ========== 请求队列(per-device 限速) ========== - - /** - * 提交轮询请求到设备请求队列(保证同设备请求间隔) - */ - private void submitPollRequest(Long deviceId, Long pointId) { - Queue queue = deviceRequestQueues.computeIfAbsent(deviceId, k -> new ConcurrentLinkedQueue<>()); - queue.offer(() -> pollPoint(deviceId, pointId)); - processDeviceQueue(deviceId); - } - - /** - * 处理设备请求队列 - */ - private void processDeviceQueue(Long deviceId) { - Queue queue = deviceRequestQueues.get(deviceId); - if (CollUtil.isEmpty(queue)) { - return; - } - // 检查是否已有延迟 timer 在等待 - if (Boolean.TRUE.equals(deviceDelayTimerActive.get(deviceId))) { - return; - } - long now = System.currentTimeMillis(); - long lastTime = deviceLastRequestTime.getOrDefault(deviceId, 0L); - long elapsed = now - lastTime; - if (elapsed >= MIN_REQUEST_INTERVAL) { - // 满足间隔要求,立即执行 - Runnable task = queue.poll(); - if (task != null) { - deviceLastRequestTime.put(deviceId, now); - task.run(); - // 继续处理队列中的下一个(如果有的话,需要延迟) - if (!queue.isEmpty()) { - scheduleNextRequest(deviceId); - } - } - } else { - // 需要延迟 - scheduleNextRequest(deviceId, MIN_REQUEST_INTERVAL - elapsed); - } - } - - private void scheduleNextRequest(Long deviceId) { - scheduleNextRequest(deviceId, MIN_REQUEST_INTERVAL); - } - - private void scheduleNextRequest(Long deviceId, long delayMs) { - deviceDelayTimerActive.put(deviceId, true); - vertx.setTimer(delayMs, id -> { - deviceDelayTimerActive.put(deviceId, false); - Queue queue = deviceRequestQueues.get(deviceId); - if (CollUtil.isEmpty(queue)) { - Runnable task = queue.poll(); - if (task != null) { - deviceLastRequestTime.put(deviceId, System.currentTimeMillis()); - task.run(); - } - // 继续处理 - if (queue != null && !queue.isEmpty()) { - scheduleNextRequest(deviceId); - } - } - }); - } - // ========== 轮询执行 ========== /** * 轮询单个点位 */ - private void pollPoint(Long deviceId, Long pointId) { + @Override + protected void pollPoint(Long deviceId, Long pointId) { // 1.1 从 configCache 获取最新配置 IotModbusDeviceConfigRespDTO config = configCacheService.getConfig(deviceId); if (config == null || CollUtil.isEmpty(config.getPoints())) { @@ -244,8 +47,7 @@ public class IotModbusTcpPollScheduler { return; } // 1.2 查找点位 - // TODO @AI:是不是这里,可以抽到 IotModbusUtils 里?感觉应该有几个地方需要的; - IotModbusPointRespDTO point = CollUtil.findOne(config.getPoints(), p -> p.getId().equals(pointId)); + IotModbusPointRespDTO point = IotModbusCommonUtils.findPointById(config, pointId); if (point == null) { log.warn("[pollPoint][设备 {} 点位 {} 未找到]", deviceId, pointId); return; @@ -259,45 +61,13 @@ public class IotModbusTcpPollScheduler { } // 2.2 获取 slave ID Integer slaveId = connectionManager.getSlaveId(deviceId); - if (slaveId == null) { - log.warn("[pollPoint][设备 {} 没有 slaveId]", deviceId); - return; - } + Assert.notNull(slaveId, "设备 {} 没有配置 slaveId", deviceId); // 3. 执行 Modbus 读取 - IotModbusTcpClientUtils.read(connection, slaveId, point) + IotModbusTcpMasterUtils.read(connection, slaveId, point) .onSuccess(rawValue -> upstreamHandler.handleReadResult(config, point, rawValue)) .onFailure(e -> log.error("[pollPoint][读取点位失败, deviceId={}, identifier={}]", deviceId, point.getIdentifier(), e)); } - // ========== 停止 ========== - - /** - * 停止设备的轮询 - */ - public void stopPolling(Long deviceId) { - Map timers = devicePointTimers.remove(deviceId); - if (CollUtil.isEmpty(timers)) { - return; - } - for (PointTimerInfo timerInfo : timers.values()) { - vertx.cancelTimer(timerInfo.getTimerId()); - } - // 清理请求队列 - deviceRequestQueues.remove(deviceId); - deviceLastRequestTime.remove(deviceId); - deviceDelayTimerActive.remove(deviceId); - log.debug("[stopPolling][设备 {} 停止了 {} 个轮询定时器]", deviceId, timers.size()); - } - - /** - * 停止所有轮询 - */ - public void stopAll() { - for (Long deviceId : new ArrayList<>(devicePointTimers.keySet())) { - stopPolling(deviceId); - } - } - } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrame.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrame.java index 6ac6930337..b7661abcfc 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrame.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrame.java @@ -1,7 +1,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec; import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils; import lombok.Data; import lombok.experimental.Accessors; @@ -39,7 +39,7 @@ public class IotModbusFrame { * 当功能码最高位为 1 时(异常响应),此字段存储异常码。 * 为 null 表示非异常响应。 * - * @see IotModbusUtils#FC_EXCEPTION_MASK + * @see IotModbusCommonUtils#FC_EXCEPTION_MASK */ private Integer exceptionCode; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameDecoder.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameDecoder.java index 09d98e0dba..fc5219e197 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameDecoder.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameDecoder.java @@ -1,7 +1,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec; import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils; import io.vertx.core.Handler; import io.vertx.core.buffer.Buffer; import io.vertx.core.parsetools.RecordParser; @@ -96,7 +96,7 @@ public class IotModbusFrameDecoder { return null; } // 校验 CRC - if (!IotModbusUtils.verifyCrc16(data)) { + if (!IotModbusCommonUtils.verifyCrc16(data)) { log.warn("[decodeRtuResponse][CRC 校验失败]"); return null; } @@ -119,8 +119,8 @@ public class IotModbusFrameDecoder { .setPdu(pdu) .setTransactionId(transactionId); // 异常响应 - if (IotModbusUtils.isExceptionResponse(functionCode)) { - frame.setFunctionCode(IotModbusUtils.extractOriginalFunctionCode(functionCode)); + if (IotModbusCommonUtils.isExceptionResponse(functionCode)) { + frame.setFunctionCode(IotModbusCommonUtils.extractOriginalFunctionCode(functionCode)); if (pdu.length >= 1) { frame.setExceptionCode(pdu[0] & 0xFF); } @@ -281,7 +281,7 @@ public class IotModbusFrameDecoder { this.slaveId = bytes[0]; this.functionCode = bytes[1]; int fc = functionCode & 0xFF; - if (IotModbusUtils.isExceptionResponse(fc)) { + if (IotModbusCommonUtils.isExceptionResponse(fc)) { // 异常响应:完整帧 = slaveId(1) + FC(1) + exceptionCode(1) + CRC(2) = 5 字节 // 已有 6 字节(多 1 字节),取前 5 字节组装 Buffer frame = Buffer.buffer(5); @@ -290,7 +290,7 @@ public class IotModbusFrameDecoder { frame.appendBytes(bytes, 2, 3); // exceptionCode + CRC emitFrame(frame); resetToHeader(); - } else if (IotModbusUtils.isReadResponse(fc) || fc == customFunctionCode) { + } else if (IotModbusCommonUtils.isReadResponse(fc) || fc == customFunctionCode) { // 读响应或自定义 FC:bytes[2] = byteCount this.byteCount = bytes[2]; int bc = byteCount & 0xFF; @@ -315,7 +315,7 @@ public class IotModbusFrameDecoder { this.expectedDataLen = bc + 2; // byteCount 个数据 + 2 CRC parser.fixedSizeMode(remaining); } - } else if (IotModbusUtils.isWriteResponse(fc)) { + } else if (IotModbusCommonUtils.isWriteResponse(fc)) { // 写响应:总长 = slaveId(1) + FC(1) + addr(2) + value/qty(2) + CRC(2) = 8 字节 // 已有 6 字节,还需 2 字节 state = STATE_WRITE_BODY; @@ -356,15 +356,15 @@ public class IotModbusFrameDecoder { this.slaveId = header[0]; this.functionCode = header[1]; int fc = functionCode & 0xFF; - if (IotModbusUtils.isExceptionResponse(fc)) { + if (IotModbusCommonUtils.isExceptionResponse(fc)) { // 异常响应 state = STATE_EXCEPTION_BODY; parser.fixedSizeMode(3); // exceptionCode(1) + CRC(2) - } else if (IotModbusUtils.isReadResponse(fc) || fc == customFunctionCode) { + } else if (IotModbusCommonUtils.isReadResponse(fc) || fc == customFunctionCode) { // 读响应或自定义 FC state = STATE_READ_BYTE_COUNT; parser.fixedSizeMode(1); // byteCount - } else if (IotModbusUtils.isWriteResponse(fc)) { + } else if (IotModbusCommonUtils.isWriteResponse(fc)) { // 写响应 state = STATE_WRITE_BODY; pendingData = Buffer.buffer(); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameEncoder.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameEncoder.java index 195c82352a..36323cbda5 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameEncoder.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameEncoder.java @@ -1,7 +1,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec; import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -60,7 +60,7 @@ public class IotModbusFrameEncoder { public byte[] encodeWriteSingleRequest(int slaveId, int functionCode, int address, int value, IotModbusFrameFormatEnum format, Integer transactionId) { // FC05 单写线圈:Modbus 标准要求 value 为 0xFF00(ON)或 0x0000(OFF) - if (functionCode == IotModbusUtils.FC_WRITE_SINGLE_COIL) { + if (functionCode == IotModbusCommonUtils.FC_WRITE_SINGLE_COIL) { value = (value != 0) ? 0xFF00 : 0x0000; } // PDU: [FC(1)] [Address(2)] [Value(2)] @@ -120,7 +120,7 @@ public class IotModbusFrameEncoder { int quantity = values.length; int byteCount = (quantity + 7) / 8; // 向上取整 byte[] pdu = new byte[6 + byteCount]; - pdu[0] = (byte) IotModbusUtils.FC_WRITE_MULTIPLE_COILS; // FC15 + pdu[0] = (byte) IotModbusCommonUtils.FC_WRITE_MULTIPLE_COILS; // FC15 pdu[1] = (byte) ((address >> 8) & 0xFF); pdu[2] = (byte) (address & 0xFF); pdu[3] = (byte) ((quantity >> 8) & 0xFF); @@ -204,7 +204,7 @@ public class IotModbusFrameEncoder { frame[0] = (byte) slaveId; System.arraycopy(pdu, 0, frame, 1, pdu.length); // 计算并追加 CRC16 - int crc = IotModbusUtils.calculateCrc16(frame, frame.length - 2); + int crc = IotModbusCommonUtils.calculateCrc16(frame, frame.length - 2); frame[frame.length - 2] = (byte) (crc & 0xFF); // CRC Low frame[frame.length - 1] = (byte) ((crc >> 8) & 0xFF); // CRC High return frame; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/downstream/IotModbusTcpSlaveDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/downstream/IotModbusTcpSlaveDownstreamHandler.java index ca9edde53e..770f13e3f8 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/downstream/IotModbusTcpSlaveDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/downstream/IotModbusTcpSlaveDownstreamHandler.java @@ -7,7 +7,7 @@ import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConfigCacheService; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConnectionManager; @@ -82,13 +82,13 @@ public class IotModbusTcpSlaveDownstreamHandler { String identifier = entry.getKey(); Object value = entry.getValue(); // 2.1 查找对应的点位配置 - IotModbusPointRespDTO point = IotModbusUtils.findPoint(config, identifier); + IotModbusPointRespDTO point = IotModbusCommonUtils.findPoint(config, identifier); if (point == null) { log.warn("[handle][设备 {} 没有点位配置: {}]", message.getDeviceId(), identifier); continue; } // 2.2 检查是否支持写操作 - if (!IotModbusUtils.isWritable(point.getFunctionCode())) { + if (!IotModbusCommonUtils.isWritable(point.getFunctionCode())) { log.warn("[handle][点位 {} 不支持写操作, 功能码={}]", identifier, point.getFunctionCode()); continue; } @@ -104,7 +104,7 @@ public class IotModbusTcpSlaveDownstreamHandler { private void writeProperty(Long deviceId, ConnectionInfo connInfo, IotModbusPointRespDTO point, Object value) { // 1.1 转换属性值为原始值 - int[] rawValues = IotModbusUtils.convertToRawValues(value, point); + int[] rawValues = IotModbusCommonUtils.convertToRawValues(value, point); // 1.2 确定帧格式和事务 ID IotModbusFrameFormatEnum frameFormat = connInfo.getFrameFormat(); @@ -117,8 +117,8 @@ public class IotModbusTcpSlaveDownstreamHandler { // 1.3 编码写请求 byte[] data; int readFunctionCode = point.getFunctionCode(); - Integer writeSingleCode = IotModbusUtils.getWriteSingleFunctionCode(readFunctionCode); - Integer writeMultipleCode = IotModbusUtils.getWriteMultipleFunctionCode(readFunctionCode); + Integer writeSingleCode = IotModbusCommonUtils.getWriteSingleFunctionCode(readFunctionCode); + Integer writeMultipleCode = IotModbusCommonUtils.getWriteMultipleFunctionCode(readFunctionCode); if (rawValues.length == 1 && writeSingleCode != null) { // 单个值:使用单写功能码(FC05/FC06) data = frameEncoder.encodeWriteSingleRequest(slaveId, writeSingleCode, diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/downstream/IotModbusTcpSlaveDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/downstream/IotModbusTcpSlaveDownstreamSubscriber.java index 4e7b882770..2a11cff565 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/downstream/IotModbusTcpSlaveDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/downstream/IotModbusTcpSlaveDownstreamSubscriber.java @@ -2,7 +2,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.handler.dow import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.AbstractIotProtocolDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.IotModbusTcpSlaveProtocol; import lombok.extern.slf4j.Slf4j; @@ -12,7 +12,7 @@ import lombok.extern.slf4j.Slf4j; * @author 芋道源码 */ @Slf4j -public class IotModbusTcpSlaveDownstreamSubscriber extends IotProtocolDownstreamSubscriber { +public class IotModbusTcpSlaveDownstreamSubscriber extends AbstractIotProtocolDownstreamSubscriber { private final IotModbusTcpSlaveDownstreamHandler downstreamHandler; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/upstream/IotModbusTcpSlaveUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/upstream/IotModbusTcpSlaveUpstreamHandler.java index 67b2d52e8f..6ede74c1db 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/upstream/IotModbusTcpSlaveUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/upstream/IotModbusTcpSlaveUpstreamHandler.java @@ -19,7 +19,7 @@ import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrame; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConfigCacheService; @@ -237,7 +237,7 @@ public class IotModbusTcpSlaveUpstreamHandler { return; } // 2.2 提取寄存器值 - int[] rawValues = IotModbusUtils.extractValues(frame); + int[] rawValues = IotModbusCommonUtils.extractValues(frame); if (rawValues == null) { log.warn("[handlePollingResponse][提取寄存器值失败, deviceId={}, identifier={}]", info.getDeviceId(), request.getIdentifier()); @@ -248,14 +248,13 @@ public class IotModbusTcpSlaveUpstreamHandler { if (config == null || CollUtil.isEmpty(config.getPoints())) { return; } - IotModbusPointRespDTO point = CollUtil.findOne(config.getPoints(), - p -> p.getId().equals(request.getPointId())); + IotModbusPointRespDTO point = IotModbusCommonUtils.findPointById(config, request.getPointId()); if (point == null) { return; } // 3.1 点位翻译 - Object convertedValue = IotModbusUtils.convertToPropertyValue(rawValues, point); + Object convertedValue = IotModbusCommonUtils.convertToPropertyValue(rawValues, point); // 3.2 上报属性 Map params = MapUtil.of(request.getIdentifier(), convertedValue); IotDeviceMessage message = IotDeviceMessage.requestOf( diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConnectionManager.java index 241d8d777e..1a6a4cc610 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConnectionManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConnectionManager.java @@ -75,8 +75,8 @@ public class IotModbusTcpSlaveConnectionManager { try { oldSocket.close(); } catch (Exception e) { - // TODO @AI:这里日志可以打的更完整一点,方便追溯;比如:设备 ID、旧连接地址等 - log.warn("[registerConnection][关闭旧 socket 失败]", e); + log.warn("[registerConnection][关闭旧 socket 失败, deviceId={}, oldRemote={}]", + info.getDeviceId(), oldSocket.remoteAddress(), e); } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePollScheduler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePollScheduler.java index d0f38e3653..8a9cba5963 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePollScheduler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePollScheduler.java @@ -1,42 +1,28 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.lang.Assert; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO; import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.manager.AbstractIotModbusPollScheduler; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlaveConnectionManager.ConnectionInfo; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager.IotModbusTcpSlavePendingRequestManager.PendingRequest; import io.vertx.core.Vertx; -import lombok.AllArgsConstructor; -import lombok.Data; import lombok.extern.slf4j.Slf4j; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; -import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet; - /** - * IoT Modbus TCP Slave 轮询调度器 - *

- * 管理点位的轮询定时器,为云端轮询模式的设备调度读取任务。 - * 与 tcpmaster 的 {@code IotModbusTcpPollScheduler} 不同,这里不通过 j2mod 直接读取,而是: - * 1. 编码 Modbus 读请求帧 - * 2. 通过 ConnectionManager 发送到设备的 TCP 连接 - * 3. 将请求注册到 PendingRequestManager,等待设备响应 - *

- * 闭包只捕获 deviceId + pointId,运行时从 configCacheService 获取最新 config 和 point, - * 避免闭包捕获旧快照导致上报消息用旧身份的问题。 + * IoT Modbus TCP Slave 轮询调度器:编码读请求帧,通过 TCP 连接发送到设备,注册 PendingRequest 等待响应 * * @author 芋道源码 */ @Slf4j -public class IotModbusTcpSlavePollScheduler { +public class IotModbusTcpSlavePollScheduler extends AbstractIotModbusPollScheduler { - private final Vertx vertx; private final IotModbusTcpSlaveConnectionManager connectionManager; private final IotModbusFrameEncoder frameEncoder; private final IotModbusTcpSlavePendingRequestManager pendingRequestManager; @@ -47,29 +33,6 @@ public class IotModbusTcpSlavePollScheduler { */ private final AtomicInteger transactionIdCounter; - /** - * 同设备最小请求间隔(毫秒),防止 Modbus 设备性能不足时请求堆积 - */ - private static final long MIN_REQUEST_INTERVAL = 200; - - /** - * 设备点位的定时器映射:deviceId -> (pointId -> PointTimerInfo) - */ - private final Map> devicePointTimers = new ConcurrentHashMap<>(); - - /** - * per-device 请求队列:deviceId -> 待执行请求队列 - */ - private final Map> deviceRequestQueues = new ConcurrentHashMap<>(); - /** - * per-device 上次请求时间戳:deviceId -> lastRequestTimeMs - */ - private final Map deviceLastRequestTime = new ConcurrentHashMap<>(); - /** - * per-device 延迟 timer 标记:deviceId -> 是否有延迟 timer 在等待 - */ - private final Map deviceDelayTimerActive = new ConcurrentHashMap<>(); - public IotModbusTcpSlavePollScheduler(Vertx vertx, IotModbusTcpSlaveConnectionManager connectionManager, IotModbusFrameEncoder frameEncoder, @@ -77,7 +40,7 @@ public class IotModbusTcpSlavePollScheduler { int requestTimeout, AtomicInteger transactionIdCounter, IotModbusTcpSlaveConfigCacheService configCacheService) { - this.vertx = vertx; + super(vertx); this.connectionManager = connectionManager; this.frameEncoder = frameEncoder; this.pendingRequestManager = pendingRequestManager; @@ -86,185 +49,13 @@ public class IotModbusTcpSlavePollScheduler { this.configCacheService = configCacheService; } - /** - * 点位定时器信息 - */ - @Data - @AllArgsConstructor - private static class PointTimerInfo { - - /** - * Vert.x 定时器 ID - */ - private Long timerId; - /** - * 轮询间隔(用于判断是否需要更新定时器) - */ - private Integer pollInterval; - - } - - // ========== 轮询管理 ========== - - /** - * 更新轮询任务(增量更新) - * - * 1. 【删除】点位:停止对应的轮询定时器 - * 2. 【新增】点位:创建对应的轮询定时器 - * 3. 【修改】点位:pollInterval 变化,重建对应的轮询定时器 - * 4. 其他属性变化:不需要重建定时器(pollPoint 运行时从 configCache 取最新 point) - */ - public void updatePolling(IotModbusDeviceConfigRespDTO config) { - Long deviceId = config.getDeviceId(); - List newPoints = config.getPoints(); - Map currentTimers = devicePointTimers - .computeIfAbsent(deviceId, k -> new ConcurrentHashMap<>()); - // 1.1 计算新配置中的点位 ID 集合 - Set newPointIds = convertSet(newPoints, IotModbusPointRespDTO::getId); - // 1.2 计算删除的点位 ID 集合 - Set removedPointIds = new HashSet<>(currentTimers.keySet()); - removedPointIds.removeAll(newPointIds); - - // 2. 处理删除的点位:停止不再存在的定时器 - for (Long pointId : removedPointIds) { - PointTimerInfo timerInfo = currentTimers.remove(pointId); - if (timerInfo != null) { - vertx.cancelTimer(timerInfo.getTimerId()); - log.debug("[updatePolling][设备 {} 点位 {} 定时器已删除]", deviceId, pointId); - } - } - - // 3. 处理新增和修改的点位 - if (CollUtil.isEmpty(newPoints)) { - return; - } - for (IotModbusPointRespDTO point : newPoints) { - Long pointId = point.getId(); - Integer newPollInterval = point.getPollInterval(); - PointTimerInfo existingTimer = currentTimers.get(pointId); - // 3.1 新增点位:创建定时器 - if (existingTimer == null) { - Long timerId = createPollTimer(deviceId, pointId, newPollInterval); - if (timerId != null) { - currentTimers.put(pointId, new PointTimerInfo(timerId, newPollInterval)); - log.debug("[updatePolling][设备 {} 点位 {} 定时器已创建, interval={}ms]", - deviceId, pointId, newPollInterval); - } - } else if (!Objects.equals(existingTimer.getPollInterval(), newPollInterval)) { - // 3.2 pollInterval 变化:重建定时器 - vertx.cancelTimer(existingTimer.getTimerId()); - Long timerId = createPollTimer(deviceId, pointId, newPollInterval); - if (timerId != null) { - currentTimers.put(pointId, new PointTimerInfo(timerId, newPollInterval)); - log.debug("[updatePolling][设备 {} 点位 {} 定时器已更新, interval={}ms -> {}ms]", - deviceId, pointId, existingTimer.getPollInterval(), newPollInterval); - } else { - currentTimers.remove(pointId); - } - } - // 3.3 其他属性变化:无需重建定时器,因为 pollPoint() 运行时从 configCache 获取最新 point,自动使用新配置 - } - } - - /** - * 创建轮询定时器 - *

- * 闭包只捕获 deviceId 和 pointId,运行时从 configCache 获取最新配置,避免旧快照问题。 - */ - private Long createPollTimer(Long deviceId, Long pointId, Integer pollInterval) { - if (pollInterval == null || pollInterval <= 0) { - return null; - } - return vertx.setPeriodic(pollInterval, timerId -> { - try { - submitPollRequest(deviceId, pointId); - } catch (Exception e) { - log.error("[createPollTimer][轮询点位失败, deviceId={}, pointId={}]", deviceId, pointId, e); - } - }); - } - - // ========== 请求队列(per-device 限速) ========== - - /** - * 提交轮询请求到设备请求队列(保证同设备请求间隔) - */ - private void submitPollRequest(Long deviceId, Long pointId) { - // 1. 将请求添加到设备的请求队列 - Queue queue = deviceRequestQueues.computeIfAbsent(deviceId, k -> new ConcurrentLinkedQueue<>()); - queue.offer(() -> pollPoint(deviceId, pointId)); - - // 2. 处理设备请求队列(如果没有延迟 timer 在等待) - processDeviceQueue(deviceId); - } - - /** - * 处理设备请求队列 - */ - private void processDeviceQueue(Long deviceId) { - Queue queue = deviceRequestQueues.get(deviceId); - if (CollUtil.isEmpty(queue)) { - return; - } - // 检查是否已有延迟 timer 在等待 - if (Boolean.TRUE.equals(deviceDelayTimerActive.get(deviceId))) { - return; - } - - // TODO @AI:可以改成不满足间隔,然后 return,简化括号层级; - long now = System.currentTimeMillis(); - long lastTime = deviceLastRequestTime.getOrDefault(deviceId, 0L); - long elapsed = now - lastTime; - - if (elapsed >= MIN_REQUEST_INTERVAL) { - // 满足间隔要求,立即执行 - Runnable task = queue.poll(); - if (task != null) { - deviceLastRequestTime.put(deviceId, now); - task.run(); - // 继续处理队列中的下一个(如果有的话,需要延迟) - if (!queue.isEmpty()) { - scheduleNextRequest(deviceId); - } - } - } else { - // 需要延迟 - scheduleNextRequest(deviceId, MIN_REQUEST_INTERVAL - elapsed); - } - } - - private void scheduleNextRequest(Long deviceId) { - scheduleNextRequest(deviceId, MIN_REQUEST_INTERVAL); - } - - private void scheduleNextRequest(Long deviceId, long delayMs) { - deviceDelayTimerActive.put(deviceId, true); - vertx.setTimer(delayMs, id -> { - deviceDelayTimerActive.put(deviceId, false); - Queue queue = deviceRequestQueues.get(deviceId); - // TODO @AI:if return?简化下? - if (CollUtil.isEmpty(queue)) { - Runnable task = queue.poll(); - if (task != null) { - deviceLastRequestTime.put(deviceId, System.currentTimeMillis()); - task.run(); - } - // 继续处理 - if (queue != null && !queue.isEmpty()) { - scheduleNextRequest(deviceId); - } - } - }); - } - // ========== 轮询执行 ========== /** * 轮询单个点位 - *

- * 运行时从 configCacheService 获取最新的 config 和 point,而非使用闭包捕获的旧引用。 */ - private void pollPoint(Long deviceId, Long pointId) { + @Override + protected void pollPoint(Long deviceId, Long pointId) { // 1.1 从 configCache 获取最新配置 IotModbusDeviceConfigRespDTO config = configCacheService.getConfig(deviceId); if (config == null || CollUtil.isEmpty(config.getPoints())) { @@ -272,34 +63,31 @@ public class IotModbusTcpSlavePollScheduler { return; } // 1.2 查找点位 - IotModbusPointRespDTO point = CollUtil.findOne(config.getPoints(), p -> p.getId().equals(pointId)); + IotModbusPointRespDTO point = IotModbusCommonUtils.findPointById(config, pointId); if (point == null) { log.warn("[pollPoint][设备 {} 点位 {} 未找到]", deviceId, pointId); return; } - // 2. 获取连接信息 - ConnectionInfo connInfo = connectionManager.getConnectionInfoByDeviceId(deviceId); - if (connInfo == null) { + // 2.1 获取连接 + ConnectionInfo connection = connectionManager.getConnectionInfoByDeviceId(deviceId); + if (connection == null) { log.debug("[pollPoint][设备 {} 没有连接,跳过轮询]", deviceId); return; } + // 2.2 获取 slave ID + IotModbusFrameFormatEnum frameFormat = connection.getFrameFormat(); + Assert.notNull(frameFormat, "设备 {} 的帧格式不能为空", deviceId); + int slaveId = connection.getSlaveId(); + Assert.notNull(connection.getSlaveId(), "设备 {} 的 slaveId 不能为空", deviceId); - // 3.1 确定帧格式和事务 ID - IotModbusFrameFormatEnum frameFormat = connInfo.getFrameFormat(); - if (frameFormat == null) { - log.warn("[pollPoint][设备 {} 帧格式为空,跳过轮询]", deviceId); - return; - } + // 3.1 编码读请求 Integer transactionId = frameFormat == IotModbusFrameFormatEnum.MODBUS_TCP ? (transactionIdCounter.incrementAndGet() & 0xFFFF) : null; - // TODO @AI:这里断言必须非空! - int slaveId = connInfo.getSlaveId() != null ? connInfo.getSlaveId() : 1; - // 3.2 编码读请求 byte[] data = frameEncoder.encodeReadRequest(slaveId, point.getFunctionCode(), point.getRegisterAddress(), point.getRegisterCount(), frameFormat, transactionId); - // 3.3 注册 PendingRequest + // 3.2 注册 PendingRequest PendingRequest pendingRequest = new PendingRequest( deviceId, point.getId(), point.getIdentifier(), slaveId, point.getFunctionCode(), @@ -307,41 +95,11 @@ public class IotModbusTcpSlavePollScheduler { transactionId, System.currentTimeMillis() + requestTimeout); pendingRequestManager.addRequest(pendingRequest); - - // 4. 发送读请求 + // 3.3 发送读请求 connectionManager.sendToDevice(deviceId, data); log.debug("[pollPoint][设备={}, 点位={}, FC={}, 地址={}, 数量={}]", deviceId, point.getIdentifier(), point.getFunctionCode(), point.getRegisterAddress(), point.getRegisterCount()); } - // ========== 停止 ========== - - /** - * 停止设备的轮询 - */ - public void stopPolling(Long deviceId) { - Map timers = devicePointTimers.remove(deviceId); - if (CollUtil.isEmpty(timers)) { - return; - } - for (PointTimerInfo timerInfo : timers.values()) { - vertx.cancelTimer(timerInfo.getTimerId()); - } - // 清理请求队列 - deviceRequestQueues.remove(deviceId); - deviceLastRequestTime.remove(deviceId); - deviceDelayTimerActive.remove(deviceId); - log.debug("[stopPolling][设备 {} 停止了 {} 个轮询定时器]", deviceId, timers.size()); - } - - /** - * 停止所有轮询 - */ - public void stopAll() { - for (Long deviceId : new ArrayList<>(devicePointTimers.keySet())) { - stopPolling(deviceId); - } - } - } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/downstream/IotMqttDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/downstream/IotMqttDownstreamSubscriber.java index c8aa29906a..5f0b547f1a 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/downstream/IotMqttDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/downstream/IotMqttDownstreamSubscriber.java @@ -2,7 +2,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.handler.downstream; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.AbstractIotProtocolDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttProtocol; import lombok.extern.slf4j.Slf4j; @@ -12,7 +12,7 @@ import lombok.extern.slf4j.Slf4j; * @author 芋道源码 */ @Slf4j -public class IotMqttDownstreamSubscriber extends IotProtocolDownstreamSubscriber { +public class IotMqttDownstreamSubscriber extends AbstractIotProtocolDownstreamSubscriber { private final IotMqttDownstreamHandler downstreamHandler; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/handler/downstream/IotTcpDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/handler/downstream/IotTcpDownstreamSubscriber.java index 7a29e6c00c..39a73849fb 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/handler/downstream/IotTcpDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/handler/downstream/IotTcpDownstreamSubscriber.java @@ -3,7 +3,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.handler.downstream; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.AbstractIotProtocolDownstreamSubscriber; import lombok.extern.slf4j.Slf4j; /** @@ -12,7 +12,7 @@ import lombok.extern.slf4j.Slf4j; * @author 芋道源码 */ @Slf4j -public class IotTcpDownstreamSubscriber extends IotProtocolDownstreamSubscriber { +public class IotTcpDownstreamSubscriber extends AbstractIotProtocolDownstreamSubscriber { private final IotTcpDownstreamHandler downstreamHandler; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/handler/downstream/IotUdpDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/handler/downstream/IotUdpDownstreamSubscriber.java index ea0bc99b39..cc21df60e3 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/handler/downstream/IotUdpDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/handler/downstream/IotUdpDownstreamSubscriber.java @@ -3,7 +3,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.udp.handler.downstream; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.AbstractIotProtocolDownstreamSubscriber; import lombok.extern.slf4j.Slf4j; /** @@ -12,7 +12,7 @@ import lombok.extern.slf4j.Slf4j; * @author 芋道源码 */ @Slf4j -public class IotUdpDownstreamSubscriber extends IotProtocolDownstreamSubscriber { +public class IotUdpDownstreamSubscriber extends AbstractIotProtocolDownstreamSubscriber { private final IotUdpDownstreamHandler downstreamHandler; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/handler/downstream/IotWebSocketDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/handler/downstream/IotWebSocketDownstreamSubscriber.java index efe5f437e8..c565be2c95 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/handler/downstream/IotWebSocketDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/websocket/handler/downstream/IotWebSocketDownstreamSubscriber.java @@ -2,7 +2,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.downstrea import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.AbstractIotProtocolDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketProtocol; import lombok.extern.slf4j.Slf4j; @@ -12,7 +12,7 @@ import lombok.extern.slf4j.Slf4j; * @author 芋道源码 */ @Slf4j -public class IotWebSocketDownstreamSubscriber extends IotProtocolDownstreamSubscriber { +public class IotWebSocketDownstreamSubscriber extends AbstractIotProtocolDownstreamSubscriber { private final IotWebSocketDownstreamHandler downstreamHandler; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveModbusRtuIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveModbusRtuIntegrationTest.java index faea89ee9b..e103d6b9d3 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveModbusRtuIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveModbusRtuIntegrationTest.java @@ -8,7 +8,7 @@ import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrame; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameDecoder; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrameEncoder; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.IotModbusUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.net.NetClient; @@ -285,7 +285,7 @@ public class IotModbusTcpSlaveModbusRtuIntegrationTest { frame[3 + i * 2 + 1] = (byte) (registerValues[i] & 0xFF); } // 计算 CRC16 - int crc = IotModbusUtils.calculateCrc16(frame, totalLength - 2); + int crc = IotModbusCommonUtils.calculateCrc16(frame, totalLength - 2); frame[totalLength - 2] = (byte) (crc & 0xFF); // CRC Low frame[totalLength - 1] = (byte) ((crc >> 8) & 0xFF); // CRC High return frame;