Browse Source

【功能新增】IoT: 新增 RabbitMQ 数据桥梁实现

puhui999 5 months ago
parent
commit
7b449b81e7

+ 7 - 0
yudao-dependencies/pom.xml

@@ -36,6 +36,7 @@
         <!-- 消息队列 -->
         <rocketmq-spring.version>2.3.2</rocketmq-spring.version>
         <kafka-spring.version>3.3.3</kafka-spring.version>
+        <rabbitmq-spring.version>3.4.3</rabbitmq-spring.version>
         <!-- 服务保障相关 -->
         <lock4j.version>2.2.7</lock4j.version>
         <!-- 监控相关 -->
@@ -296,6 +297,12 @@
                 <artifactId>spring-kafka</artifactId>
                 <version>${kafka-spring.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-starter-amqp</artifactId>
+                <version>${rabbitmq-spring.version}</version>
+            </dependency>
+
             <!-- 服务保障相关 -->
             <dependency>
                 <groupId>cn.iocoder.boot</groupId>

+ 5 - 0
yudao-module-iot/yudao-module-iot-biz/pom.xml

@@ -86,6 +86,11 @@
             <artifactId>spring-kafka</artifactId>
             <optional>true</optional>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-amqp</artifactId>
+            <optional>true</optional>
+        </dependency>
 
         <dependency>
             <groupId>org.pf4j</groupId>  <!-- PF4J:内置插件机制 -->

+ 41 - 0
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java

@@ -204,4 +204,45 @@ public class IotDataBridgeDO extends BaseDO {
 
     }
 
+    /**
+     * RabbitMQ 配置
+     */
+    @Data
+    public static class RabbitMQConfig implements Config {
+
+        /**
+         * RabbitMQ 服务器地址
+         */
+        private String host;
+        /**
+         * 端口
+         */
+        private Integer port;
+        /**
+         * 虚拟主机
+         */
+        private String virtualHost;
+        /**
+         * 用户名
+         */
+        private String username;
+        /**
+         * 密码
+         */
+        private String password;
+
+        /**
+         * 交换机名称
+         */
+        private String exchange;
+        /**
+         * 路由键
+         */
+        private String routingKey;
+        /**
+         * 队列名称
+         */
+        private String queue;
+    }
+
 }

+ 126 - 0
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java

@@ -0,0 +1,126 @@
+package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
+
+import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
+import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum;
+import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDateTime;
+
+/**
+ * RabbitMQ 的 {@link IotDataBridgeExecute} 实现类
+ *
+ * @author HUIHUI
+ */
+@Component
+@Slf4j
+public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute {
+
+    @Override
+    public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
+        // 1.1 校验数据桥梁的类型 == RABBITMQ
+        if (!IotDataBridgTypeEnum.RABBITMQ.getType().equals(dataBridge.getType())) {
+            return;
+        }
+        // 1.2 执行 RabbitMQ 发送消息
+        executeRabbitMQ(message, (IotDataBridgeDO.RabbitMQConfig) dataBridge.getConfig());
+    }
+
+    private void executeRabbitMQ(IotDeviceMessage message, IotDataBridgeDO.RabbitMQConfig config) {
+        try {
+            // 1. 获取或创建 Channel
+            Channel channel = (Channel) getProducer(config);
+
+            // 2.1 声明交换机、队列和绑定关系
+            channel.exchangeDeclare(config.getExchange(), "direct", true);
+            channel.queueDeclare(config.getQueue(), true, false, false, null);
+            channel.queueBind(config.getQueue(), config.getExchange(), config.getRoutingKey());
+
+            // 2.2 发送消息
+            channel.basicPublish(config.getExchange(), config.getRoutingKey(), null,
+                    message.toString().getBytes(StandardCharsets.UTF_8));
+            log.info("[executeRabbitMQ][message({}) config({}) 发送成功]", message, config);
+        } catch (Exception e) {
+            log.error("[executeRabbitMQ][message({}) config({}) 发送异常]", message, config, e);
+        }
+    }
+
+    @Override
+    protected Object initProducer(Object config) throws Exception {
+        IotDataBridgeDO.RabbitMQConfig rabbitConfig = (IotDataBridgeDO.RabbitMQConfig) config;
+
+        // 1. 创建连接工厂
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setHost(rabbitConfig.getHost());
+        factory.setPort(rabbitConfig.getPort());
+        factory.setVirtualHost(rabbitConfig.getVirtualHost());
+        factory.setUsername(rabbitConfig.getUsername());
+        factory.setPassword(rabbitConfig.getPassword());
+
+        // 2. 创建连接
+        Connection connection = factory.newConnection();
+
+        // 3. 创建信道
+        return connection.createChannel();
+    }
+
+    @Override
+    protected void closeProducer(Object producer) {
+        if (producer instanceof Channel) {
+            try {
+                Channel channel = (Channel) producer;
+                if (channel.isOpen()) {
+                    channel.close();
+                }
+                Connection connection = channel.getConnection();
+                if (connection.isOpen()) {
+                    connection.close();
+                }
+            } catch (Exception e) {
+                log.error("[closeProducer][关闭 RabbitMQ 连接异常]", e);
+            }
+        }
+    }
+
+    // TODO @芋道源码:测试代码,后续清理
+    public static void main(String[] args) {
+        // 1. 创建一个共享的实例
+        IotRabbitMQDataBridgeExecute action = new IotRabbitMQDataBridgeExecute();
+
+        // 2. 创建共享的配置
+        IotDataBridgeDO.RabbitMQConfig config = new IotDataBridgeDO.RabbitMQConfig();
+        config.setHost("localhost");
+        config.setPort(5672);
+        config.setVirtualHost("/");
+        config.setUsername("admin");
+        config.setPassword("123456");
+        config.setExchange("test-exchange");
+        config.setRoutingKey("test-key");
+        config.setQueue("test-queue");
+
+        // 3. 创建共享的消息
+        IotDeviceMessage message = IotDeviceMessage.builder()
+                .requestId("TEST-001")
+                .productKey("testProduct")
+                .deviceName("testDevice")
+                .deviceKey("testDeviceKey")
+                .type("property")
+                .identifier("temperature")
+                .data("{\"value\": 60}")
+                .reportTime(LocalDateTime.now())
+                .tenantId(1L)
+                .build();
+
+        // 4. 执行两次测试,验证缓存
+        log.info("[main][第一次执行,应该会创建新的 channel]");
+        action.executeRabbitMQ(message, config);
+
+        log.info("[main][第二次执行,应该会复用缓存的 channel]");
+        action.executeRabbitMQ(message, config);
+    }
+}