Selaa lähdekoodia

【功能新增】IoT: 新增 Kafka 数据桥梁实现

puhui999 5 kuukautta sitten
vanhempi
sitoutus
69a27b1ee2

+ 6 - 2
yudao-dependencies/pom.xml

@@ -35,6 +35,7 @@
         <taos.version>3.3.3</taos.version>
         <!-- 消息队列 -->
         <rocketmq-spring.version>2.3.2</rocketmq-spring.version>
+        <kafka-spring.version>3.3.3</kafka-spring.version>
         <!-- 服务保障相关 -->
         <lock4j.version>2.2.7</lock4j.version>
         <!-- 监控相关 -->
@@ -285,13 +286,16 @@
                 <artifactId>yudao-spring-boot-starter-mq</artifactId>
                 <version>${revision}</version>
             </dependency>
-
             <dependency>
                 <groupId>org.apache.rocketmq</groupId>
                 <artifactId>rocketmq-spring-boot-starter</artifactId>
                 <version>${rocketmq-spring.version}</version>
             </dependency>
-
+            <dependency>
+                <groupId>org.springframework.kafka</groupId>
+                <artifactId>spring-kafka</artifactId>
+                <version>${kafka-spring.version}</version>
+            </dependency>
             <!-- 服务保障相关 -->
             <dependency>
                 <groupId>cn.iocoder.boot</groupId>

+ 5 - 0
yudao-module-iot/yudao-module-iot-biz/pom.xml

@@ -81,6 +81,11 @@
             <artifactId>rocketmq-spring-boot-starter</artifactId>
             <optional>true</optional>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+            <optional>true</optional>
+        </dependency>
 
         <dependency>
             <groupId>org.pf4j</groupId>  <!-- PF4J:内置插件机制 -->

+ 34 - 0
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java

@@ -170,4 +170,38 @@ public class IotDataBridgeDO extends BaseDO {
 
     }
 
+    /**
+     * Kafka 配置
+     */
+    @Data
+    public static class KafkaMQConfig implements Config {
+
+        /**
+         * Kafka 服务器地址
+         */
+        private String bootstrapServers;
+        /**
+         * 用户名
+         */
+        private String username;
+        /**
+         * 密码
+         */
+        private String password;
+        /**
+         * 是否启用 SSL
+         */
+        private Boolean ssl;
+
+        /**
+         * 生产者组 ID
+         */
+        private String groupId;
+        /**
+         * 主题
+         */
+        private String topic;
+
+    }
+
 }

+ 1 - 1
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java

@@ -8,7 +8,7 @@ import lombok.extern.slf4j.Slf4j;
 import java.time.Duration;
 
 /**
- * 带缓存功能的数据桥执行器抽象类
+ * 带缓存功能的数据桥执行器抽象类
  *
  * @author HUIHUI
  */

+ 1 - 1
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java

