Przeglądaj źródła

!1255 【功能新增】IoT: 新增 Kafka、RabbitMQ、RedisStreamMQ 数据桥梁实现
Merge pull request !1255 from puhui999/iot

芋道源码 5 miesięcy temu
rodzic
commit
6a1798bf6a
11 zmienionych plików z 622 dodań i 56 usunięć
  1. 12 1
      yudao-dependencies/pom.xml
  2. 1 1
      yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgTypeEnum.java
  3. 10 0
      yudao-module-iot/yudao-module-iot-biz/pom.xml
  4. 100 0
      yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java
  5. 80 0
      yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java
  6. 1 11
      yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java
  7. 1 1
      yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java
  8. 125 0
      yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java
  9. 126 0
      yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java
  10. 147 0
      yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java
  11. 19 42
      yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java

+ 12 - 1
yudao-dependencies/pom.xml

@@ -35,6 +35,8 @@
         <taos.version>3.3.3</taos.version>
         <!-- 消息队列 -->
         <rocketmq-spring.version>2.3.2</rocketmq-spring.version>
+        <kafka-spring.version>3.3.3</kafka-spring.version>
+        <rabbitmq-spring.version>3.4.3</rabbitmq-spring.version>
         <!-- 服务保障相关 -->
         <lock4j.version>2.2.7</lock4j.version>
         <!-- 监控相关 -->
@@ -285,12 +287,21 @@
                 <artifactId>yudao-spring-boot-starter-mq</artifactId>
                 <version>${revision}</version>
             </dependency>
-
             <dependency>
                 <groupId>org.apache.rocketmq</groupId>
                 <artifactId>rocketmq-spring-boot-starter</artifactId>
                 <version>${rocketmq-spring.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.springframework.kafka</groupId>
+                <artifactId>spring-kafka</artifactId>
+                <version>${kafka-spring.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-starter-amqp</artifactId>
+                <version>${rabbitmq-spring.version}</version>
+            </dependency>
 
             <!-- 服务保障相关 -->
             <dependency>

+ 1 - 1
yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgTypeEnum.java

@@ -22,7 +22,7 @@ public enum IotDataBridgTypeEnum implements ArrayValuable<Integer> {
     MQTT(10),
 
     DATABASE(20),
-    REDIS(21),
+    REDIS_STREAM(21),
 
     ROCKETMQ(30),
     RABBITMQ(31),

+ 10 - 0
yudao-module-iot/yudao-module-iot-biz/pom.xml

@@ -81,6 +81,16 @@
             <artifactId>rocketmq-spring-boot-starter</artifactId>
             <optional>true</optional>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-amqp</artifactId>
+            <optional>true</optional>
+        </dependency>
 
         <dependency>
             <groupId>org.pf4j</groupId>  <!-- PF4J:内置插件机制 -->

+ 100 - 0
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java

@@ -170,4 +170,104 @@ public class IotDataBridgeDO extends BaseDO {
 
     }
 
+    /**
+     * Kafka 配置
+     */
+    @Data
+    public static class KafkaMQConfig implements Config {
+
+        /**
+         * Kafka 服务器地址
+         */
+        private String bootstrapServers;
+        /**
+         * 用户名
+         */
+        private String username;
+        /**
+         * 密码
+         */
+        private String password;
+        /**
+         * 是否启用 SSL
+         */
+        private Boolean ssl;
+
+        /**
+         * 主题
+         */
+        private String topic;
+
+    }
+
+    /**
+     * RabbitMQ 配置
+     */
+    @Data
+    public static class RabbitMQConfig implements Config {
+
+        /**
+         * RabbitMQ 服务器地址
+         */
+        private String host;
+        /**
+         * 端口
+         */
+        private Integer port;
+        /**
+         * 虚拟主机
+         */
+        private String virtualHost;
+        /**
+         * 用户名
+         */
+        private String username;
+        /**
+         * 密码
+         */
+        private String password;
+
+        /**
+         * 交换机名称
+         */
+        private String exchange;
+        /**
+         * 路由键
+         */
+        private String routingKey;
+        /**
+         * 队列名称
+         */
+        private String queue;
+    }
+
+    /**
+     * Redis Stream MQ 配置
+     */
+    @Data
+    public static class RedisStreamMQConfig implements Config {
+
+        /**
+         * Redis 服务器地址
+         */
+        private String host;
+        /**
+         * 端口
+         */
+        private Integer port;
+        /**
+         * 密码
+         */
+        private String password;
+        /**
+         * 数据库索引
+         */
+        private Integer database;
+
+        /**
+         * 主题
+         */
+        private String topic;
+    }
+
 }

+ 80 - 0
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java

@@ -0,0 +1,80 @@
+package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import lombok.extern.slf4j.Slf4j;
+
+import java.time.Duration;
+
+// TODO @芋艿:因为下面的,都是有状态的,所以通过 guava 缓存连接,然后通过 RemovalNotification 实现关闭。例如说,一次新建有效期是 10 分钟;
+// TODO @芋艿:mq-redis
+// TODO @芋艿:mq-数据库
+// TODO @芋艿:kafka
+// TODO @芋艿:rocketmq
+// TODO @芋艿:rabbitmq
+// TODO @芋艿:mqtt
+// TODO @芋艿:tcp
+// TODO @芋艿:websocket
+
+/**
+ * 带缓存功能的数据桥梁执行器抽象类
+ *
+ * @author HUIHUI
+ */
+@Slf4j
+public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridgeExecute {
+
+    /**
+     * Producer 缓存
+     */
+    private final LoadingCache<Object, Object> PRODUCER_CACHE = CacheBuilder.newBuilder()
+            .expireAfterAccess(Duration.ofMinutes(30))
+            .removalListener(notification -> {
+                Object producer = notification.getValue();
+                if (producer == null) {
+                    return;
+                }
+                try {
+                    closeProducer(producer);
+                    log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已关闭]", notification.getKey());
+                } catch (Exception e) {
+                    log.error("[PRODUCER_CACHE][配置({}) 对应的 producer 关闭失败]", notification.getKey(), e);
+                }
+            })
+            .build(new CacheLoader<Object, Object>() {
+                @Override
+                public Object load(Object config) throws Exception {
+                    Object producer = initProducer(config);
+                    log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已创建并启动]", config);
+                    return producer;
+                }
+            });
+
+    /**
+     * 获取生产者
+     *
+     * @param config 配置信息
+     * @return 生产者对象
+     */
+    protected Object getProducer(Object config) throws Exception {
+        return PRODUCER_CACHE.get(config);
+    }
+
+    /**
+     * 初始化生产者
+     *
+     * @param config 配置信息
+     * @return 生产者对象
+     * @throws Exception 如果初始化失败
+     */
+    protected abstract Object initProducer(Object config) throws Exception;
+
+    /**
+     * 关闭生产者
+     *
+     * @param producer 生产者对象
+     */
+    protected abstract void closeProducer(Object producer);
+
+} 

