代码优化

master
lulicheng 10 months ago
parent 17efe4dc57
commit 0ce54612bc

@ -0,0 +1,55 @@
package com.ipsplm.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
@Configuration
public class KafkaConfig {
/**
* kafka
* Primary
*
* @return kafka
*/
@Primary
@ConfigurationProperties(prefix = "spring.kafka")
@Bean
public KafkaProperties kafkaProperties() {
return new KafkaProperties();
}
/**
* kafka
*
* @param firstKafkaProperties kafka
* @return kafka
*/
@Bean("kafkaListenerContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
firstKafkaListenerContainerFactory(@Autowired @Qualifier("kafkaProperties") KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(firstConsumerFactory(kafkaProperties));
return factory;
}
/**
* kafka
*
* @param firstKafkaProperties kafka
* @return kafka
*/
private ConsumerFactory<? super Integer, ? super String> firstConsumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
}
}

@ -1,76 +0,0 @@
package com.ipsplm.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
@Configuration
public class SecondKafkaConfig {
/**
* kafka
*
* @return kafka
*/
@ConfigurationProperties(prefix = "spring.kafka.second")
@Bean
public KafkaProperties secondKafkaProperties() {
return new KafkaProperties();
}
// /**
// * 构建第二个kafka的生产者发送template
// *
// * @param secondKafkaProperties 第二个kafka配置
// * @return 第二个kafka的生产者发送template
// */
// @Bean
// public KafkaTemplate<String, String> secondKafkaTemplate(
// @Autowired @Qualifier("secondKafkaProperties") KafkaProperties secondKafkaProperties) {
// return new KafkaTemplate<>(secondProducerFactory(secondKafkaProperties));
// }
/**
* kafka
*
* @param secondKafkaProperties kafka
* @return kafka
*/
@Bean("secondKafkaListenerContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
secondKafkaListenerContainerFactory(@Autowired @Qualifier("secondKafkaProperties") KafkaProperties secondKafkaProperties) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(secondConsumerFactory(secondKafkaProperties));
return factory;
}
/**
* kafka
*
* @param secondKafkaProperties kafka
* @return kafka
*/
private ConsumerFactory<? super Integer, ? super String> secondConsumerFactory(KafkaProperties secondKafkaProperties) {
return new DefaultKafkaConsumerFactory<>(secondKafkaProperties.buildConsumerProperties());
}
// /**
// * 新建第二个kafka的生产者工厂
// *
// * @param secondKafkaProperties 第二个kafka配置
// * @return 第二个kafka的生产者工厂
// */
// private DefaultKafkaProducerFactory<String, String> secondProducerFactory(KafkaProperties secondKafkaProperties) {
// return new DefaultKafkaProducerFactory<>(secondKafkaProperties.buildProducerProperties());
// }
}

@ -45,7 +45,7 @@ public class KafkaConsumerListener {
System.setProperty("groupId", groupId);
}
@KafkaListener(topics = {"topic_kfk_iotproduction_xazz_1129"}, groupId = "${groupId}",containerFactory = "secondKafkaListenerContainerFactory")
@KafkaListener(topics = {"topic_kfk_iotproduction_xazz_1129"}, groupId = "${groupId}",containerFactory = "kafkaListenerContainerFactory")
public void handleMessage2(ConsumerRecord<String, String> record) {
// 当前时间
Date currentTime = new Date();

@ -26,7 +26,7 @@ spring:
redis:
# 地址
host: 127.0.0.1
host: 10.31.12.195
# 端口默认为6379
port: 6379
# 数据库索引
@ -47,8 +47,7 @@ spring:
max-wait: -1ms
kafka:
second:
bootstrap-servers: 10.31.11.64:9092,10.31.11.91:9092,10.31.11.93:9092
bootstrap-servers: 10.79.10.62:9092
consumer:
auto-offset-reset: latest
enable-auto-commit: false

Loading…
Cancel
Save