Browse Source

【代码优化】IoT: 数据桥梁的执行器抽离

puhui999 5 months ago
parent
commit
0400932260

+ 1 - 0
yudao-framework/yudao-spring-boot-starter-mq/pom.xml

@@ -36,6 +36,7 @@
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-spring-boot-starter</artifactId>
+            <optional>true</optional>
         </dependency>
     </dependencies>
 

+ 4 - 3
yudao-module-iot/yudao-module-iot-biz/pom.xml

@@ -75,10 +75,11 @@
 <!--            <groupId>org.eclipse.paho</groupId> &lt;!&ndash; MQTT &ndash;&gt;-->
 <!--            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>-->
 <!--        </dependency>-->
-        <!-- 工具类相关 -->
+        <!-- 消息队列相关 -->
         <dependency>
-            <groupId>cn.iocoder.boot</groupId>
-            <artifactId>yudao-spring-boot-starter-mq</artifactId>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-spring-boot-starter</artifactId>
+            <optional>true</optional>
         </dependency>
 
         <dependency>

+ 6 - 151
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java

@@ -1,33 +1,18 @@
 package cn.iocoder.yudao.module.iot.service.rule.action;
 
-import cn.hutool.core.collection.CollUtil;
 import cn.hutool.core.lang.Assert;
 import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
-import cn.iocoder.yudao.framework.common.util.http.HttpUtils;
-import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
 import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
 import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotRuleSceneDO;
-import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum;
 import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneActionTypeEnum;
 import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
 import cn.iocoder.yudao.module.iot.service.rule.IotDataBridgeService;
+import cn.iocoder.yudao.module.iot.service.rule.execute.IotDataBridgeExecute;
 import jakarta.annotation.Resource;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.client.producer.SendStatus;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.springframework.http.*;
 import org.springframework.stereotype.Component;
-import org.springframework.web.client.RestTemplate;
-import org.springframework.web.util.UriComponentsBuilder;
 
