From 0ce54612bcd59e079ee0996d6d642becd2743f06 Mon Sep 17 00:00:00 2001 From: lulicheng Date: Wed, 12 Jun 2024 13:33:19 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/ipsplm/config/KafkaConfig.java | 55 ++++++++++++++ .../com/ipsplm/config/SecondKafkaConfig.java | 76 ------------------- .../ipsplm/kafka/KafkaConsumerListener.java | 2 +- src/main/resources/application.yml | 5 +- 4 files changed, 58 insertions(+), 80 deletions(-) create mode 100644 src/main/java/com/ipsplm/config/KafkaConfig.java delete mode 100644 src/main/java/com/ipsplm/config/SecondKafkaConfig.java diff --git a/src/main/java/com/ipsplm/config/KafkaConfig.java b/src/main/java/com/ipsplm/config/KafkaConfig.java new file mode 100644 index 0000000..661da97 --- /dev/null +++ b/src/main/java/com/ipsplm/config/KafkaConfig.java @@ -0,0 +1,55 @@ +package com.ipsplm.config; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; + +@Configuration +public class KafkaConfig { + /** + * 读取kafka配置 + * Primary注解表示默认以这个为准 + * + * @return 第一个kafka配置 + */ + @Primary + @ConfigurationProperties(prefix = "spring.kafka") + @Bean + public KafkaProperties kafkaProperties() { + return new KafkaProperties(); + } + + /** + * 构建第一个kafka的消费者监听容器工厂 + * + * @param firstKafkaProperties 第一个kafka配置 + * @return 第一个kafka的消费者监听容器工厂 + */ + @Bean("kafkaListenerContainerFactory") + public KafkaListenerContainerFactory> + firstKafkaListenerContainerFactory(@Autowired @Qualifier("kafkaProperties") KafkaProperties kafkaProperties) { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(firstConsumerFactory(kafkaProperties)); + return factory; + } + + /** + * 新建第一个kafka的消费者工厂 + * + * @param firstKafkaProperties 第一个kafka配置 + * @return 第一个kafka的消费者工厂 + */ + private ConsumerFactory firstConsumerFactory(KafkaProperties kafkaProperties) { + return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()); + } +} diff --git a/src/main/java/com/ipsplm/config/SecondKafkaConfig.java b/src/main/java/com/ipsplm/config/SecondKafkaConfig.java deleted file mode 100644 index f407a9e..0000000 --- a/src/main/java/com/ipsplm/config/SecondKafkaConfig.java +++ /dev/null @@ -1,76 +0,0 @@ -package com.ipsplm.config; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.config.KafkaListenerContainerFactory; -import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; - -@Configuration -public class SecondKafkaConfig { - /** - * 读取第二个kafka配置 - * - * @return 第二个kafka配置 - */ - @ConfigurationProperties(prefix = "spring.kafka.second") - @Bean - public KafkaProperties secondKafkaProperties() { - return new KafkaProperties(); - } - -// /** -// * 构建第二个kafka的生产者发送template -// * -// * @param secondKafkaProperties 第二个kafka配置 -// * @return 第二个kafka的生产者发送template -// */ -// @Bean -// public KafkaTemplate secondKafkaTemplate( -// @Autowired @Qualifier("secondKafkaProperties") KafkaProperties secondKafkaProperties) { -// return new KafkaTemplate<>(secondProducerFactory(secondKafkaProperties)); -// } - - /** - * 构建第二个kafka的消费者监听容器工厂 - * - * @param secondKafkaProperties 第二个kafka配置 - * @return 第二个kafka的消费者监听容器工厂 - */ - @Bean("secondKafkaListenerContainerFactory") - public KafkaListenerContainerFactory> - secondKafkaListenerContainerFactory(@Autowired @Qualifier("secondKafkaProperties") KafkaProperties secondKafkaProperties) { - ConcurrentKafkaListenerContainerFactory factory = - new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(secondConsumerFactory(secondKafkaProperties)); - return factory; - } - - /** - * 新建第二个kafka的消费者工厂 - * - * @param secondKafkaProperties 第二个kafka配置 - * @return 第二个kafka的消费者工厂 - */ - private ConsumerFactory secondConsumerFactory(KafkaProperties secondKafkaProperties) { - return new DefaultKafkaConsumerFactory<>(secondKafkaProperties.buildConsumerProperties()); - } - -// /** -// * 新建第二个kafka的生产者工厂 -// * -// * @param secondKafkaProperties 第二个kafka配置 -// * @return 第二个kafka的生产者工厂 -// */ -// private DefaultKafkaProducerFactory secondProducerFactory(KafkaProperties secondKafkaProperties) { -// return new DefaultKafkaProducerFactory<>(secondKafkaProperties.buildProducerProperties()); -// } -} diff --git a/src/main/java/com/ipsplm/kafka/KafkaConsumerListener.java b/src/main/java/com/ipsplm/kafka/KafkaConsumerListener.java index 1749438..a2ce600 100644 --- a/src/main/java/com/ipsplm/kafka/KafkaConsumerListener.java +++ b/src/main/java/com/ipsplm/kafka/KafkaConsumerListener.java @@ -45,7 +45,7 @@ public class KafkaConsumerListener { System.setProperty("groupId", groupId); } - @KafkaListener(topics = {"topic_kfk_iotproduction_xazz_1129"}, groupId = "${groupId}",containerFactory = "secondKafkaListenerContainerFactory") + @KafkaListener(topics = {"topic_kfk_iotproduction_xazz_1129"}, groupId = "${groupId}",containerFactory = "kafkaListenerContainerFactory") public void handleMessage2(ConsumerRecord record) { // 当前时间 Date currentTime = new Date(); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 228ef88..2eab58e 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -26,7 +26,7 @@ spring: redis: # 地址 - host: 127.0.0.1 + host: 10.31.12.195 # 端口,默认为6379 port: 6379 # 数据库索引 @@ -47,8 +47,7 @@ spring: max-wait: -1ms kafka: - second: - bootstrap-servers: 10.31.11.64:9092,10.31.11.91:9092,10.31.11.93:9092 + bootstrap-servers: 10.79.10.62:9092 consumer: auto-offset-reset: latest enable-auto-commit: false