@@ -14,12 +14,14 @@ import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnore;
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils ;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.device.* ;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO ;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum ;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum ;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage ;
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity ;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterReqDTO ;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterRespDTO ;
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoAddReqDTO ;
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoChangeReqDTO ;
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoDeleteReqDTO ;
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoGetRespDTO ;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils ;
@@ -29,6 +31,7 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductDO;
import cn.iocoder.yudao.module.iot.dal.mysql.device.IotDeviceMapper ;
import cn.iocoder.yudao.module.iot.dal.redis.RedisKeyConstants ;
import cn.iocoder.yudao.module.iot.enums.product.IotProductDeviceTypeEnum ;
import cn.iocoder.yudao.module.iot.service.device.message.IotDeviceMessageService ;
import cn.iocoder.yudao.module.iot.service.product.IotProductService ;
import jakarta.annotation.Resource ;
import jakarta.validation.ConstraintViolationException ;
@@ -49,6 +52,7 @@ import java.util.*;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception ;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertList ;
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.* ;
import static java.util.Collections.singletonList ;
/**
* IoT 设备 Service 实现类
@@ -69,6 +73,9 @@ public class IotDeviceServiceImpl implements IotDeviceService {
@Resource
@Lazy // 延迟加载,解决循环依赖
private IotDeviceGroupService deviceGroupService ;
@Resource
@Lazy // 延迟加载,解决循环依赖
private IotDeviceMessageService deviceMessageService ;
@Override
public Long createDevice ( IotDeviceSaveReqVO createReqVO ) {
@@ -590,7 +597,8 @@ public class IotDeviceServiceImpl implements IotDeviceService {
// 3. 清空对应缓存
deleteDeviceCache ( devices ) ;
// TODO @AI: 需要下发网关设备, 让其建立拓扑关系吗? (增加)
// 4. 下发网关设备拓扑变更通知 (增加)
sendTopoChangeNotify ( gatewayId , IotDeviceTopoChangeReqDTO . STATUS_CREATE , devices ) ;
}
private void checkSubDeviceCanBind ( IotDeviceDO device , Long gatewayId ) {
@@ -604,30 +612,25 @@ public class IotDeviceServiceImpl implements IotDeviceService {
@Override
@Transactional ( rollbackFor = Exception . class )
public void unbindDeviceGateway ( Collection < Long > ids ) {
public void unbindDeviceGateway ( Collection < Long > ids , Long gatewayId ) {
// 1. 校验设备存在
if ( CollUtil . isEmpty ( ids ) ) {
return ;
}
List < IotDeviceDO > devices = deviceMapper . selectByIds ( ids ) ;
if ( CollUtil . isNotEmpty ( devices ) ) {
devices . removeIf ( device - > device . getGatewayId ( ) = = null ) ;
if ( CollUtil . isEmpty ( devices ) ) {
return ;
}
// 2. 批量更新数据库(将 gatewayId 设置为 null)
// TODO @AI: 需要搞个方法, 专门批量更新某个字段为 null。
List < IotDeviceDO > updateList = devices . stream ( )
. filter ( device - > device . getGatewayId ( ) ! = null )
. map ( device - > new IotDeviceDO ( ) . setId ( device . getId ( ) ) . setGatewayId ( null ) )
. toList ( ) ;
if ( CollUtil . isNotEmpty ( updateList ) ) {
deviceMapper . updateBatch ( updateList ) ;
}
deviceMapper . updateGatewayIdBatch ( convertList ( devices , IotDeviceDO : : getId ) , null ) ;
// 3. 清空对应缓存
deleteDeviceCache ( devices ) ;
// TODO @AI: 需要下发网关设备, 让其建立拓扑关系吗? ( 减少 )
// 4. 下发网关设备拓扑变更通知(删除 )
sendTopoChangeNotify ( gatewayId , IotDeviceTopoChangeReqDTO . STATUS_DELETE , devices ) ;
}
@Override
@@ -742,9 +745,8 @@ public class IotDeviceServiceImpl implements IotDeviceService {
subDeviceIdentity . getProductKey ( ) , subDeviceIdentity . getDeviceName ( ) ) ;
}
// 2. 更新数据库
// TODO @AI: 直接调用更新方法;
// unbindDeviceGateway(Collections.singletonList(subDevice.getId()));
// 2. 更新数据库(将 gatewayId 设置为 null)
deviceMapper . updateGatewayIdBatch ( singletonList ( subDevice . getId ( ) ) , null ) ;
log . info ( " [deleteDeviceTopo][网关({}/{}) 解绑子设备({}/{})] " ,
gatewayDevice . getProductKey ( ) , gatewayDevice . getDeviceName ( ) ,
subDevice . getProductKey ( ) , subDevice . getDeviceName ( ) ) ;
@@ -782,8 +784,7 @@ public class IotDeviceServiceImpl implements IotDeviceService {
if ( ! ( message . getParams ( ) instanceof List ) ) {
throw exception ( DEVICE_SUB_REGISTER_PARAMS_INVALID ) ;
}
// TODO @AI: 这个要不也弄到 JsonUtils 里面去?感觉类似 convertObject 呀。
List < IotSubDeviceRegisterReqDTO > paramsList = JsonUtils . parseArray ( JsonUtils . toJsonString ( message . getParams ( ) ) ,
List < IotSubDeviceRegisterReqDTO > paramsList = JsonUtils . convertList ( message . getParams ( ) ,
IotSubDeviceRegisterReqDTO . class ) ;
if ( CollUtil . isEmpty ( paramsList ) ) {
throw exception ( DEVICE_SUB_REGISTER_PARAMS_INVALID ) ;
@@ -817,9 +818,13 @@ public class IotDeviceServiceImpl implements IotDeviceService {
throw exception ( DEVICE_SUB_REGISTER_PRODUCT_NOT_GATEWAY_SUB , params . getProductKey ( ) ) ;
}
// 1.3 查找设备是否已存在
// TODO @AI: 存在的时候, 必须父设备是自己, 才返回, 否则抛出业务异常;
IotDeviceDO existDevice = getSelf ( ) . getDeviceFromCache ( params . getProductKey ( ) , params . getDeviceName ( ) ) ;
if ( existDevice ! = null ) {
// 校验是否绑定到当前网关
if ( ObjUtil . notEqual ( existDevice . getGatewayId ( ) , gatewayDevice . getId ( ) ) ) {
throw exception ( DEVICE_GATEWAY_BINDTO_EXISTS ,
existDevice . getProductKey ( ) , existDevice . getDeviceName ( ) ) ;
}
// 已存在则返回设备信息
return existDevice ;
}
@@ -840,4 +845,42 @@ public class IotDeviceServiceImpl implements IotDeviceService {
return SpringUtil . getBean ( getClass ( ) ) ;
}
/**
* 发送拓扑变更通知给网关设备
*
* @param gatewayId 网关设备编号
* @param status 变更状态( 0-创建, 1-删除)
* @param subDevices 子设备列表
* @see <a href="https://help.aliyun.com/zh/marketplace/notify-gateway-topology-changes">阿里云 - 通知网关拓扑关系变化</a>
*/
private void sendTopoChangeNotify ( Long gatewayId , Integer status , List < IotDeviceDO > subDevices ) {
if ( CollUtil . isEmpty ( subDevices ) ) {
return ;
}
// 1. 获取网关设备
IotDeviceDO gatewayDevice = deviceMapper . selectById ( gatewayId ) ;
if ( gatewayDevice = = null ) {
log . warn ( " [sendTopoChangeNotify][网关设备({}) 不存在,无法发送拓扑变更通知] " , gatewayId ) ;
return ;
}
try {
// 2.1 构建拓扑变更通知消息
List < IotDeviceIdentity > subList = convertList ( subDevices , subDevice - >
new IotDeviceIdentity ( subDevice . getProductKey ( ) , subDevice . getDeviceName ( ) ) ) ;
IotDeviceTopoChangeReqDTO params = new IotDeviceTopoChangeReqDTO ( status , subList ) ;
IotDeviceMessage notifyMessage = IotDeviceMessage . requestOf (
IotDeviceMessageMethodEnum . TOPO_CHANGE . getMethod ( ) , params ) ;
// 2.2 发送消息
deviceMessageService . sendDeviceMessage ( notifyMessage , gatewayDevice ) ;
log . info ( " [sendTopoChangeNotify][网关({}/{}) 发送拓扑变更通知成功, status={}, subDevices={}] " ,
gatewayDevice . getProductKey ( ) , gatewayDevice . getDeviceName ( ) ,
status , subList ) ;
} catch ( Exception ex ) {
log . error ( " [sendTopoChangeNotify][网关({}/{}) 发送拓扑变更通知失败, status={}] " ,
gatewayDevice . getProductKey ( ) , gatewayDevice . getDeviceName ( ) , status , ex ) ;
}
}
}