+ 1 - 11
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java

@@ -12,21 +12,11 @@ import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
 public interface IotDataBridgeExecute {
 
     /**
-     * 执行数据桥操作
+     * 执行数据桥操作
      *
      * @param message    设备消息
      * @param dataBridge 数据桥梁
      */
     void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge);
 
-    // TODO @芋艿:因为下面的,都是有状态的,所以通过 guava 缓存连接,然后通过 RemovalNotification 实现关闭。例如说,一次新建有效期是 10 分钟;
-    // TODO @芋艿:mq-redis
-    // TODO @芋艿:mq-数据库
-    // TODO @芋艿:kafka
-    // TODO @芋艿:rocketmq
-    // TODO @芋艿:rabbitmq
-    // TODO @芋艿:mqtt
-    // TODO @芋艿:tcp
-    // TODO @芋艿:websocket
-
 }

+ 1 - 1
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java

@@ -32,7 +32,7 @@ public class IotHttpDataBridgeExecute implements IotDataBridgeExecute {
 
     @Override
     public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
-        // 1.1 校验数据桥的类型 == HTTP
+        // 1.1 校验数据桥的类型 == HTTP
         if (!IotDataBridgTypeEnum.HTTP.getType().equals(dataBridge.getType())) {
             return;
         }

+ 125 - 0
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java

@@ -0,0 +1,125 @@
+package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
+
+import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
+import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum;
+import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Component;
+
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Kafka 的 {@link IotDataBridgeExecute} 实现类
+ *
+ * @author HUIHUI
+ */
+@Component
+@Slf4j
+public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute {
+
+    @Override
+    public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
+        // 1.1 校验数据桥梁的类型 == KAFKA
+        if (!IotDataBridgTypeEnum.KAFKA.getType().equals(dataBridge.getType())) {
+            return;
+        }
+        // 1.2 执行 Kafka 发送消息
+        executeKafka(message, (IotDataBridgeDO.KafkaMQConfig) dataBridge.getConfig());
+    }
+
+    @SuppressWarnings("unchecked")
+    private void executeKafka(IotDeviceMessage message, IotDataBridgeDO.KafkaMQConfig config) {
+        try {
+            // 1. 获取或创建 KafkaTemplate
+            KafkaTemplate<String, String> kafkaTemplate = (KafkaTemplate<String, String>) getProducer(config);
+
+            // 2. 发送消息并等待结果
+            kafkaTemplate.send(config.getTopic(), message.toString())
+                    .get(10, TimeUnit.SECONDS); // 添加超时等待
+            log.info("[executeKafka][message({}) 发送成功]", message);
+        } catch (TimeoutException e) {
+            log.error("[executeKafka][message({}) config({}) 发送超时]", message, config, e);
+        } catch (Exception e) {
+            log.error("[executeKafka][message({}) config({}) 发送异常]", message, config, e);
+        }
+    }
+
+    @Override
+    protected Object initProducer(Object config) {
+        IotDataBridgeDO.KafkaMQConfig kafkaConfig = (IotDataBridgeDO.KafkaMQConfig) config;
+
+        // 1.1 构建生产者配置
+        Map<String, Object> props = new HashMap<>();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+        // 1.2 如果配置了认证信息
+        if (kafkaConfig.getUsername() != null && kafkaConfig.getPassword() != null) {
+            props.put("security.protocol", "SASL_PLAINTEXT");
+            props.put("sasl.mechanism", "PLAIN");
+            props.put("sasl.jaas.config",
+                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
+                            + kafkaConfig.getUsername() + "\" password=\"" + kafkaConfig.getPassword() + "\";");
+        }
+
+        // 1.3 如果启用 SSL
+        if (Boolean.TRUE.equals(kafkaConfig.getSsl())) {
+            props.put("security.protocol", "SSL");
+        }
+
+        // 2. 创建 KafkaTemplate
+        DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props);
+        return new KafkaTemplate<>(producerFactory);
+    }
+
+    @Override
+    protected void closeProducer(Object producer) {
+        if (producer instanceof KafkaTemplate) {
+            ((KafkaTemplate<?, ?>) producer).destroy();
+        }
+    }
+
+    // TODO @芋艿:测试代码,后续清理
+    public static void main(String[] args) {
+        // 1. 创建一个共享的实例
+        IotKafkaMQDataBridgeExecute action = new IotKafkaMQDataBridgeExecute();
+
+        // 2. 创建共享的配置
+        IotDataBridgeDO.KafkaMQConfig config = new IotDataBridgeDO.KafkaMQConfig();
+        config.setBootstrapServers("127.0.0.1:9092");
+        config.setTopic("test-topic");
+        config.setSsl(false);
+        config.setUsername(null);
+        config.setPassword(null);
+
+        // 3. 创建共享的消息
+        IotDeviceMessage message = IotDeviceMessage.builder()
+                .requestId("TEST-001")
+                .productKey("testProduct")
+                .deviceName("testDevice")
+                .deviceKey("testDeviceKey")
+                .type("property")
+                .identifier("temperature")
+                .data("{\"value\": 60}")
+                .reportTime(LocalDateTime.now())
+                .tenantId(1L)
+                .build();
+
+        // 4. 执行两次测试,验证缓存
+        log.info("[main][第一次执行,应该会创建新的 producer]");
+        action.executeKafka(message, config);
+
+        log.info("[main][第二次执行,应该会复用缓存的 producer]");
+        action.executeKafka(message, config);
+    }
+
+}

