diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/api/device/IoTDeviceApiImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/api/device/IoTDeviceApiImpl.java index 048b79de80..6bfe0e4e92 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/api/device/IoTDeviceApiImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/api/device/IoTDeviceApiImpl.java @@ -1,16 +1,14 @@ package cn.iocoder.yudao.module.iot.api.device; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.ObjUtil; +import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.framework.common.enums.RpcConstants; import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.framework.common.util.object.BeanUtils; import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnore; import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; import cn.iocoder.yudao.module.iot.core.biz.dto.*; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceGetReqDTO; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotSubDeviceRegisterFullReqDTO; import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO; import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO; import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterRespDTO; @@ -35,8 +33,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.List; - import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success; import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet; @@ -81,12 +77,13 @@ public class IoTDeviceApiImpl implements IotDeviceCommonApi { } @Override - @PostMapping(RpcConstants.RPC_API_PREFIX + "/iot/modbus/enabled-configs") + @PostMapping(RpcConstants.RPC_API_PREFIX + "/iot/modbus/config-list") @PermitAll @TenantIgnore - public CommonResult> getEnabledModbusDeviceConfigs() { - // 1. 获取所有启用的 Modbus 连接配置 - List configList = modbusConfigService.getEnabledDeviceModbusConfigList(); + public CommonResult> getModbusDeviceConfigList( + @RequestBody IotModbusDeviceConfigListReqDTO listReqDTO) { + // 1. 获取 Modbus 连接配置 + List configList = modbusConfigService.getDeviceModbusConfigList(listReqDTO); if (CollUtil.isEmpty(configList)) { return success(new ArrayList<>()); } @@ -95,6 +92,7 @@ public class IoTDeviceApiImpl implements IotDeviceCommonApi { Set deviceIds = convertSet(configList, IotDeviceModbusConfigDO::getDeviceId); Map deviceMap = deviceService.getDeviceMap(deviceIds); Map> pointMap = modbusPointService.getEnabledDeviceModbusPointMapByDeviceIds(deviceIds); + Map productMap = productService.getProductMap(convertSet(deviceMap.values(), IotDeviceDO::getProductId)); List result = new ArrayList<>(configList.size()); for (IotDeviceModbusConfigDO config : configList) { // 3.1 获取设备信息 @@ -102,12 +100,20 @@ public class IoTDeviceApiImpl implements IotDeviceCommonApi { if (device == null) { continue; } - // 3.2 获取启用的点位列表 + // 3.2 按 protocolType 筛选(如果非空) + if (StrUtil.isNotEmpty(listReqDTO.getProtocolType())) { + IotProductDO product = productMap.get(device.getProductId()); + if (product == null || ObjUtil.notEqual(listReqDTO.getProtocolType(), product.getProtocolType())) { + continue; + } + } + // 3.3 获取启用的点位列表 List pointList = pointMap.get(config.getDeviceId()); if (CollUtil.isEmpty(pointList)) { continue; } - // 3.3 构建 IotModbusDeviceConfigRespDTO 对象 + + // 3.4 构建 IotModbusDeviceConfigRespDTO 对象 IotModbusDeviceConfigRespDTO configDTO = BeanUtils.toBean(config, IotModbusDeviceConfigRespDTO.class, o -> o.setProductKey(device.getProductKey()).setDeviceName(device.getDeviceName()) .setPoints(BeanUtils.toBean(pointList, IotModbusPointRespDTO.class))); diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/modbus/IotDeviceModbusConfigRespVO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/modbus/IotDeviceModbusConfigRespVO.java index 60b132c3d1..ecce04de6e 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/modbus/IotDeviceModbusConfigRespVO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/modbus/IotDeviceModbusConfigRespVO.java @@ -33,13 +33,11 @@ public class IotDeviceModbusConfigRespVO { @Schema(description = "重试间隔(毫秒)", example = "1000") private Integer retryInterval; - // TODO @AI:不要【:1-云端轮询 2-主动上报】 - @Schema(description = "模式:1-云端轮询 2-主动上报", example = "1") + @Schema(description = "工作模式", example = "1") private Integer mode; - // TODO @AI:还是换成 int,然后写注释;不要【:modbus_tcp / modbus_rtu】 - @Schema(description = "数据帧格式:modbus_tcp / modbus_rtu", example = "modbus_tcp") - private String frameFormat; + @Schema(description = "数据帧格式", example = "1") + private Integer frameFormat; @Schema(description = "状态", requiredMode = Schema.RequiredMode.REQUIRED, example = "0") private Integer status; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/modbus/IotDeviceModbusConfigSaveReqVO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/modbus/IotDeviceModbusConfigSaveReqVO.java index ae47afcd89..9fa3fdc7c2 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/modbus/IotDeviceModbusConfigSaveReqVO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/modbus/IotDeviceModbusConfigSaveReqVO.java @@ -1,7 +1,9 @@ package cn.iocoder.yudao.module.iot.controller.admin.device.vo.modbus; +import cn.iocoder.yudao.framework.common.validation.InEnum; +import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum; +import cn.iocoder.yudao.module.iot.core.enums.IotModbusModeEnum; import io.swagger.v3.oas.annotations.media.Schema; -import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; import lombok.Data; @@ -13,12 +15,14 @@ public class IotDeviceModbusConfigSaveReqVO { @NotNull(message = "设备编号不能为空") private Long deviceId; - @Schema(description = "Modbus 服务器 IP 地址", requiredMode = Schema.RequiredMode.REQUIRED, example = "192.168.1.100") - @NotEmpty(message = "Modbus 服务器 IP 地址不能为空") + @Schema(description = "Modbus 服务器 IP 地址", example = "192.168.1.100") +// @NotEmpty(message = "Modbus 服务器 IP 地址不能为空") + // TODO @AI:这个字段,要根据情况校验; private String ip; - @Schema(description = "Modbus 端口", requiredMode = Schema.RequiredMode.REQUIRED, example = "502") - @NotNull(message = "Modbus 端口不能为空") + @Schema(description = "Modbus 端口", example = "502") +// @NotNull(message = "Modbus 端口不能为空") + // TODO @AI:这个字段,要根据情况校验; private Integer port; @Schema(description = "从站地址", requiredMode = Schema.RequiredMode.REQUIRED, example = "1") @@ -31,13 +35,13 @@ public class IotDeviceModbusConfigSaveReqVO { @Schema(description = "重试间隔(毫秒)", example = "1000") private Integer retryInterval; - // TODO @AI:不要【:1-云端轮询 2-主动上报】 - @Schema(description = "模式:1-云端轮询 2-主动上报", example = "1") + @Schema(description = "工作模式", example = "1") + @InEnum(IotModbusModeEnum.class) private Integer mode; - // TODO @AI:不要【:1-云端轮询 2-主动上报】 - @Schema(description = "数据帧格式:modbus_tcp / modbus_rtu", example = "modbus_tcp") - private String frameFormat; + @Schema(description = "数据帧格式", example = "1") + @InEnum(IotModbusFrameFormatEnum.class) + private Integer frameFormat; @Schema(description = "状态", requiredMode = Schema.RequiredMode.REQUIRED, example = "0") @NotNull(message = "状态不能为空") diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/device/IotDeviceModbusConfigDO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/device/IotDeviceModbusConfigDO.java index 06e94d2506..0204908d9e 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/device/IotDeviceModbusConfigDO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/device/IotDeviceModbusConfigDO.java @@ -69,7 +69,7 @@ public class IotDeviceModbusConfigDO extends TenantBaseDO { * * @see cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum */ - private String frameFormat; + private Integer frameFormat; /** * 状态 * diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/device/IotDeviceModbusConfigMapper.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/device/IotDeviceModbusConfigMapper.java index 397a3884f4..b18769c6a6 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/device/IotDeviceModbusConfigMapper.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/device/IotDeviceModbusConfigMapper.java @@ -1,6 +1,8 @@ package cn.iocoder.yudao.module.iot.dal.mysql.device; import cn.iocoder.yudao.framework.mybatis.core.mapper.BaseMapperX; +import cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigListReqDTO; import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceModbusConfigDO; import org.apache.ibatis.annotations.Mapper; @@ -18,8 +20,11 @@ public interface IotDeviceModbusConfigMapper extends BaseMapperX selectListByStatus(Integer status) { - return selectList(IotDeviceModbusConfigDO::getStatus, status); + default List selectList(IotModbusDeviceConfigListReqDTO reqDTO) { + return selectList(new LambdaQueryWrapperX() + .eqIfPresent(IotDeviceModbusConfigDO::getStatus, reqDTO.getStatus()) + .eqIfPresent(IotDeviceModbusConfigDO::getMode, reqDTO.getMode()) + .inIfPresent(IotDeviceModbusConfigDO::getDeviceId, reqDTO.getDeviceIds())); } } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceModbusConfigService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceModbusConfigService.java index 1bd17e4183..2d9ef7ec61 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceModbusConfigService.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceModbusConfigService.java @@ -1,6 +1,7 @@ package cn.iocoder.yudao.module.iot.service.device; import cn.iocoder.yudao.module.iot.controller.admin.device.vo.modbus.IotDeviceModbusConfigSaveReqVO; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigListReqDTO; import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceModbusConfigDO; import jakarta.validation.Valid; @@ -37,10 +38,11 @@ public interface IotDeviceModbusConfigService { IotDeviceModbusConfigDO getDeviceModbusConfigByDeviceId(Long deviceId); /** - * 获得所有启用的 Modbus 连接配置列表 + * 获得 Modbus 连接配置列表 * - * @return 启用的 Modbus 连接配置列表 + * @param listReqDTO 查询参数 + * @return Modbus 连接配置列表 */ - List getEnabledDeviceModbusConfigList(); + List getDeviceModbusConfigList(IotModbusDeviceConfigListReqDTO listReqDTO); } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceModbusConfigServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceModbusConfigServiceImpl.java index 91388d2cf6..b92e9948e3 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceModbusConfigServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceModbusConfigServiceImpl.java @@ -1,8 +1,8 @@ package cn.iocoder.yudao.module.iot.service.device; -import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum; import cn.iocoder.yudao.framework.common.util.object.BeanUtils; import cn.iocoder.yudao.module.iot.controller.admin.device.vo.modbus.IotDeviceModbusConfigSaveReqVO; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigListReqDTO; import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceModbusConfigDO; import cn.iocoder.yudao.module.iot.dal.mysql.device.IotDeviceModbusConfigMapper; import jakarta.annotation.Resource; @@ -54,8 +54,8 @@ public class IotDeviceModbusConfigServiceImpl implements IotDeviceModbusConfigSe } @Override - public List getEnabledDeviceModbusConfigList() { - return modbusConfigMapper.selectListByStatus(CommonStatusEnum.ENABLE.getStatus()); + public List getDeviceModbusConfigList(IotModbusDeviceConfigListReqDTO listReqDTO) { + return modbusConfigMapper.selectList(listReqDTO); } } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/product/IotProductService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/product/IotProductService.java index d4292ef521..3d68fb59e1 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/product/IotProductService.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/product/IotProductService.java @@ -10,6 +10,9 @@ import javax.annotation.Nullable; import java.time.LocalDateTime; import java.util.Collection; import java.util.List; +import java.util.Map; + +import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertMap; /** * IoT 产品 Service 接口 @@ -121,6 +124,24 @@ public interface IotProductService { */ Long getProductCount(@Nullable LocalDateTime createTime); + /** + * 批量获得产品列表 + * + * @param ids 产品编号集合 + * @return 产品列表 + */ + List getProductList(Collection ids); + + /** + * 批量获得产品 Map + * + * @param ids 产品编号集合 + * @return 产品 Map(key: 产品编号, value: 产品) + */ + default Map getProductMap(Collection ids) { + return convertMap(getProductList(ids), IotProductDO::getId); + } + /** * 批量校验产品存在 * diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/product/IotProductServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/product/IotProductServiceImpl.java index e001f46a2b..9686f42c93 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/product/IotProductServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/product/IotProductServiceImpl.java @@ -171,6 +171,11 @@ public class IotProductServiceImpl implements IotProductService { return productMapper.selectCountByCreateTime(createTime); } + @Override + public List getProductList(Collection ids) { + return productMapper.selectByIds(ids); + } + @Override public void validateProductsExist(Collection ids) { if (CollUtil.isEmpty(ids)) { diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/biz/IotDeviceCommonApi.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/biz/IotDeviceCommonApi.java index 54a0e67a41..c0b3f9df31 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/biz/IotDeviceCommonApi.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/biz/IotDeviceCommonApi.java @@ -1,13 +1,7 @@ package cn.iocoder.yudao.module.iot.core.biz; import cn.iocoder.yudao.framework.common.pojo.CommonResult; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceGetReqDTO; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO; - -import java.util.List; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotSubDeviceRegisterFullReqDTO; +import cn.iocoder.yudao.module.iot.core.biz.dto.*; import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO; import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO; import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterRespDTO; @@ -54,10 +48,11 @@ public interface IotDeviceCommonApi { CommonResult> registerSubDevices(IotSubDeviceRegisterFullReqDTO reqDTO); /** - * 获取所有启用的 Modbus 设备配置列表 + * 获取 Modbus 设备配置列表 * + * @param listReqDTO 查询参数 * @return Modbus 设备配置列表 */ - CommonResult> getEnabledModbusDeviceConfigs(); + CommonResult> getModbusDeviceConfigList(IotModbusDeviceConfigListReqDTO listReqDTO); } diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/biz/dto/IotModbusDeviceConfigListReqDTO.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/biz/dto/IotModbusDeviceConfigListReqDTO.java new file mode 100644 index 0000000000..7865a09f00 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/biz/dto/IotModbusDeviceConfigListReqDTO.java @@ -0,0 +1,37 @@ +package cn.iocoder.yudao.module.iot.core.biz.dto; + +import lombok.Data; +import lombok.experimental.Accessors; + +import java.util.Set; + +/** + * IoT Modbus 设备配置列表查询 Request DTO + * + * @author 芋道源码 + */ +@Data +@Accessors(chain = true) +public class IotModbusDeviceConfigListReqDTO { + + /** + * 状态 + */ + private Integer status; + + /** + * 模式 + */ + private Integer mode; + + /** + * 协议类型 + */ + private String protocolType; + + /** + * 设备 ID 集合 + */ + private Set deviceIds; + +} diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/biz/dto/IotModbusDeviceConfigRespDTO.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/biz/dto/IotModbusDeviceConfigRespDTO.java index 4580a8e596..683bcef4c4 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/biz/dto/IotModbusDeviceConfigRespDTO.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/biz/dto/IotModbusDeviceConfigRespDTO.java @@ -54,7 +54,7 @@ public class IotModbusDeviceConfigRespDTO { /** * 数据帧格式 */ - private String frameFormat; + private Integer frameFormat; // ========== Modbus 点位配置 ========== diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/enums/IotModbusFrameFormatEnum.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/enums/IotModbusFrameFormatEnum.java index b2817047a1..4e963850eb 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/enums/IotModbusFrameFormatEnum.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/enums/IotModbusFrameFormatEnum.java @@ -13,26 +13,22 @@ import java.util.Arrays; */ @Getter @RequiredArgsConstructor -public enum IotModbusFrameFormatEnum implements ArrayValuable { +public enum IotModbusFrameFormatEnum implements ArrayValuable { - MODBUS_TCP("modbus_tcp", "Modbus TCP"), - MODBUS_RTU("modbus_rtu", "Modbus RTU"); + MODBUS_TCP(1), + MODBUS_RTU(2); - public static final String[] ARRAYS = Arrays.stream(values()) + public static final Integer[] ARRAYS = Arrays.stream(values()) .map(IotModbusFrameFormatEnum::getFormat) - .toArray(String[]::new); + .toArray(Integer[]::new); /** * 格式 */ - private final String format; - /** - * 名称 - */ - private final String name; + private final Integer format; @Override - public String[] array() { + public Integer[] array() { return ARRAYS; } diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceAuthUtils.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceAuthUtils.java index 609d0a60ae..1aa9cfcabf 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceAuthUtils.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/util/IotDeviceAuthUtils.java @@ -25,6 +25,14 @@ public class IotDeviceAuthUtils { return String.format("%s.%s", productKey, deviceName); } + public static String buildClientIdFromUsername(String username) { + IotDeviceIdentity identity = parseUsername(username); + if (identity == null) { + return null; + } + return buildClientId(identity.getProductKey(), identity.getDeviceName()); + } + public static String buildUsername(String productKey, String deviceName) { return String.format("%s&%s", deviceName, productKey); } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/manager/AbstractIotModbusPollScheduler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/manager/AbstractIotModbusPollScheduler.java index dedac7acd3..e62f85fcf6 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/manager/AbstractIotModbusPollScheduler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/manager/AbstractIotModbusPollScheduler.java @@ -32,6 +32,10 @@ public abstract class AbstractIotModbusPollScheduler { * 同设备最小请求间隔(毫秒),防止 Modbus 设备性能不足时请求堆积 */ private static final long MIN_REQUEST_INTERVAL = 1000; + /** + * 每个设备请求队列的最大长度,超出时丢弃最旧请求 + */ + private static final int MAX_QUEUE_SIZE = 1000; /** * 设备点位的定时器映射:deviceId -> (pointId -> PointTimerInfo) @@ -159,6 +163,11 @@ public abstract class AbstractIotModbusPollScheduler { private void submitPollRequest(Long deviceId, Long pointId) { // 1. 【重要】将请求添加到设备的请求队列 Queue queue = deviceRequestQueues.computeIfAbsent(deviceId, k -> new ConcurrentLinkedQueue<>()); + while (queue.size() >= MAX_QUEUE_SIZE) { + // 超出上限时,丢弃最旧的请求 + queue.poll(); + log.warn("[submitPollRequest][设备 {} 请求队列已满({}), 丢弃最旧请求]", deviceId, MAX_QUEUE_SIZE); + } queue.offer(() -> pollPoint(deviceId, pointId)); // 2. 处理设备请求队列(如果没有延迟 timer 在等待) diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/utils/IotModbusCommonUtils.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/utils/IotModbusCommonUtils.java index 236277bc0a..23ee4bf124 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/utils/IotModbusCommonUtils.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/utils/IotModbusCommonUtils.java @@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.framework.common.util.object.ObjectUtils; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO; @@ -331,10 +332,10 @@ public class IotModbusCommonUtils { // 其他字节序调整 byte[] result = new byte[bytes.length]; switch (byteOrderEnum) { - case BA: // 小端序(16 位) - if (bytes.length >= 2) { - result[0] = bytes[1]; - result[1] = bytes[0]; + case BA: // 小端序:按每 2 字节一组交换(16 位场景 [1,0],32 位场景 [1,0,3,2]) + for (int i = 0; i + 1 < bytes.length; i += 2) { + result[i] = bytes[i + 1]; + result[i + 1] = bytes[i]; } break; case CDAB: // 大端字交换(32 位) @@ -509,6 +510,9 @@ public class IotModbusCommonUtils { * @return 匹配的点位配置,未找到返回 null */ public static IotModbusPointRespDTO findPoint(IotModbusDeviceConfigRespDTO config, String identifier) { + if (config == null || StrUtil.isBlank(identifier)) { + return null; + } return CollUtil.findOne(config.getPoints(), p -> identifier.equals(p.getIdentifier())); } @@ -520,6 +524,9 @@ public class IotModbusCommonUtils { * @return 匹配的点位配置,未找到返回 null */ public static IotModbusPointRespDTO findPointById(IotModbusDeviceConfigRespDTO config, Long pointId) { + if (config == null || pointId == null) { + return null; + } return CollUtil.findOne(config.getPoints(), p -> p.getId().equals(pointId)); } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/utils/IotModbusTcpMasterUtils.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/utils/IotModbusTcpMasterUtils.java index 5804ea6022..1f7c14dc41 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/utils/IotModbusTcpMasterUtils.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/common/utils/IotModbusTcpMasterUtils.java @@ -1,7 +1,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpMasterConnectionManager; import com.ghgande.j2mod.modbus.io.ModbusTCPTransaction; import com.ghgande.j2mod.modbus.msg.*; import com.ghgande.j2mod.modbus.procimg.InputRegister; @@ -19,7 +19,7 @@ import static cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.I *

* 封装基于 j2mod 的 Modbus TCP 读写操作: * 1. 根据功能码创建对应的 Modbus 读/写请求 - * 2. 通过 {@link IotModbusTcpConnectionManager.ModbusConnection} 执行事务 + * 2. 通过 {@link IotModbusTcpMasterConnectionManager.ModbusConnection} 执行事务 * 3. 从响应中提取原始值 * * @author 芋道源码 @@ -36,9 +36,9 @@ public class IotModbusTcpMasterUtils { * @param point 点位配置 * @return 原始值(int 数组) */ - public static Future read(IotModbusTcpConnectionManager.ModbusConnection connection, - Integer slaveId, - IotModbusPointRespDTO point) { + public static Future read(IotModbusTcpMasterConnectionManager.ModbusConnection connection, + Integer slaveId, + IotModbusPointRespDTO point) { return connection.executeBlocking(tcpConnection -> { try { // 1. 创建请求 @@ -70,10 +70,10 @@ public class IotModbusTcpMasterUtils { * @param values 要写入的值 * @return 是否成功 */ - public static Future write(IotModbusTcpConnectionManager.ModbusConnection connection, - Integer slaveId, - IotModbusPointRespDTO point, - int[] values) { + public static Future write(IotModbusTcpMasterConnectionManager.ModbusConnection connection, + Integer slaveId, + IotModbusPointRespDTO point, + int[] values) { return connection.executeBlocking(tcpConnection -> { try { // 1. 创建请求 diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/IotModbusTcpMasterProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/IotModbusTcpMasterProtocol.java index 7b435d74fa..1fac973a92 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/IotModbusTcpMasterProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/IotModbusTcpMasterProtocol.java @@ -6,16 +6,15 @@ import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO; import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; -import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolProperties; import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.downstream.IotModbusTcpDownstreamHandler; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.downstream.IotModbusTcpDownstreamSubscriber; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.upstream.IotModbusTcpUpstreamHandler; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpConfigCacheService; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpConnectionManager; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpPollScheduler; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.downstream.IotModbusTcpMasterDownstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.downstream.IotModbusTcpMasterDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.upstream.IotModbusTcpMasterUpstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpMasterConfigCacheService; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpMasterConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpMasterPollScheduler; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.core.Vertx; import lombok.Getter; @@ -23,6 +22,7 @@ import lombok.extern.slf4j.Slf4j; import org.redisson.api.RedissonClient; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; /** @@ -61,15 +61,14 @@ public class IotModbusTcpMasterProtocol implements IotProtocol { /** * 连接管理器 */ - private final IotModbusTcpConnectionManager connectionManager; + private final IotModbusTcpMasterConnectionManager connectionManager; /** * 下行消息订阅者 */ - private final IotModbusTcpDownstreamSubscriber downstreamSubscriber; + private final IotModbusTcpMasterDownstreamSubscriber downstreamSubscriber; - private final IotModbusTcpConfigCacheService configCacheService; - private final IotModbusTcpPollScheduler pollScheduler; - private final IotDeviceMessageService messageService; + private final IotModbusTcpMasterConfigCacheService configCacheService; + private final IotModbusTcpMasterPollScheduler pollScheduler; public IotModbusTcpMasterProtocol(ProtocolProperties properties) { IotModbusTcpMasterConfig modbusTcpMasterConfig = properties.getModbusTcpMaster(); @@ -83,22 +82,23 @@ public class IotModbusTcpMasterProtocol implements IotProtocol { // 初始化 Manager RedissonClient redissonClient = SpringUtil.getBean(RedissonClient.class); IotDeviceCommonApi deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); - this.connectionManager = new IotModbusTcpConnectionManager(redissonClient, vertx); - this.configCacheService = new IotModbusTcpConfigCacheService(deviceApi); + IotDeviceMessageService messageService = SpringUtil.getBean(IotDeviceMessageService.class); + this.configCacheService = new IotModbusTcpMasterConfigCacheService(deviceApi); + // DONE @AI:上线/下线消息已移到 ConnectionManager 内部处理,不再走回调 + this.connectionManager = new IotModbusTcpMasterConnectionManager(redissonClient, vertx, + messageService, configCacheService, serverId); // 初始化 Handler - this.messageService = SpringUtil.getBean(IotDeviceMessageService.class); - IotDeviceMessageService messageService = this.messageService; - IotModbusTcpUpstreamHandler upstreamHandler = new IotModbusTcpUpstreamHandler(messageService, serverId); - IotModbusTcpDownstreamHandler downstreamHandler = new IotModbusTcpDownstreamHandler(connectionManager, + IotModbusTcpMasterUpstreamHandler upstreamHandler = new IotModbusTcpMasterUpstreamHandler(messageService, serverId); + IotModbusTcpMasterDownstreamHandler downstreamHandler = new IotModbusTcpMasterDownstreamHandler(connectionManager, configCacheService); // 初始化轮询调度器 - this.pollScheduler = new IotModbusTcpPollScheduler(vertx, connectionManager, upstreamHandler, configCacheService); + this.pollScheduler = new IotModbusTcpMasterPollScheduler(vertx, connectionManager, upstreamHandler, configCacheService); // 初始化下行消息订阅者 IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class); - this.downstreamSubscriber = new IotModbusTcpDownstreamSubscriber(this, downstreamHandler, messageBus); + this.downstreamSubscriber = new IotModbusTcpMasterDownstreamSubscriber(this, downstreamHandler, messageBus); } @Override @@ -196,46 +196,21 @@ public class IotModbusTcpMasterProtocol implements IotProtocol { // 2. 更新连接和轮询任务 for (IotModbusDeviceConfigRespDTO config : configs) { try { - // 2.1 检测是否为首次连接 - boolean isNewConnection = connectionManager.getConnection(config.getDeviceId()) == null; - // 2.2 确保连接存在 + // 2.1 确保连接存在 connectionManager.ensureConnection(config); - // 2.3 首次建连成功后发送上线消息 - // TODO @AI:在这里判断上线 ,会不会有点奇怪??? - if (isNewConnection && connectionManager.getConnection(config.getDeviceId()) != null) { - try { - IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline(); - messageService.sendDeviceMessage(onlineMessage, - config.getProductKey(), config.getDeviceName(), serverId); - } catch (Exception ex) { - log.error("[refreshConfig][发送设备上线消息失败, deviceId={}]", config.getDeviceId(), ex); - } - } - // 2.4 更新轮询任务 + // 2.2 更新轮询任务 pollScheduler.updatePolling(config); } catch (Exception e) { log.error("[refreshConfig][处理设备配置失败, deviceId={}]", config.getDeviceId(), e); } } - // 3. 清理已删除设备的资源(仅 API 成功时才执行) - configCacheService.cleanupRemovedDevices(configs, deviceId -> { - // 3.1 发送设备下线消息 - // TODO @AI:在这里判断上线 ,会不会有点奇怪??? - IotModbusDeviceConfigRespDTO removedConfig = configCacheService.getConfig(deviceId); - if (removedConfig != null) { - try { - IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline(); - messageService.sendDeviceMessage(offlineMessage, - removedConfig.getProductKey(), removedConfig.getDeviceName(), serverId); - } catch (Exception ex) { - log.error("[refreshConfig][发送设备下线消息失败, deviceId={}]", deviceId, ex); - } - } - // 3.2 停止轮询和移除连接 + // 3. 清理已删除设备的资源 + Set removedDeviceIds = configCacheService.cleanupRemovedDevices(configs); + for (Long deviceId : removedDeviceIds) { pollScheduler.stopPolling(deviceId); connectionManager.removeDevice(deviceId); - }); + } } catch (Exception e) { log.error("[refreshConfig][刷新配置失败]", e); } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/downstream/IotModbusTcpDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/downstream/IotModbusTcpMasterDownstreamHandler.java similarity index 83% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/downstream/IotModbusTcpDownstreamHandler.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/downstream/IotModbusTcpMasterDownstreamHandler.java index 32dfde5a64..af51e22933 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/downstream/IotModbusTcpDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/downstream/IotModbusTcpMasterDownstreamHandler.java @@ -1,41 +1,42 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.downstream; +import cn.hutool.core.util.ObjUtil; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusTcpMasterUtils; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpConfigCacheService; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpConnectionManager; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpMasterConfigCacheService; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager.IotModbusTcpMasterConnectionManager; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import java.util.Map; /** - * IoT Modbus TCP 下行消息处理器 + * IoT Modbus TCP Master 下行消息处理器 *

* 负责: * 1. 处理下行消息(如属性设置 thing.service.property.set) - * 2. 执行 Modbus 写入操作 + * 2. 将属性值转换为 Modbus 写指令,通过 TCP 连接发送给设备 * * @author 芋道源码 */ @RequiredArgsConstructor @Slf4j -public class IotModbusTcpDownstreamHandler { +public class IotModbusTcpMasterDownstreamHandler { - private final IotModbusTcpConnectionManager connectionManager; - private final IotModbusTcpConfigCacheService configCacheService; + private final IotModbusTcpMasterConnectionManager connectionManager; + private final IotModbusTcpMasterConfigCacheService configCacheService; /** * 处理下行消息 */ - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "DuplicatedCode"}) public void handle(IotDeviceMessage message) { // 1.1 检查是否是属性设置消息 - if (!IotDeviceMessageMethodEnum.PROPERTY_SET.getMethod().equals(message.getMethod())) { + if (ObjUtil.notEqual(IotDeviceMessageMethodEnum.PROPERTY_SET.getMethod(), message.getMethod())) { log.debug("[handle][忽略非属性设置消息: {}]", message.getMethod()); return; } @@ -78,7 +79,7 @@ public class IotModbusTcpDownstreamHandler { */ private void writeProperty(IotModbusDeviceConfigRespDTO config, IotModbusPointRespDTO point, Object value) { // 1.1 获取连接 - IotModbusTcpConnectionManager.ModbusConnection connection = connectionManager.getConnection(config.getDeviceId()); + IotModbusTcpMasterConnectionManager.ModbusConnection connection = connectionManager.getConnection(config.getDeviceId()); if (connection == null) { log.warn("[writeProperty][设备 {} 没有连接]", config.getDeviceId()); return; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/downstream/IotModbusTcpDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/downstream/IotModbusTcpMasterDownstreamSubscriber.java similarity index 64% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/downstream/IotModbusTcpDownstreamSubscriber.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/downstream/IotModbusTcpMasterDownstreamSubscriber.java index 74afcbb38f..812e13da01 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/downstream/IotModbusTcpDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/downstream/IotModbusTcpMasterDownstreamSubscriber.java @@ -12,13 +12,13 @@ import lombok.extern.slf4j.Slf4j; * @author 芋道源码 */ @Slf4j -public class IotModbusTcpDownstreamSubscriber extends AbstractIotProtocolDownstreamSubscriber { +public class IotModbusTcpMasterDownstreamSubscriber extends AbstractIotProtocolDownstreamSubscriber { - private final IotModbusTcpDownstreamHandler downstreamHandler; + private final IotModbusTcpMasterDownstreamHandler downstreamHandler; - public IotModbusTcpDownstreamSubscriber(IotModbusTcpMasterProtocol protocol, - IotModbusTcpDownstreamHandler downstreamHandler, - IotMessageBus messageBus) { + public IotModbusTcpMasterDownstreamSubscriber(IotModbusTcpMasterProtocol protocol, + IotModbusTcpMasterDownstreamHandler downstreamHandler, + IotMessageBus messageBus) { super(protocol, messageBus); this.downstreamHandler = downstreamHandler; } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/upstream/IotModbusTcpUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/upstream/IotModbusTcpMasterUpstreamHandler.java similarity index 89% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/upstream/IotModbusTcpUpstreamHandler.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/upstream/IotModbusTcpMasterUpstreamHandler.java index de02af06c2..7fc573d950 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/upstream/IotModbusTcpUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/handler/upstream/IotModbusTcpMasterUpstreamHandler.java @@ -17,13 +17,14 @@ import java.util.Map; * @author 芋道源码 */ @Slf4j -public class IotModbusTcpUpstreamHandler { +public class IotModbusTcpMasterUpstreamHandler { private final IotDeviceMessageService messageService; + private final String serverId; - public IotModbusTcpUpstreamHandler(IotDeviceMessageService messageService, - String serverId) { + public IotModbusTcpMasterUpstreamHandler(IotDeviceMessageService messageService, + String serverId) { this.messageService = messageService; this.serverId = serverId; } @@ -39,7 +40,7 @@ public class IotModbusTcpUpstreamHandler { IotModbusPointRespDTO point, int[] rawValue) { try { - // 1.1 转换原始值为物模型属性值 + // 1.1 转换原始值为物模型属性值(点位翻译) Object convertedValue = IotModbusCommonUtils.convertToPropertyValue(rawValue, point); log.debug("[handleReadResult][设备={}, 属性={}, 原始值={}, 转换值={}]", config.getDeviceId(), point.getIdentifier(), rawValue, convertedValue); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpConfigCacheService.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpMasterConfigCacheService.java similarity index 69% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpConfigCacheService.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpMasterConfigCacheService.java index 5a3386e1d2..7ce6dd02fc 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpConfigCacheService.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpMasterConfigCacheService.java @@ -1,25 +1,31 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager; +import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum; import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigListReqDTO; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO; +import cn.iocoder.yudao.module.iot.core.enums.IotModbusModeEnum; +import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import java.util.*; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Consumer; import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet; /** - * IoT Modbus TCP 配置缓存服务,负责:从 biz 拉取 Modbus 设备配置,缓存配置数据,并检测配置变更 + * IoT Modbus TCP Master 配置缓存服务 * * @author 芋道源码 */ @RequiredArgsConstructor @Slf4j -public class IotModbusTcpConfigCacheService { +public class IotModbusTcpMasterConfigCacheService { private final IotDeviceCommonApi deviceApi; @@ -31,7 +37,7 @@ public class IotModbusTcpConfigCacheService { /** * 已知的设备 ID 集合(作用:用于检测已删除的设备) * - * @see #cleanupRemovedDevices(List, Consumer) + * @see #cleanupRemovedDevices(List) */ private final Set knownDeviceIds = ConcurrentHashMap.newKeySet(); @@ -43,7 +49,9 @@ public class IotModbusTcpConfigCacheService { public List refreshConfig() { try { // 1. 从远程获取配置 - CommonResult> result = deviceApi.getEnabledModbusDeviceConfigs(); + CommonResult> result = deviceApi.getModbusDeviceConfigList( + new IotModbusDeviceConfigListReqDTO().setStatus(CommonStatusEnum.ENABLE.getStatus()) + .setMode(IotModbusModeEnum.POLLING.getMode()).setProtocolType(IotProtocolTypeEnum.MODBUS_TCP_MASTER.getType())); result.checkError(); List configs = result.getData(); @@ -69,28 +77,30 @@ public class IotModbusTcpConfigCacheService { } /** - * 清理已删除设备的资源,并更新已知设备 ID 集合 + * 计算已删除设备的 ID 集合,清理缓存,并更新已知设备 ID 集合 + * + * DONE @AI:不再使用 callback 模式,返回已删除的设备 ID 集合,由调用方直接清理 * * @param currentConfigs 当前有效的配置列表 - * @param cleanupAction 清理动作 + * @return 已删除的设备 ID 集合 */ - public void cleanupRemovedDevices(List currentConfigs, Consumer cleanupAction) { + public Set cleanupRemovedDevices(List currentConfigs) { // 1.1 获取当前有效的设备 ID Set currentDeviceIds = convertSet(currentConfigs, IotModbusDeviceConfigRespDTO::getDeviceId); // 1.2 找出已删除的设备(基于旧的 knownDeviceIds) Set removedDeviceIds = new HashSet<>(knownDeviceIds); removedDeviceIds.removeAll(currentDeviceIds); - // 2. 清理已删除设备(先执行 cleanupAction,再从缓存移除,保证 action 中仍可获取 config) + // 2. 清理已删除设备的缓存 for (Long deviceId : removedDeviceIds) { log.info("[cleanupRemovedDevices][清理已删除设备: {}]", deviceId); - cleanupAction.accept(deviceId); configCache.remove(deviceId); } // 3. 更新已知设备 ID 集合为当前有效的设备 ID knownDeviceIds.clear(); knownDeviceIds.addAll(currentDeviceIds); + return removedDeviceIds; } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpMasterConnectionManager.java similarity index 62% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpConnectionManager.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpMasterConnectionManager.java index 3b4e104402..34b68f65b6 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpConnectionManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpMasterConnectionManager.java @@ -2,12 +2,13 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.manager; import cn.hutool.core.util.ObjUtil; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import com.ghgande.j2mod.modbus.net.TCPMasterConnection; import io.vertx.core.Context; import io.vertx.core.Future; import io.vertx.core.Vertx; import lombok.Data; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; @@ -26,14 +27,16 @@ import java.util.concurrent.ConcurrentHashMap; * * @author 芋道源码 */ -@RequiredArgsConstructor @Slf4j -public class IotModbusTcpConnectionManager { +public class IotModbusTcpMasterConnectionManager { private static final String LOCK_KEY_PREFIX = "iot:modbus-tcp:connection:"; private final RedissonClient redissonClient; private final Vertx vertx; + private final IotDeviceMessageService messageService; + private final IotModbusTcpMasterConfigCacheService configCacheService; + private final String serverId; /** * 连接池:key = ip:port @@ -45,8 +48,21 @@ public class IotModbusTcpConnectionManager { */ private final Map deviceConnectionMap = new ConcurrentHashMap<>(); + public IotModbusTcpMasterConnectionManager(RedissonClient redissonClient, Vertx vertx, + IotDeviceMessageService messageService, + IotModbusTcpMasterConfigCacheService configCacheService, + String serverId) { + this.redissonClient = redissonClient; + this.vertx = vertx; + this.messageService = messageService; + this.configCacheService = configCacheService; + this.serverId = serverId; + } + /** * 确保连接存在 + *

+ * 首次建连成功时,直接发送设备上线消息 * * @param config 设备配置 */ @@ -59,37 +75,39 @@ public class IotModbusTcpConnectionManager { config.getDeviceId(), oldConnectionKey, connectionKey); removeDevice(config.getDeviceId()); } - // 1.2 记录设备与连接的关系 + // 1.2 记录设备与连接的映射 deviceConnectionMap.put(config.getDeviceId(), connectionKey); - // 2. 情况一:连接已存在,添加设备引用 + // 2. 情况一:连接已存在,注册设备并发送上线消息 ModbusConnection connection = connectionPool.get(connectionKey); if (connection != null) { - connection.addDevice(config.getDeviceId(), config.getSlaveId()); + addDeviceAndOnline(connection, config); return; } - // 3. 情况二:连接不存在,创建新连接 - // 3.1 尝试获取分布式锁 + // 3. 情况二:连接不存在,加分布式锁创建新连接 RLock lock = redissonClient.getLock(LOCK_KEY_PREFIX + connectionKey); if (!lock.tryLock()) { log.debug("[ensureConnection][获取锁失败, 由其他节点负责: {}]", connectionKey); return; } - // 3.2 double-check:拿到锁后再次检查,避免并发创建重复连接 try { + // 3.1 double-check:拿到锁后再次检查,避免并发创建重复连接 connection = connectionPool.get(connectionKey); if (connection != null) { - connection.addDevice(config.getDeviceId(), config.getSlaveId()); - lock.unlock(); + addDeviceAndOnline(connection, config); return; } - // 3.3 创建新连接 - connection = createConnection(config, lock); + // 3.2 创建新连接 + connection = createConnection(config); connectionPool.put(connectionKey, connection); log.info("[ensureConnection][创建 Modbus 连接成功: {}]", connectionKey); + // 3.3 注册设备并发送上线消息 + addDeviceAndOnline(connection, config); } catch (Exception e) { log.error("[ensureConnection][创建 Modbus 连接失败: {}]", connectionKey, e); + } finally { + // TODO @AI:如果这里释放,会不会出现,集群模式下,多个节点同时创建连接的情况?需要验证一下 Redisson 的分布式锁特性?! lock.unlock(); } } @@ -97,7 +115,7 @@ public class IotModbusTcpConnectionManager { /** * 创建 Modbus TCP 连接 */ - private ModbusConnection createConnection(IotModbusDeviceConfigRespDTO config, RLock lock) throws Exception { + private ModbusConnection createConnection(IotModbusDeviceConfigRespDTO config) throws Exception { // 1. 创建 TCP 连接 TCPMasterConnection tcpConnection = new TCPMasterConnection(InetAddress.getByName(config.getIp())); tcpConnection.setPort(config.getPort()); @@ -105,12 +123,10 @@ public class IotModbusTcpConnectionManager { tcpConnection.connect(); // 2. 创建 Modbus 连接对象 - ModbusConnection connection = new ModbusConnection() + return new ModbusConnection() .setConnectionKey(buildConnectionKey(config.getIp(), config.getPort())) - .setTcpConnection(tcpConnection).setLock(lock).setContext(vertx.getOrCreateContext()) + .setTcpConnection(tcpConnection).setContext(vertx.getOrCreateContext()) .setTimeout(config.getTimeout()).setRetryInterval(config.getRetryInterval()); - connection.addDevice(config.getDeviceId(), config.getSlaveId()); - return connection; } /** @@ -137,25 +153,71 @@ public class IotModbusTcpConnectionManager { /** * 移除设备 + *

+ * 移除时直接发送设备下线消息 */ public void removeDevice(Long deviceId) { - // 1. 移除设备引用 + // 1.1 移除设备时,发送下线消息 + sendOfflineMessage(deviceId); + // 1.2 移除设备引用 String connectionKey = deviceConnectionMap.remove(deviceId); if (connectionKey == null) { return; } + + // 2.1 移除连接中的设备引用 ModbusConnection connection = connectionPool.get(connectionKey); if (connection == null) { return; } connection.removeDevice(deviceId); - - // 2. 如果没有设备引用了,关闭连接 + // 2.2 如果没有设备引用了,关闭连接 if (connection.getDeviceCount() == 0) { closeConnection(connectionKey); } } + // ==================== 设备连接 & 上下线消息 ==================== + + /** + * 注册设备到连接,并发送上线消息 + */ + private void addDeviceAndOnline(ModbusConnection connection, + IotModbusDeviceConfigRespDTO config) { + connection.addDevice(config.getDeviceId(), config.getSlaveId()); + sendOnlineMessage(config); + } + + /** + * 发送设备上线消息 + */ + private void sendOnlineMessage(IotModbusDeviceConfigRespDTO config) { + try { + IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline(); + messageService.sendDeviceMessage(onlineMessage, + config.getProductKey(), config.getDeviceName(), serverId); + } catch (Exception ex) { + log.error("[sendOnlineMessage][发送设备上线消息失败, deviceId={}]", config.getDeviceId(), ex); + } + } + + /** + * 发送设备下线消息 + */ + private void sendOfflineMessage(Long deviceId) { + IotModbusDeviceConfigRespDTO config = configCacheService.getConfig(deviceId); + if (config == null) { + return; + } + try { + IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline(); + messageService.sendDeviceMessage(offlineMessage, + config.getProductKey(), config.getDeviceName(), serverId); + } catch (Exception ex) { + log.error("[sendOfflineMessage][发送设备下线消息失败, deviceId={}]", deviceId, ex); + } + } + /** * 关闭指定连接 */ @@ -170,10 +232,10 @@ public class IotModbusTcpConnectionManager { connection.getTcpConnection().close(); } // 强制解锁,避免死锁(正常情况下应该不会发生锁未释放的情况) - RLock lock = connection.getLock(); - if (lock != null && lock.isLocked()) { - lock.forceUnlock(); - } +// RLock lock = connection.getLock(); +// if (lock != null && lock.isLocked()) { +// lock.forceUnlock(); +// } log.info("[closeConnection][关闭 Modbus 连接: {}]", connectionKey); } catch (Exception e) { log.error("[closeConnection][关闭连接失败: {}]", connectionKey, e); @@ -202,11 +264,14 @@ public class IotModbusTcpConnectionManager { private String connectionKey; private TCPMasterConnection tcpConnection; - private RLock lock; private Integer timeout; private Integer retryInterval; + private Context context; + // TODO @AI:是不是需要 lock?!避免集群模式下的竞争(肯定不能让别的节点连接上)!!!【另外,RLock 在节点(持有所锁的节点) cransh 的时候,会自动释放】 +// private RLock lock; + /** * 设备 ID 到 slave ID 的映射 */ diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpPollScheduler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpMasterPollScheduler.java similarity index 75% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpPollScheduler.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpMasterPollScheduler.java index 2f049561c9..df87b91fa1 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpPollScheduler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/manager/IotModbusTcpMasterPollScheduler.java @@ -7,7 +7,7 @@ import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.manager.AbstractIotModbusPollScheduler; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusTcpMasterUtils; -import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.upstream.IotModbusTcpUpstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster.handler.upstream.IotModbusTcpMasterUpstreamHandler; import io.vertx.core.Vertx; import lombok.extern.slf4j.Slf4j; @@ -17,16 +17,16 @@ import lombok.extern.slf4j.Slf4j; * @author 芋道源码 */ @Slf4j -public class IotModbusTcpPollScheduler extends AbstractIotModbusPollScheduler { +public class IotModbusTcpMasterPollScheduler extends AbstractIotModbusPollScheduler { - private final IotModbusTcpConnectionManager connectionManager; - private final IotModbusTcpUpstreamHandler upstreamHandler; - private final IotModbusTcpConfigCacheService configCacheService; + private final IotModbusTcpMasterConnectionManager connectionManager; + private final IotModbusTcpMasterUpstreamHandler upstreamHandler; + private final IotModbusTcpMasterConfigCacheService configCacheService; - public IotModbusTcpPollScheduler(Vertx vertx, - IotModbusTcpConnectionManager connectionManager, - IotModbusTcpUpstreamHandler upstreamHandler, - IotModbusTcpConfigCacheService configCacheService) { + public IotModbusTcpMasterPollScheduler(Vertx vertx, + IotModbusTcpMasterConnectionManager connectionManager, + IotModbusTcpMasterUpstreamHandler upstreamHandler, + IotModbusTcpMasterConfigCacheService configCacheService) { super(vertx); this.connectionManager = connectionManager; this.upstreamHandler = upstreamHandler; @@ -54,7 +54,7 @@ public class IotModbusTcpPollScheduler extends AbstractIotModbusPollScheduler { } // 2.1 获取连接 - IotModbusTcpConnectionManager.ModbusConnection connection = connectionManager.getConnection(deviceId); + IotModbusTcpMasterConnectionManager.ModbusConnection connection = connectionManager.getConnection(deviceId); if (connection == null) { log.warn("[pollPoint][设备 {} 没有连接]", deviceId); return; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/package-info.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/package-info.java index f971d3f862..86e393233a 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/package-info.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpmaster/package-info.java @@ -1,9 +1,6 @@ /** - * Modbus TCP Master 协议实现包 + * Modbus TCP Master(主站)协议:网关主动连接并轮询 Modbus 从站设备 *

- * 提供基于 j2mod 的 Modbus TCP 主站(Master)功能,支持: - * 1. 定时轮询 Modbus 从站设备数据 - * 2. 下发属性设置命令到从站设备 - * 3. 数据格式转换(寄存器值 ↔ 物模型属性值) + * 基于 j2mod 实现,支持 FC01-04 读、FC05/06/15/16 写,定时轮询 + 下发属性设置 */ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpmaster; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveProtocol.java index b042363acc..3a44a189fe 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveProtocol.java @@ -81,22 +81,27 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol { */ private Long requestCleanupTimerId; - // ========== 各组件 ========== - // TODO @芋艿:稍后排序下,有点小乱; - - private final IotModbusTcpSlaveConfig slaveConfig; - private final IotModbusFrameDecoder frameDecoder; - private final IotModbusFrameEncoder frameEncoder; + /** + * 连接管理器 + */ private final IotModbusTcpSlaveConnectionManager connectionManager; + /** + * 下行消息订阅者 + */ + private final IotModbusTcpSlaveDownstreamSubscriber downstreamSubscriber; + + private final IotModbusFrameDecoder frameDecoder; + @SuppressWarnings("FieldCanBeLocal") + private final IotModbusFrameEncoder frameEncoder; + private final IotModbusTcpSlaveConfigCacheService configCacheService; private final IotModbusTcpSlavePendingRequestManager pendingRequestManager; private final IotModbusTcpSlaveUpstreamHandler upstreamHandler; - private final IotModbusTcpSlaveDownstreamSubscriber downstreamSubscriber; private final IotModbusTcpSlavePollScheduler pollScheduler; private final IotDeviceMessageService messageService; public IotModbusTcpSlaveProtocol(ProtocolProperties properties) { - this.slaveConfig = properties.getModbusTcpSlave(); + IotModbusTcpSlaveConfig slaveConfig = properties.getModbusTcpSlave(); Assert.notNull(slaveConfig, "Modbus TCP Slave 协议配置(modbusTcpSlave)不能为空"); this.properties = properties; this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort()); @@ -124,10 +129,9 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol { // 初始化 Handler this.messageService = SpringUtil.getBean(IotDeviceMessageService.class); - IotDeviceMessageService messageService = this.messageService; IotDeviceService deviceService = SpringUtil.getBean(IotDeviceService.class); this.upstreamHandler = new IotModbusTcpSlaveUpstreamHandler( - deviceApi, messageService, frameEncoder, + deviceApi, this.messageService, frameEncoder, connectionManager, configCacheService, pendingRequestManager, pollScheduler, deviceService, serverId); @@ -158,9 +162,9 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol { try { // 1. 启动配置刷新定时器 - int refreshInterval = slaveConfig.getConfigRefreshInterval(); + IotModbusTcpSlaveConfig slaveConfig = properties.getModbusTcpSlave(); configRefreshTimerId = vertx.setPeriodic( - TimeUnit.SECONDS.toMillis(refreshInterval), + TimeUnit.SECONDS.toMillis(slaveConfig.getConfigRefreshInterval()), id -> refreshConfig()); // 2.1 启动 TCP Server @@ -178,6 +182,7 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol { downstreamSubscriber.start(); } catch (Exception e) { log.error("[start][IoT Modbus TCP Slave 协议 {} 启动失败]", getId(), e); + // TODO @芋艿:后续统一优化 stop 逻辑; if (configRefreshTimerId != null) { vertx.cancelTimer(configRefreshTimerId); configRefreshTimerId = null; @@ -223,9 +228,9 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol { pollScheduler.stopAll(); // 2.3 清理 PendingRequest pendingRequestManager.clear(); - // 2.3 关闭所有连接 + // 2.4 关闭所有连接 connectionManager.closeAll(); - // 2.4 关闭 TCP Server + // 2.5 关闭 TCP Server if (netServer != null) { try { netServer.close().result(); @@ -308,9 +313,6 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol { /** * 刷新已连接设备的配置(定时调用) - *

- * 与 tcpmaster 不同,slave 只刷新已连接设备的配置,不做全量 diff。 - * 设备的新增(认证时)和删除(断连时)分别在 {@link #handleConnection} 中处理。 */ private synchronized void refreshConfig() { try { @@ -321,6 +323,10 @@ public class IotModbusTcpSlaveProtocol implements IotProtocol { } List configs = configCacheService.refreshConnectedDeviceConfigList(connectedDeviceIds); + if (configs == null) { + log.warn("[refreshConfig][刷新配置失败,跳过本次刷新]"); + return; + } log.debug("[refreshConfig][刷新了 {} 个已连接设备的配置]", configs.size()); // 2. 更新已连接设备的轮询任务 diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrame.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrame.java index b7661abcfc..347b3c5386 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrame.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrame.java @@ -29,7 +29,7 @@ public class IotModbusFrame { /** * 事务标识符 *

- * 仅 {@link IotModbusFrameFormatEnum#MODBUS_TCP} 格式有值, + * 仅 {@link IotModbusFrameFormatEnum#MODBUS_TCP} 格式有值 */ private Integer transactionId; @@ -37,14 +37,13 @@ public class IotModbusFrame { * 异常码 *

* 当功能码最高位为 1 时(异常响应),此字段存储异常码。 - * 为 null 表示非异常响应。 * * @see IotModbusCommonUtils#FC_EXCEPTION_MASK */ private Integer exceptionCode; /** - * 自定义功能码时的 JSON 字符串 + * 自定义功能码时的 JSON 字符串(用于 auth 认证等等) */ private String customData; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameDecoder.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameDecoder.java index fc5219e197..b47f2a11be 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameDecoder.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/codec/IotModbusFrameDecoder.java @@ -20,20 +20,20 @@ import java.util.function.BiConsumer; * 1. 首帧检测:读前 6 字节,判断 MODBUS_TCP(ProtocolId==0x0000 且 Length 合理)或 MODBUS_RTU * 2. 检测后切换到对应的拆包 Handler,并将首包 6 字节通过 handleFirstBytes() 交给新 Handler 处理 * 3. 拆包完成后解码为 IotModbusFrame,通过回调返回 - * - MODBUS_TCP:两阶段 RecordParser(MBAP length 字段驱动) - * - MODBUS_RTU:功能码驱动的状态机 + * - MODBUS_TCP:两阶段 RecordParser(MBAP length 字段驱动) + * - MODBUS_RTU:功能码驱动的状态机 * * @author 芋道源码 */ +@RequiredArgsConstructor @Slf4j public class IotModbusFrameDecoder { + /** + * 自定义功能码 + */ private final int customFunctionCode; - public IotModbusFrameDecoder(int customFunctionCode) { - this.customFunctionCode = customFunctionCode; - } - /** * 创建带自动帧格式检测的 RecordParser * @@ -82,7 +82,7 @@ public class IotModbusFrameDecoder { // 提取 PDU 数据(从 functionCode 之后到末尾) byte[] pdu = new byte[data.length - 8]; System.arraycopy(data, 8, pdu, 0, pdu.length); - + // 构建 IotModbusFrame return buildFrame(slaveId, functionCode, pdu, transactionId); } @@ -105,7 +105,7 @@ public class IotModbusFrameDecoder { // PDU 数据(不含 slaveId、functionCode、CRC) byte[] pdu = new byte[data.length - 4]; System.arraycopy(data, 2, pdu, 0, pdu.length); - + // 构建 IotModbusFrame return buildFrame(slaveId, functionCode, pdu, null); } @@ -144,7 +144,6 @@ public class IotModbusFrameDecoder { /** * 帧格式检测阶段 Handler(仅处理首包,探测后切换到对应的拆包 Handler) */ - @SuppressWarnings("ClassCanBeRecord") @RequiredArgsConstructor private class DetectPhaseHandler implements Handler { diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/downstream/IotModbusTcpSlaveDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/downstream/IotModbusTcpSlaveDownstreamHandler.java index 770f13e3f8..727cc7cea4 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/downstream/IotModbusTcpSlaveDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/downstream/IotModbusTcpSlaveDownstreamHandler.java @@ -51,7 +51,7 @@ public class IotModbusTcpSlaveDownstreamHandler { /** * 处理下行消息 */ - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "DuplicatedCode"}) public void handle(IotDeviceMessage message) { // 1.1 检查是否是属性设置消息 if (ObjUtil.notEqual(IotDeviceMessageMethodEnum.PROPERTY_SET.getMethod(), message.getMethod())) { @@ -125,17 +125,26 @@ public class IotModbusTcpSlaveDownstreamHandler { point.getRegisterAddress(), rawValues[0], frameFormat, transactionId); } else if (writeMultipleCode != null) { // 多个值:使用多写功能码(FC15/FC16) - data = frameEncoder.encodeWriteMultipleRegistersRequest(slaveId, - point.getRegisterAddress(), rawValues, frameFormat, transactionId); + if (writeMultipleCode == IotModbusCommonUtils.FC_WRITE_MULTIPLE_COILS) { + data = frameEncoder.encodeWriteMultipleCoilsRequest(slaveId, + point.getRegisterAddress(), rawValues, frameFormat, transactionId); + } else { + data = frameEncoder.encodeWriteMultipleRegistersRequest(slaveId, + point.getRegisterAddress(), rawValues, frameFormat, transactionId); + } } else { log.warn("[writeProperty][点位 {} 不支持写操作, 功能码={}]", point.getIdentifier(), readFunctionCode); return; } - // 2. 发送 - connectionManager.sendToDevice(deviceId, data); - log.info("[writeProperty][写入成功, deviceId={}, identifier={}, value={}]", - deviceId, point.getIdentifier(), value); + // 2. 发送消息 + connectionManager.sendToDevice(deviceId, data).onSuccess(v -> + log.info("[writeProperty][写入成功, deviceId={}, identifier={}, value={}]", + deviceId, point.getIdentifier(), value) + ).onFailure(e -> + log.error("[writeProperty][写入失败, deviceId={}, identifier={}, value={}]", + deviceId, point.getIdentifier(), value, e) + ); } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/upstream/IotModbusTcpSlaveUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/upstream/IotModbusTcpSlaveUpstreamHandler.java index 6ede74c1db..4742610de9 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/upstream/IotModbusTcpSlaveUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/handler/upstream/IotModbusTcpSlaveUpstreamHandler.java @@ -1,9 +1,9 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.handler.upstream; -import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Assert; import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.BooleanUtil; +import cn.hutool.core.util.ObjUtil; import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.framework.common.exception.ServiceException; import cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants; @@ -61,6 +61,7 @@ public class IotModbusTcpSlaveUpstreamHandler { private final IotModbusTcpSlavePendingRequestManager pendingRequestManager; private final IotModbusTcpSlavePollScheduler pollScheduler; private final IotDeviceService deviceService; + private final String serverId; public IotModbusTcpSlaveUpstreamHandler(IotDeviceCommonApi deviceApi, @@ -153,16 +154,20 @@ public class IotModbusTcpSlaveUpstreamHandler { // 1. 解析认证参数 IotDeviceAuthReqDTO request = JsonUtils.convertObject(params, IotDeviceAuthReqDTO.class); Assert.notNull(request, "认证参数不能为空"); - Assert.notBlank(request.getClientId(), "clientId 不能为空"); Assert.notBlank(request.getUsername(), "username 不能为空"); Assert.notBlank(request.getPassword(), "password 不能为空"); + // 特殊:考虑到 modbus 消息体积较小,默认 clientId 传递空串 + if (StrUtil.isBlank(request.getClientId())) { + request.setClientId(IotDeviceAuthUtils.buildClientIdFromUsername(request.getUsername())); + } + Assert.notBlank(request.getClientId(), "clientId 不能为空"); // 2.1 调用认证 API CommonResult result = deviceApi.authDevice(request); result.checkError(); if (BooleanUtil.isFalse(result.getData())) { log.warn("[handleAuth][认证失败, clientId={}, username={}]", request.getClientId(), request.getUsername()); - sendCustomResponse(socket, frame, frameFormat, METHOD_AUTH, 1, "认证失败"); + sendCustomResponse(socket, frame, frameFormat, METHOD_AUTH, BAD_REQUEST.getCode(), "认证失败"); return; } // 2.2 解析设备信息 @@ -171,7 +176,21 @@ public class IotModbusTcpSlaveUpstreamHandler { // 2.3 获取设备信息 IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(), deviceInfo.getDeviceName()); Assert.notNull(device, "设备不存在"); - // TODO @AI:2.4 必须找到连接配置; + // 2.4 加载设备 Modbus 配置,无配置则阻断认证 + IotModbusDeviceConfigRespDTO modbusConfig = configCacheService.loadDeviceConfig(device.getId()); + if (modbusConfig == null) { + log.warn("[handleAuth][设备 {} 没有 Modbus 点位配置, 拒绝认证]", device.getId()); + sendCustomResponse(socket, frame, frameFormat, METHOD_AUTH, BAD_REQUEST.getCode(), "设备无 Modbus 配置"); + return; + } + // 2.5 协议不一致,阻断认证 + if (ObjUtil.notEqual(frameFormat.getFormat(), modbusConfig.getFrameFormat())) { + log.warn("[handleAuth][设备 {} frameFormat 不一致, 连接协议={}, 设备配置={},拒绝认证]", + device.getId(), frameFormat.getFormat(), modbusConfig.getFrameFormat()); + sendCustomResponse(socket, frame, frameFormat, METHOD_AUTH, BAD_REQUEST.getCode(), + "frameFormat 协议不一致"); + return; + } // 3.1 注册连接 ConnectionInfo connectionInfo = new ConnectionInfo() @@ -189,13 +208,8 @@ public class IotModbusTcpSlaveUpstreamHandler { GlobalErrorCodeConstants.SUCCESS.getCode(), "success"); log.info("[handleAuth][认证成功, clientId={}, deviceId={}]", request.getClientId(), device.getId()); - // 4. 加载设备配置并启动轮询 - IotModbusDeviceConfigRespDTO config = configCacheService.loadDeviceConfig(device.getId()); - if (config != null) { - pollScheduler.updatePolling(config); - } else { - log.warn("[handleAuth][认证成功但未找到设备配置, deviceId={}]", device.getId()); - } + // 4. 启动轮询 + pollScheduler.updatePolling(modbusConfig); } /** @@ -245,20 +259,19 @@ public class IotModbusTcpSlaveUpstreamHandler { } // 2.3 查找点位配置 IotModbusDeviceConfigRespDTO config = configCacheService.getConfig(info.getDeviceId()); - if (config == null || CollUtil.isEmpty(config.getPoints())) { - return; - } IotModbusPointRespDTO point = IotModbusCommonUtils.findPointById(config, request.getPointId()); if (point == null) { return; } - // 3.1 点位翻译 + // 3.1 转换原始值为物模型属性值(点位翻译) Object convertedValue = IotModbusCommonUtils.convertToPropertyValue(rawValues, point); - // 3.2 上报属性 + // 3.2 构造属性上报消息 Map params = MapUtil.of(request.getIdentifier(), convertedValue); IotDeviceMessage message = IotDeviceMessage.requestOf( IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), params); + + // 4. 发送到消息总线 messageService.sendDeviceMessage(message, info.getProductKey(), info.getDeviceName(), serverId); log.debug("[handlePollingResponse][设备={}, 属性={}, 原始值={}, 转换值={}]", info.getDeviceId(), request.getIdentifier(), rawValues, convertedValue); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConfigCacheService.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConfigCacheService.java index 9a00997f05..a998a77079 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConfigCacheService.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConfigCacheService.java @@ -1,10 +1,15 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager; import cn.hutool.core.collection.CollUtil; +import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum; import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigListReqDTO; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusPointRespDTO; +import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum; +import cn.iocoder.yudao.module.iot.core.enums.IotModbusModeEnum; +import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -13,16 +18,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** - * IoT Modbus TCP Slave 配置缓存服务 - *

- * 与 tcpmaster 的 {@code IotModbusTcpConfigCacheService} 不同: - * - tcpmaster 启动时拉全量配置 → 主动建连,需要全量 diff 检测新增/删除设备 - * - tcpslave 设备主动连接 → 认证时按需加载配置,断连时清理,定时刷新已连接设备的配置 - *

- * 配置生命周期: - * 1. 认证时:{@link #loadDeviceConfig(Long)} 按 deviceId 从 API 加载配置到缓存 - * 2. 断连时:{@link #removeConfig(Long)} 从缓存中移除 - * 3. 定时刷新:{@link #refreshConnectedDeviceConfigList(Set)} 批量刷新已连接设备的配置 + * IoT Modbus TCP Slave 配置缓存:认证时按需加载,断连时清理,定时刷新已连接设备 * * @author 芋道源码 */ @@ -41,44 +37,33 @@ public class IotModbusTcpSlaveConfigCacheService { /** * 加载单个设备的配置(认证成功后调用) - *

- * 从远程 API 获取全量配置,然后按 deviceId 匹配。 - * 如果远程获取失败,尝试从 Mock 数据中匹配。 * * @param deviceId 设备 ID - * @return 设备配置,未找到返回 null + * @return 设备配置 */ public IotModbusDeviceConfigRespDTO loadDeviceConfig(Long deviceId) { try { - // 1. 从远程 API 获取全量配置 - // TODO @AI:等待修复,不着急; - CommonResult> result = deviceApi.getEnabledModbusDeviceConfigs(); - if (result != null && result.isSuccess() && result.getData() != null) { - for (IotModbusDeviceConfigRespDTO config : result.getData()) { - // 顺便更新缓存(其他已连接设备也受益) - configCache.put(config.getDeviceId(), config); - if (config.getDeviceId().equals(deviceId)) { - return config; - } - } + // 1. 从远程 API 获取配置 + IotModbusDeviceConfigListReqDTO reqDTO = new IotModbusDeviceConfigListReqDTO() + .setStatus(CommonStatusEnum.ENABLE.getStatus()) + .setMode(IotModbusModeEnum.POLLING.getMode()) + .setProtocolType(IotProtocolTypeEnum.MODBUS_TCP_SLAVE.getType()) + .setDeviceIds(Collections.singleton(deviceId)); + CommonResult> result = deviceApi.getModbusDeviceConfigList(reqDTO); + result.checkError(); + IotModbusDeviceConfigRespDTO modbusConfig = CollUtil.getFirst(result.getData()); + if (modbusConfig == null) { + log.warn("[loadDeviceConfig][远程获取配置失败,未找到数据, deviceId={}]", deviceId); + return null; } + + // 2. 更新缓存并返回 + configCache.put(modbusConfig.getDeviceId(), modbusConfig); + return modbusConfig; } catch (Exception e) { log.error("[loadDeviceConfig][从远程获取配置失败, deviceId={}]", deviceId, e); + return null; } - - // 2. 远程未找到,尝试 Mock 数据(仅 mockEnabled=true 时) - // DONE @AI:【from codex】【中】Mock 数据已通过 mockEnabled 配置开关控制,线上环境不会污染真实配置。 - // TODO @芋艿:测试完成后移除 - if (true) { - for (IotModbusDeviceConfigRespDTO mockConfig : buildMockConfigs()) { - configCache.put(mockConfig.getDeviceId(), mockConfig); - if (mockConfig.getDeviceId().equals(deviceId)) { - return mockConfig; - } - } - } - - return configCache.get(deviceId); } // ==================== 定时刷新(已连接设备) ==================== @@ -96,26 +81,25 @@ public class IotModbusTcpSlaveConfigCacheService { return Collections.emptyList(); } try { - // 1. 从远程获取全量配置 - // TODO @AI:传递 ids 批量查询;需要分批啦; - CommonResult> result = deviceApi.getEnabledModbusDeviceConfigs(); - List allConfigs; - if (result != null && result.isSuccess() && result.getData() != null) { - allConfigs = new ArrayList<>(result.getData()); - } else { - log.warn("[refreshConnectedDeviceConfigList][获取 Modbus 配置失败: {}]", result); - allConfigs = new ArrayList<>(); - } + // 1. 从远程获取已连接设备的配置 + CommonResult> result = deviceApi.getModbusDeviceConfigList( + new IotModbusDeviceConfigListReqDTO().setStatus(CommonStatusEnum.ENABLE.getStatus()) + .setMode(IotModbusModeEnum.POLLING.getMode()) + .setProtocolType(IotProtocolTypeEnum.MODBUS_TCP_SLAVE.getType()) + .setDeviceIds(connectedDeviceIds)); + List modbusConfigs = result.getCheckedData(); // 2. 追加 Mock 测试数据(仅 mockEnabled=true 时) // TODO @芋艿:测试完成后移除 + // TODO @claude-code:【严重】同上,if(true) 导致 mockEnabled 开关失效,Mock 数据永远加载 if (true) { - allConfigs.addAll(buildMockConfigs()); + modbusConfigs.addAll(buildMockConfigs()); } - // 3. 只保留已连接设备的配置,更新缓存 + // 2. 只保留已连接设备的配置,更新缓存 + // TODO @AI:是不是直接添加到 configCache 缓存(或者覆盖),然后返回 modbusConfigs 就 ok 了?! List connectedConfigs = new ArrayList<>(); - for (IotModbusDeviceConfigRespDTO config : allConfigs) { + for (IotModbusDeviceConfigRespDTO config : modbusConfigs) { if (connectedDeviceIds.contains(config.getDeviceId())) { configCache.put(config.getDeviceId(), config); connectedConfigs.add(config); @@ -124,15 +108,7 @@ public class IotModbusTcpSlaveConfigCacheService { return connectedConfigs; } catch (Exception e) { log.error("[refreshConnectedDeviceConfigList][刷新配置失败]", e); - // 降级:返回缓存中已连接设备的配置 - List fallback = new ArrayList<>(); - for (Long deviceId : connectedDeviceIds) { - IotModbusDeviceConfigRespDTO config = configCache.get(deviceId); - if (config != null) { - fallback.add(config); - } - } - return fallback; + return null; } } @@ -142,7 +118,12 @@ public class IotModbusTcpSlaveConfigCacheService { * 获取设备配置 */ public IotModbusDeviceConfigRespDTO getConfig(Long deviceId) { - return configCache.get(deviceId); + IotModbusDeviceConfigRespDTO config = configCache.get(deviceId); + if (config != null) { + return config; + } + // 缓存未命中,从远程 API 获取 + return loadDeviceConfig(deviceId); } /** @@ -169,7 +150,7 @@ public class IotModbusTcpSlaveConfigCacheService { config.setDeviceName("small"); config.setSlaveId(1); config.setMode(1); // 云端轮询 - config.setFrameFormat("modbus_tcp"); + config.setFrameFormat(IotModbusFrameFormatEnum.MODBUS_TCP.getFormat()); // 点位列表 List points = new ArrayList<>(); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConnectionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConnectionManager.java index 1a6a4cc610..04434910aa 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConnectionManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlaveConnectionManager.java @@ -1,6 +1,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager; import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum; +import io.vertx.core.Future; import io.vertx.core.buffer.Buffer; import io.vertx.core.net.NetSocket; import lombok.Data; @@ -129,21 +130,25 @@ public class IotModbusTcpSlaveConnectionManager { /** * 发送数据到设备 + * + * @return 发送结果 Future */ - public void sendToDevice(Long deviceId, byte[] data) { + public Future sendToDevice(Long deviceId, byte[] data) { NetSocket socket = deviceSocketMap.get(deviceId); if (socket == null) { log.warn("[sendToDevice][设备 {} 没有连接]", deviceId); - return; + return Future.failedFuture("设备 " + deviceId + " 没有连接"); } - sendToSocket(socket, data); + return sendToSocket(socket, data); } /** * 发送数据到指定 socket + * + * @return 发送结果 Future */ - public void sendToSocket(NetSocket socket, byte[] data) { - socket.write(Buffer.buffer(data)); + public Future sendToSocket(NetSocket socket, byte[] data) { + return socket.write(Buffer.buffer(data)); } /** diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePendingRequestManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePendingRequestManager.java index 8ddcd25598..a02601d666 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePendingRequestManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePendingRequestManager.java @@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.manager; import cn.hutool.core.collection.CollUtil; import cn.iocoder.yudao.module.iot.core.enums.IotModbusFrameFormatEnum; +import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.common.utils.IotModbusCommonUtils; import cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave.codec.IotModbusFrame; import lombok.AllArgsConstructor; import lombok.Data; @@ -76,8 +77,9 @@ public class IotModbusTcpSlavePendingRequestManager { if (frameFormat == IotModbusFrameFormatEnum.MODBUS_TCP && frame.getTransactionId() != null) { return matchByTransactionId(queue, frame.getTransactionId()); } - // RTU 模式:FIFO,匹配 slaveId + functionCode - return matchByFifo(queue, frame.getSlaveId(), frame.getFunctionCode()); + // RTU 模式:FIFO,匹配 slaveId + functionCode + registerCount + int responseRegisterCount = extractRegisterCountFromResponse(frame); + return matchByFifo(queue, frame.getSlaveId(), frame.getFunctionCode(), responseRegisterCount); } /** @@ -96,13 +98,16 @@ public class IotModbusTcpSlavePendingRequestManager { } /** - * 按 FIFO 匹配 + * 按 FIFO 匹配(slaveId + functionCode + registerCount) */ - private PendingRequest matchByFifo(Deque queue, int slaveId, int functionCode) { + private PendingRequest matchByFifo(Deque queue, int slaveId, int functionCode, + int responseRegisterCount) { Iterator it = queue.iterator(); while (it.hasNext()) { PendingRequest req = it.next(); - if (req.getSlaveId() == slaveId && req.getFunctionCode() == functionCode) { + if (req.getSlaveId() == slaveId + && req.getFunctionCode() == functionCode + && (responseRegisterCount <= 0 || req.getRegisterCount() == responseRegisterCount)) { it.remove(); return req; } @@ -110,6 +115,29 @@ public class IotModbusTcpSlavePendingRequestManager { return null; } + // TODO @AI:是不是放到 modbus 工具类里,更合适? + /** + * 从响应帧中提取 registerCount(通过 PDU 的 byteCount 推断) + * + * @return registerCount,无法提取时返回 -1(匹配时跳过校验) + */ + private int extractRegisterCountFromResponse(IotModbusFrame frame) { + byte[] pdu = frame.getPdu(); + if (pdu == null || pdu.length < 1) { + return -1; + } + int byteCount = pdu[0] & 0xFF; + int fc = frame.getFunctionCode(); + // FC03/04 寄存器读响应:registerCount = byteCount / 2 + if (fc == IotModbusCommonUtils.FC_READ_HOLDING_REGISTERS + || fc == IotModbusCommonUtils.FC_READ_INPUT_REGISTERS) { + return byteCount / 2; + } + // FC01/02 线圈/离散输入读响应:registerCount = byteCount * 8(线圈数量) + // 但因为按 bit 打包有余位,无法精确反推,返回 -1 跳过校验 + return -1; + } + /** * 清理过期请求 */ diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePollScheduler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePollScheduler.java index 8a9cba5963..f1276eaa52 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePollScheduler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/manager/IotModbusTcpSlavePollScheduler.java @@ -55,6 +55,7 @@ public class IotModbusTcpSlavePollScheduler extends AbstractIotModbusPollSchedul * 轮询单个点位 */ @Override + @SuppressWarnings("DuplicatedCode") protected void pollPoint(Long deviceId, Long pointId) { // 1.1 从 configCache 获取最新配置 IotModbusDeviceConfigRespDTO config = configCacheService.getConfig(deviceId); @@ -78,7 +79,7 @@ public class IotModbusTcpSlavePollScheduler extends AbstractIotModbusPollSchedul // 2.2 获取 slave ID IotModbusFrameFormatEnum frameFormat = connection.getFrameFormat(); Assert.notNull(frameFormat, "设备 {} 的帧格式不能为空", deviceId); - int slaveId = connection.getSlaveId(); + Integer slaveId = connection.getSlaveId(); Assert.notNull(connection.getSlaveId(), "设备 {} 的 slaveId 不能为空", deviceId); // 3.1 编码读请求 @@ -96,10 +97,13 @@ public class IotModbusTcpSlavePollScheduler extends AbstractIotModbusPollSchedul System.currentTimeMillis() + requestTimeout); pendingRequestManager.addRequest(pendingRequest); // 3.3 发送读请求 - connectionManager.sendToDevice(deviceId, data); - log.debug("[pollPoint][设备={}, 点位={}, FC={}, 地址={}, 数量={}]", - deviceId, point.getIdentifier(), point.getFunctionCode(), - point.getRegisterAddress(), point.getRegisterCount()); + connectionManager.sendToDevice(deviceId, data).onSuccess(v -> + log.debug("[pollPoint][设备={}, 点位={}, FC={}, 地址={}, 数量={}]", + deviceId, point.getIdentifier(), point.getFunctionCode(), + point.getRegisterAddress(), point.getRegisterCount()) + ).onFailure(e -> + log.warn("[pollPoint][发送失败, 设备={}, 点位={}]", deviceId, point.getIdentifier(), e) + ); } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/package-info.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/package-info.java new file mode 100644 index 0000000000..cd8f1cb322 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/package-info.java @@ -0,0 +1,6 @@ +/** + * Modbus TCP Slave(从站)协议:设备主动连接网关,自定义 FC65 认证后由网关云端轮询 + *

+ * TCP Server 模式,支持 MODBUS_TCP / MODBUS_RTU 帧格式自动检测 + */ +package cn.iocoder.yudao.module.iot.gateway.protocol.modbus.tcpslave; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/device/remote/IotDeviceApiImpl.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/device/remote/IotDeviceApiImpl.java index dfda30db40..702876db91 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/device/remote/IotDeviceApiImpl.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/device/remote/IotDeviceApiImpl.java @@ -6,6 +6,7 @@ import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceGetReqDTO; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigListReqDTO; import cn.iocoder.yudao.module.iot.core.biz.dto.IotModbusDeviceConfigRespDTO; import cn.iocoder.yudao.module.iot.core.biz.dto.IotSubDeviceRegisterFullReqDTO; import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO; @@ -64,8 +65,8 @@ public class IotDeviceApiImpl implements IotDeviceCommonApi { } @Override - public CommonResult> getEnabledModbusDeviceConfigs() { - return doPost("/rpc-api/iot/modbus/enabled-configs", null, new ParameterizedTypeReference<>() { }); + public CommonResult> getModbusDeviceConfigList(IotModbusDeviceConfigListReqDTO listReqDTO) { + return doPost("/rpc-api/iot/modbus/config-list", listReqDTO, new ParameterizedTypeReference<>() { }); } @Override diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml index e2cfd94abb..1ccd9f37b3 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml @@ -167,7 +167,7 @@ yudao: # 针对引入的 Modbus TCP Master 组件的配置 # ==================================== - id: modbus-tcp-master-1 - enabled: false + enabled: true protocol: modbus_tcp_master port: 502 modbus-tcp-master: diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveModbusTcpIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveModbusTcpIntegrationTest.java index c81ab4a83a..b8c1edb8a0 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveModbusTcpIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/modbus/tcpslave/IotModbusTcpSlaveModbusTcpIntegrationTest.java @@ -67,9 +67,9 @@ public class IotModbusTcpSlaveModbusTcpIntegrationTest { // ===================== 设备信息(根据实际情况修改,从 iot_device 表查询) ===================== - private static final String PRODUCT_KEY = "4aymZgOTOOCrDKRT"; - private static final String DEVICE_NAME = "small"; - private static final String DEVICE_SECRET = "0baa4c2ecc104ae1a26b4070c218bdf3"; + private static final String PRODUCT_KEY = "modbus_tcp_slave_product_demo"; + private static final String DEVICE_NAME = "modbus_tcp_slave_device_demo_tcp"; + private static final String DEVICE_SECRET = "8e4adeb3d25342ab88643421d3fba3f6"; @BeforeAll static void setUp() { @@ -128,7 +128,6 @@ public class IotModbusTcpSlaveModbusTcpIntegrationTest { // 2. 设置持续监听:每收到一个读请求,自动回复 log.info("[testPollingResponse][开始持续监听网关下发的读请求...]"); - CompletableFuture done = new CompletableFuture<>(); RecordParser parser = FRAME_DECODER.createRecordParser((frame, frameFormat) -> { log.info("[testPollingResponse][收到请求: slaveId={}, FC={}, transactionId={}]", frame.getSlaveId(), frame.getFunctionCode(), frame.getTransactionId()); @@ -201,6 +200,7 @@ public class IotModbusTcpSlaveModbusTcpIntegrationTest { */ private IotModbusFrame authenticate(NetSocket socket) throws Exception { IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); + authInfo.setClientId(""); // 特殊:考虑到 modbus 消息长度限制,默认 clientId 不发送 byte[] authFrame = buildAuthFrame(authInfo.getClientId(), authInfo.getUsername(), authInfo.getPassword()); return sendAndReceive(socket, authFrame); }