-import java.time.LocalDateTime;
-import java.util.HashMap;
-import java.util.Map;
-
-import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
+import java.util.List;
 
 /**
  * IoT 数据桥梁的 {@link IotRuleSceneAction} 实现类
@@ -41,11 +26,10 @@ import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_
 @Slf4j
 public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction {
 
-    @Resource
-    private RestTemplate restTemplate;
-
     @Resource
     private IotDataBridgeService dataBridgeService;
+    @Resource
+    private List<IotDataBridgeExecute> dataBridgeExecutes;
 
     @Override
     public void execute(IotDeviceMessage message, IotRuleSceneDO.ActionConfig config) {
@@ -65,26 +49,8 @@ public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction {
             return;
         }
 
-        // 2.1 执行 HTTP 请求
-        if (IotDataBridgTypeEnum.HTTP.getType().equals(dataBridge.getType())) {
-            executeHttp(message, (IotDataBridgeDO.HttpConfig) dataBridge.getConfig());
-            return;
-        }
-        // 2.2 执行 RocketMQ 发送消息
-        if (IotDataBridgTypeEnum.ROCKETMQ.getType().equals(dataBridge.getType())) {
-            executeRocketMQ(message, (IotDataBridgeDO.RocketMQConfig) dataBridge.getConfig());
-            return;
-        }
-
-        // TODO @芋艿:因为下面的,都是有状态的,所以通过 guava 缓存连接,然后通过 RemovalNotification 实现关闭。例如说,一次新建有效期是 10 分钟;
-        // TODO @芋艿:mq-redis
-        // TODO @芋艿:mq-数据库
-        // TODO @芋艿:kafka
-        // TODO @芋艿:rocketmq
-        // TODO @芋艿:rabbitmq
-        // TODO @芋艿:mqtt
-        // TODO @芋艿:tcp
-        // TODO @芋艿:websocket
+        // 2.1 执行数据桥接操作
+        dataBridgeExecutes.forEach(execute -> execute.execute(message, dataBridge));
     }
 
     @Override
@@ -92,115 +58,4 @@ public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction {
         return IotRuleSceneActionTypeEnum.DATA_BRIDGE;
     }
 
-    @SuppressWarnings({"unchecked", "deprecation"})
-    private void executeHttp(IotDeviceMessage message, IotDataBridgeDO.HttpConfig config) {
-        String url = null;
-        HttpMethod method = HttpMethod.valueOf(config.getMethod().toUpperCase());
-        HttpEntity<String> requestEntity = null;
-        ResponseEntity<String> responseEntity = null;
-        try {
-            // 1.1 构建 Header
-            HttpHeaders headers = new HttpHeaders();
-            if (CollUtil.isNotEmpty(config.getHeaders())) {
-                config.getHeaders().putAll(config.getHeaders());
-            }
-            headers.add(HEADER_TENANT_ID, message.getTenantId().toString());
-            // 1.2 构建 URL
-            UriComponentsBuilder uriBuilder = UriComponentsBuilder.fromUriString(config.getUrl());
-            if (CollUtil.isNotEmpty(config.getQuery())) {
-                config.getQuery().forEach(uriBuilder::queryParam);
-            }
-            // 1.3 构建请求体
-            if (method == HttpMethod.GET) {
-                uriBuilder.queryParam("message", HttpUtils.encodeUtf8(JsonUtils.toJsonString(message)));
-                url = uriBuilder.build().toUriString();
-                requestEntity = new HttpEntity<>(headers);
-            } else {
-                url = uriBuilder.build().toUriString();
-                Map<String, Object> requestBody = JsonUtils.parseObject(config.getBody(), Map.class);
-                if (requestBody == null) {
-                    requestBody = new HashMap<>();
-                }
-                requestBody.put("message", message);
-                headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE);
-                requestEntity = new HttpEntity<>(JsonUtils.toJsonString(requestBody), headers);
-            }
-
-            // 2.1 发送请求
-            responseEntity = restTemplate.exchange(url, method, requestEntity, String.class);
-            // 2.2 记录日志
-            if (responseEntity.getStatusCode().is2xxSuccessful()) {
-                log.info("[executeHttp][message({}) config({}) url({}) method({}) requestEntity({}) 请求成功({})]",
-                        message, config, url, method, requestEntity, responseEntity);
-            } else {
-                log.error("[executeHttp][message({}) config({}) url({}) method({}) requestEntity({}) 请求失败({})]",
-                        message, config, url, method, requestEntity, responseEntity);
-            }
-        } catch (Exception e) {
-            log.error("[executeHttp][message({}) config({}) url({}) method({}) requestEntity({}) 请求异常({})]",
-                    message, config, url, method, requestEntity, responseEntity, e);
-        }
-    }
-
-    private void executeRocketMQ(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) {
-        // 1.1 创建生产者实例,指定生产者组名
-        DefaultMQProducer producer = new DefaultMQProducer(config.getGroup());
-        // TODO @puhui999:可以考虑,基于 guava 做 cache,使用 config 作为 key,然后假个 listener 超时,销毁 producer
-        try {
-            // 1.2 设置 NameServer 地址
-            producer.setNamesrvAddr(config.getNameServer());
-            // 1.3 启动生产者
-            producer.start();
-
-            // 2.1 创建消息对象,指定Topic、Tag和消息体
-            Message msg = new Message(
-                    config.getTopic(),
-                    config.getTags(),
-                    message.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)
-            );
-            // 2.2 发送同步消息并处理结果
-            SendResult sendResult = producer.send(msg);
-            // 2.3 处理发送结果
-            if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
-                log.info("[executeRocketMQ][message({}) config({}) 发送成功,结果({})]", message, config, sendResult);
-            } else {
-                log.error("[executeRocketMQ][message({}) config({}) 发送失败,结果({})]", message, config, sendResult);
-            }
-        } catch (Exception e) {
-            log.error("[executeRocketMQ][message({}) config({}) 发送异常]", message, config, e);
-        } finally {
-            // 3. 关闭生产者
-            producer.shutdown();
-        }
-    }
-
-    // TODO @芋艿:测试代码,后续清理
-    public static void main(String[] args) {
-        // 1. 创建 IotRuleSceneDataBridgeAction 实例
-        IotRuleSceneDataBridgeAction action = new IotRuleSceneDataBridgeAction();
-
-        // 2. 创建测试消息
-        IotDeviceMessage message = IotDeviceMessage.builder()
-                .requestId("TEST-001")
-                .productKey("testProduct")
-                .deviceName("testDevice")
-                .deviceKey("testDeviceKey")
-                .type("property")
-                .identifier("temperature")
-                .data("{\"value\": 60}")
-                .reportTime(LocalDateTime.now())
-                .tenantId(1L)
-                .build();
-
-        // 3. 创建 RocketMQ 配置
-        IotDataBridgeDO.RocketMQConfig config = new IotDataBridgeDO.RocketMQConfig();
-        config.setNameServer("127.0.0.1:9876");
-        config.setGroup("test-group");
-        config.setTopic("test-topic");
-        config.setTags("test-tag");
-
-        // 4. 执行测试
-        action.executeRocketMQ(message, config);
-    }
-
 }

+ 32 - 0
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/execute/IotDataBridgeExecute.java

@@ -0,0 +1,32 @@
+package cn.iocoder.yudao.module.iot.service.rule.execute;
+
+import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
+import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
+
+
+/**
+ * IoT 数据桥梁的执行器 execute 接口
+ *
+ * @author HUIHUI
+ */
+public interface IotDataBridgeExecute {
+
+    /**
+     * 执行数据桥接操作
+     *
+     * @param message    设备消息
+     * @param dataBridge 数据桥梁
+     */
+    void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge);
+
+    // TODO @芋艿:因为下面的,都是有状态的,所以通过 guava 缓存连接,然后通过 RemovalNotification 实现关闭。例如说,一次新建有效期是 10 分钟;
+    // TODO @芋艿:mq-redis
+    // TODO @芋艿:mq-数据库
+    // TODO @芋艿:kafka
+    // TODO @芋艿:rocketmq
+    // TODO @芋艿:rabbitmq
+    // TODO @芋艿:mqtt
+    // TODO @芋艿:tcp
+    // TODO @芋艿:websocket
+
+}

