feat(iot):【协议改造】udp 初步改造(100%),基于 /Users/yunai/.claude/plans/iot-udp-fix-plan.md,优化代码以及各种缺陷

This commit is contained in:
YunaiV
2026-02-01 12:51:06 +08:00
parent cb301eb788
commit 8e4b4cf20a
7 changed files with 32 additions and 103 deletions

View File

@@ -155,7 +155,11 @@ public class IotTcpProtocol implements IotProtocol {
this.downstreamSubscriber.start();
} catch (Exception e) {
log.error("[start][IoT TCP 协议 {} 启动失败]", getId(), e);
// 启动失败时关闭 Vertx
// 启动失败时关闭资源
if (tcpServer != null) {
tcpServer.close();
tcpServer = null;
}
if (vertx != null) {
vertx.close();
vertx = null;

View File

@@ -21,17 +21,11 @@ public class IotUdpConfig {
/**
* 会话超时时间(毫秒)
* <p>
* 用于清理不活跃的设备地址映射
* 基于 Guava Cache 的 expireAfterAccess 实现自动过期清理
*/
@NotNull(message = "会话超时时间不能为空")
@Min(value = 1000, message = "会话超时时间必须大于 1000 毫秒")
private Long sessionTimeoutMs = 60000L;
/**
* 会话清理间隔(毫秒)
*/
@NotNull(message = "会话清理间隔不能为空")
@Min(value = 1000, message = "会话清理间隔必须大于 1000 毫秒")
private Long sessionCleanIntervalMs = 30000L;
/**
* 接收缓冲区大小(字节)

View File

@@ -1,13 +1,9 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolInstanceProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.handler.downstream.IotUdpDownstreamHandler;
@@ -16,8 +12,7 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.udp.handler.upstream.IotUdpU
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import io.vertx.core.Vertx;
import io.vertx.core.datagram.DatagramSocket;
import io.vertx.core.datagram.DatagramSocketOptions;
@@ -25,8 +20,6 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Assert;
import java.util.List;
/**
* IoT UDP 协议实现
* <p>
@@ -78,22 +71,11 @@ public class IotUdpProtocol implements IotProtocol {
*/
private final IotUdpSessionManager sessionManager;
private final IotDeviceService deviceService;
private final IotDeviceMessageService deviceMessageService;
/**
* 会话清理定时器 ID
*/
// TODO @AI会话清理是不是放到 sessionManager 更合适?
private Long cleanTimerId;
public IotUdpProtocol(ProtocolInstanceProperties properties) {
IotUdpConfig udpConfig = properties.getUdp();
Assert.notNull(udpConfig, "UDP 协议配置udp不能为空");
this.properties = properties;
this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort());
this.deviceService = SpringUtil.getBean(IotDeviceService.class);
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
// 初始化序列化器
IotSerializeTypeEnum serializeType = IotSerializeTypeEnum.of(properties.getSerialize());
@@ -102,7 +84,7 @@ public class IotUdpProtocol implements IotProtocol {
this.serializer = serializerManager.get(serializeType);
// 初始化会话管理器
this.sessionManager = new IotUdpSessionManager(udpConfig.getMaxSessions());
this.sessionManager = new IotUdpSessionManager(udpConfig.getMaxSessions(), udpConfig.getSessionTimeoutMs());
// 初始化下行消息订阅者
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
@@ -143,24 +125,30 @@ public class IotUdpProtocol implements IotProtocol {
// 1.4 创建上行消息处理器
IotUdpUpstreamHandler upstreamHandler = new IotUdpUpstreamHandler(serverId, sessionManager, serializer);
// 1.5 监听端口
udpSocket.listen(properties.getPort(), "0.0.0.0", result -> {
if (result.failed()) {
log.error("[start][IoT UDP 协议 {} 启动失败]", getId(), result.cause());
return;
}
// 1.5 启动 UDP 服务器(阻塞式)
try {
udpSocket.listen(properties.getPort(), "0.0.0.0").result();
// 设置数据包处理器
udpSocket.handler(packet -> upstreamHandler.handle(packet, udpSocket));
running = true;
log.info("[start][IoT UDP 协议 {} 启动成功,端口:{}serverId{}]",
getId(), properties.getPort(), serverId);
// 启动会话清理定时器
startSessionCleanTimer(udpConfig);
// 2. 启动下行消息订阅者
// TODO @AI这里会导致 Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 2992 ms, time limit is 2000 ms
this.downstreamSubscriber.start();
});
} catch (Exception e) {
log.error("[start][IoT UDP 协议 {} 启动失败]", getId(), e);
// 启动失败时关闭资源
if (udpSocket != null) {
udpSocket.close();
udpSocket = null;
}
if (vertx != null) {
vertx.close();
vertx = null;
}
throw e;
}
}
@Override
@@ -176,13 +164,7 @@ public class IotUdpProtocol implements IotProtocol {
log.error("[stop][IoT UDP 协议 {} 下行消息订阅者停止失败]", getId(), e);
}
// 2.1 取消会话清理定时器
if (cleanTimerId != null) {
vertx.cancelTimer(cleanTimerId);
cleanTimerId = null;
log.info("[stop][IoT UDP 协议 {} 会话清理定时器已取消]", getId());
}
// 2.2 关闭 UDP Socket
// 2.1 关闭 UDP Socket
if (udpSocket != null) {
try {
udpSocket.close().result();
@@ -192,7 +174,7 @@ public class IotUdpProtocol implements IotProtocol {
}
udpSocket = null;
}
// 2.3 关闭 Vertx 实例
// 2.2 关闭 Vertx 实例
if (vertx != null) {
try {
vertx.close().result();
@@ -206,54 +188,4 @@ public class IotUdpProtocol implements IotProtocol {
log.info("[stop][IoT UDP 协议 {} 已停止]", getId());
}
/**
* 启动会话清理定时器
*/
// TODO @AI这个放到
private void startSessionCleanTimer(IotUdpConfig udpConfig) {
cleanTimerId = vertx.setPeriodic(udpConfig.getSessionCleanIntervalMs(), id -> {
try {
// 1. 清理超时的设备会话,并获取离线设备列表
List<Long> offlineDeviceIds = sessionManager.cleanExpiredSessions(udpConfig.getSessionTimeoutMs());
// 2. 为每个离线设备发送离线消息
for (Long deviceId : offlineDeviceIds) {
sendOfflineMessage(deviceId);
}
if (CollUtil.isNotEmpty(offlineDeviceIds)) {
log.info("[cleanExpiredSessions][本次清理 {} 个超时设备]", offlineDeviceIds.size());
}
} catch (Exception e) {
log.error("[cleanExpiredSessions][清理超时会话失败]", e);
}
});
log.info("[startSessionCleanTimer][会话清理定时器启动,间隔:{} ms超时{} ms]",
udpConfig.getSessionCleanIntervalMs(), udpConfig.getSessionTimeoutMs());
}
/**
* 发送设备离线消息
*
* @param deviceId 设备 ID
*/
private void sendOfflineMessage(Long deviceId) {
try {
// 获取设备信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceId);
if (device == null) {
log.warn("[sendOfflineMessage][设备不存在,设备 ID: {}]", deviceId);
return;
}
// 发送离线消息
IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
deviceMessageService.sendDeviceMessage(offlineMessage, device.getProductKey(),
device.getDeviceName(), serverId);
log.info("[sendOfflineMessage][发送离线消息,设备 ID: {},设备名: {}]",
deviceId, device.getDeviceName());
} catch (Exception e) {
log.error("[sendOfflineMessage][发送离线消息失败,设备 ID: {}]", deviceId, e);
}
}
}

View File

@@ -34,7 +34,7 @@ public class IotUdpDownstreamHandler {
log.info("[handle][处理下行消息,设备 ID: {},方法: {},消息 ID: {}]",
message.getDeviceId(), message.getMethod(), message.getId());
// 1. 检查设备会话
IotUdpSessionManager.SessionInfo sessionInfo = sessionManager.getSessionInfo(message.getDeviceId());
IotUdpSessionManager.SessionInfo sessionInfo = sessionManager.getSession(message.getDeviceId());
if (sessionInfo == null) {
log.warn("[handle][会话信息不存在,设备 ID: {},方法: {},消息 ID: {}]",
message.getDeviceId(), message.getMethod(), message.getId());

View File

@@ -85,8 +85,7 @@ yudao:
serialize: json
udp:
max-sessions: 1000 # 最大会话数
session-timeout-ms: 60000 # 会话超时时间(毫秒)
session-clean-interval-ms: 30000 # 会话清理间隔(毫秒)
session-timeout-ms: 60000 # 会话超时时间(毫秒),基于 Guava Cache 自动过期
receive-buffer-size: 65536 # 接收缓冲区大小(字节)
send-buffer-size: 65536 # 发送缓冲区大小(字节)

View File

@@ -65,7 +65,7 @@ public class IotDirectDeviceUdpProtocolIntegrationTest {
/**
* 直连设备 Token从 {@link #testAuth()} 方法获取后,粘贴到这里
*/
private static final String TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwcm9kdWN0S2V5IjoiNGF5bVpnT1RPT0NyREtSVCIsImV4cCI6MTc2OTk0ODYzOCwiZGV2aWNlTmFtZSI6InNtYWxsIn0.TrOJisXhloZ3quLBOAIyowmpq6Syp9PHiEpfj-nQ9xo";
private static final String TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwcm9kdWN0S2V5IjoiNGF5bVpnT1RPT0NyREtSVCIsImV4cCI6MTc3MDUyNTA0MywiZGV2aWNlTmFtZSI6InNtYWxsIn0.W9Mo-Oe1ZNLDkINndKieUeW1XhDzhVp0W0zTAwO6hJM";
// ===================== 认证测试 =====================
@@ -120,7 +120,7 @@ public class IotDirectDeviceUdpProtocolIntegrationTest {
*/
@Test
public void testPropertyPost() throws Exception {
// 1. 构建属性上报消息UDP 协议token 放在 params 中)
// 1. 构建属性上报消息
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(),
withToken(IotDevicePropertyPostReqDTO.of(MapUtil.<String, Object>builder()

View File

@@ -98,7 +98,7 @@ public class IotGatewaySubDeviceUdpProtocolIntegrationTest {
*/
@Test
public void testPropertyPost() throws Exception {
// 1. 构建属性上报消息UDP 协议token 放在 params 中)
// 1. 构建属性上报消息
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(),
withToken(IotDevicePropertyPostReqDTO.of(MapUtil.<String, Object>builder()
@@ -120,7 +120,7 @@ public class IotGatewaySubDeviceUdpProtocolIntegrationTest {
*/
@Test
public void testEventPost() throws Exception {
// 1. 构建事件上报消息UDP 协议token 放在 params 中)
// 1. 构建事件上报消息
IotDeviceMessage request = IotDeviceMessage.requestOf(
IotDeviceMessageMethodEnum.EVENT_POST.getMethod(),
withToken(IotDeviceEventPostReqDTO.of(