@@ -12,7 +12,7 @@ import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
 public interface IotDataBridgeExecute {
 
     /**
-     * 执行数据桥操作
+     * 执行数据桥操作
      *
      * @param message    设备消息
      * @param dataBridge 数据桥梁

+ 1 - 1
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java

@@ -32,7 +32,7 @@ public class IotHttpDataBridgeExecute implements IotDataBridgeExecute {
 
     @Override
     public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
-        // 1.1 校验数据桥的类型 == HTTP
+        // 1.1 校验数据桥的类型 == HTTP
         if (!IotDataBridgTypeEnum.HTTP.getType().equals(dataBridge.getType())) {
             return;
         }

+ 124 - 0
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java

@@ -0,0 +1,124 @@
+package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
+
+import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
+import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum;
+import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Component;
+
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Kafka 的 {@link IotDataBridgeExecute} 实现类
+ *
+ * @author HUIHUI
+ */
+@Component
+@Slf4j
+public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute {
+
+    @Override
+    public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
+        // 1.1 校验数据桥梁的类型 == KAFKA
+        if (!IotDataBridgTypeEnum.KAFKA.getType().equals(dataBridge.getType())) {
+            return;
+        }
+        // 1.2 执行 Kafka 发送消息
+        executeKafka(message, (IotDataBridgeDO.KafkaMQConfig) dataBridge.getConfig());
+    }
+
+    private void executeKafka(IotDeviceMessage message, IotDataBridgeDO.KafkaMQConfig config) {
+        try {
+            // 1. 获取或创建 KafkaTemplate
+            KafkaTemplate<String, String> kafkaTemplate = (KafkaTemplate<String, String>) getProducer(config);
+
+            // 2. 发送消息并等待结果
+            kafkaTemplate.send(config.getTopic(), message.toString())
+                    .get(10, TimeUnit.SECONDS); // 添加超时等待
+            log.info("[executeKafka][message({}) 发送成功]", message);
+        } catch (TimeoutException e) {
+            log.error("[executeKafka][message({}) config({}) 发送超时]", message, config, e);
+        } catch (Exception e) {
+            log.error("[executeKafka][message({}) config({}) 发送异常]", message, config, e);
+        }
+    }
+
+    @Override
+    protected Object initProducer(Object config) {
+        IotDataBridgeDO.KafkaMQConfig kafkaConfig = (IotDataBridgeDO.KafkaMQConfig) config;
+
+        // 1.1 构建生产者配置
+        Map<String, Object> props = new HashMap<>();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+        // 1.2 如果配置了认证信息
+        if (kafkaConfig.getUsername() != null && kafkaConfig.getPassword() != null) {
+            props.put("security.protocol", "SASL_PLAINTEXT");
+            props.put("sasl.mechanism", "PLAIN");
+            props.put("sasl.jaas.config",
+                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
+                            + kafkaConfig.getUsername() + "\" password=\"" + kafkaConfig.getPassword() + "\";");
+        }
+
+        // 1.3 如果启用 SSL
+        if (Boolean.TRUE.equals(kafkaConfig.getSsl())) {
+            props.put("security.protocol", "SSL");
+        }
+
+        // 2. 创建 KafkaTemplate
+        DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props);
+        return new KafkaTemplate<>(producerFactory);
+    }
+
+    @Override
+    protected void closeProducer(Object producer) {
+        if (producer instanceof KafkaTemplate) {
+            ((KafkaTemplate<?, ?>) producer).destroy();
+        }
+    }
+
+    // TODO @芋艿:测试代码,后续清理
+    public static void main(String[] args) {
+        // 1. 创建一个共享的实例
+        IotKafkaMQDataBridgeExecute action = new IotKafkaMQDataBridgeExecute();
+
+        // 2. 创建共享的配置
+        IotDataBridgeDO.KafkaMQConfig config = new IotDataBridgeDO.KafkaMQConfig();
+        config.setBootstrapServers("127.0.0.1:9092");
+        config.setTopic("test-topic");
+        config.setSsl(false);
+        config.setUsername(null);
+        config.setPassword(null);
+
+        // 3. 创建共享的消息
+        IotDeviceMessage message = IotDeviceMessage.builder()
+                .requestId("TEST-001")
+                .productKey("testProduct")
+                .deviceName("testDevice")
+                .deviceKey("testDeviceKey")
+                .type("property")
+                .identifier("temperature")
+                .data("{\"value\": 60}")
+                .reportTime(LocalDateTime.now())
+                .tenantId(1L)
+                .build();
+
+        // 4. 执行两次测试,验证缓存
+        log.info("[main][第一次执行,应该会创建新的 producer]");
+        action.executeKafka(message, config);
+
+        log.info("[main][第二次执行,应该会复用缓存的 producer]");
+        action.executeKafka(message, config);
+    }
+
+}

+ 1 - 1
yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java

@@ -24,7 +24,7 @@ public class IotRocketMQDataBridgeExecute extends AbstractCacheableDataBridgeExe
 
     @Override
     public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
-        // 1.1 校验数据桥的类型 == ROCKETMQ
+        // 1.1 校验数据桥的类型 == ROCKETMQ
         if (!IotDataBridgTypeEnum.ROCKETMQ.getType().equals(dataBridge.getType())) {
             return;
         }