+ 93 - 0
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/execute/IotHttpDataBridgeExecute.java

@@ -0,0 +1,93 @@
+package cn.iocoder.yudao.module.iot.service.rule.execute;
+
+import cn.hutool.core.collection.CollUtil;
+import cn.iocoder.yudao.framework.common.util.http.HttpUtils;
+import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
+import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
+import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum;
+import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.*;
+import org.springframework.stereotype.Component;
+import org.springframework.web.client.RestTemplate;
+import org.springframework.web.util.UriComponentsBuilder;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
+
+/**
+ * Http 的 {@link IotDataBridgeExecute} 实现类
+ *
+ * @author HUIHUI
+ */
+@Component
+@Slf4j
+public class IotHttpDataBridgeExecute implements IotDataBridgeExecute {
+
+    @Resource
+    private RestTemplate restTemplate;
+
+    @Override
+    public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
+        // 1.1 校验数据桥接的类型 == HTTP
+        if (!IotDataBridgTypeEnum.HTTP.getType().equals(dataBridge.getType())) {
+            return;
+        }
+        // 1.2 执行 HTTP 请求
+        executeHttp(message, (IotDataBridgeDO.HttpConfig) dataBridge.getConfig());
+    }
+
+    @SuppressWarnings({"unchecked", "deprecation"})
+    private void executeHttp(IotDeviceMessage message, IotDataBridgeDO.HttpConfig config) {
+        String url = null;
+        HttpMethod method = HttpMethod.valueOf(config.getMethod().toUpperCase());
+        HttpEntity<String> requestEntity = null;
+        ResponseEntity<String> responseEntity = null;
+        try {
+            // 1.1 构建 Header
+            HttpHeaders headers = new HttpHeaders();
+            if (CollUtil.isNotEmpty(config.getHeaders())) {
+                config.getHeaders().putAll(config.getHeaders());
+            }
+            headers.add(HEADER_TENANT_ID, message.getTenantId().toString());
+            // 1.2 构建 URL
+            UriComponentsBuilder uriBuilder = UriComponentsBuilder.fromUriString(config.getUrl());
+            if (CollUtil.isNotEmpty(config.getQuery())) {
+                config.getQuery().forEach(uriBuilder::queryParam);
+            }
+            // 1.3 构建请求体
+            if (method == HttpMethod.GET) {
+                uriBuilder.queryParam("message", HttpUtils.encodeUtf8(JsonUtils.toJsonString(message)));
+                url = uriBuilder.build().toUriString();
+                requestEntity = new HttpEntity<>(headers);
+            } else {
+                url = uriBuilder.build().toUriString();
+                Map<String, Object> requestBody = JsonUtils.parseObject(config.getBody(), Map.class);
+                if (requestBody == null) {
+                    requestBody = new HashMap<>();
+                }
+                requestBody.put("message", message);
+                headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE);
+                requestEntity = new HttpEntity<>(JsonUtils.toJsonString(requestBody), headers);
+            }
+
+            // 2.1 发送请求
+            responseEntity = restTemplate.exchange(url, method, requestEntity, String.class);
+            // 2.2 记录日志
+            if (responseEntity.getStatusCode().is2xxSuccessful()) {
+                log.info("[executeHttp][message({}) config({}) url({}) method({}) requestEntity({}) 请求成功({})]",
+                        message, config, url, method, requestEntity, responseEntity);
+            } else {
+                log.error("[executeHttp][message({}) config({}) url({}) method({}) requestEntity({}) 请求失败({})]",
+                        message, config, url, method, requestEntity, responseEntity);
+            }
+        } catch (Exception e) {
+            log.error("[executeHttp][message({}) config({}) url({}) method({}) requestEntity({}) 请求异常({})]",
+                    message, config, url, method, requestEntity, responseEntity, e);
+        }
+    }
+
+}

