소스 검색

【功能完善】IoT: 添加 MQTT 主题非法错误码,重构设备服务调用和属性设置逻辑,优化 MQTT 消息处理流程

安浩浩 5 달 전
부모
커밋
006ef40c4b

+ 3 - 0
yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java

@@ -67,4 +67,7 @@ public interface ErrorCodeConstants {
     ErrorCode OTA_UPGRADE_RECORD_DUPLICATE = new ErrorCode(1_050_008_007, "升级记录重复");
     ErrorCode OTA_UPGRADE_RECORD_CANNOT_RETRY = new ErrorCode(1_050_008_008, "升级记录不能重试");
 
+    // ========== MQTT 通信相关 1-050-009-000 ==========
+    ErrorCode MQTT_TOPIC_ILLEGAL = new ErrorCode(1_050_009_000, "topic illegal");
+
 }

+ 28 - 5
yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotPluginEmqxAutoConfiguration.java

@@ -1,9 +1,14 @@
 package cn.iocoder.yudao.module.iot.plugin.emqx.config;
 
+import cn.hutool.core.util.IdUtil;
 import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
 import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamHandler;
 import cn.iocoder.yudao.module.iot.plugin.emqx.downstream.IotDeviceDownstreamHandlerImpl;
 import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.IotDeviceUpstreamServer;
+import io.vertx.core.Vertx;
+import io.vertx.mqtt.MqttClient;
+import io.vertx.mqtt.MqttClientOptions;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -13,19 +18,37 @@ import org.springframework.context.annotation.Configuration;
  *
  * @author haohao
  */
+@Slf4j
 @Configuration
 @EnableConfigurationProperties(IotPluginEmqxProperties.class)
 public class IotPluginEmqxAutoConfiguration {
 
+    @Bean
+    public Vertx vertx() {
+        return Vertx.vertx();
+    }
+
+    @Bean
+    public MqttClient mqttClient(Vertx vertx, IotPluginEmqxProperties emqxProperties) {
+        MqttClientOptions options = new MqttClientOptions()
+                .setClientId("yudao-iot-downstream-" + IdUtil.fastSimpleUUID())
+                .setUsername(emqxProperties.getMqttUsername())
+                .setPassword(emqxProperties.getMqttPassword())
+                .setSsl(emqxProperties.isMqttSsl());
+        return MqttClient.create(vertx, options);
+    }
+
     @Bean(initMethod = "start", destroyMethod = "stop")
     public IotDeviceUpstreamServer deviceUpstreamServer(IotDeviceUpstreamApi deviceUpstreamApi,
-                                                        IotPluginEmqxProperties emqxProperties) {
-        return new IotDeviceUpstreamServer(emqxProperties, deviceUpstreamApi);
+                                                        IotPluginEmqxProperties emqxProperties,
+                                                        Vertx vertx,
+                                                        MqttClient mqttClient) {
+        return new IotDeviceUpstreamServer(emqxProperties, deviceUpstreamApi, vertx, mqttClient);
     }
 
     @Bean
-    public IotDeviceDownstreamHandler deviceDownstreamHandler() {
-        return new IotDeviceDownstreamHandlerImpl();
+    public IotDeviceDownstreamHandler deviceDownstreamHandler(MqttClient mqttClient) {
+        return new IotDeviceDownstreamHandlerImpl(mqttClient);
     }
 
-}
+}

+ 147 - 10
yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/downstream/IotDeviceDownstreamHandlerImpl.java

@@ -1,8 +1,19 @@
 package cn.iocoder.yudao.module.iot.plugin.emqx.downstream;
 
+import cn.hutool.core.util.IdUtil;
+import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
 import cn.iocoder.yudao.framework.common.pojo.CommonResult;
 import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*;
 import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamHandler;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.mqtt.MqttClient;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Map;
+
+import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.MQTT_TOPIC_ILLEGAL;
 
 /**
  * EMQX 插件的 {@link IotDeviceDownstreamHandler} 实现类
@@ -10,14 +21,61 @@ import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamH
  *
  * @author 芋道源码
  */
