이번 포스트에서는 springboot에서 kafka를 연동해서 consuming을 해보겠습니다
환경
- springboot 3.1.10
- gradle 8.7
- java 17
- spring-kafka
kafka configuration을 설정하지 않아도 autoconfigure를 통해 application.yml으로만 설정이 가능합니다
kafka configuration설정을 통해 커스텀하게 설정할 수 있습니다
ConsumerFactory, ConcurrentKafkaListenerContainerFactory 두 가지를 설정하게 되면 컨슈밍할 준비가 되었고
KafkaListener로 consuming할수 있습니다
kafka configuration
ConsumerFactory
kafkaConsumer 인스턴스를 생성하는 팩토리 인터페이스입니다
일반적으로 `DefaultKafkaConsumerFactory` 구현체를 사용합니다
브로커 설정, 오프셋관리, key,value 직렬화, 역직렬화 설정을 담당합니다
주로 Kafka 컨슈머 기본 속성을 설정하는데 사용됩니다
ConcurrentKafkaListenerContainerFactory
@kafkaListener 애노테이션으로 정의된 Kafka 리스너를 관리하는 컨테이너 팩토리입니다
ConsumerFactory를 사용해서 컨슈머를 생성합니다
리스너의 동작방식에 관련된 설정을 담당합니다
동시성수준, 폴링 타임아웃등을 설정합니다
kafkaConfiguration 클래스
@Configuration
public class KafkaConfig {
private final Environment environment;
public KafkaConfig(Environment environment) {
this.environment = environment;
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("spring.kafka.bootstrap-servers"));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, environment.getProperty("spring.kafka.consumer.key-deserializer"));
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, environment.getProperty("spring.kafka.consumer.value-deserializer"));
props.put(JsonDeserializer.TRUSTED_PACKAGES, environment.getProperty("spring.kafka.consumer.properties.spring.json.trusted.packages"));
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, environment.getProperty("spring.kafka.consumer.auto-offset-reset"));
props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
ConsumerFactory<String, Object> consumerFactory
) {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(1);
return factory;
}
}
application.yml
spring:
kafka:
bootstrap-servers: localhost:9092, localhost:9093, localhost:9094
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: latest
properties:
spring.json.trusted.packages: "*"
allow.auto.create.topics: false
KafkaListener
@KafkaListener 애노테이션은 Kafka 메시지를 수신하는 리스너 메서드를 정의할 때 사용됩니다.
ConcurrentKafkaListenerContainerFactory를 통해 생성된 리스너 컨테이너가 이 메서드를 호출합니다.
@KafkaListener 애노테이션을 사용할 때 특정 컨테이너 팩토리를 지정할 수 있습니다. 이를 통해 리스너 메서드에 대해 특정 설정을 적용할 수 있습니다. @KafkaListener 애노테이션의 containerFactory 속성을 사용하여 이를 설정할 수 있습니다.
@Component
public class KafkaListener {
@KafkaListener(
topics = { "topic-1" },
groupId = "topic-1-group-1"
) public void accept(ConsumerRecord<String,MyMessage> message) {
System.out.println("Received message: " + message.value());
}
}
참고자료:
'스프링 프레임워크 > kafka' 카테고리의 다른 글
Chaining Kafka and Database Transactions with SpringBoot (0) | 2024.08.26 |
---|---|
spring-kafka DelegatingByTopicDeserializer 역직렬화 (0) | 2024.06.26 |