feat(iot):【协议改造】http 初步改造

This commit is contained in:
YunaiV
2026-01-31 22:41:30 +08:00
parent f4ba6e75cb
commit e89fc2bfbd
40 changed files with 1467 additions and 633 deletions

View File

@@ -101,18 +101,6 @@
<artifactId>spring-boot-starter-amqp</artifactId>
<optional>true</optional>
</dependency>
<!-- IoT 网络组件:接收来自设备的上行数据 -->
<!-- <dependency>-->
<!-- <groupId>cn.iocoder.boot</groupId>-->
<!-- <artifactId>yudao-module-iot-net-component-http</artifactId>-->
<!-- <version>${revision}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>cn.iocoder.boot</groupId>-->
<!-- <artifactId>yudao-module-iot-net-component-emqx</artifactId>-->
<!-- <version>${revision}</version>-->
<!-- </dependency>-->
</dependencies>
</project>

View File

@@ -0,0 +1,46 @@
package cn.iocoder.yudao.module.iot.core.enums;
import cn.hutool.core.util.ArrayUtil;
import cn.iocoder.yudao.framework.common.core.ArrayValuable;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.util.Arrays;
/**
* IoT 协议类型枚举
*
* 用于定义传输层协议类型
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Getter
public enum IotProtocolTypeEnum implements ArrayValuable<String> {
TCP("tcp"),
UDP("udp"),
WEBSOCKET("websocket"),
HTTP("http"),
MQTT("mqtt"),
EMQX("emqx"),
COAP("coap"),
MODBUS_TCP("modbus_tcp");
public static final String[] ARRAYS = Arrays.stream(values()).map(IotProtocolTypeEnum::getType).toArray(String[]::new);
/**
* 类型
*/
private final String type;
@Override
public String[] array() {
return ARRAYS;
}
public static IotProtocolTypeEnum of(String type) {
return ArrayUtil.firstMatch(e -> e.getType().equals(type), values());
}
}

View File

@@ -0,0 +1,40 @@
package cn.iocoder.yudao.module.iot.core.enums;
import cn.hutool.core.util.ArrayUtil;
import cn.iocoder.yudao.framework.common.core.ArrayValuable;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.util.Arrays;
/**
* IoT 序列化类型枚举
*
* 用于定义设备消息的序列化格式
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Getter
public enum IotSerializeTypeEnum implements ArrayValuable<String> {
JSON("json"),
BINARY("binary");
public static final String[] ARRAYS = Arrays.stream(values()).map(IotSerializeTypeEnum::getType).toArray(String[]::new);
/**
* 类型
*/
private final String type;
@Override
public String[] array() {
return ARRAYS;
}
public static IotSerializeTypeEnum of(String type) {
return ArrayUtil.firstMatch(e -> e.getType().equals(type), values());
}
}

View File

@@ -24,4 +24,14 @@ public interface IotMessageBus {
*/
void register(IotMessageSubscriber<?> subscriber);
/**
* 取消注册消息订阅者
*
* @param subscriber 订阅者
*/
default void unregister(IotMessageSubscriber<?> subscriber) {
// TODO 芋艿:暂时不实现,需求量不大,但是
// throw new UnsupportedOperationException("取消注册消息订阅者功能,尚未实现");
}
}

View File

@@ -26,4 +26,16 @@ public interface IotMessageSubscriber<T> {
*/
void onMessage(T message);
/**
* 启动订阅
*/
default void start() {
}
/**
* 停止订阅
*/
default void stop() {
}
}

View File

