Quellcode durchsuchen

【功能完善】IoT: 更新 EMQX 插件配置,添加 MQTT 连接参数,重构相关逻辑

安浩浩 vor 5 Monaten
Ursprung
Commit
53697b55c2
11 geänderte Dateien mit 161 neuen und 92 gelöschten Zeilen
  1. 1 1
      yudao-dependencies/pom.xml
  2. 1 1
      yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/api/device/IoTDeviceUpstreamApiImpl.java
  3. 0 12
      yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/device/IotDeviceRespVO.java
  4. 1 1
      yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceUpstreamService.java
  5. 9 11
      yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceUpstreamServiceImpl.java
  6. 17 52
      yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/util/MqttSignUtils.java
  7. 4 0
      yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/pom.xml
  8. 12 5
      yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotPluginEmqxProperties.java
  9. 0 2
      yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/downstream/IotDeviceDownstreamHandlerImpl.java
  10. 110 3
      yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java
  11. 6 4
      yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/resources/application.yml

+ 1 - 1
yudao-dependencies/pom.xml

@@ -67,7 +67,7 @@
         <netty.version>4.1.116.Final</netty.version>
         <mqtt.version>1.2.5</mqtt.version>
         <pf4j-spring.version>0.9.0</pf4j-spring.version>
-        <vertx.version>4.5.11</vertx.version>
+        <vertx.version>4.5.13</vertx.version>
         <!-- 三方云服务相关 -->
         <commons-io.version>2.17.0</commons-io.version>
         <commons-compress.version>1.27.1</commons-compress.version>

+ 1 - 1
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/api/device/IoTDeviceUpstreamApiImpl.java

