diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java index 47c7a79720..1b475e9fce 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java @@ -115,11 +115,9 @@ public class IotGatewayConfiguration { @Bean public IotTcpDownstreamSubscriber iotTcpDownstreamSubscriber(IotTcpUpstreamProtocol protocolHandler, IotDeviceMessageService messageService, - IotDeviceService deviceService, IotTcpConnectionManager connectionManager, IotMessageBus messageBus) { - return new IotTcpDownstreamSubscriber(protocolHandler, messageService, deviceService, connectionManager, - messageBus); + return new IotTcpDownstreamSubscriber(protocolHandler, messageService, connectionManager, messageBus); } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpDownstreamSubscriber.java index e4d46b3af6..aec671811c 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotTcpDownstreamSubscriber.java @@ -6,7 +6,6 @@ import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager; import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.router.IotTcpDownstreamHandler; -import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import jakarta.annotation.PostConstruct; import lombok.RequiredArgsConstructor; @@ -25,8 +24,6 @@ public class IotTcpDownstreamSubscriber implements IotMessageSubscriber { private String getMessageCodecType(Buffer buffer, NetSocket socket) { // 1. 如果已认证,优先使用缓存的编解码类型 IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket); - if (connectionInfo != null && connectionInfo.isAuthenticated() && - StrUtil.isNotBlank(connectionInfo.getCodecType())) { + if (connectionInfo != null + && StrUtil.isNotBlank(connectionInfo.getCodecType())) { return connectionInfo.getCodecType(); } @@ -255,8 +255,7 @@ public class IotTcpUpstreamHandler implements Handler { .setProductKey(device.getProductKey()) .setDeviceName(device.getDeviceName()) .setClientId(clientId) - .setCodecType(codecType) - .setAuthenticated(true); + .setCodecType(codecType); // 注册连接 connectionManager.registerConnection(socket, device.getId(), connectionInfo); } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java new file mode 100644 index 0000000000..ec939521af --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/IotDirectDeviceTcpProtocolIntegrationTest.java @@ -0,0 +1,238 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp; + +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.HexUtil; +import cn.hutool.core.util.IdUtil; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; +import cn.iocoder.yudao.module.iot.gateway.codec.IotDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec; +import cn.iocoder.yudao.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; + +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; + +/** + * IoT 直连设备 TCP 协议集成测试(手动测试) + * + *

测试场景:直连设备(IotProductDeviceTypeEnum 的 DIRECT 类型)通过 TCP 协议直接连接平台 + * + *

支持两种编解码格式: + *

+ * + *

使用步骤: + *

    + *
  1. 启动 yudao-module-iot-gateway 服务(TCP 端口 8091)
  2. + *
  3. 修改 {@link #CODEC} 选择测试的编解码格式
  4. + *
  5. 运行以下测试方法: + *
      + *
    • {@link #testAuth()} - 设备认证
    • + *
    • {@link #testPropertyPost()} - 设备属性上报
    • + *
    • {@link #testEventPost()} - 设备事件上报
    • + *
    + *
  6. + *
+ * + *

注意:TCP 协议是有状态的长连接,认证成功后同一连接上的后续请求无需再携带认证信息 + * + * @author 芋道源码 + */ +@Slf4j +public class IotDirectDeviceTcpProtocolIntegrationTest { + + private static final String SERVER_HOST = "127.0.0.1"; + private static final int SERVER_PORT = 8091; + private static final int TIMEOUT_MS = 5000; + + // ===================== 编解码器选择(修改此处切换 JSON / Binary) ===================== +// private static final IotDeviceMessageCodec CODEC = new IotTcpJsonDeviceMessageCodec(); + private static final IotDeviceMessageCodec CODEC = new IotTcpBinaryDeviceMessageCodec(); + + // ===================== 直连设备信息(根据实际情况修改,从 iot_device 表查询) ===================== + private static final String PRODUCT_KEY = "4aymZgOTOOCrDKRT"; + private static final String DEVICE_NAME = "small"; + private static final String DEVICE_SECRET = "0baa4c2ecc104ae1a26b4070c218bdf3"; + + // ===================== 认证测试 ===================== + + /** + * 认证测试:设备认证 + */ + @Test + public void testAuth() throws Exception { + // 1.1 构建认证消息 + IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); + IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() + .setClientId(authInfo.getClientId()) + .setUsername(authInfo.getUsername()) + .setPassword(authInfo.getPassword()); + IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); + // 1.2 编码 + byte[] payload = CODEC.encode(request); + log.info("[testAuth][Codec: {}, 请求消息: {}, 数据包长度: {} 字节]", CODEC.type(), request, payload.length); + if (CODEC instanceof IotTcpBinaryDeviceMessageCodec) { + log.info("[testAuth][二进制数据包(HEX): {}]", HexUtil.encodeHexStr(payload)); + } + + // 2.1 发送请求 + try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { + socket.setSoTimeout(TIMEOUT_MS); + byte[] responseBytes = sendAndReceive(socket, payload); + // 2.2 解码响应 + if (responseBytes != null) { + IotDeviceMessage response = CODEC.decode(responseBytes); + log.info("[testAuth][响应消息: {}]", response); + } else { + log.warn("[testAuth][未收到响应]"); + } + } + } + + // ===================== 直连设备属性上报测试 ===================== + + /** + * 属性上报测试 + */ + @Test + public void testPropertyPost() throws Exception { + try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { + socket.setSoTimeout(TIMEOUT_MS); + + // 1. 先进行认证 + IotDeviceMessage authResponse = authenticate(socket); + log.info("[testPropertyPost][认证响应: {}]", authResponse); + + // 2.1 构建属性上报消息 + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(), + IotDevicePropertyPostReqDTO.of(MapUtil.builder() + .put("width", 1) + .put("height", "2") + .build()), + null, null, null); + // 2.2 编码 + byte[] payload = CODEC.encode(request); + log.info("[testPropertyPost][Codec: {}, 请求消息: {}]", CODEC.type(), request); + + // 3.1 发送请求 + byte[] responseBytes = sendAndReceive(socket, payload); + // 3.2 解码响应 + if (responseBytes != null) { + IotDeviceMessage response = CODEC.decode(responseBytes); + log.info("[testPropertyPost][响应消息: {}]", response); + } else { + log.warn("[testPropertyPost][未收到响应]"); + } + } + } + + // ===================== 直连设备事件上报测试 ===================== + + /** + * 事件上报测试 + */ + @Test + public void testEventPost() throws Exception { + try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { + socket.setSoTimeout(TIMEOUT_MS); + + // 1. 先进行认证 + IotDeviceMessage authResponse = authenticate(socket); + log.info("[testEventPost][认证响应: {}]", authResponse); + + // 2.1 构建事件上报消息 + IotDeviceMessage request = IotDeviceMessage.of( + IdUtil.fastSimpleUUID(), + IotDeviceMessageMethodEnum.EVENT_POST.getMethod(), + IotDeviceEventPostReqDTO.of( + "eat", + MapUtil.builder().put("rice", 3).build(), + System.currentTimeMillis()), + null, null, null); + // 2.2 编码 + byte[] payload = CODEC.encode(request); + log.info("[testEventPost][Codec: {}, 请求消息: {}]", CODEC.type(), request); + + // 3.1 发送请求 + byte[] responseBytes = sendAndReceive(socket, payload); + // 3.2 解码响应 + if (responseBytes != null) { + IotDeviceMessage response = CODEC.decode(responseBytes); + log.info("[testEventPost][响应消息: {}]", response); + } else { + log.warn("[testEventPost][未收到响应]"); + } + } + } + + // ===================== 辅助方法 ===================== + + /** + * 执行设备认证 + * + * @param socket TCP 连接 + * @return 认证响应消息 + */ + private IotDeviceMessage authenticate(Socket socket) throws Exception { + IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); + IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() + .setClientId(authInfo.getClientId()) + .setUsername(authInfo.getUsername()) + .setPassword(authInfo.getPassword()); + IotDeviceMessage request = IotDeviceMessage.of(IdUtil.fastSimpleUUID(), "auth", authReqDTO, null, null, null); + byte[] payload = CODEC.encode(request); + byte[] responseBytes = sendAndReceive(socket, payload); + if (responseBytes != null) { + log.info("[authenticate][响应数据长度: {} 字节,首字节: 0x{}, HEX: {}]", + responseBytes.length, + String.format("%02X", responseBytes[0]), + HexUtil.encodeHexStr(responseBytes)); + return CODEC.decode(responseBytes); + } + return null; + } + + /** + * 发送 TCP 请求并接收响应 + * + * @param socket TCP Socket + * @param payload 请求数据 + * @return 响应数据 + */ + private byte[] sendAndReceive(Socket socket, byte[] payload) throws Exception { + // 1. 发送请求 + OutputStream out = socket.getOutputStream(); + InputStream in = socket.getInputStream(); + out.write(payload); + out.flush(); + + // 2.1 等待一小段时间让服务器处理 + Thread.sleep(100); + // 2.2 接收响应 + byte[] buffer = new byte[4096]; + try { + int length = in.read(buffer); + if (length > 0) { + byte[] response = new byte[length]; + System.arraycopy(buffer, 0, response, 0, length); + return response; + } + return null; + } catch (java.net.SocketTimeoutException e) { + log.warn("[sendAndReceive][接收响应超时]"); + return null; + } + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/binary/IotDirectDeviceTcpBinaryProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/binary/IotDirectDeviceTcpBinaryProtocolIntegrationTest.java deleted file mode 100644 index 2efd77f680..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/binary/IotDirectDeviceTcpBinaryProtocolIntegrationTest.java +++ /dev/null @@ -1,385 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.binary; - -import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.IdUtil; -import cn.hutool.core.util.StrUtil; -import cn.iocoder.yudao.framework.common.util.json.JsonUtils; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; -import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; -import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO; -import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO; -import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; -import io.vertx.core.buffer.Buffer; -import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.Test; - -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; -import java.nio.charset.StandardCharsets; - -/** - * IoT 直连设备 TCP 二进制协议集成测试(手动测试) - * - *

测试场景:直连设备(IotProductDeviceTypeEnum 的 DIRECT 类型)通过 TCP 二进制协议直接连接平台 - * - *

使用步骤: - *

    - *
  1. 启动 yudao-module-iot-gateway 服务(TCP 端口 8091)
  2. - *
  3. 运行 {@link #testAuth()} 获取设备认证,认证成功后连接保持
  4. - *
  5. 运行以下测试方法: - *
      - *
    • {@link #testPropertyPost()} - 设备属性上报
    • - *
    • {@link #testEventPost()} - 设备事件上报
    • - *
    - *
  6. - *
- * - *

注意:TCP 协议是有状态的长连接,认证成功后同一连接上的后续请求无需再携带认证信息 - * - *

二进制协议格式说明请参考:{@code tcp-binary-packet-examples.md} - * - * @author 芋道源码 - */ -@Slf4j -public class IotDirectDeviceTcpBinaryProtocolIntegrationTest { - - private static final String SERVER_HOST = "127.0.0.1"; - private static final int SERVER_PORT = 8091; - private static final int TIMEOUT_MS = 5000; - - // ===================== 二进制协议常量 ===================== - /** - * 协议魔术字,用于协议识别 - */ - private static final byte MAGIC_NUMBER = (byte) 0x7E; - - /** - * 协议版本号 - */ - private static final byte PROTOCOL_VERSION = (byte) 0x01; - - /** - * 请求消息类型 - */ - private static final byte REQUEST = (byte) 0x01; - - // ===================== 直连设备信息(根据实际情况修改,从 iot_device 表查询子设备) ===================== - private static final String PRODUCT_KEY = "4aymZgOTOOCrDKRT"; - private static final String DEVICE_NAME = "small"; - private static final String DEVICE_SECRET = "0baa4c2ecc104ae1a26b4070c218bdf3"; - - // ===================== 认证测试 ===================== - - /** - * 认证测试:设备认证(二进制格式) - */ - @Test - public void testAuth() throws Exception { - // 1.1 构建认证参数 - IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); - IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() - .setClientId(authInfo.getClientId()) - .setUsername(authInfo.getUsername()) - .setPassword(authInfo.getPassword()); - // 1.2 构建二进制请求 - String messageId = IdUtil.fastSimpleUUID(); - String method = "auth"; - byte[] payload = buildBinaryRequest(messageId, method, authReqDTO); - // 1.3 输出请求 - log.info("[testAuth][消息ID: {}, 方法: {}, 参数: {}]", messageId, method, JsonUtils.toJsonString(authReqDTO)); - log.info("[testAuth][二进制数据包长度: {} 字节]", payload.length); - log.info("[testAuth][二进制数据包(HEX): {}]", bytesToHex(payload)); - - // 2.1 发送请求 - try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { - socket.setSoTimeout(TIMEOUT_MS); - byte[] response = sendAndReceiveBinary(socket, payload); - // 2.2 输出结果 - if (response != null) { - log.info("[testAuth][响应数据包长度: {} 字节]", response.length); - log.info("[testAuth][响应数据包(HEX): {}]", bytesToHex(response)); - parseBinaryResponse(response); - } else { - log.warn("[testAuth][未收到响应]"); - } - } - } - - // ===================== 直连设备属性上报测试 ===================== - - /** - * 属性上报测试(二进制格式) - * - * 注意:TCP 协议需要先认证,这里为了简化测试,在同一连接上先认证再上报 - */ - @Test - public void testPropertyPost() throws Exception { - try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { - socket.setSoTimeout(TIMEOUT_MS); - - // 1. 先进行认证 - byte[] authResponse = authenticateBinary(socket); - log.info("[testPropertyPost][认证响应长度: {} 字节]", authResponse != null ? authResponse.length : 0); - if (authResponse != null) { - parseBinaryResponse(authResponse); - } - - // 2.1 构建属性上报请求 - String messageId = IdUtil.fastSimpleUUID(); - String method = IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(); - Object params = IotDevicePropertyPostReqDTO.of(MapUtil.builder() - .put("width", 1) - .put("height", "2") - .build()); - byte[] payload = buildBinaryRequest(messageId, method, params); - // 2.2 输出请求 - log.info("[testPropertyPost][消息ID: {}, 方法: {}, 参数: {}]", messageId, method, JsonUtils.toJsonString(params)); - log.info("[testPropertyPost][二进制数据包长度: {} 字节]", payload.length); - - // 3.1 发送请求 - byte[] response = sendAndReceiveBinary(socket, payload); - // 3.2 输出结果 - if (response != null) { - log.info("[testPropertyPost][响应数据包长度: {} 字节]", response.length); - parseBinaryResponse(response); - } else { - log.warn("[testPropertyPost][未收到响应]"); - } - } - } - - // ===================== 直连设备事件上报测试 ===================== - - /** - * 事件上报测试(二进制格式) - * - * 注意:TCP 协议需要先认证,这里为了简化测试,在同一连接上先认证再上报 - */ - @Test - public void testEventPost() throws Exception { - try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { - socket.setSoTimeout(TIMEOUT_MS); - - // 1. 先进行认证 - byte[] authResponse = authenticateBinary(socket); - log.info("[testEventPost][认证响应长度: {} 字节]", authResponse != null ? authResponse.length : 0); - if (authResponse != null) { - parseBinaryResponse(authResponse); - } - - // 2.1 构建事件上报请求 - String messageId = IdUtil.fastSimpleUUID(); - String method = IotDeviceMessageMethodEnum.EVENT_POST.getMethod(); - Object params = IotDeviceEventPostReqDTO.of( - "eat", - MapUtil.builder().put("rice", 3).build(), - System.currentTimeMillis()); - byte[] payload = buildBinaryRequest(messageId, method, params); - // 2.2 输出请求 - log.info("[testEventPost][消息ID: {}, 方法: {}, 参数: {}]", messageId, method, JsonUtils.toJsonString(params)); - log.info("[testEventPost][二进制数据包长度: {} 字节]", payload.length); - - // 3.1 发送请求 - byte[] response = sendAndReceiveBinary(socket, payload); - // 3.2 输出结果 - if (response != null) { - log.info("[testEventPost][响应数据包长度: {} 字节]", response.length); - parseBinaryResponse(response); - } else { - log.warn("[testEventPost][未收到响应]"); - } - } - } - - // ===================== 辅助方法 ===================== - - /** - * 执行设备认证(二进制格式) - * - * @param socket TCP 连接 - * @return 认证响应 - */ - private byte[] authenticateBinary(Socket socket) throws Exception { - IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); - IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() - .setClientId(authInfo.getClientId()) - .setUsername(authInfo.getUsername()) - .setPassword(authInfo.getPassword()); - String messageId = IdUtil.fastSimpleUUID(); - byte[] payload = buildBinaryRequest(messageId, "auth", authReqDTO); - return sendAndReceiveBinary(socket, payload); - } - - /** - * 构建二进制请求数据包 - * - *

协议格式: - *

-     * +--------+--------+--------+---------------------------+
-     * | 魔术字 | 版本号 | 消息类型|      消息长度(4字节)      |
-     * +--------+--------+--------+---------------------------+
-     * |    消息ID长度(2字节)    |    消息ID(变长字符串)      |
-     * +--------+--------+--------+--------+--------+--------+
-     * |    方法名长度(2字节)    |    方法名(变长字符串)      |
-     * +--------+--------+--------+--------+--------+--------+
-     * |                 消息体数据(变长)                    |
-     * +--------+--------+--------+--------+--------+--------+
-     * 
- * - * @param messageId 消息 ID - * @param method 方法名 - * @param params 请求参数 - * @return 二进制数据包 - */ - private byte[] buildBinaryRequest(String messageId, String method, Object params) { - Buffer buffer = Buffer.buffer(); - // 1. 写入协议头部 - buffer.appendByte(MAGIC_NUMBER); - buffer.appendByte(PROTOCOL_VERSION); - buffer.appendByte(REQUEST); - // 2. 预留消息长度位置 - int lengthPosition = buffer.length(); - buffer.appendInt(0); - // 3. 写入消息 ID - byte[] messageIdBytes = StrUtil.utf8Bytes(messageId); - buffer.appendShort((short) messageIdBytes.length); - buffer.appendBytes(messageIdBytes); - // 4. 写入方法名 - byte[] methodBytes = StrUtil.utf8Bytes(method); - buffer.appendShort((short) methodBytes.length); - buffer.appendBytes(methodBytes); - // 5. 写入消息体(params 序列化为 JSON) - if (params != null) { - buffer.appendBytes(JsonUtils.toJsonByte(params)); - } - // 6. 更新消息长度 - buffer.setInt(lengthPosition, buffer.length()); - return buffer.getBytes(); - } - - /** - * 解析二进制响应数据包 - * - * @param data 响应数据 - */ - private void parseBinaryResponse(byte[] data) { - if (data == null || data.length < 11) { - log.warn("[parseBinaryResponse][数据包过短]"); - return; - } - - Buffer buffer = Buffer.buffer(data); - int index = 0; - - // 1. 解析魔术字 - byte magic = buffer.getByte(index++); - log.info("[parseBinaryResponse][魔术字: 0x{} (预期: 0x7E)]", String.format("%02X", magic)); - - // 2. 解析版本号 - byte version = buffer.getByte(index++); - log.info("[parseBinaryResponse][版本号: 0x{}]", String.format("%02X", version)); - - // 3. 解析消息类型 - byte messageType = buffer.getByte(index++); - log.info("[parseBinaryResponse][消息类型: 0x{} (0x01=请求, 0x02=响应)]", String.format("%02X", messageType)); - - // 4. 解析消息长度 - int messageLength = buffer.getInt(index); - index += 4; - log.info("[parseBinaryResponse][消息长度: {}]", messageLength); - - // 5. 解析消息 ID - short messageIdLength = buffer.getShort(index); - index += 2; - String messageId = buffer.getString(index, index + messageIdLength, StandardCharsets.UTF_8.name()); - index += messageIdLength; - log.info("[parseBinaryResponse][消息ID: {}]", messageId); - - // 6. 解析方法名 - short methodLength = buffer.getShort(index); - index += 2; - String method = buffer.getString(index, index + methodLength, StandardCharsets.UTF_8.name()); - index += methodLength; - log.info("[parseBinaryResponse][方法名: {}]", method); - - // 7. 解析消息体 - if (messageType == 0x02) { // RESPONSE - // 响应消息:code(4) + msgLen(2) + msg + data(JSON) - if (index + 4 <= buffer.length()) { - int code = buffer.getInt(index); - index += 4; - log.info("[parseBinaryResponse][响应码: {}]", code); - - if (index + 2 <= buffer.length()) { - short msgLength = buffer.getShort(index); - index += 2; - if (msgLength > 0 && index + msgLength <= buffer.length()) { - String msg = buffer.getString(index, index + msgLength, StandardCharsets.UTF_8.name()); - index += msgLength; - log.info("[parseBinaryResponse][响应消息: {}]", msg); - } - - if (index < buffer.length()) { - String dataJson = buffer.getString(index, buffer.length(), StandardCharsets.UTF_8.name()); - log.info("[parseBinaryResponse][响应数据: {}]", dataJson); - } - } - } - } else { - // 请求消息:params(JSON) - if (index < buffer.length()) { - String paramsJson = buffer.getString(index, buffer.length(), StandardCharsets.UTF_8.name()); - log.info("[parseBinaryResponse][请求参数: {}]", paramsJson); - } - } - } - - /** - * 发送二进制请求并接收响应 - * - * @param socket TCP Socket - * @param payload 二进制数据包 - * @return 响应数据 - */ - private byte[] sendAndReceiveBinary(Socket socket, byte[] payload) throws Exception { - // 1. 发送请求 - OutputStream out = socket.getOutputStream(); - InputStream in = socket.getInputStream(); - out.write(payload); - out.flush(); - - // 2.1 等待一小段时间让服务器处理 - Thread.sleep(100); - // 2.2 接收响应 - byte[] buffer = new byte[4096]; - try { - int length = in.read(buffer); - if (length > 0) { - byte[] response = new byte[length]; - System.arraycopy(buffer, 0, response, 0, length); - return response; - } - return null; - } catch (java.net.SocketTimeoutException e) { - log.warn("[sendAndReceiveBinary][接收响应超时]"); - return null; - } - } - - // TODO @AI:hutool 简化下; - /** - * 字节数组转十六进制字符串 - * - * @param bytes 字节数组 - * @return 十六进制字符串 - */ - private String bytesToHex(byte[] bytes) { - StringBuilder sb = new StringBuilder(); - for (byte b : bytes) { - sb.append(String.format("%02X ", b)); - } - return sb.toString().trim(); - } - -} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/json/IotDirectDeviceTcpJsonProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/json/IotDirectDeviceTcpJsonProtocolIntegrationTest.java deleted file mode 100644 index 62940ca2e0..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/json/IotDirectDeviceTcpJsonProtocolIntegrationTest.java +++ /dev/null @@ -1,205 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.json; - -import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.IdUtil; -import cn.iocoder.yudao.framework.common.util.json.JsonUtils; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; -import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; -import cn.iocoder.yudao.module.iot.core.topic.event.IotDeviceEventPostReqDTO; -import cn.iocoder.yudao.module.iot.core.topic.property.IotDevicePropertyPostReqDTO; -import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; -import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.Test; - -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; -import java.nio.charset.StandardCharsets; - -/** - * IoT 直连设备 TCP JSON 协议集成测试(手动测试) - * - *

测试场景:直连设备(IotProductDeviceTypeEnum 的 DIRECT 类型)通过 TCP JSON 协议直接连接平台 - * - *

使用步骤: - *

    - *
  1. 启动 yudao-module-iot-gateway 服务(TCP 端口 8091)
  2. - *
  3. 运行 {@link #testAuth()} 获取设备认证,认证成功后连接保持
  4. - *
  5. 运行以下测试方法: - *
      - *
    • {@link #testPropertyPost()} - 设备属性上报
    • - *
    • {@link #testEventPost()} - 设备事件上报
    • - *
    - *
  6. - *
- * - *

注意:TCP 协议是有状态的长连接,认证成功后同一连接上的后续请求无需再携带认证信息 - * - * @author 芋道源码 - */ -@Slf4j -public class IotDirectDeviceTcpJsonProtocolIntegrationTest { - - private static final String SERVER_HOST = "127.0.0.1"; - private static final int SERVER_PORT = 8091; - private static final int TIMEOUT_MS = 5000; - - // ===================== 直连设备信息(根据实际情况修改,从 iot_device 表查询子设备) ===================== - private static final String PRODUCT_KEY = "4aymZgOTOOCrDKRT"; - private static final String DEVICE_NAME = "small"; - private static final String DEVICE_SECRET = "0baa4c2ecc104ae1a26b4070c218bdf3"; - - // ===================== 认证测试 ===================== - - /** - * 认证测试:设备认证 - */ - @Test - public void testAuth() throws Exception { - // 1.1 构建请求 - IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); - IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() - .setClientId(authInfo.getClientId()) - .setUsername(authInfo.getUsername()) - .setPassword(authInfo.getPassword()); - String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("id", IdUtil.fastSimpleUUID()) - .put("method", "auth") - .put("params", authReqDTO) - .put("timestamp", System.currentTimeMillis()) - .build()); - // 1.2 输出请求 - log.info("[testAuth][请求体: {}]", payload); - - // 2.1 发送请求 - try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { - socket.setSoTimeout(TIMEOUT_MS); - String response = sendAndReceive(socket, payload); - // 2.2 输出结果 - log.info("[testAuth][响应体: {}]", response); - } - } - - // ===================== 直连设备属性上报测试 ===================== - - /** - * 属性上报测试 - */ - @Test - public void testPropertyPost() throws Exception { - try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { - socket.setSoTimeout(TIMEOUT_MS); - - // 1. 先进行认证 - String authResponse = authenticate(socket); - log.info("[testPropertyPost][认证响应: {}]", authResponse); - - // 2.1 构建属性上报请求 - String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("id", IdUtil.fastSimpleUUID()) - .put("method", IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod()) - .put("params", IotDevicePropertyPostReqDTO.of(MapUtil.builder() - .put("width", 1) - .put("height", "2") - .build())) - .put("timestamp", System.currentTimeMillis()) - .build()); - // 2.2 输出请求 - log.info("[testPropertyPost][请求体: {}]", payload); - - // 3.1 发送请求 - String response = sendAndReceive(socket, payload); - // 3.2 输出结果 - log.info("[testPropertyPost][响应体: {}]", response); - } - } - - // ===================== 直连设备事件上报测试 ===================== - - /** - * 事件上报测试 - */ - @Test - public void testEventPost() throws Exception { - try (Socket socket = new Socket(SERVER_HOST, SERVER_PORT)) { - socket.setSoTimeout(TIMEOUT_MS); - - // 1. 先进行认证 - String authResponse = authenticate(socket); - log.info("[testEventPost][认证响应: {}]", authResponse); - - // 2.1 构建事件上报请求 - String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("id", IdUtil.fastSimpleUUID()) - .put("method", IotDeviceMessageMethodEnum.EVENT_POST.getMethod()) - .put("params", IotDeviceEventPostReqDTO.of( - "eat", - MapUtil.builder().put("rice", 3).build(), - System.currentTimeMillis())) - .put("timestamp", System.currentTimeMillis()) - .build()); - // 2.2 输出请求 - log.info("[testEventPost][请求体: {}]", payload); - - // 3.1 发送请求 - String response = sendAndReceive(socket, payload); - // 3.2 输出结果 - log.info("[testEventPost][响应体: {}]", response); - } - } - - // ===================== 辅助方法 ===================== - - /** - * 执行设备认证 - * - * @param socket TCP 连接 - * @return 认证响应 - */ - private String authenticate(Socket socket) throws Exception { - IotDeviceAuthReqDTO authInfo = IotDeviceAuthUtils.getAuthInfo(PRODUCT_KEY, DEVICE_NAME, DEVICE_SECRET); - IotDeviceAuthReqDTO authReqDTO = new IotDeviceAuthReqDTO() - .setClientId(authInfo.getClientId()) - .setUsername(authInfo.getUsername()) - .setPassword(authInfo.getPassword()); - String payload = JsonUtils.toJsonString(MapUtil.builder() - .put("id", IdUtil.fastSimpleUUID()) - .put("method", "auth") - .put("params", authReqDTO) - .put("timestamp", System.currentTimeMillis()) - .build()); - return sendAndReceive(socket, payload); - } - - /** - * 发送 TCP 请求并接收响应 - * - * @param socket TCP Socket - * @param payload 请求体 - * @return 响应内容 - */ - private String sendAndReceive(Socket socket, String payload) throws Exception { - // 1. 发送请求 - OutputStream out = socket.getOutputStream(); - InputStream in = socket.getInputStream(); - byte[] sendData = payload.getBytes(StandardCharsets.UTF_8); - out.write(sendData); - out.flush(); - - // 2.1 等待一小段时间让服务器处理 - Thread.sleep(100); - // 2.2 接收响应 - byte[] buffer = new byte[4096]; - try { - int length = in.read(buffer); - if (length > 0) { - return new String(buffer, 0, length, StandardCharsets.UTF_8); - } - return null; - } catch (java.net.SocketTimeoutException e) { - log.warn("[sendAndReceive][接收响应超时]"); - return null; - } - } - -} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/binary/tcp-binary-packet-examples.md b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/tcp-binary-packet-examples.md similarity index 100% rename from yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/binary/tcp-binary-packet-examples.md rename to yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/tcp-binary-packet-examples.md diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/json/tcp-json-packet-examples.md b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/tcp-json-packet-examples.md similarity index 100% rename from yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/json/tcp-json-packet-examples.md rename to yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/tcp-json-packet-examples.md