feat(iot):【协议改造】websocket 初步改造(100%)

This commit is contained in:
YunaiV
2026-02-01 19:26:05 +08:00
parent d9a08094d9
commit c105ecea96
6 changed files with 151 additions and 424 deletions

View File

@@ -291,7 +291,7 @@ public class IotGatewayProperties {
*/
private Integer keepAliveTimeoutSeconds = 300;
// TODO @AI所有跟 ssl 相关参数,是不是可以统一?放到 protocol 层级ProtocolInstanceProperties【优先级低】暂时不用规划
// NOTESSL 相关参数后续统一到 protocol 层级(优先级低)
/**
* 是否启用 SSL
*/
@@ -336,7 +336,7 @@ public class IotGatewayProperties {
}
// TODO @AI【暂时忽略】改成 ProtocolProperties
// NOTE暂未统一为 ProtocolProperties,待协议改造完成再调整
/**
* 协议实例配置
*/

View File

@@ -1,7 +1,6 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.handler.upstream;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.framework.common.exception.ServiceException;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
@@ -79,35 +78,32 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
@Override
@SuppressWarnings("DuplicatedCode")
public void handle(NetSocket socket) {
// TODO @AIclientId 去掉;其它模块也看看,怎么去掉下看看;
String clientId = IdUtil.simpleUUID();
log.debug("[handle][设备连接,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
String remoteAddress = String.valueOf(socket.remoteAddress());
log.debug("[handle][设备连接,地址: {}]", remoteAddress);
// 1. 设置异常和关闭处理器
socket.exceptionHandler(ex -> {
log.warn("[handle][连接异常,客户端 ID: {}地址: {}]", clientId, socket.remoteAddress());
log.warn("[handle][连接异常,地址: {}]", remoteAddress, ex);
socket.close();
});
socket.closeHandler(v -> {
log.debug("[handle][连接关闭,客户端 ID: {}地址: {}]", clientId, socket.remoteAddress());
log.debug("[handle][连接关闭,地址: {}]", remoteAddress);
cleanupConnection(socket);
});
// 2.1 设置消息处理器
// TODO @AI去掉 clientId
Handler<Buffer> messageHandler = buffer -> {
try {
processMessage(buffer, socket);
} catch (Exception e) {
log.error("[handle][消息处理失败,客户端 ID: {}地址: {}]",
clientId, socket.remoteAddress(), e);
log.error("[handle][消息处理失败,地址: {}]", remoteAddress, e);
socket.close();
}
};
// 2.2 使用拆包器处理粘包/拆包
RecordParser parser = codec.createDecodeParser(messageHandler);
socket.handler(parser);
log.debug("[handle][启用 {} 拆包器,客户端 ID: {}]", codec.getType(), clientId);
log.debug("[handle][启用 {} 拆包器,地址: {}]", codec.getType(), remoteAddress);
}
/**
@@ -135,23 +131,23 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
handleRegisterRequest(message, socket);
} else {
// 业务消息
handleBusinessRequest(null, message, socket);
handleBusinessRequest(message, socket);
}
} catch (ServiceException e) {
// 业务异常,返回对应的错误码和错误信息
log.warn("[processMessage][业务异常,客户端 ID: {},错误: {}]", null, e.getMessage());
log.warn("[processMessage][业务异常,地址: {},错误: {}]", socket.remoteAddress(), e.getMessage());
String requestId = message != null ? message.getRequestId() : null;
String method = message != null ? message.getMethod() : null;
sendErrorResponse(socket, requestId, method, e.getCode(), e.getMessage());
} catch (IllegalArgumentException e) {
// 参数校验失败,返回 400
log.warn("[processMessage][参数校验失败,客户端 ID: {},错误: {}]", null, e.getMessage());
log.warn("[processMessage][参数校验失败,地址: {},错误: {}]", socket.remoteAddress(), e.getMessage());
String requestId = message != null ? message.getRequestId() : null;
String method = message != null ? message.getMethod() : null;
sendErrorResponse(socket, requestId, method, BAD_REQUEST.getCode(), e.getMessage());
} catch (Exception e) {
// 其他异常,返回 500并重新抛出让上层关闭连接
log.error("[processMessage][处理消息失败,客户端 ID: {}]", null, e);
log.error("[processMessage][处理消息失败,地址: {}]", socket.remoteAddress(), e);
String requestId = message != null ? message.getRequestId() : null;
String method = message != null ? message.getMethod() : null;
sendErrorResponse(socket, requestId, method,
@@ -218,21 +214,21 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
// 3. 发送成功响应
sendSuccessResponse(socket, message.getRequestId(),
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), result.getData());
log.info("[handleRegisterRequest][注册成功,客户端 ID: {},设备名: {}]", null, params.getDeviceName());
log.info("[handleRegisterRequest][注册成功,地址: {},设备名: {}]",
socket.remoteAddress(), params.getDeviceName());
}
/**
* 处理业务请求
*
* @param clientId 客户端 ID
* @param message 消息信息
* @param socket 网络连接
*/
private void handleBusinessRequest(String clientId, IotDeviceMessage message, NetSocket socket) {
private void handleBusinessRequest(IotDeviceMessage message, NetSocket socket) {
// 1. 获取认证信息并处理业务消息
IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
if (connectionInfo == null) {
log.error("[handleBusinessRequest][无法获取连接信息,客户端 ID: {}]", clientId);
log.error("[handleBusinessRequest][无法获取连接信息,地址: {}]", socket.remoteAddress());
sendErrorResponse(socket, message.getRequestId(), message.getMethod(),
UNAUTHORIZED.getCode(), "设备未认证,无法处理业务消息");
return;
@@ -241,7 +237,7 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
// 2. 发送消息到消息总线
deviceMessageService.sendDeviceMessage(message, connectionInfo.getProductKey(),
connectionInfo.getDeviceName(), serverId);
log.info("[handleBusinessRequest][发送消息到消息总线,客户端 ID: {},消息: {}", clientId, message);
log.info("[handleBusinessRequest][发送消息到消息总线,地址: {},消息: {}]", socket.remoteAddress(), message);
}
/**

View File

@@ -9,13 +9,11 @@ import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolInstanceProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.downstream.IotWebSocketDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.downstream.IotWebSocketDownstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.upstream.IotWebSocketUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
@@ -24,9 +22,10 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Assert;
// TODO @AI注释调整下参考 IotTcpProtocol
/**
* IoT 网关 WebSocket 协议:接收设备上行消息
* IoT WebSocket 协议实现
* <p>
* 基于 Vert.x 实现 WebSocket 服务器,接收设备上行消息
*
* @author 芋道源码
*/
@@ -62,12 +61,6 @@ public class IotWebSocketProtocol implements IotProtocol {
*/
private final IotWebSocketConnectionManager connectionManager;
// TODO @AI可以不用这个变量从 properties 里面获取
/**
* WebSocket 配置
*/
private final IotWebSocketConfig wsConfig;
/**
* 下行消息订阅者
*/
@@ -78,14 +71,10 @@ public class IotWebSocketProtocol implements IotProtocol {
*/
private final IotMessageSerializer serializer;
private final IotDeviceService deviceService;
private final IotDeviceMessageService messageService;
public IotWebSocketProtocol(ProtocolInstanceProperties properties) {
Assert.notNull(properties, "协议实例配置不能为空");
Assert.notNull(properties.getWebsocket(), "WebSocket 协议配置websocket不能为空");
this.properties = properties;
this.wsConfig = properties.getWebsocket();
this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort());
// 初始化序列化器
@@ -94,9 +83,7 @@ public class IotWebSocketProtocol implements IotProtocol {
IotMessageSerializerManager serializerManager = SpringUtil.getBean(IotMessageSerializerManager.class);
this.serializer = serializerManager.get(serializeType);
// 初始化基础依赖
this.deviceService = SpringUtil.getBean(IotDeviceService.class);
this.messageService = SpringUtil.getBean(IotDeviceMessageService.class);
// 初始化连接管理器
this.connectionManager = new IotWebSocketConnectionManager();
// 初始化下行消息订阅者
@@ -127,6 +114,7 @@ public class IotWebSocketProtocol implements IotProtocol {
this.vertx = Vertx.vertx();
// 1.2 创建服务器选项
IotWebSocketConfig wsConfig = properties.getWebsocket();
HttpServerOptions options = new HttpServerOptions()
.setPort(properties.getPort())
.setIdleTimeout(wsConfig.getIdleTimeoutSeconds())
@@ -150,8 +138,7 @@ public class IotWebSocketProtocol implements IotProtocol {
return;
}
// 创建上行处理器
IotWebSocketUpstreamHandler handler = new IotWebSocketUpstreamHandler(this,
messageService, deviceService, connectionManager, serializer);
IotWebSocketUpstreamHandler handler = new IotWebSocketUpstreamHandler(serverId, serializer, connectionManager);
handler.handle(socket);
});

View File

@@ -2,7 +2,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.downstrea
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import lombok.RequiredArgsConstructor;
@@ -37,23 +36,17 @@ public class IotWebSocketDownstreamHandler {
return;
}
// 2. 编码消息并发送到设备
// 2. 序列化
byte[] bytes = serializer.serialize(message);
// TODO @AI参考别的模块的做法直接发类似 tcp 这种;
boolean success;
if (serializer.getType() == IotSerializeTypeEnum.BINARY) {
success = connectionManager.sendToDevice(message.getDeviceId(), bytes);
} else {
String jsonMessage = StrUtil.utf8Str(bytes);
success = connectionManager.sendToDevice(message.getDeviceId(), jsonMessage);
}
if (success) {
log.info("[handle][下行消息发送成功,设备 ID: {},方法: {},消息 ID: {},数据长度: {} 字节]",
message.getDeviceId(), message.getMethod(), message.getId(), bytes.length);
} else {
log.error("[handle][下行消息发送失败,设备 ID: {},方法: {},消息 ID: {}]",
message.getDeviceId(), message.getMethod(), message.getId());
String bytesContent = StrUtil.utf8Str(bytes);
// 3. 发送到设备
boolean success = connectionManager.sendToDevice(connectionInfo.getDeviceId(), bytesContent);
if (!success) {
throw new RuntimeException("下行消息发送失败");
}
log.info("[handle][下行消息发送成功,设备 ID: {},方法: {},消息 ID: {},数据长度: {} 字节]",
message.getDeviceId(), message.getMethod(), message.getId(), bytes.length);
} catch (Exception e) {
log.error("[handle][处理下行消息失败,设备 ID: {},方法: {},消息内容: {}]",
message.getDeviceId(), message.getMethod(), message, e);

View File

@@ -1,34 +1,33 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.websocket.handler.upstream;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.framework.common.exception.ServiceException;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager;
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializer;
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.ServerWebSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Assert;
import java.util.Map;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.*;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
import static cn.iocoder.yudao.module.iot.gateway.enums.ErrorCodeConstants.DEVICE_AUTH_FAIL;
/**
@@ -52,256 +51,175 @@ public class IotWebSocketUpstreamHandler implements Handler<ServerWebSocket> {
*/
private final IotWebSocketConnectionManager connectionManager;
// TODO @AI是不是可以去掉
private final boolean binaryPayload;
private final IotDeviceMessageService deviceMessageService;
private final IotDeviceService deviceService;
private final IotDeviceCommonApi deviceApi;
// TODO @AI参数、顺序参考 IotTcpUpstreamHandler
public IotWebSocketUpstreamHandler(IotWebSocketProtocol protocol,
IotDeviceMessageService deviceMessageService,
IotDeviceService deviceService,
IotWebSocketConnectionManager connectionManager,
IotMessageSerializer serializer) {
this.deviceMessageService = deviceMessageService;
this.deviceService = deviceService;
this.connectionManager = connectionManager;
public IotWebSocketUpstreamHandler(String serverId,
IotMessageSerializer serializer,
IotWebSocketConnectionManager connectionManager) {
this.serverId = serverId;
this.serializer = serializer;
this.binaryPayload = serializer.getType() == IotSerializeTypeEnum.BINARY;
this.connectionManager = connectionManager;
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
this.deviceService = SpringUtil.getBean(IotDeviceService.class);
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
this.serverId = protocol.getServerId();
// TODO @AI通过 springutildeviceService、deviceMessageService
}
@Override
@SuppressWarnings("DuplicatedCode")
public void handle(ServerWebSocket socket) {
String clientId = IdUtil.simpleUUID();
log.debug("[handle][设备连接,客户端 ID: {}地址: {}]", clientId, socket.remoteAddress());
String remoteAddress = String.valueOf(socket.remoteAddress());
log.debug("[handle][设备连接,地址: {}]", remoteAddress);
// 1. 设置异常和关闭处理器
// TODO @AIclientId 去掉;
socket.exceptionHandler(ex -> {
log.warn("[handle][连接异常,客户端 ID: {}地址: {}]", clientId, socket.remoteAddress());
log.warn("[handle][连接异常,地址: {}]", remoteAddress, ex);
socket.close();
});
socket.closeHandler(v -> {
log.debug("[handle][连接关闭,客户端 ID: {}地址: {}]", clientId, socket.remoteAddress());
log.debug("[handle][连接关闭,地址: {}]", remoteAddress);
cleanupConnection(socket);
});
// 2. 设置消息处理器(JSON 使用文本BINARY 使用二进制
// TODO @AI是不是 text、binary 保持统一?用一个 mesagehandler
if (binaryPayload) {
socket.binaryMessageHandler(buffer -> {
try {
processMessage(clientId, buffer.getBytes(), socket);
} catch (Exception e) {
log.error("[handle][消息解码失败,断开连接,客户端 ID: {},地址: {},错误: {}]",
clientId, socket.remoteAddress(), e.getMessage());
cleanupConnection(socket);
socket.close();
}
});
socket.textMessageHandler(message -> {
log.warn("[handle][收到文本帧但当前序列化为 BINARY断开连接客户端 ID: {},地址: {}]",
clientId, socket.remoteAddress());
cleanupConnection(socket);
// 2. 设置消息处理器(仅支持文本帧
socket.textMessageHandler(message -> {
try {
processMessage(StrUtil.utf8Bytes(message), socket);
} catch (Exception e) {
log.error("[handle][消息解码失败,断开连接,地址: {},错误: {}]", remoteAddress, e.getMessage());
socket.close();
});
} else {
socket.textMessageHandler(message -> {
try {
processMessage(clientId, StrUtil.utf8Bytes(message), socket);
} catch (Exception e) {
log.error("[handle][消息解码失败,断开连接,客户端 ID: {},地址: {},错误: {}]",
clientId, socket.remoteAddress(), e.getMessage());
// TODO @AI是不是不用 cleanupConnectionclosehandler 本身就吹了了;
cleanupConnection(socket);
socket.close();
}
});
socket.binaryMessageHandler(buffer -> {
try {
processMessage(clientId, buffer.getBytes(), socket);
} catch (Exception e) {
log.error("[handle][消息解码失败,断开连接,客户端 ID: {},地址: {},错误: {}]",
clientId, socket.remoteAddress(), e.getMessage());
cleanupConnection(socket);
socket.close();
}
});
}
}
});
}
/**
* 处理消息
*
* @param clientId 客户端 ID
* @param payload 消息负载
* @param socket WebSocket 连接
* @throws Exception 消息解码失败时抛出异常
*/
private void processMessage(String clientId, byte[] payload, ServerWebSocket socket) throws Exception {
// 1.1 基础检查
if (ArrayUtil.isEmpty(payload)) {
return;
}
// 1.2 解码消息
IotDeviceMessage deviceMessage;
private void processMessage(byte[] payload, ServerWebSocket socket) {
IotDeviceMessage message = null;
try {
deviceMessage = serializer.deserialize(payload);
if (deviceMessage == null) {
throw new Exception("解码后消息为空");
// 1.1 基础检查
if (ArrayUtil.isEmpty(payload)) {
return;
}
} catch (Exception e) {
throw new Exception("消息解码失败: " + e.getMessage(), e);
}
// 1.2 解码消息
message = serializer.deserialize(payload);
Assert.notNull(message, "消息反序列化失败");
Assert.hasText(message.getMethod(), "method 不能为空");
// 2. 根据消息类型路由处理
try {
if (AUTH_METHOD.equals(deviceMessage.getMethod())) {
// 认证请求
handleAuthenticationRequest(clientId, deviceMessage, socket);
} else if (IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod().equals(deviceMessage.getMethod())) {
// 设备动态注册请求
handleRegisterRequest(clientId, deviceMessage, socket);
// 2. 根据消息类型路由处理
if (AUTH_METHOD.equals(message.getMethod())) {
handleAuthenticationRequest(message, socket);
} else if (IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod().equals(message.getMethod())) {
handleRegisterRequest(message, socket);
} else {
// 业务消息
handleBusinessRequest(clientId, deviceMessage, socket);
handleBusinessRequest(message, socket);
}
} catch (ServiceException e) {
log.warn("[processMessage][业务异常,错误: {}]", e.getMessage());
String requestId = message != null ? message.getRequestId() : null;
String method = message != null ? message.getMethod() : null;
sendErrorResponse(socket, requestId, method, e.getCode(), e.getMessage());
} catch (IllegalArgumentException e) {
log.warn("[processMessage][参数校验失败,错误: {}]", e.getMessage());
String requestId = message != null ? message.getRequestId() : null;
String method = message != null ? message.getMethod() : null;
sendErrorResponse(socket, requestId, method, BAD_REQUEST.getCode(), e.getMessage());
} catch (Exception e) {
// TODO @AI参考 IotTcpUpstreamHandler 处理;业务、参数、其它
log.error("[processMessage][处理消息失败,客户端 ID: {},消息方法: {}]",
clientId, deviceMessage.getMethod(), e);
// 发送错误响应,避免客户端一直等待
try {
sendErrorResponse(socket, deviceMessage.getRequestId(), "消息处理失败");
} catch (Exception responseEx) {
log.error("[processMessage][发送错误响应失败,客户端 ID: {}]", clientId, responseEx);
}
log.error("[processMessage][处理消息失败]", e);
String requestId = message != null ? message.getRequestId() : null;
String method = message != null ? message.getMethod() : null;
sendErrorResponse(socket, requestId, method, INTERNAL_SERVER_ERROR.getCode(),
INTERNAL_SERVER_ERROR.getMsg());
throw e;
}
}
/**
* 处理认证请求
*
* @param clientId 客户端 ID
* @param message 消息信息
* @param socket WebSocket 连接
*/
private void handleAuthenticationRequest(String clientId, IotDeviceMessage message, ServerWebSocket socket) {
try {
// 1.1 解析认证参数
// TODO @AI参数解析参考 tcp 对应的 handleAuthenticationRequest
IotDeviceAuthReqDTO authParams = parseAuthParams(message.getParams());
if (authParams == null) {
log.warn("[handleAuthenticationRequest][认证参数解析失败,客户端 ID: {}]", clientId);
sendErrorResponse(socket, message.getRequestId(), "认证参数不完整");
return;
}
// 1.2 执行认证
if (!validateDeviceAuth(authParams)) {
log.warn("[handleAuthenticationRequest][认证失败,客户端 ID: {}username: {}]",
clientId, authParams.getUsername());
sendErrorResponse(socket, message.getRequestId(), "认证失败");
return;
}
@SuppressWarnings("DuplicatedCode")
private void handleAuthenticationRequest(IotDeviceMessage message, ServerWebSocket socket) {
// 1. 解析认证参数
IotDeviceAuthReqDTO authParams = JsonUtils.convertObject(message.getParams(), IotDeviceAuthReqDTO.class);
Assert.notNull(authParams, "认证参数不能为空");
Assert.hasText(authParams.getUsername(), "username 不能为空");
Assert.hasText(authParams.getPassword(), "password 不能为空");
// 2.1 解析设备信息
IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.getUsername());
if (deviceInfo == null) {
sendErrorResponse(socket, message.getRequestId(), "解析设备信息失败");
return;
}
// 2.2 获取设备信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(),
deviceInfo.getDeviceName());
if (device == null) {
sendErrorResponse(socket, message.getRequestId(), "设备不存在");
return;
}
// 3.1 注册连接
registerConnection(socket, device, clientId);
// 3.2 发送上线消息
sendOnlineMessage(device);
// 3.3 发送成功响应
sendSuccessResponse(socket, message.getRequestId(), "认证成功");
log.info("[handleAuthenticationRequest][认证成功,设备 ID: {},设备名: {}]",
device.getId(), device.getDeviceName());
} catch (Exception e) {
log.error("[handleAuthenticationRequest][认证处理异常,客户端 ID: {}]", clientId, e);
sendErrorResponse(socket, message.getRequestId(), "认证处理异常");
// 2.1 执行认证
CommonResult<Boolean> authResult = deviceApi.authDevice(authParams);
authResult.checkError();
if (BooleanUtil.isFalse(authResult.getData())) {
throw exception(DEVICE_AUTH_FAIL);
}
// 2.2 解析设备信息
IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.getUsername());
Assert.notNull(deviceInfo, "解析设备信息失败");
// 2.3 获取设备信息
IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(), deviceInfo.getDeviceName());
Assert.notNull(device, "设备不存在");
// 3.1 注册连接
registerConnection(socket, device);
// 3.2 发送上线消息
sendOnlineMessage(device);
// 3.3 发送成功响应
sendSuccessResponse(socket, message.getRequestId(), AUTH_METHOD, "认证成功");
log.info("[handleAuthenticationRequest][认证成功,设备 ID: {},设备名: {}]", device.getId(), device.getDeviceName());
}
/**
* 处理设备动态注册请求(一型一密,不需要认证)
*
* @param clientId 客户端 ID
* @param message 消息信息
* @param socket WebSocket 连接
* @see <a href="https://help.aliyun.com/zh/iot/user-guide/unique-certificate-per-product-verification">阿里云 - 一型一密</a>
*/
private void handleRegisterRequest(String clientId, IotDeviceMessage message, ServerWebSocket socket) {
// TODO @AI参数解析参考 tcp 对应的 handleRegisterRequest
try {
// 1. 解析注册参数
IotDeviceRegisterReqDTO params = parseRegisterParams(message.getParams());
if (params == null
|| StrUtil.hasEmpty(params.getProductKey(), params.getDeviceName(), params.getProductSecret())) {
log.warn("[handleRegisterRequest][注册参数解析失败,客户端 ID: {}]", clientId);
sendErrorResponse(socket, message.getRequestId(), "注册参数不完整");
return;
}
@SuppressWarnings("DuplicatedCode")
private void handleRegisterRequest(IotDeviceMessage message, ServerWebSocket socket) {
// 1. 解析注册参数
IotDeviceRegisterReqDTO params = JsonUtils.convertObject(message.getParams(), IotDeviceRegisterReqDTO.class);
Assert.notNull(params, "注册参数不能为空");
Assert.hasText(params.getProductKey(), "productKey 不能为空");
Assert.hasText(params.getDeviceName(), "deviceName 不能为空");
// 2. 调用动态注册
CommonResult<IotDeviceRegisterRespDTO> result = deviceApi.registerDevice(params);
if (result.isError()) {
log.warn("[handleRegisterRequest][注册失败,客户端 ID: {},错误: {}]", clientId, result.getMsg());
sendErrorResponse(socket, message.getRequestId(), result.getMsg());
return;
}
// 2. 调用动态注册
CommonResult<IotDeviceRegisterRespDTO> result = deviceApi.registerDevice(params);
result.checkError();
// 3. 发送成功响应(包含 deviceSecret
sendRegisterSuccessResponse(socket, message.getRequestId(), result.getData());
log.info("[handleRegisterRequest][注册成功,客户端 ID: {},设备名: {}]",
clientId, params.getDeviceName());
} catch (Exception e) {
log.error("[handleRegisterRequest][注册处理异常,客户端 ID: {}]", clientId, e);
sendErrorResponse(socket, message.getRequestId(), "注册处理异常");
}
// 3. 发送成功响应(包含 deviceSecret
sendSuccessResponse(socket, message.getRequestId(),
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), result.getData());
log.info("[handleRegisterRequest][注册成功,设备名: {}]", params.getDeviceName());
}
// TODO @AI参考对应的 tcp 的 handleBusinessRequest
/**
* 处理业务请求
*
* @param clientId 客户端 ID
* @param message 消息信息
* @param socket WebSocket 连接
*/
private void handleBusinessRequest(String clientId, IotDeviceMessage message, ServerWebSocket socket) {
try {
// 1. 获取认证信息并处理业务消息
IotWebSocketConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
if (connectionInfo == null) {
log.warn("[handleBusinessRequest][连接未认证,拒绝处理业务消息,客户端 ID: {}]", clientId);
sendErrorResponse(socket, message.getRequestId(), "连接未认证");
return;
}
// 2. 发送消息到消息总线
deviceMessageService.sendDeviceMessage(message, connectionInfo.getProductKey(),
connectionInfo.getDeviceName(), serverId);
log.info("[handleBusinessRequest][发送消息到消息总线,客户端 ID: {},消息: {}",
clientId, message.toString());
} catch (Exception e) {
log.error("[handleBusinessRequest][业务请求处理异常,客户端 ID: {}]", clientId, e);
private void handleBusinessRequest(IotDeviceMessage message, ServerWebSocket socket) {
// 1. 获取认证信息并处理业务消息
IotWebSocketConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
if (connectionInfo == null) {
log.warn("[handleBusinessRequest][连接未认证,拒绝处理业务消息]");
sendErrorResponse(socket, message.getRequestId(), message.getMethod(),
UNAUTHORIZED.getCode(), "设备未认证,无法处理业务消息");
return;
}
// 2. 发送消息到消息总线
deviceMessageService.sendDeviceMessage(message, connectionInfo.getProductKey(),
connectionInfo.getDeviceName(), serverId);
log.info("[handleBusinessRequest][发送消息到消息总线,消息: {}]", message);
}
/**
@@ -309,9 +227,8 @@ public class IotWebSocketUpstreamHandler implements Handler<ServerWebSocket> {
*
* @param socket WebSocket 连接
* @param device 设备
* @param clientId 客户端 ID
*/
private void registerConnection(ServerWebSocket socket, IotDeviceRespDTO device, String clientId) {
private void registerConnection(ServerWebSocket socket, IotDeviceRespDTO device) {
IotWebSocketConnectionManager.ConnectionInfo connectionInfo = new IotWebSocketConnectionManager.ConnectionInfo()
.setDeviceId(device.getId())
.setProductKey(device.getProductKey())
@@ -362,152 +279,18 @@ public class IotWebSocketUpstreamHandler implements Handler<ServerWebSocket> {
* 发送响应消息
*
* @param socket WebSocket 连接
* @param success 是否成功
* @param message 消息
* @param requestId 请求 ID
* @param method 请求方法
* @param data 响应数据
*/
private void sendResponse(ServerWebSocket socket, boolean success, String message, String requestId) {
try {
Object responseData = MapUtil.builder()
.put("success", success)
.put("message", message)
.build();
int code = success ? 0 : 401;
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, responseData, code, message);
writeResponse(socket, responseMessage);
} catch (Exception e) {
log.error("[sendResponse][发送响应失败requestId: {}]", requestId, e);
}
private void sendSuccessResponse(ServerWebSocket socket, String requestId, String method, Object data) {
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, method, data, SUCCESS.getCode(), null);
writeResponse(socket, responseMessage);
}
/**
* 验证设备认证信息
*
* @param authParams 认证参数
* @return 是否认证成功
*/
private boolean validateDeviceAuth(IotDeviceAuthReqDTO authParams) {
try {
CommonResult<Boolean> result = deviceApi.authDevice(new IotDeviceAuthReqDTO()
.setClientId(authParams.getClientId()).setUsername(authParams.getUsername())
.setPassword(authParams.getPassword()));
result.checkError();
return BooleanUtil.isTrue(result.getData());
} catch (Exception e) {
log.error("[validateDeviceAuth][设备认证异常username: {}]", authParams.getUsername(), e);
return false;
}
}
/**
* 发送错误响应
*
* @param socket WebSocket 连接
* @param requestId 请求 ID
* @param errorMessage 错误消息
*/
private void sendErrorResponse(ServerWebSocket socket, String requestId, String errorMessage) {
sendResponse(socket, false, errorMessage, requestId);
}
/**
* 发送成功响应
*
* @param socket WebSocket 连接
* @param requestId 请求 ID
* @param message 消息
*/
@SuppressWarnings("SameParameterValue")
private void sendSuccessResponse(ServerWebSocket socket, String requestId, String message) {
sendResponse(socket, true, message, requestId);
}
/**
* 解析认证参数
*
* @param params 参数对象(通常为 Map 类型)
* @return 认证参数 DTO解析失败时返回 null
*/
@SuppressWarnings({"unchecked", "DuplicatedCode"})
private IotDeviceAuthReqDTO parseAuthParams(Object params) {
if (params == null) {
return null;
}
try {
// 参数默认为 Map 类型,直接转换
if (params instanceof Map) {
Map<String, Object> paramMap = (Map<String, Object>) params;
return new IotDeviceAuthReqDTO()
.setClientId(MapUtil.getStr(paramMap, "clientId"))
.setUsername(MapUtil.getStr(paramMap, "username"))
.setPassword(MapUtil.getStr(paramMap, "password"));
}
// 如果已经是目标类型,直接返回
if (params instanceof IotDeviceAuthReqDTO) {
return (IotDeviceAuthReqDTO) params;
}
// 其他情况尝试 JSON 转换
return JsonUtils.convertObject(params, IotDeviceAuthReqDTO.class);
} catch (Exception e) {
log.error("[parseAuthParams][解析认证参数({})失败]", params, e);
return null;
}
}
/**
* 解析注册参数
*
* @param params 参数对象(通常为 Map 类型)
* @return 注册参数 DTO解析失败时返回 null
*/
@SuppressWarnings("unchecked")
private IotDeviceRegisterReqDTO parseRegisterParams(Object params) {
if (params == null) {
return null;
}
try {
// 参数默认为 Map 类型,直接转换
if (params instanceof Map) {
Map<String, Object> paramMap = (Map<String, Object>) params;
return new IotDeviceRegisterReqDTO()
.setProductKey(MapUtil.getStr(paramMap, "productKey"))
.setDeviceName(MapUtil.getStr(paramMap, "deviceName"))
.setProductSecret(MapUtil.getStr(paramMap, "productSecret"));
}
// 如果已经是目标类型,直接返回
if (params instanceof IotDeviceRegisterReqDTO) {
return (IotDeviceRegisterReqDTO) params;
}
// 其他情况尝试 JSON 转换
return JsonUtils.convertObject(params, IotDeviceRegisterReqDTO.class);
} catch (Exception e) {
log.error("[parseRegisterParams][解析注册参数({})失败]", params, e);
return null;
}
}
/**
* 发送注册成功响应(包含 deviceSecret
*
* @param socket WebSocket 连接
* @param requestId 请求 ID
* @param registerResp 注册响应
*/
private void sendRegisterSuccessResponse(ServerWebSocket socket, String requestId,
IotDeviceRegisterRespDTO registerResp) {
try {
// 1. 构建响应消息(参考 HTTP 返回格式,直接返回 IotDeviceRegisterRespDTO
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId,
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), registerResp, 0, null);
// 2. 发送响应
writeResponse(socket, responseMessage);
} catch (Exception e) {
log.error("[sendRegisterSuccessResponse][发送注册成功响应失败requestId: {}]", requestId, e);
}
private void sendErrorResponse(ServerWebSocket socket, String requestId, String method, Integer code, String msg) {
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, method, null, code, msg);
writeResponse(socket, responseMessage);
}
/**
@@ -515,11 +298,7 @@ public class IotWebSocketUpstreamHandler implements Handler<ServerWebSocket> {
*/
private void writeResponse(ServerWebSocket socket, IotDeviceMessage responseMessage) {
byte[] payload = serializer.serialize(responseMessage);
if (binaryPayload) {
socket.writeBinaryMessage(Buffer.buffer(payload));
} else {
socket.writeTextMessage(StrUtil.utf8Str(payload));
}
socket.writeTextMessage(StrUtil.utf8Str(payload));
}
}

View File

@@ -1,6 +1,5 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.ServerWebSocket;
import lombok.Data;
import lombok.experimental.Accessors;
@@ -115,33 +114,6 @@ public class IotWebSocketConnectionManager {
}
}
// TODO @AI没必要这里加一个
/**
* 发送消息到设备(二进制消息)
*
* @param deviceId 设备 ID
* @param payload 二进制消息
* @return 是否发送成功
*/
public boolean sendToDevice(Long deviceId, byte[] payload) {
ServerWebSocket socket = deviceSocketMap.get(deviceId);
if (socket == null) {
log.warn("[sendToDevice][设备未连接,设备 ID: {}]", deviceId);
return false;
}
try {
socket.writeBinaryMessage(Buffer.buffer(payload));
log.debug("[sendToDevice][发送消息成功,设备 ID: {},数据长度: {} 字节]", deviceId, payload.length);
return true;
} catch (Exception e) {
log.error("[sendToDevice][发送消息失败,设备 ID: {}]", deviceId, e);
// 发送失败时清理连接
unregisterConnection(socket);
return false;
}
}
/**
* 连接信息(包含认证信息)
*/