@@ -62,7 +62,7 @@ public class IoTDeviceUpstreamApiImpl implements IotDeviceUpstreamApi {
 
     @Override
     public CommonResult<Boolean> authenticateEmqxConnection(IotDeviceEmqxAuthReqDTO authReqDTO) {
-        Boolean result = deviceUpstreamService.authenticateEmqxConnection(authReqDTO);
+        boolean result = deviceUpstreamService.authenticateEmqxConnection(authReqDTO);
         return success(result);
     }
 

+ 0 - 12
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/device/IotDeviceRespVO.java

@@ -79,18 +79,6 @@ public class IotDeviceRespVO {
     @ExcelProperty("设备密钥")
     private String deviceSecret;
 
-    @Schema(description = "MQTT 客户端 ID", example = "24602")
-    @ExcelProperty("MQTT 客户端 ID")
-    private String mqttClientId;
-
-    @Schema(description = "MQTT 用户名", example = "芋艿")
-    @ExcelProperty("MQTT 用户名")
-    private String mqttUsername;
-
-    @Schema(description = "MQTT 密码")
-    @ExcelProperty("MQTT 密码")
-    private String mqttPassword;
-
     @Schema(description = "认证类型(如一机一密、动态注册)", example = "2")
     @ExcelProperty("认证类型(如一机一密、动态注册)")
     private String authType;

+ 1 - 1
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceUpstreamService.java

@@ -67,6 +67,6 @@ public interface IotDeviceUpstreamService {
      *
      * @param authReqDTO Emqx 连接认证 DTO
      */
-    Boolean authenticateEmqxConnection(IotDeviceEmqxAuthReqDTO authReqDTO);
+    boolean authenticateEmqxConnection(IotDeviceEmqxAuthReqDTO authReqDTO);
 
 }

+ 9 - 11
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceUpstreamServiceImpl.java

@@ -174,7 +174,7 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService {
     }
 
     private void registerDevice0(String productKey, String deviceName, Long gatewayId,
-                                 IotDeviceUpstreamAbstractReqDTO registerReqDTO) {
+            IotDeviceUpstreamAbstractReqDTO registerReqDTO) {
         // 1.1 注册设备
         IotDeviceDO device = deviceService.getDeviceByProductKeyAndDeviceNameFromCache(productKey, deviceName);
         boolean registerNew = device == null;
@@ -280,16 +280,15 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService {
         sendDeviceMessage(message, device);
     }
 
-    // TODO @haohao:建议返回 boolean;
     @Override
-    public Boolean authenticateEmqxConnection(IotDeviceEmqxAuthReqDTO authReqDTO) {
+    public boolean authenticateEmqxConnection(IotDeviceEmqxAuthReqDTO authReqDTO) {
         log.info("[authenticateEmqxConnection][认证 Emqx 连接: {}]", authReqDTO);
         // 1. 校验设备是否存在
         // username 格式:${DeviceName}&${ProductKey}
         String[] usernameParts = authReqDTO.getUsername().split("&");
         if (usernameParts.length != 2) {
             log.error("[authenticateEmqxConnection][认证失败,username 格式不正确]");
-            return Boolean.FALSE;
+            return false;
         }
         String deviceName = usernameParts[0];
         String productKey = usernameParts[1];
@@ -298,19 +297,18 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService {
         if (device == null) {
             log.error("[authenticateEmqxConnection][设备({}/{}) 不存在]",
                     productKey, deviceName);
-            return Boolean.FALSE;
+            return false;
         }
         // 2. 校验密码
         String deviceSecret = device.getDeviceSecret();
         String clientId = authReqDTO.getClientId();
         MqttSignResult sign = MqttSignUtils.calculate(productKey, deviceName, deviceSecret, clientId);
-        // TODO @haohao:notEquals,尽量不走取反逻辑哈
-        if (!StrUtil.equals(sign.getPassword(), authReqDTO.getPassword())) {
-            log.error("[authenticateEmqxConnection][认证失败,密码不正确]");
-            return Boolean.FALSE;
+        if (StrUtil.equals(sign.getPassword(), authReqDTO.getPassword())) {
+            log.info("[authenticateEmqxConnection][认证成功]");
+            return true;
         }
-        log.info("[authenticateEmqxConnection][认证成功]");
-        return Boolean.TRUE;
+        log.error("[authenticateEmqxConnection][认证失败,密码不正确]");
+        return false;
     }
 
     private void updateDeviceLastTime(IotDeviceDO device, IotDeviceUpstreamAbstractReqDTO reqDTO) {

+ 17 - 52
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/util/MqttSignUtils.java

@@ -1,9 +1,10 @@
 package cn.iocoder.yudao.module.iot.util;
 
+import cn.hutool.crypto.digest.HMac;
+import cn.hutool.crypto.digest.HmacAlgorithm;
+import lombok.AllArgsConstructor;
 import lombok.Getter;
 
-import javax.crypto.Mac;
-import javax.crypto.spec.SecretKeySpec;
 import java.nio.charset.StandardCharsets;
 
 /**
@@ -13,10 +14,6 @@ import java.nio.charset.StandardCharsets;
  */
 public class MqttSignUtils {
 
-    private static final String SIGN_METHOD = "hmacsha256";
-
-    // TODO @haohao:calculate 方法,可以融合么?
-
     /**
      * 计算 MQTT 连接参数
      *
@@ -26,14 +23,7 @@ public class MqttSignUtils {
      * @return 包含 clientId, username, password 的结果对象
      */
     public static MqttSignResult calculate(String productKey, String deviceName, String deviceSecret) {
-        String clientId = productKey + "." + deviceName;
-        String username = deviceName + "&" + productKey;
-        // 生成 password
-        // TODO @haohao:signContent 和 signContentBuilder 风格保持统一的实现哈
-        String signContent = String.format("clientId%sdeviceName%sdeviceSecret%sproductKey%s",
-                clientId, deviceName, deviceSecret, productKey);
-        String password = sign(signContent, deviceSecret);
-        return new MqttSignResult(clientId, username, password);
+        return calculate(productKey, deviceName, deviceSecret, productKey + "." + deviceName);
     }
 
     /**
@@ -47,56 +37,31 @@ public class MqttSignUtils {
      */
     public static MqttSignResult calculate(String productKey, String deviceName, String deviceSecret, String clientId) {
         String username = deviceName + "&" + productKey;
-        String signContentBuilder = "clientId" + clientId +
-                "deviceName" + deviceName +
-                "deviceSecret" + deviceSecret +
-                "productKey" + productKey;
+        // 构建签名内容
+        StringBuilder signContentBuilder = new StringBuilder()
+                .append("clientId").append(clientId)
+                .append("deviceName").append(deviceName)
+                .append("deviceSecret").append(deviceSecret)
+                .append("productKey").append(productKey);
 
-        String password = sign(signContentBuilder, deviceSecret);
+        // 使用 HMac 计算签名
+        byte[] key = deviceSecret.getBytes(StandardCharsets.UTF_8);
+        String signContent = signContentBuilder.toString();
+        HMac mac = new HMac(HmacAlgorithm.HmacSHA256, key);
+        String password = mac.digestHex(signContent);
 
         return new MqttSignResult(clientId, username, password);
     }
 
-    // TODO @haohao:hutool 貌似有工具类可以用哈。
-    private static String sign(String content, String key) {
-        try {
-            Mac mac = Mac.getInstance(SIGN_METHOD);
-            mac.init(new SecretKeySpec(key.getBytes(StandardCharsets.UTF_8), SIGN_METHOD));
-            byte[] signData = mac.doFinal(content.getBytes(StandardCharsets.UTF_8));
-            return bytesToHex(signData);
-        } catch (Exception e) {
-            throw new RuntimeException("Failed to sign content with HmacSHA256", e);
-        }
-    }
-
-    private static String bytesToHex(byte[] bytes) {
-        StringBuilder hexString = new StringBuilder(bytes.length * 2);
-        for (byte b : bytes) {
-            String hex = Integer.toHexString(0xFF & b);
-            if (hex.length() == 1) {
-                hexString.append('0');
-            }
-            hexString.append(hex);
-        }
-        return hexString.toString();
-    }
-
     /**
      * MQTT 签名结果类
      */
     @Getter
-    // TODO @haohao:可以用 lombok 哈
+    @AllArgsConstructor
     public static class MqttSignResult {
-
         private final String clientId;
         private final String username;
         private final String password;
-
-        public MqttSignResult(String clientId, String username, String password) {
-            this.clientId = clientId;
-            this.username = username;
-            this.password = password;
-        }
-
     }
+
 }

+ 4 - 0
yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/pom.xml

@@ -160,5 +160,9 @@
             <groupId>io.vertx</groupId>
             <artifactId>vertx-web</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.vertx</groupId>
+            <artifactId>vertx-mqtt</artifactId>
+        </dependency>
     </dependencies>
 </project>

+ 12 - 5
yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotPluginEmqxProperties.java

@@ -17,22 +17,29 @@ public class IotPluginEmqxProperties {
     /**
      * 服务主机
      */
-    private String host;
-
+    private String mqttHost;
     /**
      * 服务端口
      */
-    private int port;
+    private int mqttPort;
+    /**
+     * 服务用户名
+     */
+    private String mqttUsername;
 
+    /**
+     * 服务密码
+     */
+    private String mqttPassword;
     /**
      * 是否启用 SSL
      */
-    private boolean ssl;
+    private boolean mqttSsl;
 
     /**
      * 订阅的主题
      */
-    private String topics;
+    private String mqttTopics;
 
     /**
      * 认证端口

+ 0 - 2
yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/downstream/IotDeviceDownstreamHandlerImpl.java

@@ -7,8 +7,6 @@ import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamH
 /**
  * EMQX 插件的 {@link IotDeviceDownstreamHandler} 实现类
  * <p>
- * 但是:由于设备通过 HTTP 短链接接入,导致其实无法下行指导给 device 设备,所以基本都是直接返回失败!!!
- * 类似 MQTT、WebSocket、TCP 插件,是可以实现下行指令的。
  *
  * @author 芋道源码
  */

+ 110 - 3
yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java

@@ -1,20 +1,30 @@
 package cn.iocoder.yudao.module.iot.plugin.emqx.upstream;
 
+import cn.hutool.core.date.DateUtil;
+import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
 import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
+import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO;
 import cn.iocoder.yudao.module.iot.plugin.common.config.IotPluginCommonProperties;
 import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamServer;
+import cn.iocoder.yudao.module.iot.plugin.common.util.IotPluginCommonUtils;
 import cn.iocoder.yudao.module.iot.plugin.emqx.config.IotPluginEmqxProperties;
 import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router.IotDeviceAuthVertxHandler;
 import io.vertx.core.Vertx;
 import io.vertx.core.http.HttpServer;
 import io.vertx.ext.web.Router;
 import io.vertx.ext.web.handler.BodyHandler;
+import io.vertx.mqtt.MqttClient;
+import io.vertx.mqtt.MqttClientOptions;
 import lombok.extern.slf4j.Slf4j;
 
+import java.time.LocalDateTime;
+import java.util.UUID;
+
 /**
  * IoT 设备下行服务端,接收来自 device 设备的请求,转发给 server 服务器
  * <p>
- * 协议:HTTP
+ * 协议:HTTP、MQTT
  *
  * @author haohao
  */
@@ -23,13 +33,16 @@ public class IotDeviceUpstreamServer {
 
     private final Vertx vertx;
     private final HttpServer server;
+    private final MqttClient client;
     private final IotPluginEmqxProperties emqxProperties;
+    private final IotDeviceUpstreamApi deviceUpstreamApi;
 
     public IotDeviceUpstreamServer(IotPluginCommonProperties commonProperties,
                                    IotPluginEmqxProperties emqxProperties,
                                    IotDeviceUpstreamApi deviceUpstreamApi,
                                    IotDeviceDownstreamServer deviceDownstreamServer) {
         this.emqxProperties = emqxProperties;
+        this.deviceUpstreamApi = deviceUpstreamApi;
         // 创建 Vertx 实例
         this.vertx = Vertx.vertx();
         // 创建 Router 实例
@@ -39,18 +52,104 @@ public class IotDeviceUpstreamServer {
                 .handler(new IotDeviceAuthVertxHandler(deviceUpstreamApi));
         // 创建 HttpServer 实例
         this.server = vertx.createHttpServer().requestHandler(router);
+
+        // 创建 MQTT 客户端
+        MqttClientOptions options = new MqttClientOptions()
+                .setClientId("yudao-iot-server-" + UUID.randomUUID())
+                .setUsername(emqxProperties.getMqttUsername())
+                .setPassword(emqxProperties.getMqttPassword())
+                .setSsl(emqxProperties.isMqttSsl());
+        client = MqttClient.create(vertx, options);
     }
 
     /**
-     * 启动 HTTP 服务器
+     * 启动 HTTP 服务器、MQTT 客户端
      */
     public void start() {
+        // 1. 启动 HTTP 服务器
         log.info("[start][开始启动]");
         server.listen(emqxProperties.getAuthPort())
                 .toCompletionStage()
                 .toCompletableFuture()
                 .join();
-        log.info("[start][启动完成,端口({})]", this.server.actualPort());
+        log.info("[start][HTTP服务器启动完成,端口({})]", this.server.actualPort());
+
+        // 2. 连接 MQTT Broker
+        connectMqtt();
+
+        // 3. 添加 MQTT 断开重连监听器
+        client.closeHandler(v -> {
+            log.warn("[closeHandler][MQTT 连接已断开,准备重连]");
+            // 等待 5 秒后重连,避免频繁重连
+            vertx.setTimer(5000, id -> {
+                log.info("[closeHandler][开始重新连接 MQTT]");
+                connectMqtt();
+            });
+        });
+
+        // 4. 设置 MQTT 消息处理器
+        client.publishHandler(message -> {
+            String topic = message.topicName();
+            String payload = message.payload().toString();
+            log.info("[messageHandler][接收到消息][topic: {}][payload: {}]", topic, payload);
+
+            try {
+                // 4.1 处理设备属性上报消息: /{productKey}/{deviceName}/event/property/post
+                if (topic.contains("/event/property/post")) {
+                    // 4.2 解析消息内容
+                    JSONObject jsonObject = JSONUtil.parseObj(payload);
+                    String requestId = jsonObject.getStr("id");
+                    Long timestamp = jsonObject.getLong("timestamp");
+
+                    // 4.3 从 topic 中解析设备标识
+                    String[] topicParts = topic.split("/");
+                    String productKey = topicParts[1];
+                    String deviceName = topicParts[2];
+
+                    // 4.4 构建设备属性上报请求对象
+                    IotDevicePropertyReportReqDTO devicePropertyReportReqDTO = ((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO()
+                            .setRequestId(requestId)
+                            .setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now())
+                            .setProductKey(productKey).setDeviceName(deviceName))
+                            .setProperties(jsonObject.getJSONObject("params"));
+
+                    // 4.5 调用上游 API 处理设备上报数据
+                    deviceUpstreamApi.reportDeviceProperty(devicePropertyReportReqDTO);
+                    log.info("[messageHandler][处理设备上行消息成功][topic: {}][devicePropertyReportReqDTO: {}]",
+                            topic, JSONUtil.toJsonStr(devicePropertyReportReqDTO));
+                }
+            } catch (Exception e) {
+                log.error("[messageHandler][处理消息失败][topic: {}][payload: {}]", topic, payload, e);
+            }
+        });
+    }
+
+    /**
+     * 连接 MQTT Broker 并订阅主题
+     */
+    private void connectMqtt() {
+        // 连接 MQTT Broker
+        client.connect(emqxProperties.getMqttPort(), emqxProperties.getMqttHost())
+                .onSuccess(connAck -> {
+                    log.info("[connectMqtt][MQTT客户端连接成功]");
+                    // 连接成功后订阅主题
+                    String mqttTopics = emqxProperties.getMqttTopics();
+                    String[] topics = mqttTopics.split(",");
+                    for (String topic : topics) {
+                        client.subscribe(topic, 1)
+                                .onSuccess(v -> log.info("[connectMqtt][成功订阅主题: {}]", topic))
+                                .onFailure(err -> log.error("[connectMqtt][订阅主题失败: {}]", topic, err));
+                    }
+                    log.info("[connectMqtt][开始订阅设备上行消息主题]");
+                })
+                .onFailure(err -> {
+                    log.error("[connectMqtt][连接 MQTT Broker 失败]", err);
+                    // 连接失败后,等待 5 秒重试
+                    vertx.setTimer(5000, id -> {
+                        log.info("[connectMqtt][准备重新连接 MQTT]");
+                        connectMqtt();
+                    });
+                });
     }
 
     /**
@@ -67,6 +166,14 @@ public class IotDeviceUpstreamServer {
                         .join();
             }
 
+            // 关闭 MQTT 客户端
+            if (client != null) {
+                client.disconnect()
+                        .toCompletionStage()
+                        .toCompletableFuture()
+                        .join();
+            }
+
             // 关闭 Vertx 实例
             if (vertx != null) {
                 vertx.close()

+ 6 - 4
yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/resources/application.yml

@@ -10,8 +10,10 @@ yudao:
         downstream-port: 8100
         plugin-key: yudao-module-iot-plugin-emqx
       emqx:
-        host: 127.0.0.1
-        port: 1883
-        ssl: false
-        topics: "/sys/#"
+        mqtt-host: 127.0.0.1
+        mqtt-port: 1883
+        mqtt-ssl: false
+        mqtt-username: yudao
+        mqtt-password: yudao
+        mqtt-topics: "/+/#"
         auth-port: 8101