Parcourir la source

【功能新增】IoT: 新增 Redis Stream MQ 数据桥梁实现

puhui999 il y a 5 mois
Parent
commit
cb16539b66

+ 1 - 1
yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgTypeEnum.java

@@ -22,7 +22,7 @@ public enum IotDataBridgTypeEnum implements ArrayValuable<Integer> {
     MQTT(10),
 
     DATABASE(20),
-    REDIS(21),
+    REDIS_STREAM(21),
 
     ROCKETMQ(30),
     RABBITMQ(31),

+ 29 - 4
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java

@@ -193,10 +193,6 @@ public class IotDataBridgeDO extends BaseDO {
          */
         private Boolean ssl;
 
-        /**
-         * 生产者组 ID
-         */
-        private String groupId;
         /**
          * 主题
          */
@@ -245,4 +241,33 @@ public class IotDataBridgeDO extends BaseDO {
         private String queue;
     }
 
+    /**
+     * Redis Stream MQ 配置
+     */
+    @Data
+    public static class RedisStreamMQConfig implements Config {
+
+        /**
+         * Redis 服务器地址
+         */
+        private String host;
+        /**
+         * 端口
+         */
+        private Integer port;
+        /**
+         * 密码
+         */
+        private String password;
+        /**
+         * 数据库索引
+         */
+        private Integer database;
+
+        /**
+         * 主题
+         */
+        private String topic;
+    }
+
 }

+ 10 - 0
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java

@@ -7,6 +7,16 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.time.Duration;
 
+// TODO @芋艿:因为下面的,都是有状态的,所以通过 guava 缓存连接,然后通过 RemovalNotification 实现关闭。例如说,一次新建有效期是 10 分钟;
+// TODO @芋艿:mq-redis
+// TODO @芋艿:mq-数据库
+// TODO @芋艿:kafka
+// TODO @芋艿:rocketmq
+// TODO @芋艿:rabbitmq
+// TODO @芋艿:mqtt
+// TODO @芋艿:tcp
+// TODO @芋艿:websocket
+
 /**
  * 带缓存功能的数据桥梁执行器抽象类
  *

+ 0 - 10
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java

@@ -19,14 +19,4 @@ public interface IotDataBridgeExecute {
      */
     void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge);
 
-    // TODO @芋艿:因为下面的,都是有状态的,所以通过 guava 缓存连接,然后通过 RemovalNotification 实现关闭。例如说,一次新建有效期是 10 分钟;
-    // TODO @芋艿:mq-redis
-    // TODO @芋艿:mq-数据库
-    // TODO @芋艿:kafka
-    // TODO @芋艿:rocketmq
-    // TODO @芋艿:rabbitmq
-    // TODO @芋艿:mqtt
-    // TODO @芋艿:tcp
-    // TODO @芋艿:websocket
-
 }

+ 1 - 0
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java

@@ -35,6 +35,7 @@ public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExec
         executeKafka(message, (IotDataBridgeDO.KafkaMQConfig) dataBridge.getConfig());
     }
 
+    @SuppressWarnings("unchecked")
     private void executeKafka(IotDeviceMessage message, IotDataBridgeDO.KafkaMQConfig config) {
         try {
             // 1. 获取或创建 KafkaTemplate

+ 1 - 1
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java

@@ -87,7 +87,7 @@ public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExe
         }
     }
 
