Forráskód Böngészése

【功能完善】IoT: 更新 MQTT 客户端逻辑,重构消息处理和重连机制,优化配置文件

安浩浩 5 hónapja
szülő
commit
4015e7905f

+ 87 - 48
yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java

@@ -1,6 +1,6 @@
 package cn.iocoder.yudao.module.iot.plugin.emqx.upstream;
 
-import cn.hutool.core.date.DateUtil;
+import cn.hutool.core.util.IdUtil;
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
 import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
@@ -19,7 +19,6 @@ import io.vertx.mqtt.MqttClientOptions;
 import lombok.extern.slf4j.Slf4j;
 
 import java.time.LocalDateTime;
-import java.util.UUID;
 
 /**
  * IoT 设备下行服务端,接收来自 device 设备的请求,转发给 server 服务器
@@ -31,6 +30,10 @@ import java.util.UUID;
 @Slf4j
 public class IotDeviceUpstreamServer {
 
+    private static final String PROPERTY_POST_TOPIC = "/event/property/post";
+    private static final int RECONNECT_DELAY = 5000; // 重连延迟时间(毫秒)
+    private static final int QOS_LEVEL = 1;
+
     private final Vertx vertx;
     private final HttpServer server;
     private final MqttClient client;
@@ -55,7 +58,7 @@ public class IotDeviceUpstreamServer {
 
         // 创建 MQTT 客户端
         MqttClientOptions options = new MqttClientOptions()
-                .setClientId("yudao-iot-server-" + UUID.randomUUID())
+                .setClientId("yudao-iot-server-" + IdUtil.fastSimpleUUID())
                 .setUsername(emqxProperties.getMqttUsername())
                 .setPassword(emqxProperties.getMqttPassword())
                 .setSsl(emqxProperties.isMqttSsl());
@@ -80,78 +83,114 @@ public class IotDeviceUpstreamServer {
         // 3. 添加 MQTT 断开重连监听器
         client.closeHandler(v -> {
             log.warn("[closeHandler][MQTT 连接已断开,准备重连]");
-            // 等待 5 秒后重连,避免频繁重连
-            vertx.setTimer(5000, id -> {
-                log.info("[closeHandler][开始重新连接 MQTT]");
-                connectMqtt();
-            });
+            reconnectWithDelay();
         });
 
         // 4. 设置 MQTT 消息处理器
+        setupMessageHandler();
+    }
+
+    /**
+     * 设置 MQTT 消息处理器
+     */
+    private void setupMessageHandler() {
         client.publishHandler(message -> {
             String topic = message.topicName();
             String payload = message.payload().toString();
             log.info("[messageHandler][接收到消息][topic: {}][payload: {}]", topic, payload);
 
             try {
-                // 4.1 处理设备属性上报消息: /{productKey}/{deviceName}/event/property/post
-                if (topic.contains("/event/property/post")) {
-                    // 4.2 解析消息内容
-                    JSONObject jsonObject = JSONUtil.parseObj(payload);
-                    String requestId = jsonObject.getStr("id");
-                    Long timestamp = jsonObject.getLong("timestamp");
-
-                    // 4.3 从 topic 中解析设备标识
-                    String[] topicParts = topic.split("/");
-                    String productKey = topicParts[1];
-                    String deviceName = topicParts[2];
-
-                    // 4.4 构建设备属性上报请求对象
-                    IotDevicePropertyReportReqDTO devicePropertyReportReqDTO = ((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO()
-                            .setRequestId(requestId)
-                            .setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now())
-                            .setProductKey(productKey).setDeviceName(deviceName))
-                            .setProperties(jsonObject.getJSONObject("params"));
-
-                    // 4.5 调用上游 API 处理设备上报数据
-                    deviceUpstreamApi.reportDeviceProperty(devicePropertyReportReqDTO);
-                    log.info("[messageHandler][处理设备上行消息成功][topic: {}][devicePropertyReportReqDTO: {}]",
-                            topic, JSONUtil.toJsonStr(devicePropertyReportReqDTO));
-                }
+                handleMessage(topic, payload);
             } catch (Exception e) {
                 log.error("[messageHandler][处理消息失败][topic: {}][payload: {}]", topic, payload, e);
             }
         });
     }
 