+ 126 - 0
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java

@@ -0,0 +1,126 @@
+package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
+
+import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
+import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum;
+import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDateTime;
+
+/**
+ * RabbitMQ 的 {@link IotDataBridgeExecute} 实现类
+ *
+ * @author HUIHUI
+ */
+@Component
+@Slf4j
+public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute {
+
+    @Override
+    public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
+        // 1.1 校验数据桥梁的类型 == RABBITMQ
+        if (!IotDataBridgTypeEnum.RABBITMQ.getType().equals(dataBridge.getType())) {
+            return;
+        }
+        // 1.2 执行 RabbitMQ 发送消息
+        executeRabbitMQ(message, (IotDataBridgeDO.RabbitMQConfig) dataBridge.getConfig());
+    }
+
+    private void executeRabbitMQ(IotDeviceMessage message, IotDataBridgeDO.RabbitMQConfig config) {
+        try {
+            // 1. 获取或创建 Channel
+            Channel channel = (Channel) getProducer(config);
+
+            // 2.1 声明交换机、队列和绑定关系
+            channel.exchangeDeclare(config.getExchange(), "direct", true);
+            channel.queueDeclare(config.getQueue(), true, false, false, null);
+            channel.queueBind(config.getQueue(), config.getExchange(), config.getRoutingKey());
+
+            // 2.2 发送消息
+            channel.basicPublish(config.getExchange(), config.getRoutingKey(), null,
+                    message.toString().getBytes(StandardCharsets.UTF_8));
+            log.info("[executeRabbitMQ][message({}) config({}) 发送成功]", message, config);
+        } catch (Exception e) {
+            log.error("[executeRabbitMQ][message({}) config({}) 发送异常]", message, config, e);
+        }
+    }
+
+    @Override
+    protected Object initProducer(Object config) throws Exception {
+        IotDataBridgeDO.RabbitMQConfig rabbitConfig = (IotDataBridgeDO.RabbitMQConfig) config;
+
+        // 1. 创建连接工厂
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setHost(rabbitConfig.getHost());
+        factory.setPort(rabbitConfig.getPort());
+        factory.setVirtualHost(rabbitConfig.getVirtualHost());
+        factory.setUsername(rabbitConfig.getUsername());
+        factory.setPassword(rabbitConfig.getPassword());
+
+        // 2. 创建连接
+        Connection connection = factory.newConnection();
+
+        // 3. 创建信道
+        return connection.createChannel();
+    }
+
+    @Override
+    protected void closeProducer(Object producer) {
+        if (producer instanceof Channel) {
+            try {
+                Channel channel = (Channel) producer;
+                if (channel.isOpen()) {
+                    channel.close();
+                }
+                Connection connection = channel.getConnection();
+                if (connection.isOpen()) {
+                    connection.close();
+                }
+            } catch (Exception e) {
+                log.error("[closeProducer][关闭 RabbitMQ 连接异常]", e);
+            }
+        }
+    }
+
+    // TODO @芋艿:测试代码,后续清理
+    public static void main(String[] args) {
+        // 1. 创建一个共享的实例
+        IotRabbitMQDataBridgeExecute action = new IotRabbitMQDataBridgeExecute();
+
+        // 2. 创建共享的配置
+        IotDataBridgeDO.RabbitMQConfig config = new IotDataBridgeDO.RabbitMQConfig();
+        config.setHost("localhost");
+        config.setPort(5672);
+        config.setVirtualHost("/");
+        config.setUsername("admin");
+        config.setPassword("123456");
+        config.setExchange("test-exchange");
+        config.setRoutingKey("test-key");
+        config.setQueue("test-queue");
+
+        // 3. 创建共享的消息
+        IotDeviceMessage message = IotDeviceMessage.builder()
+                .requestId("TEST-001")
+                .productKey("testProduct")
+                .deviceName("testDevice")
+                .deviceKey("testDeviceKey")
+                .type("property")
+                .identifier("temperature")
+                .data("{\"value\": 60}")
+                .reportTime(LocalDateTime.now())
+                .tenantId(1L)
+                .build();
+
+        // 4. 执行两次测试,验证缓存
+        log.info("[main][第一次执行,应该会创建新的 channel]");
+        action.executeRabbitMQ(message, config);
+
+        log.info("[main][第二次执行,应该会复用缓存的 channel]");
+        action.executeRabbitMQ(message, config);
+    }
+}

