Parcourir la source

【功能新增】IoT: 数据桥梁增加 RocketMQConfig 配置,实现executeRocketMQ 发送消息

puhui999 il y a 5 mois
Parent
commit
add90365df

+ 1 - 1
yudao-dependencies/pom.xml

@@ -34,7 +34,7 @@
         <opengauss.jdbc.version>5.1.0</opengauss.jdbc.version>
         <taos.version>3.3.3</taos.version>
         <!-- 消息队列 -->
-        <rocketmq-spring.version>2.3.1</rocketmq-spring.version>
+        <rocketmq-spring.version>2.3.2</rocketmq-spring.version>
         <!-- 服务保障相关 -->
         <lock4j.version>2.2.7</lock4j.version>
         <!-- 监控相关 -->

+ 0 - 1
yudao-framework/yudao-spring-boot-starter-mq/pom.xml

@@ -36,7 +36,6 @@
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-spring-boot-starter</artifactId>
-            <optional>true</optional>
         </dependency>
     </dependencies>
 

+ 5 - 0
yudao-module-iot/yudao-module-iot-biz/pom.xml

@@ -75,6 +75,11 @@
 <!--            <groupId>org.eclipse.paho</groupId> &lt;!&ndash; MQTT &ndash;&gt;-->
 <!--            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>-->
 <!--        </dependency>-->
+        <!-- 工具类相关 -->
+        <dependency>
+            <groupId>cn.iocoder.boot</groupId>
+            <artifactId>yudao-spring-boot-starter-mq</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>org.pf4j</groupId>  <!-- PF4J:内置插件机制 -->

+ 37 - 0
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java

@@ -136,5 +136,42 @@ public class IotDataBridgeDO extends BaseDO {
 
     }
 
+    /**
+     * RocketMQ 配置
+     */
+    @Data
+    public static class RocketMQConfig implements Config {
+
+        /**
+         * RocketMQ 名称服务器地址
+         */
+        private String nameServer;
+
+        /**
+         * 生产者/消费者组
+         */
+        private String group;
+
+        /**
+         * 主题
+         */
+        private String topic;
+
+        /**
+         * 标签
+         */
+        private String tags;
+
+        /**
+         * 访问密钥
+         */
+        private String accessKey;
+
+        /**
+         * 秘密钥匙
+         */
+        private String secretKey;
+
+    }
 
 }

+ 73 - 0
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java

@@ -13,11 +13,17 @@ import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
 import cn.iocoder.yudao.module.iot.service.rule.IotDataBridgeService;
 import jakarta.annotation.Resource;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.springframework.http.*;
 import org.springframework.stereotype.Component;
 import org.springframework.web.client.RestTemplate;
 import org.springframework.web.util.UriComponentsBuilder;
 
+import java.time.LocalDateTime;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -64,6 +70,11 @@ public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction {
             executeHttp(message, (IotDataBridgeDO.HttpConfig) dataBridge.getConfig());
             return;
         }
+        // 2.2 执行 RocketMQ 发送消息
+        if (IotDataBridgTypeEnum.ROCKETMQ.getType().equals(dataBridge.getType())) {
+            executeRocketMQ(message, (IotDataBridgeDO.RocketMQConfig) dataBridge.getConfig());
+            return;
+        }
 
         // TODO @芋艿:因为下面的,都是有状态的,所以通过 guava 缓存连接,然后通过 RemovalNotification 实现关闭。例如说,一次新建有效期是 10 分钟;
         // TODO @芋艿:mq-redis
@@ -131,4 +142,66 @@ public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction {
         }
     }
 
+    private void executeRocketMQ(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) {
+        // 1. 创建生产者实例,指定生产者组名
+        DefaultMQProducer producer = new DefaultMQProducer(config.getGroup());
+        try {
+            // 2. 设置 NameServer 地址
+            producer.setNamesrvAddr(config.getNameServer());
+            // 3. 启动生产者
+            producer.start();
+            // 4. 创建消息对象,指定Topic、Tag和消息体
+            Message msg = new Message(
+                    config.getTopic(),
+                    config.getTags(),
+                    message.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)
+            );
+
+            // 5. 发送同步消息并处理结果
+            SendResult sendResult = producer.send(msg);
+            // 6. 处理发送结果
+            if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
+                log.info("[executeRocketMQ][message({}) config({}) 发送成功,结果({})]",
+                        message, config, sendResult);
+            } else {
+                log.error("[executeRocketMQ][message({}) config({}) 发送失败,结果({})]",
+                        message, config, sendResult);
+            }
+        } catch (Exception e) {
+            log.error("[executeRocketMQ][message({}) config({}) 发送异常]",
+                    message, config, e);
+        } finally {
+            // 7. 关闭生产者
+            producer.shutdown();
+        }
+    }
+
+    public static void main(String[] args) {
+        // 1. 创建 IotRuleSceneDataBridgeAction 实例
+        IotRuleSceneDataBridgeAction action = new IotRuleSceneDataBridgeAction();
+
+        // 2. 创建测试消息
+        IotDeviceMessage message = IotDeviceMessage.builder()
+                .requestId("TEST-001")
+                .productKey("testProduct")
+                .deviceName("testDevice")
+                .deviceKey("testDeviceKey")
+                .type("property")
+                .identifier("temperature")
+                .data("{\"value\": 60}")
+                .reportTime(LocalDateTime.now())
+                .tenantId(1L)
+                .build();
+
+        // 3. 创建 RocketMQ 配置
+        IotDataBridgeDO.RocketMQConfig config = new IotDataBridgeDO.RocketMQConfig();
+        config.setNameServer("127.0.0.1:9876");
+        config.setGroup("test-group");
+        config.setTopic("test-topic");
+        config.setTags("test-tag");
+
+        // 4. 执行测试
+        action.executeRocketMQ(message, config);
+    }
+
 }