+@Slf4j
 public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandler {
 
+    private static final String SYS_TOPIC_PREFIX = "/sys/";
+
+    // 设备服务调用 标准 JSON
+    // 请求Topic:/sys/${productKey}/${deviceName}/thing/service/${tsl.service.identifier}
+    // 响应Topic:/sys/${productKey}/${deviceName}/thing/service/${tsl.service.identifier}_reply
+    private static final String SERVICE_TOPIC_PREFIX = "/thing/service/";
+
+    // 设置设备属性 标准 JSON
+    // 请求Topic:/sys/${productKey}/${deviceName}/thing/service/property/set
+    // 响应Topic:/sys/${productKey}/${deviceName}/thing/service/property/set_reply
+    private static final String PROPERTY_SET_TOPIC = "/thing/service/property/set";
+
+    private final MqttClient mqttClient;
+
+    /**
+     * 构造函数
+     *
+     * @param mqttClient MQTT客户端
+     */
+    public IotDeviceDownstreamHandlerImpl(MqttClient mqttClient) {
+        this.mqttClient = mqttClient;
+    }
+
     @Override
-    public CommonResult<Boolean> invokeDeviceService(IotDeviceServiceInvokeReqDTO invokeReqDTO) {
-        // 设备服务调用
-        // 请求Topic:/sys/${productKey}/${deviceName}/thing/service/${tsl.service.identifier}
-        // 响应Topic:/sys/${productKey}/${deviceName}/thing/service/${tsl.service.identifier}_reply
-        return CommonResult.success(true);
+    public CommonResult<Boolean> invokeDeviceService(IotDeviceServiceInvokeReqDTO reqDTO) {
+        log.info("[invokeService][开始调用设备服务][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO));
+
+        // 验证参数
+        if (reqDTO.getProductKey() == null || reqDTO.getDeviceName() == null || reqDTO.getIdentifier() == null) {
+            log.error("[invokeService][参数不完整][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO));
+            return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg());
+        }
+
+        try {
+            // 构建请求主题
+            String topic = buildServiceTopic(reqDTO.getProductKey(), reqDTO.getDeviceName(), reqDTO.getIdentifier());
+
+            // 生成请求ID(如果没有提供)
+            String requestId = reqDTO.getRequestId() != null ? reqDTO.getRequestId() : generateRequestId();
+
+            // 构建请求消息
+            JSONObject request = buildServiceRequest(requestId, reqDTO.getIdentifier(), reqDTO.getParams());
+
+            // 发送消息
+            publishMessage(topic, request);
+
+            log.info("[invokeService][调用设备服务成功][requestId: {}][topic: {}]", requestId, topic);
+            return CommonResult.success(true);
+        } catch (Exception e) {
+            log.error("[invokeService][调用设备服务异常][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO), e);
+            return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg());
+        }
     }
 
     @Override
@@ -26,11 +84,34 @@ public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandle
     }
 
     @Override
-    public CommonResult<Boolean> setDeviceProperty(IotDevicePropertySetReqDTO setReqDTO) {
-        // 设置设备属性 标准 JSON
-        // 请求Topic:/sys/${productKey}/${deviceName}/thing/service/property/set
-        // 响应Topic:/sys/${productKey}/${deviceName}/thing/service/property/set_reply
-        return CommonResult.success(true);
+    public CommonResult<Boolean> setDeviceProperty(IotDevicePropertySetReqDTO reqDTO) {
+        log.info("[setProperty][开始设置设备属性][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO));
+
+        // 验证参数
+        if (reqDTO.getProductKey() == null || reqDTO.getDeviceName() == null) {
+            log.error("[setProperty][参数不完整][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO));
+            return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg());
+        }
+
+        try {
+            // 构建请求主题
+            String topic = buildPropertySetTopic(reqDTO.getProductKey(), reqDTO.getDeviceName());
+
+            // 生成请求ID(如果没有提供)
+            String requestId = reqDTO.getRequestId() != null ? reqDTO.getRequestId() : generateRequestId();
+
+            // 构建请求消息
+            JSONObject request = buildPropertySetRequest(requestId, reqDTO.getProperties());
+
+            // 发送消息
+            publishMessage(topic, request);
+
+            log.info("[setProperty][设置设备属性成功][requestId: {}][topic: {}]", requestId, topic);
+            return CommonResult.success(true);
+        } catch (Exception e) {
+            log.error("[setProperty][设置设备属性异常][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO), e);
+            return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg());
+        }
     }
 
     @Override
@@ -43,4 +124,60 @@ public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandle
         return CommonResult.success(true);
     }
 
+    /**
+     * 构建服务调用主题
+     */
+    private String buildServiceTopic(String productKey, String deviceName, String serviceIdentifier) {
+        return SYS_TOPIC_PREFIX + productKey + "/" + deviceName + SERVICE_TOPIC_PREFIX + serviceIdentifier;
+    }
+
+    /**
+     * 构建属性设置主题
+     */
+    private String buildPropertySetTopic(String productKey, String deviceName) {
+        return SYS_TOPIC_PREFIX + productKey + "/" + deviceName + PROPERTY_SET_TOPIC;
+    }
+
+    /**
+     * 构建服务调用请求
+     */
+    private JSONObject buildServiceRequest(String requestId, String serviceIdentifier, Map<String, Object> params) {
+        return new JSONObject()
+                .set("id", requestId)
+                .set("version", "1.0")
+                .set("method", "thing.service." + serviceIdentifier)
+                .set("params", params != null ? params : new JSONObject());
+    }
+
+    /**
+     * 构建属性设置请求
+     */
+    private JSONObject buildPropertySetRequest(String requestId, Map<String, Object> properties) {
+        return new JSONObject()
+                .set("id", requestId)
+                .set("version", "1.0")
+                .set("method", "thing.service.property.set")
+                .set("params", properties);
+    }
+
+    /**
+     * 发布MQTT消息
+     */
+    private void publishMessage(String topic, JSONObject payload) {
+        mqttClient.publish(
+                topic,
+                Buffer.buffer(payload.toString()),
+                MqttQoS.AT_LEAST_ONCE,
+                false,
+                false);
+        log.info("[publishMessage][发送消息成功][topic: {}][payload: {}]", topic, payload);
+    }
+
+    /**
+     * 生成请求ID
+     */
+    private String generateRequestId() {
+        return IdUtil.fastSimpleUUID();
+    }
+
 }

+ 128 - 69
yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java

@@ -1,19 +1,21 @@
 package cn.iocoder.yudao.module.iot.plugin.emqx.upstream;
 
-import cn.hutool.core.util.IdUtil;
 import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
 import cn.iocoder.yudao.module.iot.plugin.emqx.config.IotPluginEmqxProperties;
 import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router.IotDeviceAuthVertxHandler;
 import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router.IotDeviceMqttMessageHandler;
 import io.netty.handler.codec.mqtt.MqttQoS;
+import io.vertx.core.Future;
 import io.vertx.core.Vertx;
 import io.vertx.core.http.HttpServer;
 import io.vertx.ext.web.Router;
 import io.vertx.ext.web.handler.BodyHandler;
 import io.vertx.mqtt.MqttClient;
-import io.vertx.mqtt.MqttClientOptions;
 import lombok.extern.slf4j.Slf4j;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
 /**
  * IoT 设备下行服务端,接收来自 device 设备的请求,转发给 server 服务器
  * <p>
@@ -24,20 +26,26 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class IotDeviceUpstreamServer {
 
-    private static final int RECONNECT_DELAY = 5000; // 重连延迟时间(毫秒)
+    private static final int RECONNECT_DELAY_MS = 5000; // 重连延迟时间(毫秒)
+    private static final int CONNECTION_TIMEOUT_MS = 10000; // 连接超时时间(毫秒)
+    private static final String TOPIC_SEPARATOR = ","; // 主题分隔符
+    private static final MqttQoS DEFAULT_QOS = MqttQoS.AT_LEAST_ONCE; // 默认QoS级别
 
     private final Vertx vertx;
     private final HttpServer server;
     private final MqttClient client;
     private final IotPluginEmqxProperties emqxProperties;
     private final IotDeviceMqttMessageHandler mqttMessageHandler;
+    private volatile boolean isRunning = false; // 服务运行状态标志
 
     public IotDeviceUpstreamServer(IotPluginEmqxProperties emqxProperties,
-                                   IotDeviceUpstreamApi deviceUpstreamApi) {
+                                   IotDeviceUpstreamApi deviceUpstreamApi,
+                                   Vertx vertx,
+                                   MqttClient client) {
+        this.vertx = vertx;
         this.emqxProperties = emqxProperties;
+        this.client = client;
 
-        // 创建 Vertx 实例
-        this.vertx = Vertx.vertx();
         // 创建 Router 实例
         Router router = Router.router(vertx);
         router.route().handler(BodyHandler.create()); // 处理 Body
@@ -45,14 +53,6 @@ public class IotDeviceUpstreamServer {
                 .handler(new IotDeviceAuthVertxHandler(deviceUpstreamApi));
         // 创建 HttpServer 实例
         this.server = vertx.createHttpServer().requestHandler(router);
-
-        // 创建 MQTT 客户端
-        MqttClientOptions options = new MqttClientOptions()
-                .setClientId("yudao-iot-server-" + IdUtil.fastSimpleUUID())
-                .setUsername(emqxProperties.getMqttUsername())
-                .setPassword(emqxProperties.getMqttPassword())
-                .setSsl(emqxProperties.isMqttSsl());
-        client = MqttClient.create(vertx, options);
         this.mqttMessageHandler = new IotDeviceMqttMessageHandler(deviceUpstreamApi, client);
     }
 
@@ -60,25 +60,45 @@ public class IotDeviceUpstreamServer {
      * 启动 HTTP 服务器、MQTT 客户端
      */
     public void start() {
+        if (isRunning) {
+            log.warn("服务已经在运行中,请勿重复启动");
+            return;
+        }
+
+        log.info("[start] 开始启动服务");
+
         // 1. 启动 HTTP 服务器
-        log.info("[start][开始启动]");
-        server.listen(emqxProperties.getAuthPort())
+        CompletableFuture<Void> httpFuture = server.listen(emqxProperties.getAuthPort())
                 .toCompletionStage()
                 .toCompletableFuture()
-                .join();
-        log.info("[start][HTTP服务器启动完成,端口({})]", this.server.actualPort());
+                .thenAccept(v -> log.info("[start] HTTP服务器启动完成,端口: {}", server.actualPort()));
 
         // 2. 连接 MQTT Broker
-        connectMqtt();
-
-        // 3. 添加 MQTT 断开重连监听器
-        client.closeHandler(v -> {
-            log.warn("[closeHandler][MQTT 连接已断开,准备重连]");
-            reconnectWithDelay();
-        });
+        CompletableFuture<Void> mqttFuture = connectMqtt()
+                .toCompletionStage()
+                .toCompletableFuture()
+                .thenAccept(v -> {
+                    // 3. 添加 MQTT 断开重连监听器
+                    client.closeHandler(closeEvent -> {
+                        log.warn("[closeHandler] MQTT连接已断开,准备重连");
+                        reconnectWithDelay();
+                    });
+
+                    // 4. 设置 MQTT 消息处理器
+                    setupMessageHandler();
+                });
 
-        // 4. 设置 MQTT 消息处理器
-        setupMessageHandler();
+        // 等待所有服务启动完成
+        CompletableFuture.allOf(httpFuture, mqttFuture)
+                .orTimeout(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+                .whenComplete((result, error) -> {
+                    if (error != null) {
+                        log.error("[start] 服务启动失败", error);
+                    } else {
+                        isRunning = true;
+                        log.info("[start] 所有服务启动完成");
+                    }
+                });
     }
 
     /**
@@ -86,79 +106,118 @@ public class IotDeviceUpstreamServer {
      */
     private void setupMessageHandler() {
         client.publishHandler(mqttMessageHandler::handle);
+        log.debug("[setupMessageHandler] MQTT消息处理器设置完成");
     }
 
     /**
      * 重连 MQTT 客户端
      */
     private void reconnectWithDelay() {
-        vertx.setTimer(RECONNECT_DELAY, id -> {
-            log.info("[reconnectWithDelay][开始重新连接 MQTT]");
+        if (!isRunning) {
+            log.info("[reconnectWithDelay] 服务已停止,不再尝试重连");
+            return;
+        }
+
+        vertx.setTimer(RECONNECT_DELAY_MS, id -> {
+            log.info("[reconnectWithDelay] 开始重新连接MQTT");
             connectMqtt();
         });
     }
 
     /**
      * 连接 MQTT Broker 并订阅主题
+     *
+     * @return 连接结果的Future
      */
-    private void connectMqtt() {
-        client.connect(emqxProperties.getMqttPort(), emqxProperties.getMqttHost())
-                .onSuccess(connAck -> {
-                    log.info("[connectMqtt][MQTT客户端连接成功]");
-                    subscribeToTopics();
+    private Future<Void> connectMqtt() {
+        return client.connect(emqxProperties.getMqttPort(), emqxProperties.getMqttHost())
+                .compose(connAck -> {
+                    log.info("[connectMqtt] MQTT客户端连接成功");
+                    return subscribeToTopics();
                 })
-                .onFailure(err -> {
-                    log.error("[connectMqtt][连接 MQTT Broker 失败]", err);
+                .recover(err -> {
+                    log.error("[connectMqtt] 连接MQTT Broker失败: {}", err.getMessage());
                     reconnectWithDelay();
+                    return Future.failedFuture(err);
                 });
     }
 
     /**
      * 订阅设备上行消息主题
+     *
+     * @return 订阅结果的Future
      */
-    private void subscribeToTopics() {
-        String[] topics = emqxProperties.getMqttTopics().split(",");
+    private Future<Void> subscribeToTopics() {
+        String topicsStr = emqxProperties.getMqttTopics();
+        if (topicsStr == null || topicsStr.trim().isEmpty()) {
+            log.warn("[subscribeToTopics] 未配置MQTT主题,跳过订阅");
+            return Future.succeededFuture();
+        }
+
+        log.info("[subscribeToTopics] 开始订阅设备上行消息主题");
+
+        String[] topics = topicsStr.split(TOPIC_SEPARATOR);
+        Future<Void> compositeFuture = Future.succeededFuture();
+
         for (String topic : topics) {
-            client.subscribe(topic, MqttQoS.AT_LEAST_ONCE.value())
-                    .onSuccess(v -> log.info("[subscribeToTopics][成功订阅主题: {}]", topic))
-                    .onFailure(err -> log.error("[subscribeToTopics][订阅主题失败: {}]", topic, err));
+            String trimmedTopic = topic.trim();
+            if (trimmedTopic.isEmpty()) {
+                continue;
+            }
+
+            compositeFuture = compositeFuture.compose(v -> client.subscribe(trimmedTopic, DEFAULT_QOS.value())
+                    .<Void>map(ack -> {
+                        log.info("[subscribeToTopics] 成功订阅主题: {}", trimmedTopic);
+                        return null;
+                    })
+                    .recover(err -> {
+                        log.error("[subscribeToTopics] 订阅主题失败: {}, 原因: {}",
+                                trimmedTopic, err.getMessage());
+                        return Future.<Void>succeededFuture(); // 继续订阅其他主题
+                    }));
         }
-        log.info("[subscribeToTopics][开始订阅设备上行消息主题]");
+
+        return compositeFuture;
     }
 
     /**
-     * 停止所有
+     * 停止所有服务
      */
     public void stop() {
-        log.info("[stop][开始关闭]");
-        try {
-            // 关闭 HTTP 服务器
-            if (server != null) {
-                server.close()
-                        .toCompletionStage()
-                        .toCompletableFuture()
-                        .join();
-            }
+        if (!isRunning) {
+            log.warn("[stop] 服务未运行,无需停止");
+            return;
+        }
 
-            // 关闭 MQTT 客户端
-            if (client != null) {
-                client.disconnect()
-                        .toCompletionStage()
-                        .toCompletableFuture()
-                        .join();
-            }
+        log.info("[stop] 开始关闭服务");
+        isRunning = false;
 
-            // 关闭 Vertx 实例
-            if (vertx != null) {
-                vertx.close()
-                        .toCompletionStage()
-                        .toCompletableFuture()
-                        .join();
-            }
-            log.info("[stop][关闭完成]");
+        try {
+            CompletableFuture<Void> serverFuture = server != null
+                    ? server.close().toCompletionStage().toCompletableFuture()
+                    : CompletableFuture.completedFuture(null);
+
+            CompletableFuture<Void> clientFuture = client != null
+                    ? client.disconnect().toCompletionStage().toCompletableFuture()
+                    : CompletableFuture.completedFuture(null);
+
+            CompletableFuture<Void> vertxFuture = vertx != null
+                    ? vertx.close().toCompletionStage().toCompletableFuture()
+                    : CompletableFuture.completedFuture(null);
+
+            // 等待所有资源关闭
+            CompletableFuture.allOf(serverFuture, clientFuture, vertxFuture)
+                    .orTimeout(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+                    .whenComplete((result, error) -> {
+                        if (error != null) {
+                            log.error("[stop] 服务关闭过程中发生异常", error);
+                        } else {
+                            log.info("[stop] 所有服务关闭完成");
+                        }
+                    });
         } catch (Exception e) {
-            log.error("[stop][关闭异常]", e);
-            throw new RuntimeException(e);
+            log.error("[stop] 关闭服务异常", e);
+            throw new RuntimeException("关闭IoT设备上行服务失败", e);
         }
     }
 }

+ 144 - 66
yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceMqttMessageHandler.java

@@ -13,13 +13,15 @@ import io.vertx.mqtt.messages.MqttPublishMessage;
 import lombok.extern.slf4j.Slf4j;
 
 import java.time.LocalDateTime;
+import java.util.Arrays;
 
 /**
  * IoT 设备 MQTT 消息处理器
  * <p>
  * 参考:
  * <p>
- * "<a href="https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services?spm=a2c4g.11186623.0.0.97a72915vRck44#section-g4j-5zg-12b">...</a>">
+ * "<a href=
+ * "https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services?spm=a2c4g.11186623.0.0.97a72915vRck44#section-g4j-5zg-12b">...</a>">
  */
 @Slf4j
 public class IotDeviceMqttMessageHandler {
@@ -35,6 +37,12 @@ public class IotDeviceMqttMessageHandler {
     private static final String PROPERTY_POST_TOPIC = "/thing/event/property/post";
     private static final String EVENT_POST_TOPIC_PREFIX = "/thing/event/";
     private static final String EVENT_POST_TOPIC_SUFFIX = "/post";
+    private static final String REPLY_SUFFIX = "_reply";
+    private static final int SUCCESS_CODE = 200;
+    private static final String SUCCESS_MESSAGE = "success";
+    private static final String PROPERTY_METHOD = "thing.event.property.post";
+    private static final String EVENT_METHOD_PREFIX = "thing.event.";
+    private static final String EVENT_METHOD_SUFFIX = ".post";
 
     private final IotDeviceUpstreamApi deviceUpstreamApi;
     private final MqttClient mqttClient;
@@ -44,18 +52,33 @@ public class IotDeviceMqttMessageHandler {
         this.mqttClient = mqttClient;
     }
 
+    /**
+     * 处理MQTT消息
+     *
+     * @param message MQTT发布消息
+     */
     public void handle(MqttPublishMessage message) {
         String topic = message.topicName();
         String payload = message.payload().toString();
         log.info("[messageHandler][接收到消息][topic: {}][payload: {}]", topic, payload);
 
         try {
+            if (payload == null || payload.isEmpty()) {
+                log.warn("[messageHandler][消息内容为空][topic: {}]", topic);
+                return;
+            }
             handleMessage(topic, payload);
         } catch (Exception e) {
             log.error("[messageHandler][处理消息失败][topic: {}][payload: {}]", topic, payload, e);
         }
     }
 
+    /**
+     * 根据主题类型处理消息
+     *
+     * @param topic   主题
+     * @param payload 消息内容
+     */
     private void handleMessage(String topic, String payload) {
         // 校验前缀
         if (!topic.startsWith(SYS_TOPIC_PREFIX)) {
@@ -88,34 +111,26 @@ public class IotDeviceMqttMessageHandler {
      * @param payload 消息内容
      */
     private void handlePropertyPost(String topic, String payload) {
-        // 解析消息内容
-        JSONObject jsonObject = JSONUtil.parseObj(payload);
-        String[] topicParts = topic.split("/");
+        try {
+            // 解析消息内容
+            JSONObject jsonObject = JSONUtil.parseObj(payload);
+            String[] topicParts = parseTopic(topic);
+            if (topicParts == null) {
+                return;
+            }
 
-        // 构建设备属性上报请求对象
-        IotDevicePropertyReportReqDTO reportReqDTO = buildPropertyReportDTO(jsonObject, topicParts);
+            // 构建设备属性上报请求对象
+            IotDevicePropertyReportReqDTO reportReqDTO = buildPropertyReportDTO(jsonObject, topicParts);
 
-        // 调用上游 API 处理设备上报数据
-        deviceUpstreamApi.reportDeviceProperty(reportReqDTO);
-        log.info("[handlePropertyPost][处理设备上行消息成功][topic: {}][reportReqDTO: {}]",
-                topic, JSONUtil.toJsonStr(reportReqDTO));
+            // 调用上游 API 处理设备上报数据
+            deviceUpstreamApi.reportDeviceProperty(reportReqDTO);
+            log.info("[handlePropertyPost][处理设备属性上报成功][topic: {}]", topic);
 
-        // 发送响应消息
-        String replyTopic = topic + "_reply";
-        JSONObject response = new JSONObject()
-                .set("id", jsonObject.getStr("id"))
-                .set("code", 200)
-                .set("data", new JSONObject())
-                .set("message", "success")
-                .set("method", "thing.event.property.post");
-
-        mqttClient.publish(replyTopic,
-                Buffer.buffer(response.toString()),
-                MqttQoS.AT_LEAST_ONCE,
-                false,
-                false);
-        log.info("[handlePropertyPost][发送响应消息成功][topic: {}][response: {}]",
-                replyTopic, response.toString());
+            // 发送响应消息
+            sendResponse(topic, jsonObject, PROPERTY_METHOD, null);
+        } catch (Exception e) {
+            log.error("[handlePropertyPost][处理设备属性上报失败][topic: {}][payload: {}]", topic, payload, e);
+        }
     }
 
     /**
@@ -125,35 +140,97 @@ public class IotDeviceMqttMessageHandler {
      * @param payload 消息内容
      */
     private void handleEventPost(String topic, String payload) {
-        // 解析消息内容
-        JSONObject jsonObject = JSONUtil.parseObj(payload);
+        try {
+            // 解析消息内容
+            JSONObject jsonObject = JSONUtil.parseObj(payload);
+            String[] topicParts = parseTopic(topic);
+            if (topicParts == null) {
+                return;
+            }
+
+            // 构建设备事件上报请求对象
+            IotDeviceEventReportReqDTO reportReqDTO = buildEventReportDTO(jsonObject, topicParts);
+
+            // 调用上游 API 处理设备上报数据
+            deviceUpstreamApi.reportDeviceEvent(reportReqDTO);
+            log.info("[handleEventPost][处理设备事件上报成功][topic: {}]", topic);
+
+            // 从 topic 中获取事件标识符
+            String eventIdentifier = getEventIdentifier(topicParts, topic);
+            if (eventIdentifier == null) {
+                return;
+            }
+
+            // 发送响应消息
+            String method = EVENT_METHOD_PREFIX + eventIdentifier + EVENT_METHOD_SUFFIX;
+            sendResponse(topic, jsonObject, method, null);
+        } catch (Exception e) {
+            log.error("[handleEventPost][处理设备事件上报失败][topic: {}][payload: {}]", topic, payload, e);
+        }
+    }
+
+    /**
+     * 解析主题,获取主题各部分
+     *
+     * @param topic 主题
+     * @return 主题各部分数组,如果解析失败返回null
+     */
+    private String[] parseTopic(String topic) {
         String[] topicParts = topic.split("/");
+        if (topicParts.length < 7) {
+            log.warn("[parseTopic][主题格式不正确][topic: {}]", topic);
+            return null;
+        }
+        return topicParts;
+    }
 
-        // 构建设备事件上报请求对象
-        IotDeviceEventReportReqDTO reportReqDTO = buildEventReportDTO(jsonObject, topicParts);
+    /**
+     * 从主题部分中获取事件标识符
+     *
+     * @param topicParts 主题各部分
+     * @param topic      原始主题,用于日志
+     * @return 事件标识符,如果获取失败返回null
+     */
+    private String getEventIdentifier(String[] topicParts, String topic) {
+        try {
+            return topicParts[6];
+        } catch (ArrayIndexOutOfBoundsException e) {
+            log.warn("[getEventIdentifier][无法从主题中获取事件标识符][topic: {}][topicParts: {}]",
+                    topic, Arrays.toString(topicParts));
+            return null;
+        }
+    }
 
-        // 调用上游 API 处理设备上报数据
-        deviceUpstreamApi.reportDeviceEvent(reportReqDTO);
-        log.info("[handleEventPost][处理设备上行消息成功][topic: {}][reportReqDTO: {}]",
-                topic, JSONUtil.toJsonStr(reportReqDTO));
+    /**
+     * 发送响应消息
+     *
+     * @param topic      原始主题
+     * @param jsonObject 原始消息JSON对象
+     * @param method     响应方法
+     * @param customData 自定义数据,可为null
+     */
+    private void sendResponse(String topic, JSONObject jsonObject, String method, JSONObject customData) {
+        String replyTopic = topic + REPLY_SUFFIX;
+        JSONObject data = customData != null ? customData : new JSONObject();
 
-        // 发送响应消息
-        String replyTopic = topic + "_reply";
-        String eventIdentifier = topicParts[6]; // 从 topic 中获取事件标识符
         JSONObject response = new JSONObject()
                 .set("id", jsonObject.getStr("id"))
-                .set("code", 200)
-                .set("data", new JSONObject())
-                .set("message", "success")
-                .set("method", "thing.event." + eventIdentifier + ".post");
-
-        mqttClient.publish(replyTopic,
-                Buffer.buffer(response.toString()),
-                MqttQoS.AT_LEAST_ONCE,
-                false,
-                false);
-        log.info("[handleEventPost][发送响应消息成功][topic: {}][response: {}]",
-                replyTopic, response.toString());
+                .set("code", SUCCESS_CODE)
+                .set("data", data)
+                .set("message", SUCCESS_MESSAGE)
+                .set("method", method);
+
+        try {
+            mqttClient.publish(replyTopic,
+                    Buffer.buffer(response.toString()),
+                    MqttQoS.AT_LEAST_ONCE,
+                    false,
+                    false);
+            log.info("[sendResponse][发送响应消息成功][topic: {}]", replyTopic);
+        } catch (Exception e) {
+            log.error("[sendResponse][发送响应消息失败][topic: {}][response: {}]",
+                    replyTopic, response.toString(), e);
+        }
     }
 
     /**
@@ -163,15 +240,15 @@ public class IotDeviceMqttMessageHandler {
      * @param topicParts 主题部分
      * @return 设备属性上报请求对象
      */
-    private IotDevicePropertyReportReqDTO buildPropertyReportDTO(JSONObject jsonObject,
-                                                                 String[] topicParts) {
-        return ((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO()
-                .setRequestId(jsonObject.getStr("id"))
-                .setProcessId(IotPluginCommonUtils.getProcessId())
-                .setReportTime(LocalDateTime.now())
-                .setProductKey(topicParts[2])
-                .setDeviceName(topicParts[3]))
-                .setProperties(jsonObject.getJSONObject("params"));
+    private IotDevicePropertyReportReqDTO buildPropertyReportDTO(JSONObject jsonObject, String[] topicParts) {
+        IotDevicePropertyReportReqDTO reportReqDTO = new IotDevicePropertyReportReqDTO();
+        reportReqDTO.setRequestId(jsonObject.getStr("id"));
+        reportReqDTO.setProcessId(IotPluginCommonUtils.getProcessId());
+        reportReqDTO.setReportTime(LocalDateTime.now());
+        reportReqDTO.setProductKey(topicParts[2]);
+        reportReqDTO.setDeviceName(topicParts[3]);
+        reportReqDTO.setProperties(jsonObject.getJSONObject("params"));
+        return reportReqDTO;
     }
 
     /**
@@ -182,13 +259,14 @@ public class IotDeviceMqttMessageHandler {
      * @return 设备事件上报请求对象
      */
     private IotDeviceEventReportReqDTO buildEventReportDTO(JSONObject jsonObject, String[] topicParts) {
-        return ((IotDeviceEventReportReqDTO) new IotDeviceEventReportReqDTO()
-                .setRequestId(jsonObject.getStr("id"))
-                .setProcessId(IotPluginCommonUtils.getProcessId())
-                .setReportTime(LocalDateTime.now())
-                .setProductKey(topicParts[2])
-                .setDeviceName(topicParts[3]))
-                .setIdentifier(topicParts[4])
-                .setParams(jsonObject.getJSONObject("params"));
+        IotDeviceEventReportReqDTO reportReqDTO = new IotDeviceEventReportReqDTO();
+        reportReqDTO.setRequestId(jsonObject.getStr("id"));
+        reportReqDTO.setProcessId(IotPluginCommonUtils.getProcessId());
+        reportReqDTO.setReportTime(LocalDateTime.now());
+        reportReqDTO.setProductKey(topicParts[2]);
+        reportReqDTO.setDeviceName(topicParts[3]);
+        reportReqDTO.setIdentifier(topicParts[6]);
+        reportReqDTO.setParams(jsonObject.getJSONObject("params"));
+        return reportReqDTO;
     }
 }