+ 147 - 0
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java

@@ -0,0 +1,147 @@
+package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
+
+import cn.hutool.core.util.ReflectUtil;
+import cn.hutool.core.util.StrUtil;
+import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
+import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum;
+import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import lombok.extern.slf4j.Slf4j;
+import org.redisson.Redisson;
+import org.redisson.api.RedissonClient;
+import org.redisson.config.Config;
+import org.redisson.config.SingleServerConfig;
+import org.redisson.spring.data.connection.RedissonConnectionFactory;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.connection.stream.ObjectRecord;
+import org.springframework.data.redis.connection.stream.StreamRecords;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.serializer.RedisSerializer;
+import org.springframework.stereotype.Component;
+
+import java.time.LocalDateTime;
+
+/**
+ * Redis Stream MQ 的 {@link IotDataBridgeExecute} 实现类
+ *
+ * @author HUIHUI
+ */
+@Component
+@Slf4j
+public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute {
+
+    @Override
+    public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
+        // 1.1 校验数据桥梁类型
+        if (!IotDataBridgTypeEnum.REDIS_STREAM.getType().equals(dataBridge.getType())) {
+            return;
+        }
+        // 1.2 执行消息发送
+        executeRedisStream(message, (IotDataBridgeDO.RedisStreamMQConfig) dataBridge.getConfig());
+    }
+
+    @SuppressWarnings("unchecked")
+    private void executeRedisStream(IotDeviceMessage message, IotDataBridgeDO.RedisStreamMQConfig config) {
+        try {
+            // 1. 获取 RedisTemplate
+            RedisTemplate<String, Object> redisTemplate = (RedisTemplate<String, Object>) getProducer(config);
+
+            // 2. 创建并发送 Stream 记录
+            ObjectRecord<String, IotDeviceMessage> record = StreamRecords.newRecord()
+                    .ofObject(message).withStreamKey(config.getTopic());
+            String recordId = String.valueOf(redisTemplate.opsForStream().add(record));
+            log.info("[executeRedisStream][消息发送成功] messageId: {}, config: {}", recordId, config);
+        } catch (Exception e) {
+            log.error("[executeRedisStream][消息发送失败] message: {}, config: {}", message, config, e);
+        }
+    }
+
+    @Override
+    protected Object initProducer(Object config) {
+        IotDataBridgeDO.RedisStreamMQConfig redisConfig = (IotDataBridgeDO.RedisStreamMQConfig) config;
+
+        // 1.1 创建 Redisson 配置
+        Config redissonConfig = new Config();
+        SingleServerConfig serverConfig = redissonConfig.useSingleServer()
+                .setAddress("redis://" + redisConfig.getHost() + ":" + redisConfig.getPort())
+                .setDatabase(redisConfig.getDatabase());
+        // 1.2 设置密码(如果有)
+        if (StrUtil.isNotBlank(redisConfig.getPassword())) {
+            serverConfig.setPassword(redisConfig.getPassword());
+        }
+
+        // 2.1 创建 RedissonClient
+        RedissonClient redisson = Redisson.create(redissonConfig);
+        // 2.2 创建并配置 RedisTemplate
+        RedisTemplate<String, Object> template = new RedisTemplate<>();
+        // 设置 RedisConnection 工厂。😈 它就是实现多种 Java Redis 客户端接入的秘密工厂。感兴趣的胖友,可以自己去撸下。
+        template.setConnectionFactory(new RedissonConnectionFactory(redisson));
+        // 使用 String 序列化方式,序列化 KEY 。
+        template.setKeySerializer(RedisSerializer.string());
+        template.setHashKeySerializer(RedisSerializer.string());
+        // 使用 JSON 序列化方式(库是 Jackson ),序列化 VALUE 。
+        template.setValueSerializer(buildRedisSerializer());
+        template.setHashValueSerializer(buildRedisSerializer());
+        template.afterPropertiesSet();// 初始化
+        return template;
+    }
+
+    @Override
+    protected void closeProducer(Object producer) {
+        if (producer instanceof RedisTemplate) {
+            RedisConnectionFactory factory = ((RedisTemplate<?, ?>) producer).getConnectionFactory();
+            try {
+                if (factory != null) {
+                    ((RedissonConnectionFactory) factory).destroy();
+                }
+            } catch (Exception e) {
+                log.error("[closeProducer][关闭 redisson 连接异常]", e);
+            }
+        }
+    }
+
+
+    public static RedisSerializer<?> buildRedisSerializer() {
+        RedisSerializer<Object> json = RedisSerializer.json();
+        // 解决 LocalDateTime 的序列化
+        ObjectMapper objectMapper = (ObjectMapper) ReflectUtil.getFieldValue(json, "mapper");
+        objectMapper.registerModules(new JavaTimeModule());
+        return json;
+    }
+
+    // TODO @芋艿:测试代码,后续清理
+    public static void main(String[] args) {
+        // 1. 创建一个共享的实例
+        IotRedisStreamMQDataBridgeExecute action = new IotRedisStreamMQDataBridgeExecute();
+
+        // 2. 创建共享的配置
+        IotDataBridgeDO.RedisStreamMQConfig config = new IotDataBridgeDO.RedisStreamMQConfig();
+        config.setHost("127.0.0.1");
+        config.setPort(6379);
+        config.setDatabase(0);
+        config.setPassword("123456");
+        config.setTopic("test-stream");
+
+        // 3. 创建共享的消息
+        IotDeviceMessage message = IotDeviceMessage.builder()
+                .requestId("TEST-001")
+                .productKey("testProduct")
+                .deviceName("testDevice")
+                .deviceKey("testDeviceKey")
+                .type("property")
+                .identifier("temperature")
+                .data("{\"value\": 60}")
+                .reportTime(LocalDateTime.now())
+                .tenantId(1L)
+                .build();
+
+        // 4. 执行两次测试,验证缓存
+        log.info("[main][第一次执行,应该会创建新的 RedisTemplate]");
+        action.executeRedisStream(message, config);
+
+        log.info("[main][第二次执行,应该会复用缓存的 RedisTemplate]");
+        action.executeRedisStream(message, config);
+    }
+
+}