+    /**
+     * 处理 MQTT 消息
+     */
+    private void handleMessage(String topic, String payload) {
+        // 处理设备属性上报消息
+        if (topic.contains(PROPERTY_POST_TOPIC)) {
+            handlePropertyPost(topic, payload);
+        }
+    }
+
+    /**
+     * 处理设备属性上报
+     */
+    private void handlePropertyPost(String topic, String payload) {
+        // 解析消息内容
+        JSONObject jsonObject = JSONUtil.parseObj(payload);
+        String[] topicParts = topic.split("/");
+
+        // 构建设备属性上报请求对象
+        IotDevicePropertyReportReqDTO reportReqDTO = buildPropertyReportDTO(
+                jsonObject,
+                topicParts[1], // productKey
+                topicParts[2] // deviceName
+        );
+
+        // 调用上游 API 处理设备上报数据
+        deviceUpstreamApi.reportDeviceProperty(reportReqDTO);
+        log.info("[handlePropertyPost][处理设备上行消息成功][topic: {}][reportReqDTO: {}]",
+                topic, JSONUtil.toJsonStr(reportReqDTO));
+    }
+
+    /**
+     * 构建设备属性上报请求对象
+     */
+    private IotDevicePropertyReportReqDTO buildPropertyReportDTO(JSONObject jsonObject,
+                                                                 String productKey,
+                                                                 String deviceName) {
+        return ((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO()
+                .setRequestId(jsonObject.getStr("id"))
+                .setProcessId(IotPluginCommonUtils.getProcessId())
+                .setReportTime(LocalDateTime.now())
+                .setProductKey(productKey)
+                .setDeviceName(deviceName))
+                .setProperties(jsonObject.getJSONObject("params"));
+    }
+
+    /**
+     * 重连 MQTT 客户端
+     */
+    private void reconnectWithDelay() {
+        vertx.setTimer(RECONNECT_DELAY, id -> {
+            log.info("[reconnectWithDelay][开始重新连接 MQTT]");
+            connectMqtt();
+        });
+    }
+
     /**
      * 连接 MQTT Broker 并订阅主题
      */
     private void connectMqtt() {
-        // 连接 MQTT Broker
         client.connect(emqxProperties.getMqttPort(), emqxProperties.getMqttHost())
                 .onSuccess(connAck -> {
                     log.info("[connectMqtt][MQTT客户端连接成功]");
-                    // 连接成功后订阅主题
-                    String mqttTopics = emqxProperties.getMqttTopics();
-                    String[] topics = mqttTopics.split(",");
-                    for (String topic : topics) {
-                        client.subscribe(topic, 1)
-                                .onSuccess(v -> log.info("[connectMqtt][成功订阅主题: {}]", topic))
-                                .onFailure(err -> log.error("[connectMqtt][订阅主题失败: {}]", topic, err));
-                    }
-                    log.info("[connectMqtt][开始订阅设备上行消息主题]");
+                    subscribeToTopics();
                 })
                 .onFailure(err -> {
                     log.error("[connectMqtt][连接 MQTT Broker 失败]", err);
-                    // 连接失败后,等待 5 秒重试
-                    vertx.setTimer(5000, id -> {
-                        log.info("[connectMqtt][准备重新连接 MQTT]");
-                        connectMqtt();
-                    });
+                    reconnectWithDelay();
                 });
     }
 
+    /**
+     * 订阅设备上行消息主题
+     */
+    private void subscribeToTopics() {
+        String[] topics = emqxProperties.getMqttTopics().split(",");
+        for (String topic : topics) {
+            client.subscribe(topic, QOS_LEVEL)
+                    .onSuccess(v -> log.info("[subscribeToTopics][成功订阅主题: {}]", topic))
+                    .onFailure(err -> log.error("[subscribeToTopics][订阅主题失败: {}]", topic, err));
+        }
+        log.info("[subscribeToTopics][开始订阅设备上行消息主题]");
+    }
+
     /**
      * 停止所有
      */
@@ -187,4 +226,4 @@ public class IotDeviceUpstreamServer {
             throw new RuntimeException(e);
         }
     }
-}
+}

+ 1 - 1
yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/resources/application.yml

@@ -14,6 +14,6 @@ yudao:
         mqtt-port: 1883
         mqtt-ssl: false
         mqtt-username: yudao
-        mqtt-password: yudao
+        mqtt-password: 123456
         mqtt-topics: "/+/#"
         auth-port: 8101

+ 0 - 17
yudao-server/src/main/resources/application-local.yaml

@@ -267,23 +267,6 @@ justauth:
     timeout: 24h # 超时时长,目前只对 Redis 缓存生效,默认 3 分钟
 
 --- #################### iot相关配置 TODO 芋艿:再瞅瞅 ####################
-iot:
-  emq:
-    # 账号
-    username: haohao
-    # 密码
-    password: ahh@123456
-    # 主机地址
-    hostUrl: tcp://chaojiniu.top:1883
-    # 客户端Id,不能相同,采用随机数 ${random.value}
-    client-id: ${random.int}
-    # 默认主题
-    default-topic: test
-    # 保持连接
-    keepalive: 60
-    # 清除会话(设置为false,断开连接,重连后使用原来的会话 保留订阅的主题,能接收离线期间的消息)
-    clearSession: true
-
 pf4j:
 #  pluginsDir: /tmp/
   pluginsDir: ../plugins