diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/api/device/IoTDeviceApiImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/api/device/IoTDeviceApiImpl.java index db0a862d0e..ee4680ea6d 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/api/device/IoTDeviceApiImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/api/device/IoTDeviceApiImpl.java @@ -58,7 +58,7 @@ public class IoTDeviceApiImpl implements IotDeviceCommonApi { return success(BeanUtils.toBean(device, IotDeviceRespDTO.class, deviceDTO -> { IotProductDO product = productService.getProductFromCache(deviceDTO.getProductId()); if (product != null) { - deviceDTO.setCodecType(product.getCodecType()); + deviceDTO.setProtocolType(product.getProtocolType()).setSerializeType(product.getSerializeType()); } })); } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/product/vo/product/IotProductRespVO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/product/vo/product/IotProductRespVO.java index ffc92a2132..302b072620 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/product/vo/product/IotProductRespVO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/product/vo/product/IotProductRespVO.java @@ -67,10 +67,15 @@ public class IotProductRespVO { @DictFormat(DictTypeConstants.NET_TYPE) private Integer netType; - @Schema(description = "数据格式", requiredMode = Schema.RequiredMode.REQUIRED, example = "2") - @ExcelProperty(value = "数据格式", converter = DictConvert.class) - @DictFormat(DictTypeConstants.CODEC_TYPE) - private String codecType; + @Schema(description = "协议类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "mqtt") + @ExcelProperty(value = "协议类型", converter = DictConvert.class) + @DictFormat(DictTypeConstants.PROTOCOL_TYPE) + private String protocolType; + + @Schema(description = "序列化类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "json") + @ExcelProperty(value = "序列化类型", converter = DictConvert.class) + @DictFormat(DictTypeConstants.SERIALIZE_TYPE) + private String serializeType; @Schema(description = "创建时间", requiredMode = Schema.RequiredMode.REQUIRED) @ExcelProperty("创建时间") diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/product/vo/product/IotProductSaveReqVO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/product/vo/product/IotProductSaveReqVO.java index 08c636f7f2..fceede0eb0 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/product/vo/product/IotProductSaveReqVO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/product/vo/product/IotProductSaveReqVO.java @@ -1,6 +1,8 @@ package cn.iocoder.yudao.module.iot.controller.admin.product.vo.product; import cn.iocoder.yudao.framework.common.validation.InEnum; +import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; +import cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum; import cn.iocoder.yudao.module.iot.enums.product.IotNetTypeEnum; import cn.iocoder.yudao.module.iot.enums.product.IotProductDeviceTypeEnum; import io.swagger.v3.oas.annotations.media.Schema; @@ -44,9 +46,15 @@ public class IotProductSaveReqVO { @InEnum(value = IotNetTypeEnum.class, message = "联网方式必须是 {value}") private Integer netType; - @Schema(description = "数据格式", requiredMode = Schema.RequiredMode.REQUIRED, example = "2") - @NotEmpty(message = "数据格式不能为空") - private String codecType; + @Schema(description = "协议类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "mqtt") + @InEnum(value = IotProtocolTypeEnum.class, message = "协议类型必须是 {value}") + @NotEmpty(message = "协议类型不能为空") + private String protocolType; + + @Schema(description = "序列化类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "json") + @InEnum(value = IotSerializeTypeEnum.class, message = "序列化类型必须是 {value}") + @NotEmpty(message = "序列化类型不能为空") + private String serializeType; @Schema(description = "是否开启动态注册", example = "false") @NotNull(message = "是否开启动态注册不能为空") diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/product/IotProductDO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/product/IotProductDO.java index e296b35017..a1a77fc5e4 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/product/IotProductDO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/product/IotProductDO.java @@ -78,12 +78,16 @@ public class IotProductDO extends TenantBaseDO { */ private Integer netType; /** - * 数据格式(编解码器类型) + * 协议类型 *

- * 字典 {@link cn.iocoder.yudao.module.iot.enums.DictTypeConstants#CODEC_TYPE} - * - * 目的:用于 gateway-server 解析消息格式 + * 枚举 {@link cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum} */ - private String codecType; + private String protocolType; + /** + * 序列化类型 + *

+ * 枚举 {@link cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum} + */ + private String serializeType; } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/enums/DictTypeConstants.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/enums/DictTypeConstants.java index 4f07ddfc1c..cf6bec1181 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/enums/DictTypeConstants.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/enums/DictTypeConstants.java @@ -8,8 +8,8 @@ package cn.iocoder.yudao.module.iot.enums; public class DictTypeConstants { public static final String NET_TYPE = "iot_net_type"; - public static final String LOCATION_TYPE = "iot_location_type"; - public static final String CODEC_TYPE = "iot_codec_type"; + public static final String PROTOCOL_TYPE = "iot_protocol_type"; + public static final String SERIALIZE_TYPE = "iot_serialize_type"; public static final String PRODUCT_STATUS = "iot_product_status"; public static final String PRODUCT_DEVICE_TYPE = "iot_product_device_type"; diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/biz/dto/IotDeviceRespDTO.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/biz/dto/IotDeviceRespDTO.java index add1167801..8ad2c5bcd0 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/biz/dto/IotDeviceRespDTO.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/biz/dto/IotDeviceRespDTO.java @@ -34,8 +34,12 @@ public class IotDeviceRespDTO { */ private Long productId; /** - * 编解码器类型 + * 协议类型 */ - private String codecType; + private String protocolType; + /** + * 序列化类型 + */ + private String serializeType; } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/IotDeviceMessageCodec.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/IotDeviceMessageCodec.java deleted file mode 100644 index 94dd309dd1..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/IotDeviceMessageCodec.java +++ /dev/null @@ -1,33 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.codec; - -import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; - -/** - * {@link cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage} 的编解码器 - * - * @author 芋道源码 - */ -public interface IotDeviceMessageCodec { - - /** - * 编码消息 - * - * @param message 消息 - * @return 编码后的消息内容 - */ - byte[] encode(IotDeviceMessage message); - - /** - * 解码消息 - * - * @param bytes 消息内容 - * @return 解码后的消息内容 - */ - IotDeviceMessage decode(byte[] bytes); - - /** - * @return 数据格式(编码器类型) - */ - String type(); - -} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/alink/IotAlinkDeviceMessageCodec.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/alink/IotAlinkDeviceMessageCodec.java deleted file mode 100644 index 5a4e47fe18..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/alink/IotAlinkDeviceMessageCodec.java +++ /dev/null @@ -1,89 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.codec.alink; - -import cn.hutool.core.lang.Assert; -import cn.iocoder.yudao.framework.common.pojo.CommonResult; -import cn.iocoder.yudao.framework.common.util.json.JsonUtils; -import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; -import org.springframework.stereotype.Component; - -/** - * 阿里云 Alink {@link IotDeviceMessage} 的编解码器 - * - * @author 芋道源码 - */ -@Component -public class IotAlinkDeviceMessageCodec implements IotDeviceMessageCodec { - - public static final String TYPE = "Alink"; - - @Data - @NoArgsConstructor - @AllArgsConstructor - private static class AlinkMessage { - - public static final String VERSION_1 = "1.0"; - - /** - * 消息 ID,且每个消息 ID 在当前设备具有唯一性 - */ - private String id; - - /** - * 版本号 - */ - private String version; - - /** - * 请求方法 - */ - private String method; - - /** - * 请求参数 - */ - private Object params; - - /** - * 响应结果 - */ - private Object data; - /** - * 响应错误码 - */ - private Integer code; - /** - * 响应提示 - * - * 特殊:这里阿里云是 message,为了保持和项目的 {@link CommonResult#getMsg()} 一致。 - */ - private String msg; - - } - - @Override - public String type() { - return TYPE; - } - - @Override - public byte[] encode(IotDeviceMessage message) { - AlinkMessage alinkMessage = new AlinkMessage(message.getRequestId(), AlinkMessage.VERSION_1, - message.getMethod(), message.getParams(), message.getData(), message.getCode(), message.getMsg()); - return JsonUtils.toJsonByte(alinkMessage); - } - - @Override - @SuppressWarnings("DataFlowIssue") - public IotDeviceMessage decode(byte[] bytes) { - AlinkMessage alinkMessage = JsonUtils.parseObject(bytes, AlinkMessage.class); - Assert.notNull(alinkMessage, "消息不能为空"); - Assert.equals(alinkMessage.getVersion(), AlinkMessage.VERSION_1, "消息版本号必须是 1.0"); - return IotDeviceMessage.of(alinkMessage.getId(), alinkMessage.getMethod(), alinkMessage.getParams(), - alinkMessage.getData(), alinkMessage.getCode(), alinkMessage.getMsg()); - } - -} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/package-info.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/package-info.java deleted file mode 100644 index e1dae7707a..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/package-info.java +++ /dev/null @@ -1,4 +0,0 @@ -/** - * 提供设备接入的各种数据(请求、响应)的编解码 - */ -package cn.iocoder.yudao.module.iot.gateway.codec; \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpBinaryDeviceMessageCodec.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpBinaryDeviceMessageCodec.java deleted file mode 100644 index 05098cccbf..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpBinaryDeviceMessageCodec.java +++ /dev/null @@ -1,286 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.codec.tcp; - -import cn.hutool.core.lang.Assert; -import cn.hutool.core.util.StrUtil; -import cn.iocoder.yudao.framework.common.util.json.JsonUtils; -import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; -import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; -import io.vertx.core.buffer.Buffer; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -import java.nio.charset.StandardCharsets; - -/** - * TCP/UDP 二进制格式 {@link IotDeviceMessage} 编解码器 - *

- * 二进制协议格式(所有数值使用大端序): - * - *

- * +--------+--------+--------+---------------------------+--------+--------+
- * | 魔术字 | 版本号 | 消息类型|         消息长度(4 字节)          |
- * +--------+--------+--------+---------------------------+--------+--------+
- * |           消息 ID 长度(2 字节)        |      消息 ID (变长字符串)         |
- * +--------+--------+--------+--------+--------+--------+--------+--------+
- * |           方法名长度(2 字节)        |      方法名(变长字符串)         |
- * +--------+--------+--------+--------+--------+--------+--------+--------+
- * |                        消息体数据(变长)                              |
- * +--------+--------+--------+--------+--------+--------+--------+--------+
- * 
- *

- * 消息体格式: - * - 请求消息:params 数据(JSON) - * - 响应消息:code (4字节) + msg 长度(2字节) + msg 字符串 + data 数据(JSON) - *

- * 注意:deviceId 不包含在协议中,由服务器根据连接上下文自动设置 - * - * @author 芋道源码 - */ -@Slf4j -@Component -public class IotTcpBinaryDeviceMessageCodec implements IotDeviceMessageCodec { - - public static final String TYPE = "TCP_BINARY"; - - /** - * 协议魔术字,用于协议识别 - */ - private static final byte MAGIC_NUMBER = (byte) 0x7E; - - /** - * 协议版本号 - */ - private static final byte PROTOCOL_VERSION = (byte) 0x01; - - /** - * 请求消息类型 - */ - private static final byte REQUEST = (byte) 0x01; - - /** - * 响应消息类型 - */ - private static final byte RESPONSE = (byte) 0x02; - - /** - * 协议头部固定长度(魔术字 + 版本号 + 消息类型 + 消息长度) - */ - private static final int HEADER_FIXED_LENGTH = 7; - - /** - * 最小消息长度(头部 + 消息ID长度 + 方法名长度) - */ - private static final int MIN_MESSAGE_LENGTH = HEADER_FIXED_LENGTH + 4; - - @Override - public String type() { - return TYPE; - } - - @Override - public byte[] encode(IotDeviceMessage message) { - Assert.notNull(message, "消息不能为空"); - Assert.notBlank(message.getMethod(), "消息方法不能为空"); - try { - // 1. 确定消息类型 - byte messageType = determineMessageType(message); - // 2. 构建消息体 - byte[] bodyData = buildMessageBody(message, messageType); - // 3. 构建完整消息 - return buildCompleteMessage(message, messageType, bodyData); - } catch (Exception e) { - log.error("[encode][TCP 二进制消息编码失败,消息: {}]", message, e); - throw new RuntimeException("TCP 二进制消息编码失败: " + e.getMessage(), e); - } - } - - @Override - public IotDeviceMessage decode(byte[] bytes) { - Assert.notNull(bytes, "待解码数据不能为空"); - Assert.isTrue(bytes.length >= MIN_MESSAGE_LENGTH, "数据包长度不足"); - try { - Buffer buffer = Buffer.buffer(bytes); - // 解析协议头部和消息内容 - int index = 0; - // 1. 验证魔术字 - byte magic = buffer.getByte(index++); - Assert.isTrue(magic == MAGIC_NUMBER, "无效的协议魔术字: " + magic); - - // 2. 验证版本号 - byte version = buffer.getByte(index++); - Assert.isTrue(version == PROTOCOL_VERSION, "不支持的协议版本: " + version); - - // 3. 读取消息类型 - byte messageType = buffer.getByte(index++); - // 直接验证消息类型,无需抽取方法 - Assert.isTrue(messageType == REQUEST || messageType == RESPONSE, - "无效的消息类型: " + messageType); - - // 4. 读取消息长度 - int messageLength = buffer.getInt(index); - index += 4; - Assert.isTrue(messageLength == buffer.length(), - "消息长度不匹配,期望: " + messageLength + ", 实际: " + buffer.length()); - - // 5. 读取消息 ID - short messageIdLength = buffer.getShort(index); - index += 2; - String messageId = buffer.getString(index, index + messageIdLength, StandardCharsets.UTF_8.name()); - index += messageIdLength; - - // 6. 读取方法名 - short methodLength = buffer.getShort(index); - index += 2; - String method = buffer.getString(index, index + methodLength, StandardCharsets.UTF_8.name()); - index += methodLength; - - // 7. 解析消息体 - return parseMessageBody(buffer, index, messageType, messageId, method); - } catch (Exception e) { - log.error("[decode][TCP 二进制消息解码失败,数据长度: {}]", bytes.length, e); - throw new RuntimeException("TCP 二进制消息解码失败: " + e.getMessage(), e); - } - } - - /** - * 确定消息类型 - * 优化后的判断逻辑:有响应字段就是响应消息,否则就是请求消息 - */ - private byte determineMessageType(IotDeviceMessage message) { - // 判断是否为响应消息:有响应码或响应消息时为响应 - if (message.getCode() != null) { - return RESPONSE; - } - // 默认为请求消息 - return REQUEST; - } - - /** - * 构建消息体 - */ - private byte[] buildMessageBody(IotDeviceMessage message, byte messageType) { - Buffer bodyBuffer = Buffer.buffer(); - if (messageType == RESPONSE) { - // code - bodyBuffer.appendInt(message.getCode() != null ? message.getCode() : 0); - // msg - String msg = message.getMsg() != null ? message.getMsg() : ""; - byte[] msgBytes = StrUtil.utf8Bytes(msg); - bodyBuffer.appendShort((short) msgBytes.length); - bodyBuffer.appendBytes(msgBytes); - // data - if (message.getData() != null) { - bodyBuffer.appendBytes(JsonUtils.toJsonByte(message.getData())); - } - } else { - // 请求消息只处理 params 参数 - // TODO @haohao:如果为空,是不是得写个长度 0 哈? - if (message.getParams() != null) { - bodyBuffer.appendBytes(JsonUtils.toJsonByte(message.getParams())); - } - } - return bodyBuffer.getBytes(); - } - - /** - * 构建完整消息 - */ - private byte[] buildCompleteMessage(IotDeviceMessage message, byte messageType, byte[] bodyData) { - Buffer buffer = Buffer.buffer(); - // 1. 写入协议头部 - buffer.appendByte(MAGIC_NUMBER); - buffer.appendByte(PROTOCOL_VERSION); - buffer.appendByte(messageType); - // 2. 预留消息长度位置(在 5. 更新消息长度) - int lengthPosition = buffer.length(); - buffer.appendInt(0); - // 3. 写入消息 ID - String messageId = StrUtil.isNotBlank(message.getRequestId()) ? message.getRequestId() - : IotDeviceMessageUtils.generateMessageId(); - byte[] messageIdBytes = StrUtil.utf8Bytes(messageId); - buffer.appendShort((short) messageIdBytes.length); - buffer.appendBytes(messageIdBytes); - // 4. 写入方法名 - byte[] methodBytes = StrUtil.utf8Bytes(message.getMethod()); - buffer.appendShort((short) methodBytes.length); - buffer.appendBytes(methodBytes); - // 5. 写入消息体 - buffer.appendBytes(bodyData); - // 6. 更新消息长度 - buffer.setInt(lengthPosition, buffer.length()); - return buffer.getBytes(); - } - - /** - * 解析消息体 - */ - private IotDeviceMessage parseMessageBody(Buffer buffer, int startIndex, byte messageType, - String messageId, String method) { - if (startIndex >= buffer.length()) { - // 空消息体 - return IotDeviceMessage.of(messageId, method, null, null, null, null); - } - - if (messageType == RESPONSE) { - // 响应消息:解析 code + msg + data - return parseResponseMessage(buffer, startIndex, messageId, method); - } else { - // 请求消息:解析 payload - Object payload = parseJsonData(buffer, startIndex, buffer.length()); - return IotDeviceMessage.of(messageId, method, payload, null, null, null); - } - } - - /** - * 解析响应消息 - */ - private IotDeviceMessage parseResponseMessage(Buffer buffer, int startIndex, String messageId, String method) { - int index = startIndex; - - // 1. 读取响应码 - Integer code = buffer.getInt(index); - index += 4; - - // 2. 读取响应消息 - short msgLength = buffer.getShort(index); - index += 2; - String msg = msgLength > 0 ? buffer.getString(index, index + msgLength, StandardCharsets.UTF_8.name()) : null; - index += msgLength; - - // 3. 读取响应数据 - Object data = null; - if (index < buffer.length()) { - data = parseJsonData(buffer, index, buffer.length()); - } - - return IotDeviceMessage.of(messageId, method, null, data, code, msg); - } - - /** - * 解析 JSON 数据 - */ - private Object parseJsonData(Buffer buffer, int startIndex, int endIndex) { - if (startIndex >= endIndex) { - return null; - } - try { - String jsonStr = buffer.getString(startIndex, endIndex, StandardCharsets.UTF_8.name()); - return JsonUtils.parseObject(jsonStr, Object.class); - } catch (Exception e) { - log.warn("[parseJsonData][JSON 解析失败,返回原始字符串]", e); - return buffer.getString(startIndex, endIndex, StandardCharsets.UTF_8.name()); - } - } - - /** - * 快速检测是否为二进制格式 - * - * @param data 数据 - * @return 是否为二进制格式 - */ - public static boolean isBinaryFormatQuick(byte[] data) { - return data != null && data.length >= 1 && data[0] == MAGIC_NUMBER; - } - -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpJsonDeviceMessageCodec.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpJsonDeviceMessageCodec.java deleted file mode 100644 index 7d62ce2e0f..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/codec/tcp/IotTcpJsonDeviceMessageCodec.java +++ /dev/null @@ -1,110 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.codec.tcp; - -import cn.hutool.core.lang.Assert; -import cn.hutool.core.util.StrUtil; -import cn.iocoder.yudao.framework.common.util.json.JsonUtils; -import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; -import org.springframework.stereotype.Component; - -/** - * TCP/UDP JSON 格式 {@link IotDeviceMessage} 编解码器 - * - * 采用纯 JSON 格式传输,格式如下: - * { - * "id": "消息 ID", - * "method": "消息方法", - * "params": {...}, // 请求参数 - * "data": {...}, // 响应结果 - * "code": 200, // 响应错误码 - * "msg": "success", // 响应提示 - * "timestamp": 时间戳 - * } - * - * @author 芋道源码 - */ -@Component -public class IotTcpJsonDeviceMessageCodec implements IotDeviceMessageCodec { - - public static final String TYPE = "TCP_JSON"; - - @Data - @NoArgsConstructor - @AllArgsConstructor - private static class TcpJsonMessage { - - /** - * 消息 ID,且每个消息 ID 在当前设备具有唯一性 - */ - private String id; - - /** - * 请求方法 - */ - private String method; - - /** - * 请求参数 - */ - private Object params; - - /** - * 响应结果 - */ - private Object data; - - /** - * 响应错误码 - */ - private Integer code; - - /** - * 响应提示 - */ - private String msg; - - /** - * 时间戳 - */ - private Long timestamp; - - } - - @Override - public String type() { - return TYPE; - } - - @Override - public byte[] encode(IotDeviceMessage message) { - TcpJsonMessage tcpJsonMessage = new TcpJsonMessage( - message.getRequestId(), - message.getMethod(), - message.getParams(), - message.getData(), - message.getCode(), - message.getMsg(), - System.currentTimeMillis()); - return JsonUtils.toJsonByte(tcpJsonMessage); - } - - @Override - @SuppressWarnings("DataFlowIssue") - public IotDeviceMessage decode(byte[] bytes) { - String jsonStr = StrUtil.utf8Str(bytes).trim(); - TcpJsonMessage tcpJsonMessage = JsonUtils.parseObject(jsonStr, TcpJsonMessage.class); - Assert.notNull(tcpJsonMessage, "消息不能为空"); - Assert.notBlank(tcpJsonMessage.getMethod(), "消息方法不能为空"); - return IotDeviceMessage.of( - tcpJsonMessage.getId(), - tcpJsonMessage.getMethod(), - tcpJsonMessage.getParams(), - tcpJsonMessage.getData(), - tcpJsonMessage.getCode(), - tcpJsonMessage.getMsg()); - } - -} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/downstream/IotEmqxDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/downstream/IotEmqxDownstreamHandler.java index 77f777cafa..db5ea124ee 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/downstream/IotEmqxDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/downstream/IotEmqxDownstreamHandler.java @@ -53,9 +53,10 @@ public class IotEmqxDownstreamHandler { return; } // 2.2 构建载荷 - byte[] payload = deviceMessageService.encodeDeviceMessage(message, deviceInfo.getProductKey(), + byte[] payload = deviceMessageService.serializeDeviceMessage(message, deviceInfo.getProductKey(), deviceInfo.getDeviceName()); - // 2.3 发布消息 + + // 3. 发布消息 protocol.publishMessage(topic, payload); } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/upstream/IotEmqxAuthEventHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/upstream/IotEmqxAuthEventHandler.java index 53705aa64e..ccc7a2b7e1 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/upstream/IotEmqxAuthEventHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/upstream/IotEmqxAuthEventHandler.java @@ -452,16 +452,17 @@ public class IotEmqxAuthEventHandler { IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(username); Assert.notNull(deviceInfo, "设备信息不能为空"); try { - // 1. 构建响应消息 + // 1.1 构建响应消息 String method = IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(); IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(null, method, result, 0, null); - - // 2. 编码消息 - byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, "Alink"); - - // 3. 构建响应主题并延迟发布(等待设备连接成功并完成订阅) + // 1.2 序列化消息 + byte[] encodedData = deviceMessageService.serializeDeviceMessage(responseMessage, + cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum.JSON); + // 1.3 构建响应主题 String replyTopic = IotMqttTopicUtils.buildTopicByMethod(method, deviceInfo.getProductKey(), deviceInfo.getDeviceName(), true); + + // 2. 构建响应主题,并延迟发布(等待设备连接成功并完成订阅) protocol.publishDelayMessage(replyTopic, encodedData, 5000); log.info("[sendRegisterResultMessage][发送注册结果: topic={}]", replyTopic); } catch (Exception e) { diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/upstream/IotEmqxUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/upstream/IotEmqxUpstreamHandler.java index 4c2fa488f9..17d5f85fc0 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/upstream/IotEmqxUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/handler/upstream/IotEmqxUpstreamHandler.java @@ -1,5 +1,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.emqx.handler.upstream; +import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.extra.spring.SpringUtil; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; @@ -34,22 +35,21 @@ public class IotEmqxUpstreamHandler { try { // 1. 解析主题,一次性获取所有信息 String[] topicParts = topic.split("/"); - if (topicParts.length < 4 || StrUtil.hasBlank(topicParts[2], topicParts[3])) { + String productKey = ArrayUtil.get(topicParts, 2); + String deviceName = ArrayUtil.get(topicParts, 3); + if (topicParts.length < 4 || StrUtil.hasBlank(productKey, deviceName)) { log.warn("[handle][topic({}) 格式不正确,无法解析有效的 productKey 和 deviceName]", topic); return; } - String productKey = topicParts[2]; - String deviceName = topicParts[3]; - - // 3. 解码消息 - IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName); + // 2. 反序列化消息 + IotDeviceMessage message = deviceMessageService.deserializeDeviceMessage(payload, productKey, deviceName); if (message == null) { log.warn("[handle][topic({}) payload({}) 消息解码失败]", topic, new String(payload)); return; } - // 4. 发送消息到队列 + // 3. 发送消息到队列 deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId); } catch (Exception e) { log.error("[handle][topic({}) payload({}) 处理异常]", topic, new String(payload), e); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/downstream/IotMqttDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/downstream/IotMqttDownstreamHandler.java index 153da2eec1..18a5a413d2 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/downstream/IotMqttDownstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/downstream/IotMqttDownstreamHandler.java @@ -43,7 +43,7 @@ public class IotMqttDownstreamHandler { } // 2.1 序列化消息 - byte[] payload = deviceMessageService.encodeDeviceMessage(message, connectionInfo.getProductKey(), + byte[] payload = deviceMessageService.serializeDeviceMessage(message, connectionInfo.getProductKey(), connectionInfo.getDeviceName()); Assert.isTrue(payload != null && payload.length > 0, "消息编码结果不能为空"); // 2.2 构建主题 diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttAbstractHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttAbstractHandler.java index 12445cb85b..443dc1069c 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttAbstractHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttAbstractHandler.java @@ -21,12 +21,6 @@ import lombok.extern.slf4j.Slf4j; @RequiredArgsConstructor public abstract class IotMqttAbstractHandler { - // TODO @AI:当前使用 Alink 序列化类型,后续可考虑支持更多序列化方式 - /** - * 默认编解码类型(MQTT 使用 Alink 协议) - */ - protected static final String DEFAULT_CODEC_TYPE = "Alink"; - protected final IotMqttConnectionManager connectionManager; protected final IotDeviceMessageService deviceMessageService; @@ -40,8 +34,9 @@ public abstract class IotMqttAbstractHandler { * @param method 方法名 * @param data 响应数据 */ + @SuppressWarnings("SameParameterValue") protected void sendSuccessResponse(MqttEndpoint endpoint, String productKey, String deviceName, - String requestId, String method, Object data) { + String requestId, String method, Object data) { IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, method, data, 0, null); writeResponse(endpoint, productKey, deviceName, method, responseMessage); } @@ -75,11 +70,12 @@ public abstract class IotMqttAbstractHandler { private void writeResponse(MqttEndpoint endpoint, String productKey, String deviceName, String method, IotDeviceMessage responseMessage) { try { - // 1. 编码消息(使用默认编解码器) - byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, DEFAULT_CODEC_TYPE); - - // 2. 构建响应主题,并发送 + // 1.1 序列化消息(根据设备配置的序列化类型) + byte[] encodedData = deviceMessageService.serializeDeviceMessage(responseMessage, productKey, deviceName); + // 1.2 构建响应主题 String replyTopic = IotMqttTopicUtils.buildTopicByMethod(method, productKey, deviceName, true); + + // 2. 发送响应消息 endpoint.publish(replyTopic, Buffer.buffer(encodedData), MqttQoS.AT_LEAST_ONCE, false, false); log.debug("[writeResponse][发送响应,主题: {},code: {}]", replyTopic, responseMessage.getCode()); } catch (Exception e) { diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttRegisterHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttRegisterHandler.java index ec1dce2061..9640fa20b0 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttRegisterHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttRegisterHandler.java @@ -14,6 +14,8 @@ import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessa import io.vertx.mqtt.MqttEndpoint; import lombok.extern.slf4j.Slf4j; +import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR; + /** * IoT 网关 MQTT 设备注册处理器:处理设备动态注册消息(一型一密) * @@ -76,7 +78,8 @@ public class IotMqttRegisterHandler extends IotMqttAbstractHandler { log.warn("[handleRegister][注册失败,客户端 ID: {},错误: {}]", clientId, e.getMessage()); // 接受连接,并发送错误响应 endpoint.accept(false); - sendErrorResponse(endpoint, productKey, deviceName, null, method, 500, e.getMessage()); + sendErrorResponse(endpoint, productKey, deviceName, null, method, + INTERNAL_SERVER_ERROR.getCode(), e.getMessage()); } finally { // 注册完成后关闭连接(一型一密只用于获取 deviceSecret,不保持连接) endpoint.close(); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttUpstreamHandler.java index 4014ccdf03..00a0c4b849 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/handler/upstream/IotMqttUpstreamHandler.java @@ -60,7 +60,7 @@ public class IotMqttUpstreamHandler extends IotMqttAbstractHandler { Assert.equals(deviceName, connectionInfo.getDeviceName(), "设备名称不匹配"); // 2. 反序列化消息 - message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName); + message = deviceMessageService.deserializeDeviceMessage(payload, productKey, deviceName); if (message == null) { log.warn("[handleBusinessRequest][消息解码失败,客户端 ID: {},主题: {}]", clientId, topic); sendErrorResponse(endpoint, productKey, deviceName, null, null, diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/package-info.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/package-info.java index 6eb414ee9f..9c8e827879 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/package-info.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/package-info.java @@ -1,4 +1,4 @@ /** - * 提供设备接入的各种协议的实现 + * 设备接入协议:MQTT、EMQX、HTTP、TCP 等协议的实现 */ -package cn.iocoder.yudao.module.iot.gateway.protocol; \ No newline at end of file +package cn.iocoder.yudao.module.iot.gateway.protocol; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/serialize/package-info.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/serialize/package-info.java new file mode 100644 index 0000000000..cfdda5ac02 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/serialize/package-info.java @@ -0,0 +1,6 @@ +/** + * 消息序列化:将设备消息转换为字节数组(JSON、二进制等格式) + * + * @see cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer + */ +package cn.iocoder.yudao.module.iot.gateway.serialize; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/device/message/IotDeviceMessageService.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/device/message/IotDeviceMessageService.java index c86fc0983d..7d16a655c2 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/device/message/IotDeviceMessageService.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/device/message/IotDeviceMessageService.java @@ -1,5 +1,6 @@ package cn.iocoder.yudao.module.iot.gateway.service.device.message; +import cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; /** @@ -10,45 +11,45 @@ import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; public interface IotDeviceMessageService { /** - * 编码消息 + * 序列化消息 * * @param message 消息 * @param productKey 产品 Key * @param deviceName 设备名称 - * @return 编码后的消息内容 + * @return 序列化后的消息内容 */ - byte[] encodeDeviceMessage(IotDeviceMessage message, - String productKey, String deviceName); + byte[] serializeDeviceMessage(IotDeviceMessage message, + String productKey, String deviceName); /** - * 编码消息 + * 序列化消息 * - * @param message 消息 - * @param codecType 编解码器类型 - * @return 编码后的消息内容 + * @param message 消息 + * @param serializeType 序列化类型 + * @return 序列化后的消息内容 */ - byte[] encodeDeviceMessage(IotDeviceMessage message, - String codecType); + byte[] serializeDeviceMessage(IotDeviceMessage message, + IotSerializeTypeEnum serializeType); /** - * 解码消息 + * 反序列化消息 * * @param bytes 消息内容 * @param productKey 产品 Key * @param deviceName 设备名称 - * @return 解码后的消息内容 + * @return 反序列化后的消息内容 */ - IotDeviceMessage decodeDeviceMessage(byte[] bytes, - String productKey, String deviceName); + IotDeviceMessage deserializeDeviceMessage(byte[] bytes, + String productKey, String deviceName); /** - * 解码消息 + * 反序列化消息 * - * @param bytes 消息内容 - * @param codecType 编解码器类型 - * @return 解码后的消息内容 + * @param bytes 消息内容 + * @param serializeType 序列化类型 + * @return 反序列化后的消息内容 */ - IotDeviceMessage decodeDeviceMessage(byte[] bytes, String codecType); + IotDeviceMessage deserializeDeviceMessage(byte[] bytes, IotSerializeTypeEnum serializeType); /** * 发送消息 diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/device/message/IotDeviceMessageServiceImpl.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/device/message/IotDeviceMessageServiceImpl.java index 014da9a5df..ee0c4aea4a 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/device/message/IotDeviceMessageServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/device/message/IotDeviceMessageServiceImpl.java @@ -1,20 +1,20 @@ package cn.iocoder.yudao.module.iot.gateway.service.device.message; +import cn.hutool.core.lang.Assert; import cn.hutool.core.util.StrUtil; -import cn.iocoder.yudao.framework.common.util.collection.CollectionUtils; import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO; +import cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; -import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer; +import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager; import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import java.time.LocalDateTime; -import java.util.List; -import java.util.Map; import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception; import static cn.iocoder.yudao.module.iot.gateway.enums.ErrorCodeConstants.DEVICE_NOT_EXISTS; @@ -28,80 +28,70 @@ import static cn.iocoder.yudao.module.iot.gateway.enums.ErrorCodeConstants.DEVIC @Slf4j public class IotDeviceMessageServiceImpl implements IotDeviceMessageService { - /** - * 编解码器 - */ - private final Map codes; - @Resource private IotDeviceService deviceService; @Resource private IotDeviceMessageProducer deviceMessageProducer; - public IotDeviceMessageServiceImpl(List codes) { - this.codes = CollectionUtils.convertMap(codes, IotDeviceMessageCodec::type); - } + @Resource + private IotMessageSerializerManager messageSerializerManager; @Override - public byte[] encodeDeviceMessage(IotDeviceMessage message, - String productKey, String deviceName) { + public byte[] serializeDeviceMessage(IotDeviceMessage message, + String productKey, String deviceName) { // 1.1 获取设备信息 IotDeviceRespDTO device = deviceService.getDeviceFromCache(productKey, deviceName); if (device == null) { throw exception(DEVICE_NOT_EXISTS, productKey, deviceName); } - // 1.2 获取编解码器 - IotDeviceMessageCodec codec = codes.get(device.getCodecType()); - if (codec == null) { - throw new IllegalArgumentException(StrUtil.format("编解码器({}) 不存在", device.getCodecType())); - } + // 1.2 获取序列化器 + IotSerializeTypeEnum serializeType = IotSerializeTypeEnum.of(device.getSerializeType()); + Assert.notNull(serializeType, "设备序列化类型不能为空"); - // 2. 编码消息 - return codec.encode(message); + // 2. 序列化消息 + return serializeDeviceMessage(message, serializeType); } @Override - public byte[] encodeDeviceMessage(IotDeviceMessage message, - String codecType) { - // 1. 获取编解码器 - IotDeviceMessageCodec codec = codes.get(codecType); - if (codec == null) { - throw new IllegalArgumentException(StrUtil.format("编解码器({}) 不存在", codecType)); + public byte[] serializeDeviceMessage(IotDeviceMessage message, + IotSerializeTypeEnum serializeType) { + // 1. 获取序列化器 + IotMessageSerializer serializer = messageSerializerManager.get(serializeType); + if (serializer == null) { + throw new IllegalArgumentException(StrUtil.format("序列化器({}) 不存在", serializeType)); } - // 2. 编码消息 - return codec.encode(message); + // 2. 序列化消息 + return serializer.serialize(message); } @Override - public IotDeviceMessage decodeDeviceMessage(byte[] bytes, - String productKey, String deviceName) { + public IotDeviceMessage deserializeDeviceMessage(byte[] bytes, + String productKey, String deviceName) { // 1.1 获取设备信息 IotDeviceRespDTO device = deviceService.getDeviceFromCache(productKey, deviceName); if (device == null) { throw exception(DEVICE_NOT_EXISTS, productKey, deviceName); } - // 1.2 获取编解码器 - IotDeviceMessageCodec codec = codes.get(device.getCodecType()); - if (codec == null) { - throw new IllegalArgumentException(StrUtil.format("编解码器({}) 不存在", device.getCodecType())); - } + // 1.2 获取序列化器 + IotSerializeTypeEnum serializeType = IotSerializeTypeEnum.of(device.getSerializeType()); + Assert.notNull(serializeType, "设备序列化类型不能为空"); - // 2. 解码消息 - return codec.decode(bytes); + // 2. 反序列化消息 + return deserializeDeviceMessage(bytes, serializeType); } @Override - public IotDeviceMessage decodeDeviceMessage(byte[] bytes, String codecType) { - // 1. 获取编解码器 - IotDeviceMessageCodec codec = codes.get(codecType); - if (codec == null) { - throw new IllegalArgumentException(StrUtil.format("编解码器({}) 不存在", codecType)); + public IotDeviceMessage deserializeDeviceMessage(byte[] bytes, IotSerializeTypeEnum serializeType) { + // 1. 获取序列化器 + IotMessageSerializer serializer = messageSerializerManager.get(serializeType); + if (serializer == null) { + throw new IllegalArgumentException(StrUtil.format("序列化器({}) 不存在", serializeType)); } - // 2. 解码消息 - return codec.decode(bytes); + // 2. 反序列化消息 + return serializer.deserialize(bytes); } @Override diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotDirectDeviceMqttProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotDirectDeviceMqttProtocolIntegrationTest.java index 22ac321817..45cb7ca450 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotDirectDeviceMqttProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotDirectDeviceMqttProtocolIntegrationTest.java @@ -8,8 +8,8 @@ import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO; import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; import cn.iocoder.yudao.module.iot.core.util.IotProductAuthUtils; -import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; -import cn.iocoder.yudao.module.iot.gateway.codec.alink.IotAlinkDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer; +import cn.iocoder.yudao.module.iot.gateway.serialize.json.IotJsonSerializer; import io.netty.handler.codec.mqtt.MqttQoS; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; @@ -57,9 +57,9 @@ public class IotDirectDeviceMqttProtocolIntegrationTest { private static Vertx vertx; - // ===================== 编解码器(MQTT 使用 Alink 协议) ===================== + // ===================== 序列化器 ===================== - private static final IotDeviceMessageCodec CODEC = new IotAlinkDeviceMessageCodec(); + private static final IotMessageSerializer SERIALIZER = new IotJsonSerializer(); // ===================== 直连设备信息(根据实际情况修改,从 iot_device 表查询) ===================== @@ -211,7 +211,7 @@ public class IotDirectDeviceMqttProtocolIntegrationTest { client.publishHandler(message -> { log.info("[testDeviceRegister][收到响应: topic={}, payload={}]", message.topicName(), message.payload().toString()); - IotDeviceMessage response = CODEC.decode(message.payload().getBytes()); + IotDeviceMessage response = SERIALIZER.deserialize(message.payload().getBytes()); responseFuture.complete(response); }); // 3.2 订阅 _reply 主题 @@ -314,14 +314,14 @@ public class IotDirectDeviceMqttProtocolIntegrationTest { client.publishHandler(message -> { log.info("[publishAndWaitReply][收到响应: topic={}, payload={}]", message.topicName(), message.payload().toString()); - IotDeviceMessage response = CODEC.decode(message.payload().getBytes()); + IotDeviceMessage response = SERIALIZER.deserialize(message.payload().getBytes()); responseFuture.complete(response); }); - // 2. 编码并发布消息 - byte[] payload = CODEC.encode(request); - log.info("[publishAndWaitReply][Codec: {}, 发送消息: topic={}, payload={}]", - CODEC.type(), topic, new String(payload)); + // 2. 序列化并发布消息 + byte[] payload = SERIALIZER.serialize(request); + log.info("[publishAndWaitReply][Serializer: {}, 发送消息: topic={}, payload={}]", + SERIALIZER.getType(), topic, new String(payload)); client.publish(topic, Buffer.buffer(payload), MqttQoS.AT_LEAST_ONCE, false, false) .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); log.info("[publishAndWaitReply][消息发布成功]"); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotGatewayDeviceMqttProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotGatewayDeviceMqttProtocolIntegrationTest.java index 02949c758c..8e099749c8 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotGatewayDeviceMqttProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotGatewayDeviceMqttProtocolIntegrationTest.java @@ -12,8 +12,8 @@ import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoAddReqDTO; import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoDeleteReqDTO; import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoGetReqDTO; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; -import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; -import cn.iocoder.yudao.module.iot.gateway.codec.alink.IotAlinkDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer; +import cn.iocoder.yudao.module.iot.gateway.serialize.json.IotJsonSerializer; import io.netty.handler.codec.mqtt.MqttQoS; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; @@ -65,9 +65,9 @@ public class IotGatewayDeviceMqttProtocolIntegrationTest { private static Vertx vertx; - // ===================== 编解码器(MQTT 使用 Alink 协议) ===================== + // ===================== 序列化器 ===================== - private static final IotDeviceMessageCodec CODEC = new IotAlinkDeviceMessageCodec(); + private static final IotMessageSerializer SERIALIZER = new IotJsonSerializer(); // ===================== 网关设备信息(根据实际情况修改,从 iot_device 表查询网关设备) ===================== @@ -399,14 +399,14 @@ public class IotGatewayDeviceMqttProtocolIntegrationTest { client.publishHandler(message -> { log.info("[publishAndWaitReply][收到响应: topic={}, payload={}]", message.topicName(), message.payload().toString()); - IotDeviceMessage response = CODEC.decode(message.payload().getBytes()); + IotDeviceMessage response = SERIALIZER.deserialize(message.payload().getBytes()); responseFuture.complete(response); }); - // 2. 编码并发布消息 - byte[] payload = CODEC.encode(request); - log.info("[publishAndWaitReply][Codec: {}, 发送消息: topic={}, payload={}]", - CODEC.type(), topic, new String(payload)); + // 2. 序列化并发布消息 + byte[] payload = SERIALIZER.serialize(request); + log.info("[publishAndWaitReply][Serializer: {}, 发送消息: topic={}, payload={}]", + SERIALIZER.getType(), topic, new String(payload)); client.publish(topic, Buffer.buffer(payload), MqttQoS.AT_LEAST_ONCE, false, false) .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); log.info("[publishAndWaitReply][消息发布成功]"); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotGatewaySubDeviceMqttProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotGatewaySubDeviceMqttProtocolIntegrationTest.java index 5173858923..ca01b85035 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotGatewaySubDeviceMqttProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/IotGatewaySubDeviceMqttProtocolIntegrationTest.java @@ -7,8 +7,8 @@ import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO; import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; -import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; -import cn.iocoder.yudao.module.iot.gateway.codec.alink.IotAlinkDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer; +import cn.iocoder.yudao.module.iot.gateway.serialize.json.IotJsonSerializer; import io.netty.handler.codec.mqtt.MqttQoS; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; @@ -59,9 +59,9 @@ public class IotGatewaySubDeviceMqttProtocolIntegrationTest { private static Vertx vertx; - // ===================== 编解码器(MQTT 使用 Alink 协议) ===================== + // ===================== 序列化器 ===================== - private static final IotDeviceMessageCodec CODEC = new IotAlinkDeviceMessageCodec(); + private static final IotMessageSerializer SERIALIZER = new IotJsonSerializer(); // ===================== 网关子设备信息(根据实际情况修改,从 iot_device 表查询子设备) ===================== @@ -236,14 +236,14 @@ public class IotGatewaySubDeviceMqttProtocolIntegrationTest { client.publishHandler(message -> { log.info("[publishAndWaitReply][收到响应: topic={}, payload={}]", message.topicName(), message.payload().toString()); - IotDeviceMessage response = CODEC.decode(message.payload().getBytes()); + IotDeviceMessage response = SERIALIZER.deserialize(message.payload().getBytes()); responseFuture.complete(response); }); - // 2. 编码并发布消息 - byte[] payload = CODEC.encode(request); - log.info("[publishAndWaitReply][Codec: {}, 发送消息: topic={}, payload={}]", - CODEC.type(), topic, new String(payload)); + // 2. 序列化并发布消息 + byte[] payload = SERIALIZER.serialize(request); + log.info("[publishAndWaitReply][Serializer: {}, 发送消息: topic={}, payload={}]", + SERIALIZER.getType(), topic, new String(payload)); client.publish(topic, Buffer.buffer(payload), MqttQoS.AT_LEAST_ONCE, false, false) .toCompletionStage().toCompletableFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); log.info("[publishAndWaitReply][消息发布成功]");