Explorar o código

【代码优化】IoT: 优化数据桥接缓存实现

puhui999 hai 5 meses
pai
achega
1b15fb3845

+ 70 - 0
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java

@@ -0,0 +1,70 @@
+package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import lombok.extern.slf4j.Slf4j;
+
+import java.time.Duration;
+
+/**
+ * 带缓存功能的数据桥接执行器抽象类
+ *
+ * @author HUIHUI
+ */
+@Slf4j
+public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridgeExecute {
+
+    /**
+     * Producer 缓存
+     */
+    private final LoadingCache<Object, Object> PRODUCER_CACHE = CacheBuilder.newBuilder()
+            .expireAfterAccess(Duration.ofMinutes(30))
+            .removalListener(notification -> {
+                Object producer = notification.getValue();
+                if (producer == null) {
+                    return;
+                }
+                try {
+                    closeProducer(producer);
+                    log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已关闭]", notification.getKey());
+                } catch (Exception e) {
+                    log.error("[PRODUCER_CACHE][配置({}) 对应的 producer 关闭失败]", notification.getKey(), e);
+                }
+            })
+            .build(new CacheLoader<Object, Object>() {
+                @Override
+                public Object load(Object config) throws Exception {
+                    Object producer = initProducer(config);
+                    log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已创建并启动]", config);
+                    return producer;
+                }
+            });
+
+    /**
+     * 获取生产者
+     *
+     * @param config 配置信息
+     * @return 生产者对象
+     */
+    protected Object getProducer(Object config) throws Exception {
+        return PRODUCER_CACHE.get(config);
+    }
+
+    /**
+     * 初始化生产者
+     *
+     * @param config 配置信息
+     * @return 生产者对象
+     * @throws Exception 如果初始化失败
+     */
+    protected abstract Object initProducer(Object config) throws Exception;
+
+    /**
+     * 关闭生产者
+     *
+     * @param producer 生产者对象
+     */
+    protected abstract void closeProducer(Object producer);
+
+} 

+ 18 - 41
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java

@@ -3,9 +3,6 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
 import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
 import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum;
 import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendResult;
@@ -14,9 +11,7 @@ import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.springframework.stereotype.Component;
 
-import java.time.Duration;
 import java.time.LocalDateTime;
-import java.util.concurrent.Executors;
 
 /**
  * RocketMQ 的 {@link IotDataBridgeExecute} 实现类
@@ -25,41 +20,7 @@ import java.util.concurrent.Executors;
  */
 @Component
 @Slf4j
-public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute {
-
-    /**
-     * 针对 {@link IotDataBridgeDO.RocketMQConfig} 的 DefaultMQProducer 缓存
-     */
-    // TODO @puhui999:因为 kafka 之类也存在这个情况,是不是得搞个抽象类。提供一个 initProducer,和 closeProducer 方法
-    private final LoadingCache<IotDataBridgeDO.RocketMQConfig, DefaultMQProducer> PRODUCER_CACHE = CacheBuilder.newBuilder()
-            .refreshAfterWrite(Duration.ofMinutes(10)) // TODO puhui999:应该是 read 30 分钟哈
-            // 增加移除监听器,自动关闭 producer
-            .removalListener(notification -> {
-                DefaultMQProducer producer = (DefaultMQProducer) notification.getValue();
-                // TODO puhui999:if return,更简短哈
-                if (producer != null) {
-                    try {
-                        producer.shutdown();
-                        log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已关闭]", notification.getKey());
-                    } catch (Exception e) {
-                        log.error("[PRODUCER_CACHE][配置({}) 对应的 producer 关闭失败]", notification.getKey(), e);
-                    }
-                }
-            })
-            // TODO @puhui999:就同步哈,不用异步处理。
-            // 通过 asyncReloading 实现全异步加载,包括 refreshAfterWrite 被阻塞的加载线程
-            .build(CacheLoader.asyncReloading(new CacheLoader<IotDataBridgeDO.RocketMQConfig, DefaultMQProducer>() {
-
-                @Override
-                public DefaultMQProducer load(IotDataBridgeDO.RocketMQConfig config) throws Exception {
-                    DefaultMQProducer producer = new DefaultMQProducer(config.getGroup());
-                    producer.setNamesrvAddr(config.getNameServer());
-                    producer.start();
-                    log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已创建并启动]", config);
-                    return producer;
-                }
-
-            }, Executors.newCachedThreadPool()));
+public class IotRocketMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute {
 
     @Override
     public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
@@ -74,7 +35,7 @@ public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute {
     private void executeRocketMQ(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) {
         try {
             // 1. 获取或创建 Producer
-            DefaultMQProducer producer = PRODUCER_CACHE.get(config);
+            DefaultMQProducer producer = (DefaultMQProducer) getProducer(config);
 
             // 2.1 创建消息对象,指定Topic、Tag和消息体
             Message msg = new Message(
@@ -95,6 +56,22 @@ public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute {
         }
     }
 
+    @Override
+    protected Object initProducer(Object config) throws Exception {
+        IotDataBridgeDO.RocketMQConfig rocketMQConfig = (IotDataBridgeDO.RocketMQConfig) config;
+        DefaultMQProducer producer = new DefaultMQProducer(rocketMQConfig.getGroup());
+        producer.setNamesrvAddr(rocketMQConfig.getNameServer());
+        producer.start();
+        return producer;
+    }
+
+    @Override
+    protected void closeProducer(Object producer) {
+        if (producer instanceof DefaultMQProducer) {
+            ((DefaultMQProducer) producer).shutdown();
+        }
+    }
+
     // TODO @芋艿:测试代码,后续清理
     public static void main(String[] args) {
         // 1. 创建一个共享的实例