Bladeren bron

【功能完善】IoT: 更新 MQTT 主题配置,重构设备属性和事件上报处理逻辑,优化消息处理流程

安浩浩 5 maanden geleden
bovenliggende
commit
4746281df9

+ 71 - 11
yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java

@@ -4,6 +4,7 @@ import cn.hutool.core.util.IdUtil;
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
 import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
+import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEventReportReqDTO;
 import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO;
 import cn.iocoder.yudao.module.iot.plugin.common.config.IotPluginCommonProperties;
 import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamServer;
@@ -24,13 +25,26 @@ import java.time.LocalDateTime;
  * IoT 设备下行服务端,接收来自 device 设备的请求,转发给 server 服务器
  * <p>
  * 协议:HTTP、MQTT
+ * 参考:<a href=
+ * "https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services?spm=a2c4g.11186623.0.0.97a72915vRck44#section-g4j-5zg-12b">...</a>
  *
  * @author haohao
  */
 @Slf4j
 public class IotDeviceUpstreamServer {
 
-    private static final String PROPERTY_POST_TOPIC = "/event/property/post";
+    // 设备上报属性 标准 JSON
+    // 请求Topic:/sys/${productKey}/${deviceName}/thing/event/property/post
+    // 响应Topic:/sys/${productKey}/${deviceName}/thing/event/property/post_reply
+    // 设备上报事件 标准 JSON
+    // 请求Topic:/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post
+    // 响应Topic:/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post_reply
+
+    private static final String SYS_TOPIC_PREFIX = "/sys/";
+    private static final String PROPERTY_POST_TOPIC = "/thing/event/property/post";
+    private static final String EVENT_POST_TOPIC_PREFIX = "/thing/event/";
+    private static final String EVENT_POST_TOPIC_SUFFIX = "/post";
+
     private static final int RECONNECT_DELAY = 5000; // 重连延迟时间(毫秒)
     private static final int QOS_LEVEL = 1;
 
@@ -111,26 +125,41 @@ public class IotDeviceUpstreamServer {
      * 处理 MQTT 消息
      */
     private void handleMessage(String topic, String payload) {
+        // 校验前缀
+        if (!topic.startsWith(SYS_TOPIC_PREFIX)) {
+            log.warn("[handleMessage][未知的消息类型][topic: {}]", topic);
+            return;
+        }
+
         // 处理设备属性上报消息
-        if (topic.contains(PROPERTY_POST_TOPIC)) {
+        if (topic.endsWith(PROPERTY_POST_TOPIC)) {
+            log.info("[handleMessage][接收到设备属性上报][topic: {}]", topic);
             handlePropertyPost(topic, payload);
+            return;
         }
+
+        // 处理设备事件上报消息
+        if (topic.contains(EVENT_POST_TOPIC_PREFIX) && topic.endsWith(EVENT_POST_TOPIC_SUFFIX)) {
+            log.info("[handleMessage][接收到设备事件上报][topic: {}]", topic);
+            handleEventPost(topic, payload);
+            return;
+        }
+
+        // 未知消息类型
+        log.warn("[handleMessage][未知的消息类型][topic: {}]", topic);
     }
 
     /**
      * 处理设备属性上报
      */
     private void handlePropertyPost(String topic, String payload) {
+        // /sys/${productKey}/${deviceName}/thing/event/property/post
         // 解析消息内容
         JSONObject jsonObject = JSONUtil.parseObj(payload);
         String[] topicParts = topic.split("/");
 
         // 构建设备属性上报请求对象
-        IotDevicePropertyReportReqDTO reportReqDTO = buildPropertyReportDTO(
-                jsonObject,
-                topicParts[1], // productKey
-                topicParts[2] // deviceName
-        );
+        IotDevicePropertyReportReqDTO reportReqDTO = buildPropertyReportDTO(jsonObject, topicParts);
 
         // 调用上游 API 处理设备上报数据
         deviceUpstreamApi.reportDeviceProperty(reportReqDTO);
@@ -138,21 +167,52 @@ public class IotDeviceUpstreamServer {
                 topic, JSONUtil.toJsonStr(reportReqDTO));
     }
 
+    /**
+     * 处理设备事件上报
+     */
+    private void handleEventPost(String topic, String payload) {
+        // /sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post
+        // 解析消息内容
+        JSONObject jsonObject = JSONUtil.parseObj(payload);
+        String[] topicParts = topic.split("/");
+
+        // 构建设备事件上报请求对象
+        IotDeviceEventReportReqDTO reportReqDTO = buildEventReportDTO(jsonObject, topicParts);
+
+        // 调用上游 API 处理设备上报数据
+        deviceUpstreamApi.reportDeviceEvent(reportReqDTO);
+        log.info("[handleEventPost][处理设备上行消息成功][topic: {}][reportReqDTO: {}]",
+                topic, JSONUtil.toJsonStr(reportReqDTO));
+    }
+
     /**
      * 构建设备属性上报请求对象
      */
     private IotDevicePropertyReportReqDTO buildPropertyReportDTO(JSONObject jsonObject,
-                                                                 String productKey,
-                                                                 String deviceName) {
+                                                                 String[] topicParts) {
         return ((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO()
                 .setRequestId(jsonObject.getStr("id"))
                 .setProcessId(IotPluginCommonUtils.getProcessId())
                 .setReportTime(LocalDateTime.now())
-                .setProductKey(productKey)
-                .setDeviceName(deviceName))
+                .setProductKey(topicParts[2])
+                .setDeviceName(topicParts[3]))
                 .setProperties(jsonObject.getJSONObject("params"));
     }
 
+    /**
+     * 构建设备事件上报请求对象
+     */
+    private IotDeviceEventReportReqDTO buildEventReportDTO(JSONObject jsonObject, String[] topicParts) {
+        return ((IotDeviceEventReportReqDTO) new IotDeviceEventReportReqDTO()
+                .setRequestId(jsonObject.getStr("id"))
+                .setProcessId(IotPluginCommonUtils.getProcessId())
+                .setReportTime(LocalDateTime.now())
+                .setProductKey(topicParts[2])
+                .setDeviceName(topicParts[3]))
+                .setIdentifier(topicParts[4])
+                .setParams(jsonObject.getJSONObject("params"));
+    }
+
     /**
      * 重连 MQTT 客户端
      */

+ 1 - 1
yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/resources/application.yml

@@ -15,5 +15,5 @@ yudao:
         mqtt-ssl: false
         mqtt-username: yudao
         mqtt-password: 123456
-        mqtt-topics: "/+/#"
+        mqtt-topics: "/sys/#"
         auth-port: 8101