스프링 프레임워크/kafka

spring-kafka DelegatingByTopicDeserializer 역직렬화

blogger903 2024. 6. 26. 08:45
728x90

이번 포스트에서는 topic명에 해당하는 타입으로 역직렬화를 다루겠습니다

 

환경

  • springboot 3.1.10
  • gradle 8.7
  • java 17
  • spring-kafka

spring-kafka를 통해 producer, consumer를 사용하게 되면

header에 typeId가 들어가게되고 타입에 해당하는 패키지정보가 들어가있습니다

kafka client가 java를 쓰지 않을 수도 있습니다 

header에 아무런 값을 설정하지 않고 key, value만 가지고 publish할수 있습니다

header의 typeId 정보가 없는상태에서 deserialize하게 되면 header가 없는 경우에는 다음과 같은 에러가 발생합니다

Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
	at org.springframework.util.Assert.state(Assert.java:76)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:583)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429)

 

consumerFactory를 수정해줍니다

DelegatingByTopicDeserializer를 통해 다음과 같이 VALUE_DESERIALIZER_CLASS_CONFIG 키에 DelegatingByTopicDeserializer를 설정해줍니다

미리 topic에 해당하는 message type을 명시합니다

 

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("spring.kafka.bootstrap-servers"));
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, environment.getProperty("spring.kafka.consumer.key-deserializer"));
        props.put(JsonDeserializer.TRUSTED_PACKAGES, environment.getProperty("spring.kafka.consumer.properties.spring.json.trusted.packages"));
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, environment.getProperty("spring.kafka.consumer.auto-offset-reset"));

        props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
        Map<String, Object> deserializerMap = new HashMap<>();
        deserializerMap.put("topic-1", new JsonDeserializer<>(MyMessage.class));
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DelegatingByTopicDeserializer.class);
        props.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG, deserializerMap);

        return new DefaultKafkaConsumerFactory<>(props);
    }

 

참고자료:

https://docs.spring.io/spring-kafka/reference/kafka/serdes.html#by-topic