|
@@ -10,6 +10,7 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
|
|
import org.springframework.kafka.core.KafkaTemplate;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
+import java.time.Duration;
|
|
|
import java.time.LocalDateTime;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
@@ -25,13 +26,15 @@ import java.util.concurrent.TimeoutException;
|
|
|
@Slf4j
|
|
|
public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute {
|
|
|
|
|
|
+ private static final Duration SEND_TIMEOUT = Duration.ofMillis(10);
|
|
|
+
|
|
|
@Override
|
|
|
public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) {
|
|
|
- // 1.1 校验数据桥梁的类型 == KAFKA
|
|
|
+ // 1. 校验数据桥梁的类型 == KAFKA
|
|
|
if (!IotDataBridgTypeEnum.KAFKA.getType().equals(dataBridge.getType())) {
|
|
|
return;
|
|
|
}
|
|
|
- // 1.2 执行 Kafka 发送消息
|
|
|
+ // 2. 执行 Kafka 发送消息
|
|
|
executeKafka(message, (IotDataBridgeDO.KafkaMQConfig) dataBridge.getConfig());
|
|
|
}
|
|
|
|
|
@@ -43,7 +46,7 @@ public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExec
|
|
|
|
|
|
// 2. 发送消息并等待结果
|
|
|
kafkaTemplate.send(config.getTopic(), message.toString())
|
|
|
- .get(10, TimeUnit.SECONDS); // 添加超时等待
|
|
|
+ .get(SEND_TIMEOUT.getSeconds(), TimeUnit.SECONDS); // 添加超时等待
|
|
|
log.info("[executeKafka][message({}) 发送成功]", message);
|
|
|
} catch (TimeoutException e) {
|
|
|
log.error("[executeKafka][message({}) config({}) 发送超时]", message, config, e);
|
|
@@ -55,13 +58,12 @@ public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExec
|
|
|
@Override
|
|
|
protected Object initProducer(Object config) {
|
|
|
IotDataBridgeDO.KafkaMQConfig kafkaConfig = (IotDataBridgeDO.KafkaMQConfig) config;
|
|
|
-
|
|
|
+
|
|
|
// 1.1 构建生产者配置
|
|
|
Map<String, Object> props = new HashMap<>();
|
|
|
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());
|
|
|
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
|
|
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
|
|
-
|
|
|
// 1.2 如果配置了认证信息
|
|
|
if (kafkaConfig.getUsername() != null && kafkaConfig.getPassword() != null) {
|
|
|
props.put("security.protocol", "SASL_PLAINTEXT");
|
|
@@ -70,7 +72,6 @@ public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExec
|
|
|
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
|
|
|
+ kafkaConfig.getUsername() + "\" password=\"" + kafkaConfig.getPassword() + "\";");
|
|
|
}
|
|
|
-
|
|
|
// 1.3 如果启用 SSL
|
|
|
if (Boolean.TRUE.equals(kafkaConfig.getSsl())) {
|
|
|
props.put("security.protocol", "SSL");
|