스프링 프레임워크/kafka

springboot spring-kafka로 consumer 설정

blogger903 2024. 6. 25. 00:18
728x90

이번 포스트에서는 springboot에서 kafka를 연동해서 consuming을 해보겠습니다

 

환경

  • springboot 3.1.10
  • gradle 8.7
  • java 17
  • spring-kafka

kafka configuration을 설정하지 않아도 autoconfigure를 통해 application.yml으로만 설정이 가능합니다

kafka configuration설정을 통해 커스텀하게 설정할 수 있습니다

 

ConsumerFactory, ConcurrentKafkaListenerContainerFactory 두 가지를 설정하게 되면 컨슈밍할 준비가 되었고

KafkaListener로 consuming할수 있습니다

kafka configuration

ConsumerFactory

kafkaConsumer 인스턴스를 생성하는 팩토리 인터페이스입니다

 

일반적으로 `DefaultKafkaConsumerFactory` 구현체를 사용합니다

브로커 설정, 오프셋관리, key,value 직렬화, 역직렬화 설정을 담당합니다

주로  Kafka 컨슈머 기본 속성을 설정하는데 사용됩니다

 

ConcurrentKafkaListenerContainerFactory

@kafkaListener 애노테이션으로 정의된 Kafka 리스너를 관리하는 컨테이너 팩토리입니다

 

ConsumerFactory를 사용해서 컨슈머를 생성합니다

리스너의 동작방식에 관련된 설정을 담당합니다

동시성수준, 폴링 타임아웃등을 설정합니다

 

kafkaConfiguration 클래스

@Configuration
public class KafkaConfig {
    
    private final Environment environment;

    public KafkaConfig(Environment environment) {
        this.environment = environment;
    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("spring.kafka.bootstrap-servers"));
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, environment.getProperty("spring.kafka.consumer.key-deserializer"));
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, environment.getProperty("spring.kafka.consumer.value-deserializer"));
        props.put(JsonDeserializer.TRUSTED_PACKAGES, environment.getProperty("spring.kafka.consumer.properties.spring.json.trusted.packages"));
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, environment.getProperty("spring.kafka.consumer.auto-offset-reset"));
        props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
        ConsumerFactory<String, Object> consumerFactory
    ) {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(1);

        return factory;
    }
}

 

application.yml

spring:
  kafka:
    bootstrap-servers: localhost:9092, localhost:9093, localhost:9094
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      auto-offset-reset: latest
      properties:
        spring.json.trusted.packages: "*"
        allow.auto.create.topics: false

 

 

KafkaListener

@KafkaListener 애노테이션은 Kafka 메시지를 수신하는 리스너 메서드를 정의할 때 사용됩니다.

 

ConcurrentKafkaListenerContainerFactory를 통해 생성된 리스너 컨테이너가 이 메서드를 호출합니다.

@KafkaListener 애노테이션을 사용할 때 특정 컨테이너 팩토리를 지정할 수 있습니다. 이를 통해 리스너 메서드에 대해 특정 설정을 적용할 수 있습니다. @KafkaListener 애노테이션의 containerFactory 속성을 사용하여 이를 설정할 수 있습니다.

@Component
public class KafkaListener {

    @KafkaListener(
            topics = { "topic-1" },
            groupId = "topic-1-group-1"
    )    public void accept(ConsumerRecord<String,MyMessage> message) {
        System.out.println("Received message: " + message.value());
    }
}

 

참고자료:

https://docs.spring.io/spring-kafka/reference/kafka.html