+ 96 - 0
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/execute/IotRocketMQDataBridgeExecute.java

@@ -0,0 +1,96 @@
+package cn.iocoder.yudao.module.iot.service.rule.execute;
+
+import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
+import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum;
+import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.springframework.stereotype.Component;
+
+import java.time.LocalDateTime;
+
+/**
+ * RocketMQ 的 {@link IotDataBridgeExecute} 实现类
+ *
+ * @author HUIHUI
+ */
+@Component
+@Slf4j
+public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute {
+
+    @Override
+    public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
+        // 1.1 校验数据桥接的类型 == ROCKETMQ
+        if (!IotDataBridgTypeEnum.ROCKETMQ.getType().equals(dataBridge.getType())) {
+            return;
+        }
+        // 1.2 执行 RocketMQ 发送消息
+        executeRocketMQ(message, (IotDataBridgeDO.RocketMQConfig) dataBridge.getConfig());
+    }
+
+    private void executeRocketMQ(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) {
+        // 1.1 创建生产者实例,指定生产者组名
+        DefaultMQProducer producer = new DefaultMQProducer(config.getGroup());
+        // TODO @puhui999:可以考虑,基于 guava 做 cache,使用 config 作为 key,然后假个 listener 超时,销毁 producer
+        try {
+            // 1.2 设置 NameServer 地址
+            producer.setNamesrvAddr(config.getNameServer());
+            // 1.3 启动生产者
+            producer.start();
+
+            // 2.1 创建消息对象,指定Topic、Tag和消息体
+            Message msg = new Message(
+                    config.getTopic(),
+                    config.getTags(),
+                    message.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)
+            );
+            // 2.2 发送同步消息并处理结果
+            SendResult sendResult = producer.send(msg);
+            // 2.3 处理发送结果
+            if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
+                log.info("[executeRocketMQ][message({}) config({}) 发送成功,结果({})]", message, config, sendResult);
+            } else {
+                log.error("[executeRocketMQ][message({}) config({}) 发送失败,结果({})]", message, config, sendResult);
+            }
+        } catch (Exception e) {
+            log.error("[executeRocketMQ][message({}) config({}) 发送异常]", message, config, e);
+        } finally {
+            // 3. 关闭生产者
+            producer.shutdown();
+        }
+    }
+
+    // TODO @芋艿:测试代码,后续清理
+    public static void main(String[] args) {
+        // 1. 创建 IotRocketMQDataBridgeExecute 实例
+        IotRocketMQDataBridgeExecute action = new IotRocketMQDataBridgeExecute();
+
+        // 2. 创建测试消息
+        IotDeviceMessage message = IotDeviceMessage.builder()
+                .requestId("TEST-001")
+                .productKey("testProduct")
+                .deviceName("testDevice")
+                .deviceKey("testDeviceKey")
+                .type("property")
+                .identifier("temperature")
+                .data("{\"value\": 60}")
+                .reportTime(LocalDateTime.now())
+                .tenantId(1L)
+                .build();
+
+        // 3. 创建 RocketMQ 配置
+        IotDataBridgeDO.RocketMQConfig config = new IotDataBridgeDO.RocketMQConfig();
+        config.setNameServer("127.0.0.1:9876");
+        config.setGroup("test-group");
+        config.setTopic("test-topic");
+        config.setTags("test-tag");
+
+        // 4. 执行测试
+        action.executeRocketMQ(message, config);
+    }
+
+}