-    // TODO @芋道源码:测试代码,后续清理
+    // TODO @芋:测试代码,后续清理
     public static void main(String[] args) {
         // 1. 创建一个共享的实例
         IotRabbitMQDataBridgeExecute action = new IotRabbitMQDataBridgeExecute();

+ 147 - 0
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java

@@ -0,0 +1,147 @@
+package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
+
+import cn.hutool.core.util.ReflectUtil;
+import cn.hutool.core.util.StrUtil;
+import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
+import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum;
+import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import lombok.extern.slf4j.Slf4j;
+import org.redisson.Redisson;
+import org.redisson.api.RedissonClient;
+import org.redisson.config.Config;
+import org.redisson.config.SingleServerConfig;
+import org.redisson.spring.data.connection.RedissonConnectionFactory;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.connection.stream.ObjectRecord;
+import org.springframework.data.redis.connection.stream.StreamRecords;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.serializer.RedisSerializer;
+import org.springframework.stereotype.Component;
+
+import java.time.LocalDateTime;
+
+/**
+ * Redis Stream MQ 的 {@link IotDataBridgeExecute} 实现类
+ *
+ * @author HUIHUI
+ */
+@Component
+@Slf4j
+public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute {
+
+    @Override
+    public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
+        // 1.1 校验数据桥梁类型
+        if (!IotDataBridgTypeEnum.REDIS_STREAM.getType().equals(dataBridge.getType())) {
+            return;
+        }
+        // 1.2 执行消息发送
+        executeRedisStream(message, (IotDataBridgeDO.RedisStreamMQConfig) dataBridge.getConfig());
+    }
+
+    @SuppressWarnings("unchecked")
+    private void executeRedisStream(IotDeviceMessage message, IotDataBridgeDO.RedisStreamMQConfig config) {
+        try {
+            // 1. 获取 RedisTemplate
+            RedisTemplate<String, Object> redisTemplate = (RedisTemplate<String, Object>) getProducer(config);
+
+            // 2. 创建并发送 Stream 记录
+            ObjectRecord<String, IotDeviceMessage> record = StreamRecords.newRecord()
+                    .ofObject(message).withStreamKey(config.getTopic());
+            String recordId = String.valueOf(redisTemplate.opsForStream().add(record));
+            log.info("[executeRedisStream][消息发送成功] messageId: {}, config: {}", recordId, config);
+        } catch (Exception e) {
+            log.error("[executeRedisStream][消息发送失败] message: {}, config: {}", message, config, e);
+        }
+    }
+
+    @Override
+    protected Object initProducer(Object config) {
+        IotDataBridgeDO.RedisStreamMQConfig redisConfig = (IotDataBridgeDO.RedisStreamMQConfig) config;
+
+        // 1.1 创建 Redisson 配置
+        Config redissonConfig = new Config();
+        SingleServerConfig serverConfig = redissonConfig.useSingleServer()
+                .setAddress("redis://" + redisConfig.getHost() + ":" + redisConfig.getPort())
+                .setDatabase(redisConfig.getDatabase());
+        // 1.2 设置密码(如果有)
+        if (StrUtil.isNotBlank(redisConfig.getPassword())) {
+            serverConfig.setPassword(redisConfig.getPassword());
+        }
+
+        // 2.1 创建 RedissonClient
+        RedissonClient redisson = Redisson.create(redissonConfig);
+        // 2.2 创建并配置 RedisTemplate
+        RedisTemplate<String, Object> template = new RedisTemplate<>();
+        // 设置 RedisConnection 工厂。😈 它就是实现多种 Java Redis 客户端接入的秘密工厂。感兴趣的胖友,可以自己去撸下。
+        template.setConnectionFactory(new RedissonConnectionFactory(redisson));
+        // 使用 String 序列化方式,序列化 KEY 。
+        template.setKeySerializer(RedisSerializer.string());
+        template.setHashKeySerializer(RedisSerializer.string());
+        // 使用 JSON 序列化方式(库是 Jackson ),序列化 VALUE 。
+        template.setValueSerializer(buildRedisSerializer());
+        template.setHashValueSerializer(buildRedisSerializer());
+        template.afterPropertiesSet();// 初始化
+        return template;
+    }
+
+    @Override
+    protected void closeProducer(Object producer) {
+        if (producer instanceof RedisTemplate) {
+            RedisConnectionFactory factory = ((RedisTemplate<?, ?>) producer).getConnectionFactory();
+            try {
+                if (factory != null) {
+                    ((RedissonConnectionFactory) factory).destroy();
+                }
+            } catch (Exception e) {
+                log.error("[closeProducer][关闭 redisson 连接异常]", e);
+            }
+        }
+    }
+
+
+    public static RedisSerializer<?> buildRedisSerializer() {
+        RedisSerializer<Object> json = RedisSerializer.json();
+        // 解决 LocalDateTime 的序列化
+        ObjectMapper objectMapper = (ObjectMapper) ReflectUtil.getFieldValue(json, "mapper");
+        objectMapper.registerModules(new JavaTimeModule());
+        return json;
+    }
+
+    // TODO @芋艿:测试代码,后续清理
+    public static void main(String[] args) {
+        // 1. 创建一个共享的实例
+        IotRedisStreamMQDataBridgeExecute action = new IotRedisStreamMQDataBridgeExecute();
+
+        // 2. 创建共享的配置
+        IotDataBridgeDO.RedisStreamMQConfig config = new IotDataBridgeDO.RedisStreamMQConfig();
+        config.setHost("127.0.0.1");
+        config.setPort(6379);
+        config.setDatabase(0);
+        config.setPassword("123456");
+        config.setTopic("test-stream");
+
+        // 3. 创建共享的消息
+        IotDeviceMessage message = IotDeviceMessage.builder()
+                .requestId("TEST-001")
+                .productKey("testProduct")
+                .deviceName("testDevice")
+                .deviceKey("testDeviceKey")
+                .type("property")
+                .identifier("temperature")
+                .data("{\"value\": 60}")
+                .reportTime(LocalDateTime.now())
+                .tenantId(1L)
+                .build();
+
+        // 4. 执行两次测试,验证缓存
+        log.info("[main][第一次执行,应该会创建新的 RedisTemplate]");
+        action.executeRedisStream(message, config);
+
+        log.info("[main][第二次执行,应该会复用缓存的 RedisTemplate]");
+        action.executeRedisStream(message, config);
+    }
+
+}