@@ -60,7 +60,7 @@ public class IotDeviceMessage {
*/
private String serverId;
// ========== codec编解码字段 ==========
// ========== serialize序列化相关字段 ==========
/**
* 请求编号
@@ -94,7 +94,7 @@ public class IotDeviceMessage {
*/
private String msg;
// ========== 基础方法:只传递"codec编解码字段" ==========
// ========== 基础方法:只传递"serialize序列化相关字段" ==========
public static IotDeviceMessage requestOf(String method) {
return requestOf(null, method, null);

View File

@@ -1,13 +1,13 @@
package cn.iocoder.yudao.module.iot.gateway.config;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolManager;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxAuthEventProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
@@ -15,12 +15,15 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttDownstrea
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router.IotTcpDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.router.IotUdpDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.router.IotWebSocketDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
@@ -31,35 +34,22 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableConfigurationProperties(IotGatewayProperties.class)
@Slf4j
public class IotGatewayConfiguration {
/**
* IoT 网关 HTTP 协议配置类
*/
@Configuration
@ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.http", name = "enabled", havingValue = "true")
@Slf4j
public static class HttpProtocolConfiguration {
@Bean
public IotMessageSerializerManager iotMessageSerializerManager() {
return new IotMessageSerializerManager();
}
@Bean(name = "httpVertx", destroyMethod = "close")
public Vertx httpVertx() {
return Vertx.vertx();
}
@Bean
public IotHttpUpstreamProtocol iotHttpUpstreamProtocol(IotGatewayProperties gatewayProperties,
@Qualifier("httpVertx") Vertx httpVertx) {
return new IotHttpUpstreamProtocol(gatewayProperties.getProtocol().getHttp(), httpVertx);
}
@Bean
public IotHttpDownstreamSubscriber iotHttpDownstreamSubscriber(IotHttpUpstreamProtocol httpUpstreamProtocol,
IotMessageBus messageBus) {
return new IotHttpDownstreamSubscriber(httpUpstreamProtocol, messageBus);
}
@Bean
public IotProtocolManager iotProtocolManager(IotGatewayProperties gatewayProperties,
IotMessageSerializerManager serializerManager,
IotMessageBus messageBus) {
return new IotProtocolManager(gatewayProperties, serializerManager, messageBus);
}
/**
@@ -117,12 +107,17 @@ public class IotGatewayConfiguration {
deviceService, messageService, connectionManager, tcpVertx);
}
@Bean
public IotTcpDownstreamHandler iotTcpDownstreamHandler(IotDeviceMessageService messageService,
IotTcpConnectionManager connectionManager) {
return new IotTcpDownstreamHandler(messageService, connectionManager);
}
@Bean
public IotTcpDownstreamSubscriber iotTcpDownstreamSubscriber(IotTcpUpstreamProtocol protocolHandler,
IotDeviceMessageService messageService,
IotTcpConnectionManager connectionManager,
IotTcpDownstreamHandler downstreamHandler,
IotMessageBus messageBus) {
return new IotTcpDownstreamSubscriber(protocolHandler, messageService, connectionManager, messageBus);
return new IotTcpDownstreamSubscriber(protocolHandler, downstreamHandler, messageBus);
}
}
@@ -187,12 +182,18 @@ public class IotGatewayConfiguration {
deviceService, messageService, sessionManager, udpVertx);
}
@Bean
public IotUdpDownstreamHandler iotUdpDownstreamHandler(IotDeviceMessageService messageService,
IotUdpSessionManager sessionManager,
IotUdpUpstreamProtocol protocol) {
return new IotUdpDownstreamHandler(messageService, sessionManager, protocol);
}
@Bean
public IotUdpDownstreamSubscriber iotUdpDownstreamSubscriber(IotUdpUpstreamProtocol protocolHandler,
IotDeviceMessageService messageService,
IotUdpSessionManager sessionManager,
IotUdpDownstreamHandler downstreamHandler,
IotMessageBus messageBus) {
return new IotUdpDownstreamSubscriber(protocolHandler, messageService, sessionManager, messageBus);
return new IotUdpDownstreamSubscriber(protocolHandler, downstreamHandler, messageBus);
}
}
@@ -241,12 +242,17 @@ public class IotGatewayConfiguration {
deviceService, messageService, connectionManager, websocketVertx);
}
@Bean
public IotWebSocketDownstreamHandler iotWebSocketDownstreamHandler(IotDeviceMessageService messageService,
IotWebSocketConnectionManager connectionManager) {
return new IotWebSocketDownstreamHandler(messageService, connectionManager);
}
@Bean
public IotWebSocketDownstreamSubscriber iotWebSocketDownstreamSubscriber(IotWebSocketUpstreamProtocol protocolHandler,
IotDeviceMessageService messageService,
IotWebSocketConnectionManager connectionManager,
IotWebSocketDownstreamHandler downstreamHandler,
IotMessageBus messageBus) {
return new IotWebSocketDownstreamSubscriber(protocolHandler, messageService, connectionManager, messageBus);
return new IotWebSocketDownstreamSubscriber(protocolHandler, downstreamHandler, messageBus);
}
}

View File

@@ -1,5 +1,8 @@
package cn.iocoder.yudao.module.iot.gateway.config;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpConfig;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
@@ -24,10 +27,15 @@ public class IotGatewayProperties {
private TokenProperties token;
/**
* 协议配置
* 协议配置(旧版,保持兼容)
*/
private ProtocolProperties protocol;
/**
* 协议实例列表(新版)
*/
private List<ProtocolInstanceProperties> protocols;
@Data
public static class RpcProperties {
@@ -88,11 +96,6 @@ public class IotGatewayProperties {
*/
private MqttProperties mqtt;
/**
* MQTT WebSocket 组件配置
*/
private MqttWsProperties mqttWs;
/**
* UDP 组件配置
*/
@@ -422,102 +425,6 @@ public class IotGatewayProperties {
}
@Data
public static class MqttWsProperties {
/**
* 是否开启
*/
@NotNull(message = "是否开启不能为空")
private Boolean enabled;
/**
* WebSocket 服务器端口默认8083
*/
private Integer port = 8083;
/**
* WebSocket 路径(默认:/mqtt
*/
@NotEmpty(message = "WebSocket 路径不能为空")
private String path = "/mqtt";
/**
* 最大消息大小(字节)
*/
private Integer maxMessageSize = 8192;
/**
* 连接超时时间(秒)
*/
private Integer connectTimeoutSeconds = 60;
/**
* 保持连接超时时间(秒)
*/
private Integer keepAliveTimeoutSeconds = 300;
/**
* 是否启用 SSLwss://
*/
private Boolean sslEnabled = false;
/**
* SSL 配置
*/
private SslOptions sslOptions = new SslOptions();
/**
* WebSocket 子协议(通常为 "mqtt" 或 "mqttv3.1"
*/
@NotEmpty(message = "WebSocket 子协议不能为空")
private String subProtocol = "mqtt";
/**
* 最大帧大小(字节)
*/
private Integer maxFrameSize = 65536;
/**
* SSL 配置选项
*/
@Data
public static class SslOptions {
/**
* 密钥证书选项
*/
private io.vertx.core.net.KeyCertOptions keyCertOptions;
/**
* 信任选项
*/
private io.vertx.core.net.TrustOptions trustOptions;
/**
* SSL 证书路径
*/
private String certPath;
/**
* SSL 私钥路径
*/
private String keyPath;
/**
* 信任存储路径
*/
private String trustStorePath;
/**
* 信任存储密码
*/
private String trustStorePassword;
}
}
@Data
public static class UdpProperties {
@@ -643,4 +550,125 @@ public class IotGatewayProperties {
}
// TODO @AI【暂时忽略】改成 ProtocolProperties
/**
* 协议实例配置
*/
@Data
public static class ProtocolInstanceProperties {
/**
* 协议实例 ID如 "http-alink"、"tcp-binary"
*/
@NotEmpty(message = "协议实例 ID 不能为空")
private String id;
/**
* 是否启用
*/
@NotNull(message = "是否启用不能为空")
private Boolean enabled = true;
/**
* 协议类型
*
* @see cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum
*/
@NotEmpty(message = "协议类型不能为空")
private String type;
/**
* 服务端口
*/
@NotNull(message = "服务端口不能为空")
private Integer port;
/**
* 序列化类型(可选)
*
* @see cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum
*
* 为什么是可选的呢?
* 1. {@link IotProtocolTypeEnum#HTTP}、${@link IotProtocolTypeEnum#COAP} 协议,目前强制是 JSON 格式
* 2. {@link IotProtocolTypeEnum#EMQX} 协议,目前支持根据产品(设备)配置的序列化类型来解析
*/
private String serialize;
/**
* HTTP 协议配置
*/
@Valid
private IotHttpConfig http;
// TODO @AI后续改下
/**
* TCP 协议配置(后续扩展)
*/
@Valid
private TcpInstanceConfig tcp;
}
/**
* TCP 协议实例配置(后续扩展)
*/
@Data
public static class TcpInstanceConfig {
/**
* 最大连接数
*/
private Integer maxConnections = 1000;
/**
* 心跳超时时间(毫秒)
*/
private Long keepAliveTimeoutMs = 30000L;
/**
* 是否启用 SSL
*/
private Boolean sslEnabled = false;
/**
* SSL 证书路径
*/
private String sslCertPath;
/**
* SSL 私钥路径
*/
private String sslKeyPath;
/**
* 拆包配置
*/
private CodecConfig codec;
/**
* TCP 拆包配置
*/
@Data
public static class CodecConfig {
/**
* 拆包类型LENGTH_FIELD / DELIMITER
*/
private String type;
/**
* LENGTH_FIELD: 偏移量
*/
private Integer lengthFieldOffset;
/**
* LENGTH_FIELD: 长度字段长度
*/
private Integer lengthFieldLength;
/**
* DELIMITER: 分隔符
*/
private String delimiter;
}
}
}

View File

@@ -0,0 +1,52 @@
package cn.iocoder.yudao.module.iot.gateway.protocol;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
/**
* IoT 协议接口
*
* 定义传输层协议的生命周期管理
*
* @author 芋道源码
*/
public interface IotProtocol {
/**
* 获取协议实例 ID
*
* @return 协议实例 ID如 "http-alink"、"tcp-binary"
*/
String getId();
/**
* 获取服务器 ID用于消息追踪全局唯一
*
* @return 服务器 ID
*/
String getServerId();
/**
* 获取协议类型
*
* @return 协议类型枚举
*/
IotProtocolTypeEnum getType();
/**
* 启动协议服务
*/
void start();
/**
* 停止协议服务
*/
void stop();
/**
* 检查协议服务是否正在运行
*
* @return 是否正在运行
*/
boolean isRunning();
}

View File

@@ -0,0 +1,79 @@
package cn.iocoder.yudao.module.iot.gateway.protocol;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 协议下行消息订阅者抽象类
*
* 负责接收来自消息总线的下行消息,并委托给子类进行业务处理
*
* @author 芋道源码
*/
@AllArgsConstructor
@Slf4j
public abstract class IotProtocolDownstreamSubscriber implements IotMessageSubscriber<IotDeviceMessage> {
private final IotProtocol protocol;
private final IotMessageBus messageBus;
@Override
public String getTopic() {
return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(protocol.getServerId());
}
/**
* 保证点对点消费,需要保证独立的 Group所以使用 Topic 作为 Group
*/
@Override
public String getGroup() {
return getTopic();
}
@Override
public void start() {
messageBus.register(this);
log.info("[start][{} 下行消息订阅成功Topic{}]", protocol.getType().name(), getTopic());
}
@Override
public void stop() {
messageBus.unregister(this);
log.info("[stop][{} 下行消息订阅已停止Topic{}]", protocol.getType().name(), getTopic());
}
@Override
public void onMessage(IotDeviceMessage message) {
log.debug("[onMessage][接收到下行消息, messageId: {}, method: {}, deviceId: {}]",
message.getId(), message.getMethod(), message.getDeviceId());
try {
// 1. 校验
String method = message.getMethod();
if (StrUtil.isBlank(method)) {
log.warn("[onMessage][消息方法为空, messageId: {}, deviceId: {}]",
message.getId(), message.getDeviceId());
return;
}
// 2. 处理下行消息
handleMessage(message);
} catch (Exception e) {
log.error("[onMessage][处理下行消息失败, messageId: {}, method: {}, deviceId: {}]",
message.getId(), message.getMethod(), message.getDeviceId(), e);
}
}
/**
* 处理下行消息
*
* @param message 下行消息
*/
protected abstract void handleMessage(IotDeviceMessage message);
}

View File

@@ -0,0 +1,126 @@
package cn.iocoder.yudao.module.iot.gateway.protocol;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpProtocol;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.SmartLifecycle;
import java.util.ArrayList;
import java.util.List;
/**
* IoT 协议管理器
*
* 负责根据配置创建和管理协议实例
*
* @author 芋道源码
*/
@Slf4j
public class IotProtocolManager implements SmartLifecycle {
private final IotGatewayProperties gatewayProperties;
private final IotMessageSerializerManager serializerManager;
private final IotMessageBus messageBus;
private final List<IotProtocol> protocols = new ArrayList<>();
private volatile boolean running = false;
public IotProtocolManager(IotGatewayProperties gatewayProperties,
IotMessageSerializerManager serializerManager,
IotMessageBus messageBus) {
this.gatewayProperties = gatewayProperties;
this.serializerManager = serializerManager;
this.messageBus = messageBus;
}
@Override
public void start() {
if (running) {
return;
}
List<IotGatewayProperties.ProtocolInstanceProperties> protocolConfigs = gatewayProperties.getProtocols();
if (CollUtil.isEmpty(protocolConfigs)) {
log.info("[start][没有配置协议实例,跳过启动]");
return;
}
for (IotGatewayProperties.ProtocolInstanceProperties config : protocolConfigs) {
if (BooleanUtil.isFalse(config.getEnabled())) {
log.info("[start][协议实例 {} 未启用,跳过]", config.getId());
continue;
}
IotProtocol protocol = createProtocol(config);
if (protocol == null) {
continue;
}
protocol.start();
protocols.add(protocol);
}
running = true;
log.info("[start][协议管理器启动完成,共启动 {} 个协议实例]", protocols.size());
}
@Override
public void stop() {
if (!running) {
return;
}
for (IotProtocol protocol : protocols) {
try {
protocol.stop();
} catch (Exception e) {
log.error("[stop][协议实例 {} 停止失败]", protocol.getId(), e);
}
}
protocols.clear();
running = false;
log.info("[stop][协议管理器已停止]");
}
@Override
public boolean isRunning() {
return running;
}
/**
* 创建协议实例
*
* @param config 协议实例配置
* @return 协议实例
*/
@SuppressWarnings({"SwitchStatementWithTooFewBranches", "EnhancedSwitchMigration"})
private IotProtocol createProtocol(IotGatewayProperties.ProtocolInstanceProperties config) {
IotProtocolTypeEnum protocolType = IotProtocolTypeEnum.of(config.getType());
if (protocolType == null) {
log.error("[createProtocol][协议实例 {} 的协议类型 {} 不存在]", config.getId(), config.getType());
return null;
}
switch (protocolType) {
case HTTP:
return createHttpProtocol(config);
// TODO 后续添加其他协议类型
default:
throw new IllegalArgumentException(String.format(
"[createProtocol][协议实例 %s 的协议类型 %s 暂不支持]", config.getId(), protocolType));
}
}
/**
* 创建 HTTP 协议实例
*
* @param config 协议实例配置
* @return HTTP 协议实例
*/
private IotHttpProtocol createHttpProtocol(IotGatewayProperties.ProtocolInstanceProperties config) {
return new IotHttpProtocol(config, messageBus);
}
}

View File

@@ -1,11 +1,8 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.coap;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber;
import lombok.extern.slf4j.Slf4j;
/**
@@ -13,34 +10,17 @@ import lombok.extern.slf4j.Slf4j;
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotCoapDownstreamSubscriber implements IotMessageSubscriber<IotDeviceMessage> {
public class IotCoapDownstreamSubscriber extends IotProtocolDownstreamSubscriber {
private final IotCoapUpstreamProtocol protocol;
private final IotMessageBus messageBus;
@PostConstruct
public void init() {
messageBus.register(this);
public IotCoapDownstreamSubscriber(IotCoapUpstreamProtocol protocol, IotMessageBus messageBus) {
super(protocol, messageBus);
}
@Override
public String getTopic() {
return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(protocol.getServerId());
}
@Override
public String getGroup() {
// 保证点对点消费,需要保证独立的 Group所以使用 Topic 作为 Group
return getTopic();
}
@Override
public void onMessage(IotDeviceMessage message) {
protected void handleMessage(IotDeviceMessage message) {
// 如需支持,可通过 CoAP Observe 模式实现(设备订阅资源,服务器推送变更)
log.warn("[onMessage][IoT 网关 CoAP 协议暂不支持下行消息,忽略消息:{}]", message);
log.warn("[handleMessage][IoT 网关 CoAP 协议暂不支持下行消息,忽略消息:{}]", message);
}
}

View File

@@ -1,7 +1,9 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.coap;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapAuthHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapAuthResource;
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapRegisterHandler;
@@ -30,7 +32,9 @@ import java.util.concurrent.TimeUnit;
* @author 芋道源码
*/
@Slf4j
public class IotCoapUpstreamProtocol {
public class IotCoapUpstreamProtocol implements IotProtocol {
private static final String ID = "coap";
private final IotGatewayProperties.CoapProperties coapProperties;
@@ -39,11 +43,24 @@ public class IotCoapUpstreamProtocol {
@Getter
private final String serverId;
private volatile boolean running = false;
public IotCoapUpstreamProtocol(IotGatewayProperties.CoapProperties coapProperties) {
this.coapProperties = coapProperties;
this.serverId = IotDeviceMessageUtils.generateServerId(coapProperties.getPort());
}
@Override
public String getId() {
return ID;
}
@Override
public IotProtocolTypeEnum getType() {
return IotProtocolTypeEnum.COAP;
}
@Override
@PostConstruct
public void start() {
try {
@@ -73,6 +90,7 @@ public class IotCoapUpstreamProtocol {
// 3. 启动服务器
coapServer.start();
running = true;
log.info("[start][IoT 网关 CoAP 协议启动成功,端口:{},资源:/auth, /auth/register/device, /topic]", coapProperties.getPort());
} catch (Exception e) {
log.error("[start][IoT 网关 CoAP 协议启动失败]", e);
@@ -80,11 +98,13 @@ public class IotCoapUpstreamProtocol {
}
}
@Override
@PreDestroy
public void stop() {
if (coapServer != null) {
try {
coapServer.stop();
running = false;
log.info("[stop][IoT 网关 CoAP 协议已停止]");
} catch (Exception e) {
log.error("[stop][IoT 网关 CoAP 协议停止失败]", e);
@@ -92,4 +112,9 @@ public class IotCoapUpstreamProtocol {
}
}
@Override
public boolean isRunning() {
return running;
}
}

View File

@@ -24,8 +24,6 @@ import java.util.Map;
/**
* IoT 网关 CoAP 协议的【认证】处理器
*
* 参考 {@link cn.iocoder.yudao.module.iot.gateway.protocol.http.router.IotHttpAuthHandler}
*
* @author 芋道源码
*/
@Slf4j

View File

@@ -22,7 +22,6 @@ import java.util.Map;
*
* @author 芋道源码
* @see <a href="https://help.aliyun.com/zh/iot/user-guide/unique-certificate-per-product-verification">阿里云 - 一型一密</a>
* @see cn.iocoder.yudao.module.iot.gateway.protocol.http.router.IotHttpRegisterHandler
*/
@Slf4j
public class IotCoapRegisterHandler {

View File

@@ -1,11 +1,10 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.emqx;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.router.IotEmqxDownstreamHandler;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
/**
@@ -14,55 +13,18 @@ import lombok.extern.slf4j.Slf4j;
* @author 芋道源码
*/
@Slf4j
public class IotEmqxDownstreamSubscriber implements IotMessageSubscriber<IotDeviceMessage> {
public class IotEmqxDownstreamSubscriber extends IotProtocolDownstreamSubscriber {
private final IotEmqxDownstreamHandler downstreamHandler;
private final IotMessageBus messageBus;
private final IotEmqxUpstreamProtocol protocol;
public IotEmqxDownstreamSubscriber(IotEmqxUpstreamProtocol protocol, IotMessageBus messageBus) {
this.protocol = protocol;
this.messageBus = messageBus;
super(protocol, messageBus);
this.downstreamHandler = new IotEmqxDownstreamHandler(protocol);
}
@PostConstruct
public void init() {
messageBus.register(this);
}
@Override
public String getTopic() {
return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(protocol.getServerId());
protected void handleMessage(IotDeviceMessage message) {
downstreamHandler.handle(message);
}
@Override
public String getGroup() {
// 保证点对点消费,需要保证独立的 Group所以使用 Topic 作为 Group
return getTopic();
}
@Override
public void onMessage(IotDeviceMessage message) {
log.debug("[onMessage][接收到下行消息, messageId: {}, method: {}, deviceId: {}]",
message.getId(), message.getMethod(), message.getDeviceId());
try {
// 1. 校验
String method = message.getMethod();
if (method == null) {
log.warn("[onMessage][消息方法为空, messageId: {}, deviceId: {}]",
message.getId(), message.getDeviceId());
return;
}
// 2. 处理下行消息
downstreamHandler.handle(message);
} catch (Exception e) {
log.error("[onMessage][处理下行消息失败, messageId: {}, method: {}, deviceId: {}]",
message.getId(), message.getMethod(), message.getDeviceId(), e);
}
}
}
}

View File

@@ -2,8 +2,10 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.emqx;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.router.IotEmqxUpstreamHandler;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Vertx;
@@ -28,11 +30,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
* @author 芋道源码
*/
@Slf4j
public class IotEmqxUpstreamProtocol {
public class IotEmqxUpstreamProtocol implements IotProtocol {
private static final String ID = "emqx";
private final IotGatewayProperties.EmqxProperties emqxProperties;
private volatile boolean isRunning = false;
private volatile boolean running = false;
private final Vertx vertx;
@@ -50,9 +54,20 @@ public class IotEmqxUpstreamProtocol {
this.vertx = vertx;
}
@Override
public String getId() {
return ID;
}
@Override
public IotProtocolTypeEnum getType() {
return IotProtocolTypeEnum.EMQX;
}
@Override
@PostConstruct
public void start() {
if (isRunning) {
if (running) {
return;
}
@@ -61,7 +76,7 @@ public class IotEmqxUpstreamProtocol {
startMqttClient();
// 2. 标记服务为运行状态
isRunning = true;
running = true;
log.info("[start][IoT 网关 EMQX 协议启动成功]");
} catch (Exception e) {
log.error("[start][IoT 网关 EMQX 协议服务启动失败,应用将关闭]", e);
@@ -88,9 +103,10 @@ public class IotEmqxUpstreamProtocol {
}
}
@Override
@PreDestroy
public void stop() {
if (!isRunning) {
if (!running) {
return;
}
@@ -98,10 +114,15 @@ public class IotEmqxUpstreamProtocol {
stopMqttClient();
// 2. 标记服务为停止状态
isRunning = false;
running = false;
log.info("[stop][IoT 网关 MQTT 协议服务已停止]");
}
@Override
public boolean isRunning() {
return running;
}
/**
* 启动 MQTT 客户端
*/
@@ -185,7 +206,7 @@ public class IotEmqxUpstreamProtocol {
* 延迟重连
*/
private void reconnectWithDelay() {
if (!isRunning) {
if (!running) {
return;
}
if (mqttClient != null && mqttClient.isConnected()) {
@@ -195,7 +216,7 @@ public class IotEmqxUpstreamProtocol {
long delay = emqxProperties.getReconnectDelayMs();
log.info("[reconnectWithDelay][将在 {} 毫秒后尝试重连 MQTT Broker]", delay);
vertx.setTimer(delay, timerId -> {
if (!isRunning) {
if (!running) {
return;
}
if (mqttClient != null && mqttClient.isConnected()) {
@@ -305,7 +326,7 @@ public class IotEmqxUpstreamProtocol {
private void setupMqttHandlers() {
// 1. 设置断开重连监听器
mqttClient.closeHandler(closeEvent -> {
if (!isRunning) {
if (!running) {
return;
}
log.warn("[closeHandler][MQTT 连接已断开, 准备重连]");

View File

@@ -0,0 +1,28 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.http;
import lombok.Data;
/**
* IoT HTTP 协议配置
*
* @author 芋道源码
*/
@Data
public class IotHttpConfig {
/**
* 是否启用 SSL
*/
private Boolean sslEnabled = false;
/**
* SSL 证书路径
*/
private String sslCertPath;
/**
* SSL 私钥路径
*/
private String sslKeyPath;
}

View File

@@ -1,45 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.http;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 HTTP 订阅者:接收下行给设备的消息
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotHttpDownstreamSubscriber implements IotMessageSubscriber<IotDeviceMessage> {
private final IotHttpUpstreamProtocol protocol;
private final IotMessageBus messageBus;
@PostConstruct
public void init() {
messageBus.register(this);
}
@Override
public String getTopic() {
return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(protocol.getServerId());
}
@Override
public String getGroup() {
// 保证点对点消费,需要保证独立的 Group所以使用 Topic 作为 Group
return getTopic();
}
@Override
public void onMessage(IotDeviceMessage message) {
log.info("[onMessage][IoT 网关 HTTP 协议不支持下行消息,忽略消息:{}]", message);
}
}

View File

@@ -0,0 +1,185 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.http;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolInstanceProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.downstream.IotHttpDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.upstream.IotHttpAuthHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.upstream.IotHttpRegisterHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.upstream.IotHttpRegisterSubHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.upstream.IotHttpUpstreamHandler;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.net.PemKeyCertOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
/**
* IoT HTTP 协议实现
* <p>
* 基于 Vert.x 实现 HTTP 服务器,接收设备上行消息
*
* @author 芋道源码
*/
@Slf4j
public class IotHttpProtocol implements IotProtocol {
/**
* 协议配置
*/
private final ProtocolInstanceProperties properties;
/**
* 消息总线
*/
private final IotMessageBus messageBus;
/**
* 服务器 ID用于消息追踪全局唯一
*/
@Getter
private final String serverId;
/**
* Vert.x 实例(每个 Protocol 自己管理)
*/
private Vertx vertx;
/**
* HTTP 服务器
*/
private HttpServer httpServer;
/**
* 下行消息订阅者
*/
private IotHttpDownstreamSubscriber downstreamSubscriber;
/**
* 运行状态
*/
private volatile boolean running = false;
public IotHttpProtocol(ProtocolInstanceProperties properties, IotMessageBus messageBus) {
this.properties = properties;
this.messageBus = messageBus;
this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort());
this.downstreamSubscriber = new IotHttpDownstreamSubscriber(this, messageBus);
}
@Override
public String getId() {
return properties.getId();
}
@Override
public IotProtocolTypeEnum getType() {
return IotProtocolTypeEnum.HTTP;
}
@Override
public void start() {
if (running) {
log.warn("[start][IoT HTTP 协议 {} 已经在运行中]", getId());
return;
}
// 1.1 创建 Vertx 实例(每个 Protocol 独立管理)
this.vertx = Vertx.vertx();
// 1.2 创建路由
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create());
// 1.3 创建处理器,添加路由处理器
IotHttpAuthHandler authHandler = new IotHttpAuthHandler(this);
router.post(IotHttpAuthHandler.PATH).handler(authHandler);
IotHttpRegisterHandler registerHandler = new IotHttpRegisterHandler();
router.post(IotHttpRegisterHandler.PATH).handler(registerHandler);
IotHttpRegisterSubHandler registerSubHandler = new IotHttpRegisterSubHandler();
router.post(IotHttpRegisterSubHandler.PATH).handler(registerSubHandler);
IotHttpUpstreamHandler upstreamHandler = new IotHttpUpstreamHandler(this);
router.post(IotHttpUpstreamHandler.PATH).handler(upstreamHandler);
// 1.4 启动 HTTP 服务器
IotHttpConfig httpConfig = properties.getHttp();
HttpServerOptions options = new HttpServerOptions().setPort(properties.getPort());
if (httpConfig != null && Boolean.TRUE.equals(httpConfig.getSslEnabled())) {
PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions()
.setKeyPath(httpConfig.getSslKeyPath())
.setCertPath(httpConfig.getSslCertPath());
options = options.setSsl(true).setKeyCertOptions(pemKeyCertOptions);
}
try {
httpServer = vertx.createHttpServer(options)
.requestHandler(router)
.listen()
.result();
running = true;
log.info("[start][IoT HTTP 协议 {} 启动成功,端口:{}serverId{}]",
getId(), properties.getPort(), serverId);
// 2. 启动下行消息订阅者
this.downstreamSubscriber = new IotHttpDownstreamSubscriber(this, messageBus);
this.downstreamSubscriber.start();
} catch (Exception e) {
log.error("[start][IoT HTTP 协议 {} 启动失败]", getId(), e);
// 启动失败时关闭 Vertx
if (vertx != null) {
vertx.close();
vertx = null;
}
throw e;
}
// 2. 启动下行消息订阅者
this.downstreamSubscriber.start();
}
@Override
public void stop() {
if (!running) {
return;
}
// 1. 停止下行消息订阅者
if (downstreamSubscriber != null) {
try {
downstreamSubscriber.stop();
log.info("[stop][IoT HTTP 协议 {} 下行消息订阅者已停止]", getId());
} catch (Exception e) {
log.error("[stop][IoT HTTP 协议 {} 下行消息订阅者停止失败]", getId(), e);
}
downstreamSubscriber = null;
}
// 2.1 关闭 HTTP 服务器
if (httpServer != null) {
try {
httpServer.close().result();
log.info("[stop][IoT HTTP 协议 {} 服务器已停止]", getId());
} catch (Exception e) {
log.error("[stop][IoT HTTP 协议 {} 服务器停止失败]", getId(), e);
}
httpServer = null;
}
// 2.2 关闭 Vertx 实例
if (vertx != null) {
try {
vertx.close().result();
log.info("[stop][IoT HTTP 协议 {} Vertx 已关闭]", getId());
} catch (Exception e) {
log.error("[stop][IoT HTTP 协议 {} Vertx 关闭失败]", getId(), e);
}
vertx = null;
}
running = false;
log.info("[stop][IoT HTTP 协议 {} 已停止]", getId());
}
@Override
public boolean isRunning() {
return running;
}
}

View File

@@ -1,91 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.http;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.router.IotHttpAuthHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.router.IotHttpRegisterHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.router.IotHttpRegisterSubHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.router.IotHttpUpstreamHandler;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.net.PemKeyCertOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 HTTP 协议:接收设备上行消息
*
* @author 芋道源码
*/
@Slf4j
public class IotHttpUpstreamProtocol {
private final IotGatewayProperties.HttpProperties httpProperties;
private final Vertx vertx;
private HttpServer httpServer;
@Getter
private final String serverId;
public IotHttpUpstreamProtocol(IotGatewayProperties.HttpProperties httpProperties, Vertx vertx) {
this.httpProperties = httpProperties;
this.vertx = vertx;
this.serverId = IotDeviceMessageUtils.generateServerId(httpProperties.getServerPort());
}
@PostConstruct
public void start() {
// 创建路由
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create());
// 创建处理器,添加路由处理器
IotHttpAuthHandler authHandler = new IotHttpAuthHandler(this);
router.post(IotHttpAuthHandler.PATH).handler(authHandler);
IotHttpRegisterHandler registerHandler = new IotHttpRegisterHandler();
router.post(IotHttpRegisterHandler.PATH).handler(registerHandler);
IotHttpRegisterSubHandler registerSubHandler = new IotHttpRegisterSubHandler();
router.post(IotHttpRegisterSubHandler.PATH).handler(registerSubHandler);
IotHttpUpstreamHandler upstreamHandler = new IotHttpUpstreamHandler(this);
router.post(IotHttpUpstreamHandler.PATH).handler(upstreamHandler);
// 启动 HTTP 服务器
HttpServerOptions options = new HttpServerOptions()
.setPort(httpProperties.getServerPort());
if (Boolean.TRUE.equals(httpProperties.getSslEnabled())) {
PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions().setKeyPath(httpProperties.getSslKeyPath())
.setCertPath(httpProperties.getSslCertPath());
options = options.setSsl(true).setKeyCertOptions(pemKeyCertOptions);
}
try {
httpServer = vertx.createHttpServer(options)
.requestHandler(router)
.listen()
.result();
log.info("[start][IoT 网关 HTTP 协议启动成功,端口:{}]", httpProperties.getServerPort());
} catch (Exception e) {
log.error("[start][IoT 网关 HTTP 协议启动失败]", e);
throw e;
}
}
@PreDestroy
public void stop() {
if (httpServer != null) {
try {
httpServer.close().result();
log.info("[stop][IoT 网关 HTTP 协议已停止]");
} catch (Exception e) {
log.error("[stop][IoT 网关 HTTP 协议停止失败]", e);
}
}
}
}

View File

@@ -0,0 +1,27 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.downstream;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 HTTP 订阅者:接收下行给设备的消息
*
* @author 芋道源码
*/
@Slf4j
public class IotHttpDownstreamSubscriber extends IotProtocolDownstreamSubscriber {
public IotHttpDownstreamSubscriber(IotProtocol protocol, IotMessageBus messageBus) {
super(protocol, messageBus);
}
@Override
protected void handleMessage(IotDeviceMessage message) {
log.info("[handleMessage][IoT 网关 HTTP 协议不支持下行消息,忽略消息:{}]", message);
}
}

View File

@@ -1,4 +1,4 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.http.router;
package cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.upstream;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.map.MapUtil;
@@ -10,10 +10,10 @@ import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.router.IotHttpAbstractHandler;
import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
@@ -32,7 +32,7 @@ public class IotHttpAuthHandler extends IotHttpAbstractHandler {
public static final String PATH = "/auth";
private final IotHttpUpstreamProtocol protocol;
private final String serverId;
private final IotDeviceTokenService deviceTokenService;
@@ -40,8 +40,8 @@ public class IotHttpAuthHandler extends IotHttpAbstractHandler {
private final IotDeviceMessageService deviceMessageService;
public IotHttpAuthHandler(IotHttpUpstreamProtocol protocol) {
this.protocol = protocol;
public IotHttpAuthHandler(IotHttpProtocol protocol) {
this.serverId = protocol.getServerId();
this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class);
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
@@ -50,32 +50,25 @@ public class IotHttpAuthHandler extends IotHttpAbstractHandler {
@Override
public CommonResult<Object> handle0(RoutingContext context) {
// 1. 解析参数
JsonObject body = context.body().asJsonObject();
if (body == null) {
throw invalidParamException("请求体不能为空");
}
String clientId = body.getString("clientId");
if (StrUtil.isEmpty(clientId)) {
IotDeviceAuthReqDTO request = deserializeRequest(context, IotDeviceAuthReqDTO.class);
if (StrUtil.isEmpty(request.getClientId())) {
throw invalidParamException("clientId 不能为空");
}
String username = body.getString("username");
if (StrUtil.isEmpty(username)) {
if (StrUtil.isEmpty(request.getUsername())) {
throw invalidParamException("username 不能为空");
}
String password = body.getString("password");
if (StrUtil.isEmpty(password)) {
if (StrUtil.isEmpty(request.getPassword())) {
throw invalidParamException("password 不能为空");
}
// 2.1 执行认证
CommonResult<Boolean> result = deviceApi.authDevice(new IotDeviceAuthReqDTO()
.setClientId(clientId).setUsername(username).setPassword(password));
CommonResult<Boolean> result = deviceApi.authDevice(request);
result.checkError();
if (!BooleanUtil.isTrue(result.getData())) {
throw exception(DEVICE_AUTH_FAIL);
}
// 2.2 生成 Token
IotDeviceIdentity deviceInfo = deviceTokenService.parseUsername(username);
IotDeviceIdentity deviceInfo = deviceTokenService.parseUsername(request.getUsername());
Assert.notNull(deviceInfo, "设备信息不能为空");
String token = deviceTokenService.createToken(deviceInfo.getProductKey(), deviceInfo.getDeviceName());
Assert.notBlank(token, "生成 token 不能为空位");
@@ -83,7 +76,7 @@ public class IotHttpAuthHandler extends IotHttpAbstractHandler {
// 3. 执行上线
IotDeviceMessage message = IotDeviceMessage.buildStateUpdateOnline();
deviceMessageService.sendDeviceMessage(message,
deviceInfo.getProductKey(), deviceInfo.getDeviceName(), protocol.getServerId());
deviceInfo.getProductKey(), deviceInfo.getDeviceName(), serverId);
// 构建响应数据
return success(MapUtil.of("token", token));

View File

@@ -1,4 +1,4 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.http.router;
package cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.upstream;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
@@ -6,7 +6,7 @@ import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO;
import io.vertx.core.json.JsonObject;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.router.IotHttpAbstractHandler;
import io.vertx.ext.web.RoutingContext;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.invalidParamException;
@@ -33,27 +33,19 @@ public class IotHttpRegisterHandler extends IotHttpAbstractHandler {
@Override
public CommonResult<Object> handle0(RoutingContext context) {
// 1. 解析参数
JsonObject body = context.body().asJsonObject();
if (body == null) {
throw invalidParamException("请求体不能为空");
}
String productKey = body.getString("productKey");
if (StrUtil.isEmpty(productKey)) {
IotDeviceRegisterReqDTO request = deserializeRequest(context, IotDeviceRegisterReqDTO.class);
if (StrUtil.isEmpty(request.getProductKey())) {
throw invalidParamException("productKey 不能为空");
}
String deviceName = body.getString("deviceName");
if (StrUtil.isEmpty(deviceName)) {
if (StrUtil.isEmpty(request.getDeviceName())) {
throw invalidParamException("deviceName 不能为空");
}
String productSecret = body.getString("productSecret");
if (StrUtil.isEmpty(productSecret)) {
if (StrUtil.isEmpty(request.getProductSecret())) {
throw invalidParamException("productSecret 不能为空");
}
// 2. 调用动态注册
IotDeviceRegisterReqDTO reqDTO = new IotDeviceRegisterReqDTO()
.setProductKey(productKey).setDeviceName(deviceName).setProductSecret(productSecret);
CommonResult<IotDeviceRegisterRespDTO> result = deviceApi.registerDevice(reqDTO);
CommonResult<IotDeviceRegisterRespDTO> result = deviceApi.registerDevice(request);
result.checkError();
// 3. 返回结果

View File

@@ -1,13 +1,15 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.http.router;
package cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.upstream;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotSubDeviceRegisterFullReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterRespDTO;
import io.vertx.core.json.JsonObject;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.router.IotHttpAbstractHandler;
import io.vertx.ext.web.RoutingContext;
import lombok.Data;
import java.util.List;
@@ -39,29 +41,32 @@ public class IotHttpRegisterSubHandler extends IotHttpAbstractHandler {
@Override
public CommonResult<Object> handle0(RoutingContext context) {
// 1. 解析通用参数
// 1.1 解析通用参数
String productKey = context.pathParam("productKey");
String deviceName = context.pathParam("deviceName");
// 2. 解析子设备列表
JsonObject body = context.body().asJsonObject();
if (body == null) {
throw invalidParamException("请求体不能为空");
}
if (body.getJsonArray("params") == null) {
// 1.2 解析子设备列表
SubDeviceRegisterRequest request = deserializeRequest(context, SubDeviceRegisterRequest.class);
if (CollUtil.isEmpty(request.getParams())) {
throw invalidParamException("params 不能为空");
}
List<cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterReqDTO> subDevices = JsonUtils.parseArray(
body.getJsonArray("params").toString(), cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterReqDTO.class);
// 3. 调用子设备动态注册
// 2. 调用子设备动态注册
IotSubDeviceRegisterFullReqDTO reqDTO = new IotSubDeviceRegisterFullReqDTO()
.setGatewayProductKey(productKey).setGatewayDeviceName(deviceName).setSubDevices(subDevices);
.setGatewayProductKey(productKey)
.setGatewayDeviceName(deviceName)
.setSubDevices(request.getParams());
CommonResult<List<IotSubDeviceRegisterRespDTO>> result = deviceApi.registerSubDevices(reqDTO);
result.checkError();
// 4. 返回结果
// 3. 返回结果
return success(result.getData());
}
@Data
public static class SubDeviceRegisterRequest {
private List<IotSubDeviceRegisterReqDTO> params;
}
}

View File

@@ -1,4 +1,4 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.http.router;
package cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.upstream;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.map.MapUtil;
@@ -6,31 +6,28 @@ import cn.hutool.core.text.StrPool;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.router.IotHttpAbstractHandler;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.ext.web.RoutingContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.invalidParamException;
/**
* IoT 网关 HTTP 协议的上行处理器
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotHttpUpstreamHandler extends IotHttpAbstractHandler {
public static final String PATH = "/topic/sys/:productKey/:deviceName/*";
private final IotHttpUpstreamProtocol protocol;
private final String serverId;
private final IotDeviceMessageService deviceMessageService;
public IotHttpUpstreamHandler(IotHttpUpstreamProtocol protocol) {
this.protocol = protocol;
public IotHttpUpstreamHandler(IotHttpProtocol protocol) {
this.serverId = protocol.getServerId();
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
}
@@ -41,20 +38,15 @@ public class IotHttpUpstreamHandler extends IotHttpAbstractHandler {
String deviceName = context.pathParam("deviceName");
String method = context.pathParam("*").replaceAll(StrPool.SLASH, StrPool.DOT);
// 2.1 解析消息
if (context.body().buffer() == null) {
throw invalidParamException("请求体不能为空");
}
byte[] bytes = context.body().buffer().getBytes();
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(bytes,
productKey, deviceName);
// 2.1 根据 Content-Type 反序列化消息
IotDeviceMessage message = deserializeRequest(context, IotDeviceMessage.class);
Assert.equals(method, message.getMethod(), "method 不匹配");
// 2.2 发送消息
deviceMessageService.sendDeviceMessage(message,
productKey, deviceName, protocol.getServerId());
productKey, deviceName, serverId);
// 3. 返回结果
return CommonResult.success(MapUtil.of("messageId", message.getId()));
}
}
}

View File

@@ -1,6 +1,7 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.http.router;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.ObjUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
@@ -9,11 +10,13 @@ import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.framework.common.util.object.ObjectUtils;
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.upstream.IotHttpAuthHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.upstream.IotHttpRegisterHandler;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager;
import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpHeaders;
import io.vertx.ext.web.RoutingContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
@@ -27,12 +30,13 @@ import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionU
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public abstract class IotHttpAbstractHandler implements Handler<RoutingContext> {
private final IotDeviceTokenService deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class);
private final IotMessageSerializerManager serializerManager = SpringUtil.getBean(IotMessageSerializerManager.class);
@Override
public final void handle(RoutingContext context) {
try {
@@ -83,12 +87,26 @@ public abstract class IotHttpAbstractHandler implements Handler<RoutingContext>
}
}
// ========== 序列化相关方法 ==========
protected static <T> T deserializeRequest(RoutingContext context, Class<T> clazz) {
byte[] body = context.body().buffer() != null ? context.body().buffer().getBytes() : null;
if (ArrayUtil.isEmpty(body)) {
throw invalidParamException("请求体不能为空");
}
return JsonUtils.parseObject(body, clazz);
}
private static String serializeResponse(Object data) {
return JsonUtils.toJsonString(data);
}
@SuppressWarnings("deprecation")
public static void writeResponse(RoutingContext context, Object data) {
context.response()
.setStatusCode(200)
.putHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE)
.end(JsonUtils.toJsonString(data));
.end(serializeResponse(data));
}
}

View File

@@ -1,11 +1,9 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttDownstreamHandler;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
/**
@@ -16,64 +14,27 @@ import lombok.extern.slf4j.Slf4j;
* @author 芋道源码
*/
@Slf4j
public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDeviceMessage> {
private final IotMqttUpstreamProtocol upstreamProtocol;
public class IotMqttDownstreamSubscriber extends IotProtocolDownstreamSubscriber {
private final IotMqttDownstreamHandler downstreamHandler;
private final IotMessageBus messageBus;
public IotMqttDownstreamSubscriber(IotMqttUpstreamProtocol upstreamProtocol,
public IotMqttDownstreamSubscriber(IotMqttUpstreamProtocol protocol,
IotMqttDownstreamHandler downstreamHandler,
IotMessageBus messageBus) {
this.upstreamProtocol = upstreamProtocol;
super(protocol, messageBus);
this.downstreamHandler = downstreamHandler;
this.messageBus = messageBus;
}
@PostConstruct
public void subscribe() {
messageBus.register(this);
log.info("[subscribe][MQTT 协议下行消息订阅成功,主题:{}]", getTopic());
}
@Override
public String getTopic() {
return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(upstreamProtocol.getServerId());
}
@Override
public String getGroup() {
// 保证点对点消费,需要保证独立的 Group所以使用 Topic 作为 Group
return getTopic();
}
@Override
public void onMessage(IotDeviceMessage message) {
log.debug("[onMessage][接收到下行消息, messageId: {}, method: {}, deviceId: {}]",
message.getId(), message.getMethod(), message.getDeviceId());
try {
// 1. 校验
String method = message.getMethod();
if (method == null) {
log.warn("[onMessage][消息方法为空, messageId: {}, deviceId: {}]",
message.getId(), message.getDeviceId());
return;
}
// 2. 委托给下行处理器处理业务逻辑
boolean success = downstreamHandler.handleDownstreamMessage(message);
if (success) {
log.debug("[onMessage][下行消息处理成功, messageId: {}, method: {}, deviceId: {}]",
message.getId(), message.getMethod(), message.getDeviceId());
} else {
log.warn("[onMessage][下行消息处理失败, messageId: {}, method: {}, deviceId: {}]",
message.getId(), message.getMethod(), message.getDeviceId());
}
} catch (Exception e) {
log.error("[onMessage][处理下行消息失败, messageId: {}, method: {}, deviceId: {}]",
message.getId(), message.getMethod(), message.getDeviceId(), e);
protected void handleMessage(IotDeviceMessage message) {
boolean success = downstreamHandler.handleDownstreamMessage(message);
if (success) {
log.debug("[handleMessage][下行消息处理成功, messageId: {}, method: {}, deviceId: {}]",
message.getId(), message.getMethod(), message.getDeviceId());
} else {
log.warn("[handleMessage][下行消息处理失败, messageId: {}, method: {}, deviceId: {}]",
message.getId(), message.getMethod(), message.getDeviceId());
}
}
}

View File

@@ -1,7 +1,9 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
@@ -19,7 +21,11 @@ import lombok.extern.slf4j.Slf4j;
* @author 芋道源码
*/
@Slf4j
public class IotMqttUpstreamProtocol {
public class IotMqttUpstreamProtocol implements IotProtocol {
private static final String ID = "mqtt";
private volatile boolean running = false;
private final IotGatewayProperties.MqttProperties mqttProperties;
@@ -45,7 +51,23 @@ public class IotMqttUpstreamProtocol {
this.serverId = IotDeviceMessageUtils.generateServerId(mqttProperties.getPort());
}
@Override
public String getId() {
return ID;
}
@Override
public IotProtocolTypeEnum getType() {
return IotProtocolTypeEnum.MQTT;
}
@Override
public boolean isRunning() {
return running;
}
// TODO @haohao这里的编写是不是和 tcp 对应的,风格保持一致哈;
@Override
@PostConstruct
public void start() {
// 创建服务器选项
@@ -71,6 +93,7 @@ public class IotMqttUpstreamProtocol {
// 启动服务器
try {
mqttServer.listen().result();
running = true;
log.info("[start][IoT 网关 MQTT 协议启动成功,端口:{}]", mqttProperties.getPort());
} catch (Exception e) {
log.error("[start][IoT 网关 MQTT 协议启动失败]", e);
@@ -78,11 +101,13 @@ public class IotMqttUpstreamProtocol {
}
}
@Override
@PreDestroy
public void stop() {
if (mqttServer != null) {
try {
mqttServer.close().result();
running = false;
log.info("[stop][IoT 网关 MQTT 协议已停止]");
} catch (Exception e) {
log.error("[stop][IoT 网关 MQTT 协议停止失败]", e);

View File

@@ -1,14 +1,9 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router.IotTcpDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
@@ -17,48 +12,20 @@ import lombok.extern.slf4j.Slf4j;
* @author 芋道源码
*/
@Slf4j
@RequiredArgsConstructor
public class IotTcpDownstreamSubscriber implements IotMessageSubscriber<IotDeviceMessage> {
public class IotTcpDownstreamSubscriber extends IotProtocolDownstreamSubscriber {
private final IotTcpUpstreamProtocol protocol;
private final IotTcpDownstreamHandler downstreamHandler;
private final IotDeviceMessageService messageService;
private final IotTcpConnectionManager connectionManager;
private final IotMessageBus messageBus;
private IotTcpDownstreamHandler downstreamHandler;
@PostConstruct
public void init() {
// 初始化下游处理器
this.downstreamHandler = new IotTcpDownstreamHandler(messageService, connectionManager);
// 注册下游订阅者
messageBus.register(this);
log.info("[init][TCP 下游订阅者初始化完成,服务器 ID: {}Topic: {}]",
protocol.getServerId(), getTopic());
public IotTcpDownstreamSubscriber(IotTcpUpstreamProtocol protocol,
IotTcpDownstreamHandler downstreamHandler,
IotMessageBus messageBus) {
super(protocol, messageBus);
this.downstreamHandler = downstreamHandler;
}
@Override
public String getTopic() {
return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(protocol.getServerId());
protected void handleMessage(IotDeviceMessage message) {
downstreamHandler.handle(message);
}
@Override
public String getGroup() {
// 保证点对点消费,需要保证独立的 Group所以使用 Topic 作为 Group
return getTopic();
}
@Override
public void onMessage(IotDeviceMessage message) {
try {
downstreamHandler.handle(message);
} catch (Exception e) {
log.error("[onMessage][处理下行消息失败,设备 ID: {},方法: {},消息 ID: {}]",
message.getDeviceId(), message.getMethod(), message.getId(), e);
}
}
}
}

View File

@@ -1,7 +1,9 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router.IotTcpUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
@@ -21,7 +23,9 @@ import lombok.extern.slf4j.Slf4j;
* @author 芋道源码
*/
@Slf4j
public class IotTcpUpstreamProtocol {
public class IotTcpUpstreamProtocol implements IotProtocol {
private static final String ID = "tcp";
private final IotGatewayProperties.TcpProperties tcpProperties;
@@ -38,6 +42,8 @@ public class IotTcpUpstreamProtocol {
private NetServer tcpServer;
private volatile boolean running = false;
public IotTcpUpstreamProtocol(IotGatewayProperties.TcpProperties tcpProperties,
IotDeviceService deviceService,
IotDeviceMessageService messageService,
@@ -51,6 +57,17 @@ public class IotTcpUpstreamProtocol {
this.serverId = IotDeviceMessageUtils.generateServerId(tcpProperties.getPort());
}
@Override
public String getId() {
return ID;
}
@Override
public IotProtocolTypeEnum getType() {
return IotProtocolTypeEnum.TCP;
}
@Override
@PostConstruct
public void start() {
// 创建服务器选项
@@ -78,6 +95,7 @@ public class IotTcpUpstreamProtocol {
// 启动服务器
try {
tcpServer.listen().result();
running = true;
log.info("[start][IoT 网关 TCP 协议启动成功,端口:{}]", tcpProperties.getPort());
} catch (Exception e) {
log.error("[start][IoT 网关 TCP 协议启动失败]", e);
@@ -85,15 +103,23 @@ public class IotTcpUpstreamProtocol {
}
}
@Override
@PreDestroy
public void stop() {
if (tcpServer != null) {
try {
tcpServer.close().result();
running = false;
log.info("[stop][IoT 网关 TCP 协议已停止]");
} catch (Exception e) {
log.error("[stop][IoT 网关 TCP 协议停止失败]", e);
}
}
}
}
@Override
public boolean isRunning() {
return running;
}
}

View File

@@ -1,14 +1,9 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.router.IotUdpDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
@@ -17,48 +12,20 @@ import lombok.extern.slf4j.Slf4j;
* @author 芋道源码
*/
@Slf4j
@RequiredArgsConstructor
public class IotUdpDownstreamSubscriber implements IotMessageSubscriber<IotDeviceMessage> {
public class IotUdpDownstreamSubscriber extends IotProtocolDownstreamSubscriber {
private final IotUdpUpstreamProtocol protocol;
private final IotUdpDownstreamHandler downstreamHandler;
private final IotDeviceMessageService messageService;
private final IotUdpSessionManager sessionManager;
private final IotMessageBus messageBus;
private IotUdpDownstreamHandler downstreamHandler;
@PostConstruct
public void init() {
// 初始化下游处理器
this.downstreamHandler = new IotUdpDownstreamHandler(messageService, sessionManager, protocol);
// 注册下游订阅者
messageBus.register(this);
log.info("[init][UDP 下游订阅者初始化完成,服务器 ID: {}Topic: {}]",
protocol.getServerId(), getTopic());
public IotUdpDownstreamSubscriber(IotUdpUpstreamProtocol protocol,
IotUdpDownstreamHandler downstreamHandler,
IotMessageBus messageBus) {
super(protocol, messageBus);
this.downstreamHandler = downstreamHandler;
}
@Override
public String getTopic() {
return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(protocol.getServerId());
}
@Override
public String getGroup() {
// 保证点对点消费,需要保证独立的 Group所以使用 Topic 作为 Group
return getTopic();
}
@Override
public void onMessage(IotDeviceMessage message) {
try {
downstreamHandler.handle(message);
} catch (Exception e) {
log.error("[onMessage][处理下行消息失败,设备 ID: {},方法: {},消息 ID: {}]",
message.getDeviceId(), message.getMethod(), message.getId(), e);
}
protected void handleMessage(IotDeviceMessage message) {
downstreamHandler.handle(message);
}
}

View File

@@ -2,9 +2,11 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.udp;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager.IotUdpSessionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.router.IotUdpUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
@@ -30,7 +32,9 @@ import java.util.List;
* @author 芋道源码
*/
@Slf4j
public class IotUdpUpstreamProtocol {
public class IotUdpUpstreamProtocol implements IotProtocol {
private static final String ID = "udp";
private final IotGatewayProperties.UdpProperties udpProperties;
@@ -55,6 +59,8 @@ public class IotUdpUpstreamProtocol {
private IotUdpUpstreamHandler upstreamHandler;
private volatile boolean running = false;
public IotUdpUpstreamProtocol(IotGatewayProperties.UdpProperties udpProperties,
IotDeviceService deviceService,
IotDeviceMessageService messageService,
@@ -68,6 +74,17 @@ public class IotUdpUpstreamProtocol {
this.serverId = IotDeviceMessageUtils.generateServerId(udpProperties.getPort());
}
@Override
public String getId() {
return ID;
}
@Override
public IotProtocolTypeEnum getType() {
return IotProtocolTypeEnum.UDP;
}
@Override
@PostConstruct
public void start() {
// 1. 初始化上行消息处理器
@@ -90,6 +107,7 @@ public class IotUdpUpstreamProtocol {
}
// 设置数据包处理器
udpSocket.handler(packet -> upstreamHandler.handle(packet, udpSocket));
running = true;
log.info("[start][IoT 网关 UDP 协议启动成功,端口:{},接收缓冲区:{} 字节,发送缓冲区:{} 字节]",
udpProperties.getPort(), udpProperties.getReceiveBufferSize(),
udpProperties.getSendBufferSize());
@@ -99,6 +117,7 @@ public class IotUdpUpstreamProtocol {
});
}
@Override
@PreDestroy
public void stop() {
// 1. 取消会话清理定时器
@@ -112,6 +131,7 @@ public class IotUdpUpstreamProtocol {
if (udpSocket != null) {
try {
udpSocket.close().result();
running = false;
log.info("[stop][IoT 网关 UDP 协议已停止]");
} catch (Exception e) {
log.error("[stop][IoT 网关 UDP 协议停止失败]", e);
@@ -119,6 +139,11 @@ public class IotUdpUpstreamProtocol {
}
}
@Override
public boolean isRunning() {
return running;
}
/**
* 启动会话清理定时器
*/

View File

@@ -1,14 +1,9 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.websocket;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.router.IotWebSocketDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
@@ -17,48 +12,20 @@ import lombok.extern.slf4j.Slf4j;
* @author 芋道源码
*/
@Slf4j
@RequiredArgsConstructor
public class IotWebSocketDownstreamSubscriber implements IotMessageSubscriber<IotDeviceMessage> {
public class IotWebSocketDownstreamSubscriber extends IotProtocolDownstreamSubscriber {
private final IotWebSocketUpstreamProtocol protocol;
private final IotWebSocketDownstreamHandler downstreamHandler;
private final IotDeviceMessageService messageService;
private final IotWebSocketConnectionManager connectionManager;
private final IotMessageBus messageBus;
private IotWebSocketDownstreamHandler downstreamHandler;
@PostConstruct
public void init() {
// 初始化下游处理器
this.downstreamHandler = new IotWebSocketDownstreamHandler(messageService, connectionManager);
// 注册下游订阅者
messageBus.register(this);
log.info("[init][WebSocket 下游订阅者初始化完成,服务器 ID: {}Topic: {}]",
protocol.getServerId(), getTopic());
public IotWebSocketDownstreamSubscriber(IotWebSocketUpstreamProtocol protocol,
IotWebSocketDownstreamHandler downstreamHandler,
IotMessageBus messageBus) {
super(protocol, messageBus);
this.downstreamHandler = downstreamHandler;
}
@Override
public String getTopic() {
return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(protocol.getServerId());
}
@Override
public String getGroup() {
// 保证点对点消费,需要保证独立的 Group所以使用 Topic 作为 Group
return getTopic();
}
@Override
public void onMessage(IotDeviceMessage message) {
try {
downstreamHandler.handle(message);
} catch (Exception e) {
log.error("[onMessage][处理下行消息失败,设备 ID: {},方法: {},消息 ID: {}]",
message.getDeviceId(), message.getMethod(), message.getId(), e);
}
protected void handleMessage(IotDeviceMessage message) {
downstreamHandler.handle(message);
}
}

View File

@@ -1,8 +1,10 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.websocket;
import cn.hutool.core.util.ObjUtil;
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.router.IotWebSocketUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
@@ -22,7 +24,9 @@ import lombok.extern.slf4j.Slf4j;
* @author 芋道源码
*/
@Slf4j
public class IotWebSocketUpstreamProtocol {
public class IotWebSocketUpstreamProtocol implements IotProtocol {
private static final String ID = "websocket";
private final IotGatewayProperties.WebSocketProperties wsProperties;
@@ -39,6 +43,8 @@ public class IotWebSocketUpstreamProtocol {
private HttpServer httpServer;
private volatile boolean running = false;
public IotWebSocketUpstreamProtocol(IotGatewayProperties.WebSocketProperties wsProperties,
IotDeviceService deviceService,
IotDeviceMessageService messageService,
@@ -52,6 +58,17 @@ public class IotWebSocketUpstreamProtocol {
this.serverId = IotDeviceMessageUtils.generateServerId(wsProperties.getPort());
}
@Override
public String getId() {
return ID;
}
@Override
public IotProtocolTypeEnum getType() {
return IotProtocolTypeEnum.WEBSOCKET;
}
@Override
@PostConstruct
@SuppressWarnings("deprecation")
public void start() {
@@ -88,6 +105,7 @@ public class IotWebSocketUpstreamProtocol {
// 3. 启动服务器
try {
httpServer.listen().result();
running = true;
log.info("[start][IoT 网关 WebSocket 协议启动成功,端口:{},路径:{}]", wsProperties.getPort(), wsProperties.getPath());
} catch (Exception e) {
log.error("[start][IoT 网关 WebSocket 协议启动失败]", e);
@@ -95,11 +113,13 @@ public class IotWebSocketUpstreamProtocol {
}
}
@Override
@PreDestroy
public void stop() {
if (httpServer != null) {
try {
httpServer.close().result();
running = false;
log.info("[stop][IoT 网关 WebSocket 协议已停止]");
} catch (Exception e) {
log.error("[stop][IoT 网关 WebSocket 协议停止失败]", e);
@@ -107,4 +127,9 @@ public class IotWebSocketUpstreamProtocol {
}
}
@Override
public boolean isRunning() {
return running;
}
}

View File

@@ -0,0 +1,38 @@
package cn.iocoder.yudao.module.iot.gateway.serialize;
import cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
/**
* IoT 设备消息序列化器接口
*
* 用于序列化和反序列化设备消息
*
* @author 芋道源码
*/
public interface IotMessageSerializer {
/**
* 序列化消息
*
* @param message 消息
* @return 编码后的消息内容
*/
byte[] serialize(IotDeviceMessage message);
/**
* 反序列化消息
*
* @param bytes 消息内容
* @return 解码后的消息内容
*/
IotDeviceMessage deserialize(byte[] bytes);
/**
* 获取序列化类型
*
* @return 序列化类型枚举
*/
IotSerializeTypeEnum getType();
}

View File

@@ -0,0 +1,60 @@
package cn.iocoder.yudao.module.iot.gateway.serialize;
import cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum;
import cn.iocoder.yudao.module.iot.gateway.serialize.binary.IotBinarySerializer;
import cn.iocoder.yudao.module.iot.gateway.serialize.json.IotJsonSerializer;
import lombok.extern.slf4j.Slf4j;
import java.util.EnumMap;
import java.util.Map;
/**
* IoT 序列化器管理器
*
* 负责根据枚举创建和管理序列化器实例
*
* @author 芋道源码
*/
@Slf4j
public class IotMessageSerializerManager {
private final Map<IotSerializeTypeEnum, IotMessageSerializer> serializerMap = new EnumMap<>(IotSerializeTypeEnum.class);
public IotMessageSerializerManager() {
// 遍历枚举,创建对应的序列化器
for (IotSerializeTypeEnum type : IotSerializeTypeEnum.values()) {
IotMessageSerializer serializer = createSerializer(type);
serializerMap.put(type, serializer);
log.info("[IotSerializerManager][序列化器 {} 创建成功]", type);
}
}
/**
* 根据类型创建序列化器
*
* @param type 序列化类型
* @return 序列化器实例
*/
@SuppressWarnings("EnhancedSwitchMigration")
private IotMessageSerializer createSerializer(IotSerializeTypeEnum type) {
switch (type) {
case JSON:
return new IotJsonSerializer();
case BINARY:
return new IotBinarySerializer();
default:
throw new IllegalArgumentException("未知的序列化类型:" + type);
}
}
/**
* 获取序列化器
*
* @param type 序列化类型
* @return 序列化器实例
*/
public IotMessageSerializer get(IotSerializeTypeEnum type) {
return serializerMap.get(type);
}
}

View File

@@ -0,0 +1,254 @@
package cn.iocoder.yudao.module.iot.gateway.serialize.binary;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import io.vertx.core.buffer.Buffer;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
/**
* 二进制格式的消息序列化器
*
* 二进制协议格式(所有数值使用大端序):
*
* <pre>
* +--------+--------+--------+---------------------------+--------+--------+
* | 魔术字 | 版本号 | 消息类型| 消息长度(4 字节) |
* +--------+--------+--------+---------------------------+--------+--------+
* | 消息 ID 长度(2 字节) | 消息 ID (变长字符串) |
* +--------+--------+--------+--------+--------+--------+--------+--------+
* | 方法名长度(2 字节) | 方法名(变长字符串) |
* +--------+--------+--------+--------+--------+--------+--------+--------+
* | 消息体数据(变长) |
* +--------+--------+--------+--------+--------+--------+--------+--------+
* </pre>
*
* 消息体格式:
* - 请求消息params 数据(JSON)
* - 响应消息code (4字节) + msg 长度(2字节) + msg 字符串 + data 数据(JSON)
*
* @author 芋道源码
*/
@Slf4j
public class IotBinarySerializer implements IotMessageSerializer {
/**
* 协议魔术字,用于协议识别
*/
private static final byte MAGIC_NUMBER = (byte) 0x7E;
/**
* 协议版本号
*/
private static final byte PROTOCOL_VERSION = (byte) 0x01;
/**
* 请求消息类型
*/
private static final byte REQUEST = (byte) 0x01;
/**
* 响应消息类型
*/
private static final byte RESPONSE = (byte) 0x02;
/**
* 协议头部固定长度(魔术字 + 版本号 + 消息类型 + 消息长度)
*/
private static final int HEADER_FIXED_LENGTH = 7;
/**
* 最小消息长度(头部 + 消息ID长度 + 方法名长度)
*/
private static final int MIN_MESSAGE_LENGTH = HEADER_FIXED_LENGTH + 4;
@Override
public IotSerializeTypeEnum getType() {
return IotSerializeTypeEnum.BINARY;
}
@Override
public byte[] serialize(IotDeviceMessage message) {
Assert.notNull(message, "消息不能为空");
Assert.notBlank(message.getMethod(), "消息方法不能为空");
try {
// 1. 确定消息类型
byte messageType = determineMessageType(message);
// 2. 构建消息体
byte[] bodyData = buildMessageBody(message, messageType);
// 3. 构建完整消息
return buildCompleteMessage(message, messageType, bodyData);
} catch (Exception e) {
log.error("[encode][二进制消息编码失败,消息: {}]", message, e);
throw new RuntimeException("二进制消息编码失败: " + e.getMessage(), e);
}
}
@Override
public IotDeviceMessage deserialize(byte[] bytes) {
Assert.notNull(bytes, "待解码数据不能为空");
Assert.isTrue(bytes.length >= MIN_MESSAGE_LENGTH, "数据包长度不足");
try {
Buffer buffer = Buffer.buffer(bytes);
int index = 0;
// 1. 验证魔术字
byte magic = buffer.getByte(index++);
Assert.isTrue(magic == MAGIC_NUMBER, "无效的协议魔术字: " + magic);
// 2. 验证版本号
byte version = buffer.getByte(index++);
Assert.isTrue(version == PROTOCOL_VERSION, "不支持的协议版本: " + version);
// 3. 读取消息类型
byte messageType = buffer.getByte(index++);
Assert.isTrue(messageType == REQUEST || messageType == RESPONSE, "无效的消息类型: " + messageType);
// 4. 读取消息长度
int messageLength = buffer.getInt(index);
index += 4;
Assert.isTrue(messageLength == buffer.length(),
"消息长度不匹配,期望: " + messageLength + ", 实际: " + buffer.length());
// 5. 读取消息 ID
short messageIdLength = buffer.getShort(index);
index += 2;
String messageId = buffer.getString(index, index + messageIdLength, StandardCharsets.UTF_8.name());
index += messageIdLength;
// 6. 读取方法名
short methodLength = buffer.getShort(index);
index += 2;
String method = buffer.getString(index, index + methodLength, StandardCharsets.UTF_8.name());
index += methodLength;
// 7. 解析消息体
return parseMessageBody(buffer, index, messageType, messageId, method);
} catch (Exception e) {
log.error("[decode][二进制消息解码失败,数据长度: {}]", bytes.length, e);
throw new RuntimeException("二进制消息解码失败: " + e.getMessage(), e);
}
}
/**
* 快速检测是否为二进制格式
*
* @param data 数据
* @return 是否为二进制格式
*/
public static boolean isBinaryFormat(byte[] data) {
return data != null && data.length >= 1 && data[0] == MAGIC_NUMBER;
}
private byte determineMessageType(IotDeviceMessage message) {
if (message.getCode() != null) {
return RESPONSE;
}
return REQUEST;
}
private byte[] buildMessageBody(IotDeviceMessage message, byte messageType) {
Buffer bodyBuffer = Buffer.buffer();
if (messageType == RESPONSE) {
// code
bodyBuffer.appendInt(message.getCode() != null ? message.getCode() : 0);
// msg
String msg = message.getMsg() != null ? message.getMsg() : "";
byte[] msgBytes = StrUtil.utf8Bytes(msg);
bodyBuffer.appendShort((short) msgBytes.length);
bodyBuffer.appendBytes(msgBytes);
// data
if (message.getData() != null) {
bodyBuffer.appendBytes(JsonUtils.toJsonByte(message.getData()));
}
} else {
// 请求消息只处理 params 参数
if (message.getParams() != null) {
bodyBuffer.appendBytes(JsonUtils.toJsonByte(message.getParams()));
}
}
return bodyBuffer.getBytes();
}
private byte[] buildCompleteMessage(IotDeviceMessage message, byte messageType, byte[] bodyData) {
Buffer buffer = Buffer.buffer();
// 1. 写入协议头部
buffer.appendByte(MAGIC_NUMBER);
buffer.appendByte(PROTOCOL_VERSION);
buffer.appendByte(messageType);
// 2. 预留消息长度位置
int lengthPosition = buffer.length();
buffer.appendInt(0);
// 3. 写入消息 ID
String messageId = StrUtil.isNotBlank(message.getRequestId()) ? message.getRequestId()
: IotDeviceMessageUtils.generateMessageId();
byte[] messageIdBytes = StrUtil.utf8Bytes(messageId);
buffer.appendShort((short) messageIdBytes.length);
buffer.appendBytes(messageIdBytes);
// 4. 写入方法名
byte[] methodBytes = StrUtil.utf8Bytes(message.getMethod());
buffer.appendShort((short) methodBytes.length);
buffer.appendBytes(methodBytes);
// 5. 写入消息体
buffer.appendBytes(bodyData);
// 6. 更新消息长度
buffer.setInt(lengthPosition, buffer.length());
return buffer.getBytes();
}
private IotDeviceMessage parseMessageBody(Buffer buffer, int startIndex, byte messageType,
String messageId, String method) {
if (startIndex >= buffer.length()) {
return IotDeviceMessage.of(messageId, method, null, null, null, null);
}
if (messageType == RESPONSE) {
return parseResponseMessage(buffer, startIndex, messageId, method);
} else {
Object payload = parseJsonData(buffer, startIndex, buffer.length());
return IotDeviceMessage.of(messageId, method, payload, null, null, null);
}
}
private IotDeviceMessage parseResponseMessage(Buffer buffer, int startIndex, String messageId, String method) {
int index = startIndex;
// 1. 读取响应码
Integer code = buffer.getInt(index);
index += 4;
// 2. 读取响应消息
short msgLength = buffer.getShort(index);
index += 2;
String msg = msgLength > 0 ? buffer.getString(index, index + msgLength, StandardCharsets.UTF_8.name()) : null;
index += msgLength;
// 3. 读取响应数据
Object data = null;
if (index < buffer.length()) {
data = parseJsonData(buffer, index, buffer.length());
}
return IotDeviceMessage.of(messageId, method, null, data, code, msg);
}
private Object parseJsonData(Buffer buffer, int startIndex, int endIndex) {
if (startIndex >= endIndex) {
return null;
}
try {
String jsonStr = buffer.getString(startIndex, endIndex, StandardCharsets.UTF_8.name());
return JsonUtils.parseObject(jsonStr, Object.class);
} catch (Exception e) {
log.warn("[parseJsonData][JSON 解析失败,返回原始字符串]", e);
return buffer.getString(startIndex, endIndex, StandardCharsets.UTF_8.name());
}
}
}

View File

@@ -0,0 +1,37 @@
package cn.iocoder.yudao.module.iot.gateway.serialize.json;
import cn.hutool.core.lang.Assert;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
/**
* JSON 格式的消息序列化器
*
* 直接使用 JsonUtils 序列化/反序列化 {@link IotDeviceMessage},不包装额外字段
*
* @author 芋道源码
*/
public class IotJsonSerializer implements IotMessageSerializer {
@Override
public IotSerializeTypeEnum getType() {
return IotSerializeTypeEnum.JSON;
}
@Override
public byte[] serialize(IotDeviceMessage message) {
Assert.notNull(message, "消息不能为空");
return JsonUtils.toJsonByte(message);
}
@Override
public IotDeviceMessage deserialize(byte[] bytes) {
Assert.notNull(bytes, "待解码数据不能为空");
IotDeviceMessage message = JsonUtils.parseObject(bytes, IotDeviceMessage.class);
Assert.notNull(message, "消息解码失败");
return message;
}
}

View File

@@ -42,14 +42,20 @@ yudao:
secret: yudaoIotGatewayTokenSecret123456789 # Token 密钥至少32位
expiration: 7d
# 协议配置
protocol:
# 协议实例列表(新版配置方式)
protocols:
# ====================================
# 针对引入的 HTTP 组件的配置
# ====================================
http:
- id: http-json
type: http
port: 8092
enabled: true
server-port: 8092
http:
ssl-enabled: false
# 协议配置(旧版,保持兼容)
protocol:
# ====================================
# 针对引入的 EMQX 组件的配置
# ====================================