+ 19 - 42
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java

@@ -3,9 +3,6 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
 import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
 import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum;
 import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendResult;
@@ -14,9 +11,7 @@ import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.springframework.stereotype.Component;
 
-import java.time.Duration;
 import java.time.LocalDateTime;
-import java.util.concurrent.Executors;
 
 /**
  * RocketMQ 的 {@link IotDataBridgeExecute} 实现类
@@ -25,45 +20,11 @@ import java.util.concurrent.Executors;
  */
 @Component
 @Slf4j
-public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute {
-
-    /**
-     * 针对 {@link IotDataBridgeDO.RocketMQConfig} 的 DefaultMQProducer 缓存
-     */
-    // TODO @puhui999:因为 kafka 之类也存在这个情况,是不是得搞个抽象类。提供一个 initProducer,和 closeProducer 方法
-    private final LoadingCache<IotDataBridgeDO.RocketMQConfig, DefaultMQProducer> PRODUCER_CACHE = CacheBuilder.newBuilder()
-            .refreshAfterWrite(Duration.ofMinutes(10)) // TODO puhui999:应该是 read 30 分钟哈
-            // 增加移除监听器,自动关闭 producer
-            .removalListener(notification -> {
-                DefaultMQProducer producer = (DefaultMQProducer) notification.getValue();
-                // TODO puhui999:if return,更简短哈
-                if (producer != null) {
-                    try {
-                        producer.shutdown();
-                        log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已关闭]", notification.getKey());
-                    } catch (Exception e) {
-                        log.error("[PRODUCER_CACHE][配置({}) 对应的 producer 关闭失败]", notification.getKey(), e);
-                    }
-                }
-            })
-            // TODO @puhui999:就同步哈,不用异步处理。
-            // 通过 asyncReloading 实现全异步加载,包括 refreshAfterWrite 被阻塞的加载线程
-            .build(CacheLoader.asyncReloading(new CacheLoader<IotDataBridgeDO.RocketMQConfig, DefaultMQProducer>() {
-
-                @Override
-                public DefaultMQProducer load(IotDataBridgeDO.RocketMQConfig config) throws Exception {
-                    DefaultMQProducer producer = new DefaultMQProducer(config.getGroup());
-                    producer.setNamesrvAddr(config.getNameServer());
-                    producer.start();
-                    log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已创建并启动]", config);
-                    return producer;
-                }
-
-            }, Executors.newCachedThreadPool()));
+public class IotRocketMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute {
 
     @Override
     public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
-        // 1.1 校验数据桥的类型 == ROCKETMQ
+        // 1.1 校验数据桥梁的类型 == ROCKETMQ
         if (!IotDataBridgTypeEnum.ROCKETMQ.getType().equals(dataBridge.getType())) {
             return;
         }
@@ -74,7 +35,7 @@ public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute {
     private void executeRocketMQ(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) {
         try {
             // 1. 获取或创建 Producer
-            DefaultMQProducer producer = PRODUCER_CACHE.get(config);
+            DefaultMQProducer producer = (DefaultMQProducer) getProducer(config);
 
             // 2.1 创建消息对象,指定Topic、Tag和消息体
             Message msg = new Message(
@@ -95,6 +56,22 @@ public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute {
         }
     }
 
+    @Override
+    protected Object initProducer(Object config) throws Exception {
+        IotDataBridgeDO.RocketMQConfig rocketMQConfig = (IotDataBridgeDO.RocketMQConfig) config;
+        DefaultMQProducer producer = new DefaultMQProducer(rocketMQConfig.getGroup());
+        producer.setNamesrvAddr(rocketMQConfig.getNameServer());
+        producer.start();
+        return producer;
+    }
+
+    @Override
+    protected void closeProducer(Object producer) {
+        if (producer instanceof DefaultMQProducer) {
+            ((DefaultMQProducer) producer).shutdown();
+        }
+    }
+
     // TODO @芋艿:测试代码,后续清理
     public static void main(String[] args) {
         // 1. 创建一个共享的实例