728x90
이번 포스트에서는 topic명에 해당하는 타입으로 역직렬화를 다루겠습니다
환경
- springboot 3.1.10
- gradle 8.7
- java 17
- spring-kafka
spring-kafka를 통해 producer, consumer를 사용하게 되면
header에 typeId가 들어가게되고 타입에 해당하는 패키지정보가 들어가있습니다
kafka client가 java를 쓰지 않을 수도 있습니다
header에 아무런 값을 설정하지 않고 key, value만 가지고 publish할수 있습니다
header의 typeId 정보가 없는상태에서 deserialize하게 되면 header가 없는 경우에는 다음과 같은 에러가 발생합니다
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
at org.springframework.util.Assert.state(Assert.java:76)
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:583)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429)
consumerFactory를 수정해줍니다
DelegatingByTopicDeserializer를 통해 다음과 같이 VALUE_DESERIALIZER_CLASS_CONFIG 키에 DelegatingByTopicDeserializer를 설정해줍니다
미리 topic에 해당하는 message type을 명시합니다
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("spring.kafka.bootstrap-servers"));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, environment.getProperty("spring.kafka.consumer.key-deserializer"));
props.put(JsonDeserializer.TRUSTED_PACKAGES, environment.getProperty("spring.kafka.consumer.properties.spring.json.trusted.packages"));
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, environment.getProperty("spring.kafka.consumer.auto-offset-reset"));
props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
Map<String, Object> deserializerMap = new HashMap<>();
deserializerMap.put("topic-1", new JsonDeserializer<>(MyMessage.class));
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DelegatingByTopicDeserializer.class);
props.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG, deserializerMap);
return new DefaultKafkaConsumerFactory<>(props);
}
참고자료:
https://docs.spring.io/spring-kafka/reference/kafka/serdes.html#by-topic
'스프링 프레임워크 > kafka' 카테고리의 다른 글
Chaining Kafka and Database Transactions with SpringBoot (0) | 2024.08.26 |
---|---|
springboot spring-kafka로 consumer 설정 (0) | 2024.06.25 |