Jelajahi Sumber

【功能完善】IoT: 添加 Webhook 处理器以处理设备连接和断开事件,更新设备状态管理逻辑

安浩浩 5 bulan lalu
induk
melakukan
824a801b39

+ 4 - 0
yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java

@@ -5,6 +5,7 @@ import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
 import cn.iocoder.yudao.module.iot.plugin.emqx.config.IotPluginEmqxProperties;
 import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router.IotDeviceAuthVertxHandler;
 import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router.IotDeviceMqttMessageHandler;
+import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router.IotDeviceWebhookVertxHandler;
 import io.netty.handler.codec.mqtt.MqttQoS;
 import io.vertx.core.Future;
 import io.vertx.core.Vertx;
@@ -70,6 +71,9 @@ public class IotDeviceUpstreamServer {
                 // TODO @haohao:疑问,mqtt 的认证,需要通过 http 呀?
                 // 回复:MQTT 认证不必须通过 HTTP 进行,但 HTTP 认证是 EMQX 等 MQTT 服务器支持的一种灵活的认证方式
                 .handler(new IotDeviceAuthVertxHandler(deviceUpstreamApi));
+        // 添加 Webhook 处理器,用于处理设备连接和断开连接事件
+        router.post(IotDeviceWebhookVertxHandler.PATH)
+                .handler(new IotDeviceWebhookVertxHandler(deviceUpstreamApi));
         // 创建 HttpServer 实例
         this.server = vertx.createHttpServer().requestHandler(router);
         this.mqttMessageHandler = new IotDeviceMqttMessageHandler(deviceUpstreamApi, client);

+ 154 - 0
yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceWebhookVertxHandler.java

@@ -0,0 +1,154 @@
+package cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router;
+
+import cn.hutool.core.util.StrUtil;
+import cn.iocoder.yudao.framework.common.pojo.CommonResult;
+import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
+import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO;
+import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum;
+import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils;
+import io.vertx.core.Handler;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.web.RoutingContext;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.time.LocalDateTime;
+import java.util.Collections;
+
+/**
+ * IoT Emqx Webhook 事件处理的 Vert.x Handler
+ * <a href=
+ * "https://docs.emqx.com/zh/emqx/latest/data-integration/webhook.html">EMQX
+ * Webhook</a>
+ *
+ * @author haohao
+ */
+@RequiredArgsConstructor
+@Slf4j
+public class IotDeviceWebhookVertxHandler implements Handler<RoutingContext> {
+
+    public static final String PATH = "/mqtt/webhook";
+
+    private final IotDeviceUpstreamApi deviceUpstreamApi;
+
+    @Override
+    public void handle(RoutingContext routingContext) {
+        try {
+            // 解析请求体
+            JsonObject json = routingContext.body().asJsonObject();
+            String event = json.getString("event");
+            String clientId = json.getString("clientid");
+            String username = json.getString("username");
+
+            // 处理不同的事件类型
+            switch (event) {
+                case "client.connected":
+                    handleClientConnected(clientId, username);
+                    break;
+                case "client.disconnected":
+                    handleClientDisconnected(clientId, username);
+                    break;
+                default:
+                    log.info("[handle][未处理的 Webhook 事件] event={}, clientId={}, username={}", event, clientId,
+                            username);
+                    break;
+            }
+
+            // 返回成功响应
+            IotPluginCommonUtils.writeJson(routingContext, Collections.singletonMap("result", "success"));
+        } catch (Exception e) {
+            log.error("[handle][处理 Webhook 事件异常]", e);
+            IotPluginCommonUtils.writeJson(routingContext, Collections.singletonMap("result", "error"));
+        }
+    }
+
+    /**
+     * 处理客户端连接事件
+     *
+     * @param clientId 客户端ID
+     * @param username 用户名
+     */
+    private void handleClientConnected(String clientId, String username) {
+        if (StrUtil.isEmpty(username) || "undefined".equals(username)) {
+            log.warn("[handleClientConnected][客户端连接事件,但用户名为空] clientId={}", clientId);
+            return;
+        }
+
+        // 解析产品标识和设备名称
+        String[] parts = parseUsername(username);
+        if (parts == null) {
+            return;
+        }
+
+        // 更新设备状态为在线
+        IotDeviceStateUpdateReqDTO updateReqDTO = new IotDeviceStateUpdateReqDTO();
+        updateReqDTO.setProductKey(parts[1]);
+        updateReqDTO.setDeviceName(parts[0]);
+        updateReqDTO.setState(IotDeviceStateEnum.ONLINE.getState());
+        updateReqDTO.setProcessId(IotPluginCommonUtils.getProcessId());
+        updateReqDTO.setReportTime(LocalDateTime.now());
+
+        CommonResult<Boolean> result = deviceUpstreamApi.updateDeviceState(updateReqDTO);
+        if (result.getCode() != 0 || !result.getData()) {
+            log.error("[handleClientConnected][更新设备状态为在线失败] clientId={}, username={}, code={}, msg={}",
+                    clientId, username, result.getCode(), result.getMsg());
+        } else {
+            log.info("[handleClientConnected][更新设备状态为在线成功] clientId={}, username={}", clientId, username);
+        }
+    }
+
+    /**
+     * 处理客户端断开连接事件
+     *
+     * @param clientId 客户端ID
+     * @param username 用户名
+     */
+    private void handleClientDisconnected(String clientId, String username) {
+        if (StrUtil.isEmpty(username) || "undefined".equals(username)) {
+            log.warn("[handleClientDisconnected][客户端断开连接事件,但用户名为空] clientId={}", clientId);
+            return;
+        }
+
+        // 解析产品标识和设备名称
+        String[] parts = parseUsername(username);
+        if (parts == null) {
+            return;
+        }
+
+        // 更新设备状态为离线
+        IotDeviceStateUpdateReqDTO offlineReqDTO = new IotDeviceStateUpdateReqDTO();
+        offlineReqDTO.setProductKey(parts[1]);
+        offlineReqDTO.setDeviceName(parts[0]);
+        offlineReqDTO.setState(IotDeviceStateEnum.OFFLINE.getState());
+        offlineReqDTO.setProcessId(IotPluginCommonUtils.getProcessId());
+        offlineReqDTO.setReportTime(LocalDateTime.now());
+
+        CommonResult<Boolean> offlineResult = deviceUpstreamApi.updateDeviceState(offlineReqDTO);
+        if (offlineResult.getCode() != 0 || !offlineResult.getData()) {
+            log.error("[handleClientDisconnected][更新设备状态为离线失败] clientId={}, username={}, code={}, msg={}",
+                    clientId, username, offlineResult.getCode(), offlineResult.getMsg());
+        } else {
+            log.info("[handleClientDisconnected][更新设备状态为离线成功] clientId={}, username={}", clientId, username);
+        }
+    }
+
+    /**
+     * 解析用户名,格式为 deviceName&productKey
+     *
+     * @param username 用户名
+     * @return 解析结果,[0]为deviceName,[1]为productKey,解析失败返回null
+     */
+    private String[] parseUsername(String username) {
+        if (StrUtil.isEmpty(username)) {
+            return null;
+        }
+
+        String[] parts = username.split("&");
+        if (parts.length != 2) {
+            log.warn("[parseUsername][用户名格式不正确,无法解析产品标识和设备名称] username={}", username);
+            return null;
+        }
+
+        return parts;
+    }
+}