mirror of
https://github.com/YunaiV/ruoyi-vue-pro.git
synced 2026-04-19 06:47:26 +00:00
feat(iot):【协议改造】tcp 初步改造(90%):优化 codec 代码
This commit is contained in:
@@ -64,13 +64,13 @@ public class IotTcpConfig {
|
||||
/**
|
||||
* LENGTH_FIELD: 长度字段偏移量
|
||||
* <p>
|
||||
* 表示长度字段在消息中的起始位置(从0开始)
|
||||
* 表示长度字段在消息中的起始位置(从 0 开始)
|
||||
*/
|
||||
private Integer lengthFieldOffset;
|
||||
/**
|
||||
* LENGTH_FIELD: 长度字段长度(字节数)
|
||||
* <p>
|
||||
* 常见值:1(最大255)、2(最大65535)、4(最大2GB)
|
||||
* 常见值:1(最大 255)、2(最大 65535)、4(最大 2GB)
|
||||
*/
|
||||
private Integer lengthFieldLength;
|
||||
/**
|
||||
@@ -100,15 +100,6 @@ public class IotTcpConfig {
|
||||
*/
|
||||
private Integer fixedLength;
|
||||
|
||||
/**
|
||||
* 最大帧长度(字节)
|
||||
* <p>
|
||||
* 防止内存溢出,默认 1MB
|
||||
*/
|
||||
@NotNull(message = "最大帧长度不能为空")
|
||||
@Min(value = 1, message = "最大帧长度必须大于 0")
|
||||
private Integer maxFrameLength = 1048576;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolInstanceProperties;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpFrameCodec;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpFrameCodecFactory;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.handler.downstream.IotTcpDownstreamHandler;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.handler.downstream.IotTcpDownstreamSubscriber;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.handler.upstream.IotTcpUpstreamHandler;
|
||||
@@ -89,7 +90,7 @@ public class IotTcpProtocol implements IotProtocol {
|
||||
// 初始化帧编解码器
|
||||
IotTcpConfig tcpConfig = properties.getTcp();
|
||||
IotTcpConfig.CodecConfig codecConfig = tcpConfig != null ? tcpConfig.getCodec() : null;
|
||||
this.frameCodec = IotTcpFrameCodec.create(codecConfig);
|
||||
this.frameCodec = IotTcpFrameCodecFactory.create(codecConfig);
|
||||
|
||||
// 初始化连接管理器
|
||||
this.connectionManager = new IotTcpConnectionManager();
|
||||
|
||||
@@ -1,15 +1,12 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec;
|
||||
|
||||
import cn.hutool.core.util.ArrayUtil;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConfig;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.delimiter.IotTcpDelimiterFrameCodec;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.length.IotTcpFixedLengthFrameCodec;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.length.IotTcpLengthFieldFrameCodec;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* IoT TCP 拆包类型枚举
|
||||
*
|
||||
@@ -21,38 +18,28 @@ public enum IotTcpCodecTypeEnum {
|
||||
|
||||
/**
|
||||
* 基于固定长度的拆包
|
||||
* <p>
|
||||
* 消息格式:固定长度的消息体
|
||||
* 需要配置:fixedLength(固定长度)
|
||||
*/
|
||||
FIXED_LENGTH("fixed_length", IotTcpFixedLengthFrameCodec::new),
|
||||
FIXED_LENGTH("fixed_length", IotTcpFixedLengthFrameCodec.class),
|
||||
|
||||
/**
|
||||
* 基于分隔符的拆包
|
||||
* <p>
|
||||
* 消息格式:消息内容 + 分隔符
|
||||
* 需要配置:delimiter(分隔符)
|
||||
*/
|
||||
DELIMITER("delimiter", IotTcpDelimiterFrameCodec::new),
|
||||
DELIMITER("delimiter", IotTcpDelimiterFrameCodec.class),
|
||||
|
||||
/**
|
||||
* 基于长度字段的拆包
|
||||
* <p>
|
||||
* 消息格式:[长度字段][消息体]
|
||||
* 需要配置:lengthFieldOffset(长度字段偏移量)、lengthFieldLength(长度字段长度)
|
||||
*/
|
||||
LENGTH_FIELD("length_field", IotTcpLengthFieldFrameCodec::new),
|
||||
LENGTH_FIELD("length_field", IotTcpLengthFieldFrameCodec.class),
|
||||
;
|
||||
|
||||
/**
|
||||
* 类型标识
|
||||
*/
|
||||
private final String type;
|
||||
|
||||
/**
|
||||
* Codec 创建工厂
|
||||
* 编解码器类
|
||||
*/
|
||||
private final Function<IotTcpConfig.CodecConfig, IotTcpFrameCodec> codecFactory;
|
||||
private final Class<? extends IotTcpFrameCodec> codecClass;
|
||||
|
||||
/**
|
||||
* 根据类型获取枚举
|
||||
@@ -64,14 +51,4 @@ public enum IotTcpCodecTypeEnum {
|
||||
return ArrayUtil.firstMatch(e -> e.getType().equalsIgnoreCase(type), values());
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建 Codec 实例
|
||||
*
|
||||
* @param config 拆包配置
|
||||
* @return Codec 实例
|
||||
*/
|
||||
public IotTcpFrameCodec createCodec(IotTcpConfig.CodecConfig config) {
|
||||
return codecFactory.apply(config);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConfig;
|
||||
import io.vertx.core.Handler;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.parsetools.RecordParser;
|
||||
@@ -41,24 +40,4 @@ public interface IotTcpFrameCodec {
|
||||
*/
|
||||
Buffer encode(byte[] data);
|
||||
|
||||
// TODO @AI:还是搞个 facory 类 ,更好理解;
|
||||
// ========== 静态工厂方法 ==========
|
||||
|
||||
/**
|
||||
* 根据配置创建编解码器
|
||||
*
|
||||
* @param config 拆包配置
|
||||
* @return 编解码器实例,如果配置为空则返回 null
|
||||
*/
|
||||
static IotTcpFrameCodec create(IotTcpConfig.CodecConfig config) {
|
||||
if (config == null) {
|
||||
return null;
|
||||
}
|
||||
IotTcpCodecTypeEnum type = IotTcpCodecTypeEnum.of(config.getType());
|
||||
if (type == null) {
|
||||
return null;
|
||||
}
|
||||
return type.createCodec(config);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,27 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec;
|
||||
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.util.ReflectUtil;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConfig;
|
||||
|
||||
/**
|
||||
* IoT TCP 帧编解码器工厂
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public class IotTcpFrameCodecFactory {
|
||||
|
||||
/**
|
||||
* 根据配置创建编解码器
|
||||
*
|
||||
* @param config 拆包配置
|
||||
* @return 编解码器实例,如果配置为空则返回 null
|
||||
*/
|
||||
public static IotTcpFrameCodec create(IotTcpConfig.CodecConfig config) {
|
||||
Assert.notNull(config, "CodecConfig 不能为空");
|
||||
IotTcpCodecTypeEnum type = IotTcpCodecTypeEnum.of(config.getType());
|
||||
Assert.notNull(type, "不支持的 CodecType 类型:" + config.getType());
|
||||
return ReflectUtil.newInstance(type.getCodecClass(), config);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,6 +1,5 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.delimiter;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConfig;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpCodecTypeEnum;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpFrameCodec;
|
||||
@@ -8,6 +7,7 @@ import io.vertx.core.Handler;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.parsetools.RecordParser;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* IoT TCP 分隔符帧编解码器
|
||||
@@ -27,24 +27,14 @@ import lombok.extern.slf4j.Slf4j;
|
||||
@Slf4j
|
||||
public class IotTcpDelimiterFrameCodec implements IotTcpFrameCodec {
|
||||
|
||||
private final IotTcpConfig.CodecConfig config;
|
||||
|
||||
/**
|
||||
* 解析后的分隔符字节数组
|
||||
*/
|
||||
private final byte[] delimiterBytes;
|
||||
|
||||
/**
|
||||
* 最大帧长度
|
||||
*/
|
||||
// TODO @AI:最大帧数要不去掉;简洁一点;包括其他地方的配置项;
|
||||
private final int maxFrameLength;
|
||||
|
||||
public IotTcpDelimiterFrameCodec(IotTcpConfig.CodecConfig config) {
|
||||
this.config = config;
|
||||
// TODO @AI:禁止为空;
|
||||
Assert.hasText(config.getDelimiter(), "delimiter 不能为空");
|
||||
this.delimiterBytes = parseDelimiter(config.getDelimiter());
|
||||
this.maxFrameLength = config.getMaxFrameLength() != null ? config.getMaxFrameLength() : 1048576;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -55,20 +45,11 @@ public class IotTcpDelimiterFrameCodec implements IotTcpFrameCodec {
|
||||
@Override
|
||||
public RecordParser createDecodeParser(Handler<Buffer> handler) {
|
||||
RecordParser parser = RecordParser.newDelimited(Buffer.buffer(delimiterBytes));
|
||||
|
||||
parser.handler(buffer -> {
|
||||
// 检查帧长度是否超过限制
|
||||
if (buffer.length() > maxFrameLength) {
|
||||
log.warn("[createDecodeParser][帧长度超过限制,length: {}, maxFrameLength: {}]",
|
||||
buffer.length(), maxFrameLength);
|
||||
return;
|
||||
}
|
||||
// 处理完整消息(不包含分隔符)
|
||||
handler.handle(buffer);
|
||||
// 处理完整消息(不包含分隔符)
|
||||
parser.handler(handler);
|
||||
parser.exceptionHandler(ex -> {
|
||||
throw new RuntimeException("[createDecodeParser][解析异常]", ex);
|
||||
});
|
||||
|
||||
// TODO @AI:异常处理;
|
||||
parser.exceptionHandler(ex -> log.error("[createDecodeParser][解析异常]", ex));
|
||||
return parser;
|
||||
}
|
||||
|
||||
@@ -89,13 +70,7 @@ public class IotTcpDelimiterFrameCodec implements IotTcpFrameCodec {
|
||||
* @return 分隔符字节数组
|
||||
*/
|
||||
private byte[] parseDelimiter(String delimiter) {
|
||||
if (StrUtil.isBlank(delimiter)) {
|
||||
// 默认使用换行符
|
||||
return new byte[]{'\n'};
|
||||
}
|
||||
|
||||
// 处理转义字符
|
||||
// TODO @AI:是否必要?不调整感觉也没问题?用户自己写对就 ok 了是哇?
|
||||
String parsed = delimiter
|
||||
.replace("\\r\\n", "\r\n")
|
||||
.replace("\\r", "\r")
|
||||
|
||||
@@ -7,6 +7,7 @@ import io.vertx.core.Handler;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.parsetools.RecordParser;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* IoT TCP 定长帧编解码器
|
||||
@@ -24,8 +25,8 @@ public class IotTcpFixedLengthFrameCodec implements IotTcpFrameCodec {
|
||||
private final int fixedLength;
|
||||
|
||||
public IotTcpFixedLengthFrameCodec(IotTcpConfig.CodecConfig config) {
|
||||
// TODO @AI:config.getFixedLength() 禁止为空;
|
||||
this.fixedLength = config.getFixedLength() != null ? config.getFixedLength() : 1024;
|
||||
Assert.notNull(config.getFixedLength(), "fixedLength 不能为空");
|
||||
this.fixedLength = config.getFixedLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -37,8 +38,9 @@ public class IotTcpFixedLengthFrameCodec implements IotTcpFrameCodec {
|
||||
public RecordParser createDecodeParser(Handler<Buffer> handler) {
|
||||
RecordParser parser = RecordParser.newFixed(fixedLength);
|
||||
parser.handler(handler);
|
||||
// TODO @AI:解析失败,是不是要抛出异常?因为要 close 掉连接;
|
||||
parser.exceptionHandler(ex -> log.error("[createDecodeParser][解析异常]", ex));
|
||||
parser.exceptionHandler(ex -> {
|
||||
throw new RuntimeException("[createDecodeParser][解析异常]", ex);
|
||||
});
|
||||
return parser;
|
||||
}
|
||||
|
||||
@@ -46,8 +48,7 @@ public class IotTcpFixedLengthFrameCodec implements IotTcpFrameCodec {
|
||||
public Buffer encode(byte[] data) {
|
||||
Buffer buffer = Buffer.buffer(fixedLength);
|
||||
buffer.appendBytes(data);
|
||||
// 如果数据不足固定长度,填充 0
|
||||
// TODO @AI:这里的填充是合理的么?RecordParser.newFixed(fixedLength) 有填充的逻辑么?
|
||||
// 如果数据不足固定长度,填充 0(RecordParser.newFixed 解码时按固定长度读取,所以发送端需要填充)
|
||||
if (data.length < fixedLength) {
|
||||
byte[] padding = new byte[fixedLength - data.length];
|
||||
buffer.appendBytes(padding);
|
||||
|
||||
@@ -7,6 +7,9 @@ import io.vertx.core.Handler;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.parsetools.RecordParser;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* IoT TCP 长度字段帧编解码器
|
||||
@@ -30,8 +33,6 @@ public class IotTcpLengthFieldFrameCodec implements IotTcpFrameCodec {
|
||||
private final int lengthFieldLength;
|
||||
private final int lengthAdjustment;
|
||||
private final int initialBytesToStrip;
|
||||
// TODO @AI:去掉 maxFrameLength 相关字段;
|
||||
private final int maxFrameLength;
|
||||
|
||||
/**
|
||||
* 头部长度 = 长度字段偏移量 + 长度字段长度
|
||||
@@ -39,12 +40,14 @@ public class IotTcpLengthFieldFrameCodec implements IotTcpFrameCodec {
|
||||
private final int headerLength;
|
||||
|
||||
public IotTcpLengthFieldFrameCodec(IotTcpConfig.CodecConfig config) {
|
||||
// TODO @AI: 增加参数校验;不要 default 逻辑;
|
||||
this.lengthFieldOffset = config.getLengthFieldOffset() != null ? config.getLengthFieldOffset() : 0;
|
||||
this.lengthFieldLength = config.getLengthFieldLength() != null ? config.getLengthFieldLength() : 4;
|
||||
this.lengthAdjustment = config.getLengthAdjustment() != null ? config.getLengthAdjustment() : 0;
|
||||
this.initialBytesToStrip = config.getInitialBytesToStrip() != null ? config.getInitialBytesToStrip() : 0;
|
||||
this.maxFrameLength = config.getMaxFrameLength() != null ? config.getMaxFrameLength() : 1048576;
|
||||
Assert.notNull(config.getLengthFieldOffset(), "lengthFieldOffset 不能为空");
|
||||
Assert.notNull(config.getLengthFieldLength(), "lengthFieldLength 不能为空");
|
||||
Assert.notNull(config.getLengthAdjustment(), "lengthAdjustment 不能为空");
|
||||
Assert.notNull(config.getInitialBytesToStrip(), "initialBytesToStrip 不能为空");
|
||||
this.lengthFieldOffset = config.getLengthFieldOffset();
|
||||
this.lengthFieldLength = config.getLengthFieldLength();
|
||||
this.lengthAdjustment = config.getLengthAdjustment();
|
||||
this.initialBytesToStrip = config.getInitialBytesToStrip();
|
||||
this.headerLength = lengthFieldOffset + lengthFieldLength;
|
||||
}
|
||||
|
||||
@@ -57,49 +60,45 @@ public class IotTcpLengthFieldFrameCodec implements IotTcpFrameCodec {
|
||||
public RecordParser createDecodeParser(Handler<Buffer> handler) {
|
||||
// 创建状态机:先读取头部,再读取消息体
|
||||
RecordParser parser = RecordParser.newFixed(headerLength);
|
||||
// 使用数组保存状态和头部数据
|
||||
// TODO @AI:bodyLength 只使用第 0 位,是不是 atomicInteger 更合适?
|
||||
final int[] bodyLength = {-1};
|
||||
final Buffer[] headerBuffer = {null};
|
||||
final AtomicReference<Integer> bodyLength = new AtomicReference<>(null); // 消息体长度,null 表示读取头部阶段
|
||||
final AtomicReference<Buffer> headerBuffer = new AtomicReference<>(null); // 头部消息
|
||||
|
||||
// 处理读取到的数据
|
||||
parser.handler(buffer -> {
|
||||
if (bodyLength[0] == -1) {
|
||||
if (bodyLength.get() == null) {
|
||||
// 阶段 1: 读取头部,解析长度字段
|
||||
headerBuffer[0] = buffer.copy();
|
||||
headerBuffer.set(buffer.copy());
|
||||
int length = readLength(buffer, lengthFieldOffset, lengthFieldLength);
|
||||
int frameBodyLength = length + lengthAdjustment;
|
||||
// 检查帧长度是否超过限制
|
||||
if (frameBodyLength < 0 || frameBodyLength > maxFrameLength - headerLength) {
|
||||
log.warn("[createDecodeParser][帧长度异常,length: {}, frameBodyLength: {}, maxFrameLength: {}]",
|
||||
length, frameBodyLength, maxFrameLength);
|
||||
return;
|
||||
// 检查帧长度是否合法
|
||||
if (frameBodyLength < 0) {
|
||||
throw new IllegalStateException(String.format(
|
||||
"[createDecodeParser][帧长度异常,length: %d, frameBodyLength: %d]",
|
||||
length, frameBodyLength));
|
||||
}
|
||||
// 消息体为空,抛出异常
|
||||
if (frameBodyLength == 0) {
|
||||
throw new IllegalStateException("[createDecodeParser][消息体不能为空]");
|
||||
}
|
||||
|
||||
if (frameBodyLength == 0) {
|
||||
// 消息体为空,直接处理
|
||||
// TODO @AI:消息体为空,是不是不合理哈?应该抛出异常?
|
||||
Buffer frame = processFrame(headerBuffer[0], null);
|
||||
handler.handle(frame);
|
||||
} else {
|
||||
// 切换到读取消息体模式
|
||||
bodyLength[0] = frameBodyLength;
|
||||
parser.fixedSizeMode(frameBodyLength);
|
||||
}
|
||||
// 【重要】切换到读取消息体模式
|
||||
bodyLength.set(frameBodyLength);
|
||||
parser.fixedSizeMode(frameBodyLength);
|
||||
} else {
|
||||
// 阶段 2: 读取消息体,组装完整帧
|
||||
Buffer frame = processFrame(headerBuffer[0], buffer);
|
||||
Buffer frame = processFrame(headerBuffer.get(), buffer);
|
||||
// 重置状态,准备读取下一帧
|
||||
bodyLength[0] = -1;
|
||||
headerBuffer[0] = null;
|
||||
bodyLength.set(null);
|
||||
headerBuffer.set(null);
|
||||
parser.fixedSizeMode(headerLength);
|
||||
|
||||
// 处理完整消息
|
||||
// 【重要】处理完整消息
|
||||
handler.handle(frame);
|
||||
}
|
||||
});
|
||||
|
||||
parser.exceptionHandler(ex -> log.error("[createDecodeParser][解析异常]", ex));
|
||||
parser.exceptionHandler(ex -> {
|
||||
throw new RuntimeException("[createDecodeParser][解析异常]", ex);
|
||||
});
|
||||
return parser;
|
||||
}
|
||||
|
||||
@@ -122,26 +121,36 @@ public class IotTcpLengthFieldFrameCodec implements IotTcpFrameCodec {
|
||||
/**
|
||||
* 从 Buffer 中读取长度字段
|
||||
*/
|
||||
// TODO @AI:兼容 JDK8
|
||||
@SuppressWarnings("EnhancedSwitchMigration")
|
||||
private int readLength(Buffer buffer, int offset, int length) {
|
||||
return switch (length) {
|
||||
case 1 -> buffer.getUnsignedByte(offset);
|
||||
case 2 -> buffer.getUnsignedShort(offset);
|
||||
case 4 -> buffer.getInt(offset);
|
||||
default -> throw new IllegalArgumentException("不支持的长度字段长度: " + length);
|
||||
};
|
||||
switch (length) {
|
||||
case 1:
|
||||
return buffer.getUnsignedByte(offset);
|
||||
case 2:
|
||||
return buffer.getUnsignedShort(offset);
|
||||
case 4:
|
||||
return buffer.getInt(offset);
|
||||
default:
|
||||
throw new IllegalArgumentException("不支持的长度字段长度: " + length);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 向 Buffer 中写入长度字段
|
||||
*/
|
||||
// TODO @AI:兼容 JDK8
|
||||
private void writeLength(Buffer buffer, int length, int fieldLength) {
|
||||
switch (fieldLength) {
|
||||
case 1 -> buffer.appendByte((byte) length);
|
||||
case 2 -> buffer.appendShort((short) length);
|
||||
case 4 -> buffer.appendInt(length);
|
||||
default -> throw new IllegalArgumentException("不支持的长度字段长度: " + fieldLength);
|
||||
case 1:
|
||||
buffer.appendByte((byte) length);
|
||||
break;
|
||||
case 2:
|
||||
buffer.appendShort((short) length);
|
||||
break;
|
||||
case 4:
|
||||
buffer.appendInt(length);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("不支持的长度字段长度: " + fieldLength);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -39,8 +39,8 @@ public class IotTcpDownstreamHandler {
|
||||
IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfoByDeviceId(
|
||||
message.getDeviceId());
|
||||
if (connectionInfo == null) {
|
||||
// TODO @AI:是不是把消息 id 也打印进去?类似上面的日志
|
||||
log.warn("[handle][连接信息不存在,设备 ID: {}]", message.getDeviceId());
|
||||
log.warn("[handle][连接信息不存在,设备 ID: {},方法: {},消息 ID: {}]",
|
||||
message.getDeviceId(), message.getMethod(), message.getId());
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -50,14 +50,11 @@ public class IotTcpDownstreamHandler {
|
||||
|
||||
// 3. 发送到设备
|
||||
boolean success = connectionManager.sendToDevice(message.getDeviceId(), frameData.getBytes());
|
||||
// TODO @AI:不成功,直接抛出异常;反正下面的日志也会打印失败的
|
||||
if (success) {
|
||||
log.info("[handle][下行消息发送成功,设备 ID: {},方法: {},消息 ID: {},数据长度: {} 字节]",
|
||||
message.getDeviceId(), message.getMethod(), message.getId(), frameData.length());
|
||||
} else {
|
||||
log.error("[handle][下行消息发送失败,设备 ID: {},方法: {},消息 ID: {}]",
|
||||
message.getDeviceId(), message.getMethod(), message.getId());
|
||||
if (!success) {
|
||||
throw new RuntimeException("下行消息发送失败");
|
||||
}
|
||||
log.info("[handle][下行消息发送成功,设备 ID: {},方法: {},消息 ID: {},数据长度: {} 字节]",
|
||||
message.getDeviceId(), message.getMethod(), message.getId(), frameData.length());
|
||||
} catch (Exception e) {
|
||||
log.error("[handle][处理下行消息失败,设备 ID: {},方法: {},消息内容: {}]",
|
||||
message.getDeviceId(), message.getMethod(), message, e);
|
||||
|
||||
@@ -68,7 +68,13 @@ yudao:
|
||||
codec:
|
||||
type: delimiter # 拆包类型:length_field / delimiter / fixed_length
|
||||
delimiter: "\\n" # 分隔符(支持转义:\\n=换行, \\r=回车, \\t=制表符)
|
||||
max-frame-length: 1048576 # 最大帧长度(字节)
|
||||
# type: length_field # 拆包类型:length_field / delimiter / fixed_length
|
||||
# length-field-offset: 0 # 长度字段偏移量
|
||||
# length-field-length: 4 # 长度字段长度
|
||||
# length-adjustment: 0 # 长度调整值
|
||||
# initial-bytes-to-strip: 4 # 初始跳过的字节数
|
||||
# type: fixed_length # 拆包类型:length_field / delimiter / fixed_length
|
||||
# fixed-length: 256 # 固定长度
|
||||
|
||||
# 协议配置(旧版,保持兼容)
|
||||
protocol:
|
||||
|
||||
@@ -8,7 +8,9 @@ import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpCodecTypeEnum;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpFrameCodec;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpFrameCodecFactory;
|
||||
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
|
||||
import cn.iocoder.yudao.module.iot.gateway.serialize.json.IotJsonSerializer;
|
||||
import io.vertx.core.Vertx;
|
||||
@@ -69,11 +71,17 @@ public class IotDirectDeviceTcpProtocolIntegrationTest {
|
||||
/**
|
||||
* TCP 帧编解码器
|
||||
*/
|
||||
private static final IotTcpFrameCodec FRAME_CODEC = IotTcpFrameCodec.create(
|
||||
new IotTcpConfig.CodecConfig() {{
|
||||
setType("delimiter");
|
||||
setDelimiter("\\n");
|
||||
}}
|
||||
private static final IotTcpFrameCodec FRAME_CODEC = IotTcpFrameCodecFactory.create(
|
||||
new IotTcpConfig.CodecConfig()
|
||||
.setType(IotTcpCodecTypeEnum.DELIMITER.getType())
|
||||
.setDelimiter("\\n")
|
||||
// .setType(IotTcpCodecTypeEnum.LENGTH_FIELD.getType())
|
||||
// .setLengthFieldOffset(0)
|
||||
// .setLengthFieldLength(4)
|
||||
// .setLengthAdjustment(0)
|
||||
// .setInitialBytesToStrip(4)
|
||||
// .setType(IotTcpCodecTypeEnum.LENGTH_FIELD.getType())
|
||||
// .setFixedLength(256)
|
||||
);
|
||||
|
||||
// ===================== 直连设备信息(根据实际情况修改,从 iot_device 表查询) =====================
|
||||
|
||||
@@ -13,7 +13,9 @@ import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoAddReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoDeleteReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.topo.IotDeviceTopoGetReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpCodecTypeEnum;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpFrameCodec;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpFrameCodecFactory;
|
||||
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
|
||||
import cn.iocoder.yudao.module.iot.gateway.serialize.json.IotJsonSerializer;
|
||||
import io.vertx.core.Vertx;
|
||||
@@ -78,11 +80,17 @@ public class IotGatewayDeviceTcpProtocolIntegrationTest {
|
||||
/**
|
||||
* TCP 帧编解码器
|
||||
*/
|
||||
private static final IotTcpFrameCodec FRAME_CODEC = IotTcpFrameCodec.create(
|
||||
new IotTcpConfig.CodecConfig() {{
|
||||
setType("delimiter");
|
||||
setDelimiter("\\n");
|
||||
}}
|
||||
private static final IotTcpFrameCodec FRAME_CODEC = IotTcpFrameCodecFactory.create(
|
||||
new IotTcpConfig.CodecConfig()
|
||||
.setType(IotTcpCodecTypeEnum.DELIMITER.getType())
|
||||
.setDelimiter("\\n")
|
||||
// .setType(IotTcpCodecTypeEnum.LENGTH_FIELD.getType())
|
||||
// .setLengthFieldOffset(0)
|
||||
// .setLengthFieldLength(4)
|
||||
// .setLengthAdjustment(0)
|
||||
// .setInitialBytesToStrip(4)
|
||||
// .setType(IotTcpCodecTypeEnum.LENGTH_FIELD.getType())
|
||||
// .setFixedLength(256)
|
||||
);
|
||||
|
||||
// ===================== 网关设备信息(根据实际情况修改,从 iot_device 表查询网关设备) =====================
|
||||
|
||||
@@ -7,7 +7,9 @@ import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpCodecTypeEnum;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpFrameCodec;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.codec.IotTcpFrameCodecFactory;
|
||||
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
|
||||
import cn.iocoder.yudao.module.iot.gateway.serialize.json.IotJsonSerializer;
|
||||
import io.vertx.core.Vertx;
|
||||
@@ -70,11 +72,17 @@ public class IotGatewaySubDeviceTcpProtocolIntegrationTest {
|
||||
/**
|
||||
* TCP 帧编解码器
|
||||
*/
|
||||
private static final IotTcpFrameCodec FRAME_CODEC = IotTcpFrameCodec.create(
|
||||
new IotTcpConfig.CodecConfig() {{
|
||||
setType("delimiter");
|
||||
setDelimiter("\\n");
|
||||
}}
|
||||
private static final IotTcpFrameCodec FRAME_CODEC = IotTcpFrameCodecFactory.create(
|
||||
new IotTcpConfig.CodecConfig()
|
||||
.setType(IotTcpCodecTypeEnum.DELIMITER.getType())
|
||||
.setDelimiter("\\n")
|
||||
// .setType(IotTcpCodecTypeEnum.LENGTH_FIELD.getType())
|
||||
// .setLengthFieldOffset(0)
|
||||
// .setLengthFieldLength(4)
|
||||
// .setLengthAdjustment(0)
|
||||
// .setInitialBytesToStrip(4)
|
||||
// .setType(IotTcpCodecTypeEnum.LENGTH_FIELD.getType())
|
||||
// .setFixedLength(256)
|
||||
);
|
||||
|
||||
// ===================== 网关子设备信息(根据实际情况修改,从 iot_device 表查询子设备) =====================
|
||||
|
||||
Reference in New Issue
Block a user