Selaa lähdekoodia

【代码评审】IoT:整体实现

YunaiV 5 kuukautta sitten
vanhempi
sitoutus
6639d37132

+ 2 - 1
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/job/rule/IotRuleSceneJob.java

@@ -1,5 +1,6 @@
 package cn.iocoder.yudao.module.iot.job.rule;
 
+import cn.hutool.core.map.MapUtil;
 import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneTriggerTypeEnum;
 import cn.iocoder.yudao.module.iot.service.rule.IotRuleSceneService;
 import jakarta.annotation.Resource;
@@ -41,7 +42,7 @@ public class IotRuleSceneJob extends QuartzJobBean {
      * @return JobData Map
      */
     public static Map<String, Object> buildJobDataMap(Long ruleSceneId) {
-        return Map.of(JOB_DATA_KEY_RULE_SCENE_ID, ruleSceneId);
+        return MapUtil.of(JOB_DATA_KEY_RULE_SCENE_ID, ruleSceneId);
     }
 
     /**

+ 2 - 1
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceGroupService.java

@@ -1,6 +1,7 @@
 package cn.iocoder.yudao.module.iot.service.device;
 
 import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.collection.ListUtil;
 import cn.iocoder.yudao.framework.common.pojo.PageResult;
 import cn.iocoder.yudao.module.iot.controller.admin.device.vo.group.IotDeviceGroupPageReqVO;
 import cn.iocoder.yudao.module.iot.controller.admin.device.vo.group.IotDeviceGroupSaveReqVO;
@@ -48,7 +49,7 @@ public interface IotDeviceGroupService {
      */
     default List<IotDeviceGroupDO> validateDeviceGroupExists(Collection<Long> ids) {
         if (CollUtil.isEmpty(ids)) {
-            return List.of();
+            return ListUtil.empty();
         }
         return convertList(ids, this::validateDeviceGroupExists);
     }

+ 3 - 2
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/data/IotDeviceLogServiceImpl.java

@@ -1,6 +1,7 @@
 package cn.iocoder.yudao.module.iot.service.device.data;
 
 import cn.hutool.core.date.LocalDateTimeUtil;
+import cn.hutool.core.map.MapUtil;
 import cn.hutool.core.util.IdUtil;
 import cn.hutool.core.util.StrUtil;
 import cn.iocoder.yudao.framework.common.pojo.PageResult;
@@ -87,7 +88,7 @@ public class IotDeviceLogServiceImpl implements IotDeviceLogService {
                     Long timeMillis = timestamp.getTime();
                     // 消息数量转换
                     Integer count = ((Number) map.get("data")).intValue();
-                    return Map.of(timeMillis, count);
+                    return MapUtil.of(timeMillis, count);
                 })
                 .collect(Collectors.toList());
     }
@@ -103,7 +104,7 @@ public class IotDeviceLogServiceImpl implements IotDeviceLogService {
                     Long timeMillis = timestamp.getTime();
                     // 消息数量转换
                     Integer count = ((Number) map.get("data")).intValue();
-                    return Map.of(timeMillis, count);
+                    return MapUtil.of(timeMillis, count);
                 })
                 .collect(Collectors.toList());
     }

+ 30 - 25
yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java

@@ -85,11 +85,12 @@ public class IotDeviceUpstreamServer {
         }
         log.info("[start][开始启动服务]");
 
+        // TODO @haohao:建议先启动 MQTT Broker,再启动 HTTP Server。类似 jdbc 先连接了,在启动 tomcat 的味道
         // 1. 启动 HTTP 服务器
         CompletableFuture<Void> httpFuture = server.listen(emqxProperties.getAuthPort())
                 .toCompletionStage()
                 .toCompletableFuture()
-                .thenAccept(v -> log.info("[start][HTTP服务器启动完成,端口: {}]", server.actualPort()));
+                .thenAccept(v -> log.info("[start][HTTP 服务器启动完成,端口: {}]", server.actualPort()));
 
         // 2. 连接 MQTT Broker
         CompletableFuture<Void> mqttFuture = connectMqtt()
@@ -98,16 +99,16 @@ public class IotDeviceUpstreamServer {
                 .thenAccept(v -> {
                     // 2.1 添加 MQTT 断开重连监听器
                     client.closeHandler(closeEvent -> {
-                        log.warn("[closeHandler][MQTT连接已断开,准备重连]");
+                        log.warn("[closeHandler][MQTT 连接已断开,准备重连]");
                         reconnectWithDelay();
                     });
-                    // 2. 设置 MQTT 消息处理器
+                    // 2.2 设置 MQTT 消息处理器
                     setupMessageHandler();
                 });
 
         // 3. 等待所有服务启动完成
         CompletableFuture.allOf(httpFuture, mqttFuture)
-                .orTimeout(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+                .orTimeout(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS) // TODO @芋艿:JDK8 不兼容
                 .whenComplete((result, error) -> {
                     if (error != null) {
                         log.error("[start][服务启动失败]", error);
@@ -123,7 +124,7 @@ public class IotDeviceUpstreamServer {
      */
     private void setupMessageHandler() {
         client.publishHandler(mqttMessageHandler::handle);
-        log.debug("[setupMessageHandler][MQTT消息处理器设置完成]");
+        log.debug("[setupMessageHandler][MQTT 消息处理器设置完成]");
     }
 
     /**
@@ -203,26 +204,30 @@ public class IotDeviceUpstreamServer {
         isRunning = false;
 
         try {
-            CompletableFuture<Void> serverFuture = server != null
-                    ? server.close().toCompletionStage().toCompletableFuture()
-                    : CompletableFuture.completedFuture(null);
-            CompletableFuture<Void> clientFuture = client != null
-                    ? client.disconnect().toCompletionStage().toCompletableFuture()
-                    : CompletableFuture.completedFuture(null);
-            CompletableFuture<Void> vertxFuture = vertx != null
-                    ? vertx.close().toCompletionStage().toCompletableFuture()
-                    : CompletableFuture.completedFuture(null);
-
-            // 等待所有资源关闭
-            CompletableFuture.allOf(serverFuture, clientFuture, vertxFuture)
-                    .orTimeout(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS)
-                    .whenComplete((result, error) -> {
-                        if (error != null) {
-                            log.error("[stop][服务关闭过程中发生异常]", error);
-                        } else {
-                            log.info("[stop][所有服务关闭完成]");
-                        }
-                    });
+            // 关闭 HTTP 服务器
+            if (server != null) {
+                server.close()
+                        .toCompletionStage()
+                        .toCompletableFuture()
+                        .join();
+            }
+
+            // 关闭 MQTT 客户端
+            if (client != null) {
+                client.disconnect()
+                       .toCompletionStage()
+                       .toCompletableFuture()
+                       .join();
+            }
+
+            // 关闭 Vertx 实例
+            if (vertx!= null) {
+                vertx.close()
+                      .toCompletionStage()
+                      .toCompletableFuture()
+                      .join();
+            }
+            log.info("[stop][关闭完成]");
         } catch (Exception e) {
             log.error("[stop][关闭服务异常]", e);
             throw new RuntimeException("关闭 IoT 设备上行服务失败", e);