스프링 프레임워크/kotlin

kotlin-springboot spring-kafka로 produce, consume 시작하기

blogger903 2024. 8. 12. 01:36
728x90

이번 포스트에서는 kotlin-springboot 프로젝트에서 spring kafka로 produce, consuming

하는 샘플을 구성하고 kafka 사용시 기본으로 따라오는 세트들을 구성합니다

 

다루는 내용

  • spring-kafka consumer, producer 설정
  • kafka listener 
  • consumer group
  • batch listener
  • 수동커밋
  • 중복 컨슘
  • 컨슘 누락
  • EOS (Idempotent)
  • Error Handling
  • DLQ

 

 

 

build.gradle.kts

import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
    id("org.springframework.boot") version "3.0.11"
    id("io.spring.dependency-management") version "1.1.0"
    kotlin("jvm") version "1.7.22"
    kotlin("plugin.spring") version "1.7.22"
    kotlin("plugin.jpa") version "1.7.22"
}

group = "com.example"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_17

repositories {
    mavenCentral()
}

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-data-jpa")
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("org.springframework.boot:spring-boot-starter-batch")
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
    implementation("org.springframework.kafka:spring-kafka")
    implementation("org.springframework.boot:spring-boot-starter-validation")
    implementation("mysql:mysql-connector-java:8.0.28")
    runtimeOnly("com.h2database:h2")

    implementation("org.springdoc:springdoc-openapi-starter-webmvc-ui:2.1.0")

    implementation("org.springframework.kafka:spring-kafka")

    implementation("com.querydsl:querydsl-jpa:5.0.0:jakarta")
    implementation("com.querydsl:querydsl-kotlin:5.0.0")

    annotationProcessor("com.querydsl:querydsl-apt:5.0.0:jakarta")

    testImplementation("org.springframework.kafka:spring-kafka-test")
    testImplementation("org.springframework.boot:spring-boot-starter-test")
    testImplementation("org.springframework.batch:spring-batch-test")
    testImplementation("org.junit.jupiter:junit-jupiter-api")
    testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine")
}

tasks.withType<KotlinCompile> {
    kotlinOptions {
        freeCompilerArgs = listOf("-Xjsr305=strict")
        jvmTarget = "17"
    }
}

tasks.withType<Test> {
    useJUnitPlatform()
}

 

application.yml

spring:
  application:
    name: kotlin-spring-kafka-sample
  kafka:
    bootstrap-servers: localhost:9094
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      auto-offset-reset: latest
      properties:
        spring.json.trusted.packages: "*"
        allow.auto.create.topic: false
    listener:
      concurrency: 1
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: -1

  datasource:
    #    url: jdbc:h2:mem:study  # In-memory H2 database
    url: jdbc:h2:./h2db;AUTO_SERVER=true;mode=mysql  # In-memory H2 database
    username: sa
    password:
    driver-class-name: org.h2.Driver
  h2:
    console:
      enabled: true
  #      path: /h2-console

  jpa:
    hibernate:
      ddl-auto: create
    #    show-sql: true
    properties:
      hibernate:
        format_sql: true
        use_sql_comments: true
        dialect: org.hibernate.dialect.H2Dialect
#        default_batch_fetch_size: 100


logging:
  pattern:
    console: "[%d{HH:mm:ss.SSS}[%-5level][%logger.%method:line%line] - %msg%n"
  level:
    org:
      hibernate.orm.jdbc.bind: trace
      hibernate.SQL: debug

consumer, producer 설정

kotlin에서는 한 파일안에 여러 클래스를 넣을수 있기 때문에 다음과 같이 추가해줍니다

import com.example.kotlinspringkafkasample.model.MyMessage
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.*
import org.springframework.kafka.support.serializer.JsonDeserializer
import org.springframework.kafka.support.serializer.JsonSerializer

@Configuration
class KafkaConsumerConfig(
    val kafkaProperties: KafkaProperties
) {

    @Bean
    fun consumerFactory(): ConsumerFactory<String, Any> {
        val props: Map<String, Any> = mapOf(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.bootstrapServers,
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
            JsonDeserializer.TRUSTED_PACKAGES to "*"
        )
        return DefaultKafkaConsumerFactory(props)
    }

    @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
        return ConcurrentKafkaListenerContainerFactory<String, String>().apply {
            consumerFactory = consumerFactory()
            setConcurrency(3) // 단, 이는 토픽의 파티션 수에 따라 제한됩니다. 파티션 수보다 많은 concurrency는 의미가 없습니다.
            containerProperties.pollTimeout = 3000
        }
    }

    @Bean
    fun batchKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
        return ConcurrentKafkaListenerContainerFactory<String, String>().apply {
            consumerFactory = consumerFactory()
            isBatchListener = true
            setConcurrency(2)
            containerProperties.pollTimeout = 5000
        }
    }
}

@Configuration
class KafkaProducerConfig(
    private val kafkaProperties: KafkaProperties
) {
    @Bean
    fun producerFactory(): ProducerFactory<String, MyMessage> {
        val configProps: Map<String, Any> = mapOf(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.bootstrapServers,
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java
        )
        return DefaultKafkaProducerFactory(configProps)
    }

    @Bean
    fun kafkaTemplate(): KafkaTemplate<String, MyMessage> {
        return KafkaTemplate(producerFactory())
    }
}

 

오류

Could not autowire. No beans of 'KafkaProperties' type found.


패키지가 SpringBootApplication 이 포함된 메인메서드가 있는 클래스에서 ComponentScan되게끔 패키지 위치 시켜야 합니다

listener

consuming할때는 멱등성 보장 필수

너무 많은 사람들이 kafka를 회사에서 쓰기때문에 남들이 만들어놓은것을 따라서 그냥 비동기로 기능을 설계할때가 있습니다
최소한 멱등성 보장되도록 기능을 설계해야 합니다
EOS, Idempotent Kafka Producer 좋죠

그런데 consuming은 항상 멱등성 보장되도록 개발해야합니다

참고: https://www.conduktor.io/kafka/idempotent-kafka-producer/

 

https://www.conduktor.io/kafka/idempotent-kafka-producer/

 

www.conduktor.io

 

import com.example.kotlinspringkafkasample.model.MyMessage
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component

@Component
class MyConsumer {

    @KafkaListener(topics = ["test-topic"], groupId = "test-group", containerFactory = "kafkaListenerContainerFactory")
    fun consume(message: ConsumerRecord<String, MyMessage>) {
        println("Consumed message: ${message.value()}")
    }
}

produce용 controller

import com.example.kotlinspringkafkasample.model.MyMessage
import org.springframework.http.ResponseEntity
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController

@RestController
@RequestMapping("/api/messages")
class MessageController(private val kafkaTemplate: KafkaTemplate<String, MyMessage>) {

    @PostMapping
    fun sendMessage(@RequestBody message: MyMessage): ResponseEntity<String> {
        kafkaTemplate.send("test-topic", message)
        return ResponseEntity.ok("Message sent to Kafka")
    }
}

 

model

data class MyMessage(
    val id: Int = 0,
    val age: Int = 0,
    val name: String = "",
    val content: String = ""
)

consumer group

consumer group은 그룹내 목적과 그룹 밖의 목적이 다릅니다

Consumer Group 내 병렬 처리:
하나의 Consumer Group 내에서 여러 Consumer들이 Topic의 서로 다른 Partition을 병렬로 처리합니다.
이는 처리량을 높이고 부하를 분산시키는 효과가 있습니다.

서로 다른 Consumer Group은 다른 데이터 처리에 활용:
서로 다른 Consumer Group은 일반적으로 같은 Topic에 대해 다른 목적이나 다른 처리 로직을 가질 때 사용합니다.
예를 들어, 하나의 Group은 데이터를 저장하고, 다른 Group은 실시간 분석을 수행하는 식입니다.

그래서 consumer group마다 __current_offset도 다르게 관리됩니다

batch listener

kafka config에서 명시한 batchKafkaListenerContainerFactory를 여기에 명시해줍니다

import com.example.kotlinspringkafkasample.model.MyMessage
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component

@Component
class MyConsumer {
    @KafkaListener(topics = ["test-topic"], groupId = "test-group-batch", containerFactory = "batchKafkaListenerContainerFactory")
    fun consumeBatch(messages: List<ConsumerRecord<String, MyMessage>>) {
        println("Consumed batch of messages: ${messages.size}")
        messages.forEachIndexed { index, record ->
            val message = record.value()
            println("Message ${index + 1}:")
            println("  Topic: ${record.topic()}")
            println("  Partition: ${record.partition()}")
            println("  Offset: ${record.offset()}")
            println("  Key: ${record.key()}")
            println("  Value:")
            println("    ID: ${message.id}")
            println("    Age: ${message.age}")
            println("    Name: ${message.name}")
            println("    Content: ${message.content}")
            println("-----------------------------")
        }
        println("Batch processing completed")
    }
}

 

동작 확인

Consumed batch of messages: 10
Message 1:
  Topic: test-topic
  Partition: 0
  Offset: 13
  Key: null
  Value:
    ID: 30
    Age: 31
    Name: name_1
    Content: content - Iteration 1
-----------------------------
Message 2:
  Topic: test-topic
  Partition: 0
  Offset: 14
  Key: null
  Value:
    ID: 31
    Age: 32
    Name: name_2
    Content: content - Iteration 2
-----------------------------
...
-----------------------------
Message 9:
  Topic: test-topic
  Partition: 0
  Offset: 21
  Key: null
  Value:
    ID: 38
    Age: 39
    Name: name_9
    Content: content - Iteration 9
-----------------------------
Message 10:
  Topic: test-topic
  Partition: 0
  Offset: 22
  Key: null
  Value:
    ID: 39
    Age: 40
    Name: name_10
    Content: content - Iteration 10
-----------------------------

수동커밋

KafkaListener에서 ack해줍니다

import com.example.kotlinspringkafkasample.model.MyMessage
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.support.Acknowledgment
import org.springframework.stereotype.Component

@Component
class MyConsumer {

    @KafkaListener(topics = ["test-topic"], groupId = "test-group-batch", containerFactory = "batchKafkaListenerContainerFactory")
    fun consumeBatch(messages: List<ConsumerRecord<String, MyMessage>>, acknowledgment: Acknowledgment) {
        println("Consumed batch of messages: ${messages.size}")
        messages.forEachIndexed { index, record ->
            val message = record.value()
            println("Message ${index + 1}:")
            println("  Topic: ${record.topic()}")
            println("  Partition: ${record.partition()}")
            println("  Offset: ${record.offset()}")
            println("  Key: ${record.key()}")
            println("  Value:")
            println("    ID: ${message.id}")
            println("    Age: ${message.age}")
            println("    Name: ${message.name}")
            println("    Content: ${message.content}")
            println("-----------------------------")
        }
        println("Batch processing completed")
        acknowledgment.acknowledge() // 수동커밋
    }
}

 

수동 커밋 설정

ConsumerFactory에 ENABLE_AUTO_COMMIT_CONFIG를 false로 해줍니다

ConcurrentKafkaListenerContainerFactory에도 AckMode를 MANUAL로 해줍니다

더 세밀하게 ack를 해줘야하는 경우도 있을수 있습니다 AckMode.MANUAL_IMMEDIATE도 있습니다

@Configuration
class KafkaConsumerConfig(
    val kafkaProperties: KafkaProperties
) {

    @Bean
    fun consumerFactory(): ConsumerFactory<String, Any> {
        val props: Map<String, Any> = mapOf(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.bootstrapServers,
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false, // 수동 커밋
            JsonDeserializer.TRUSTED_PACKAGES to "*"
        )
        return DefaultKafkaConsumerFactory(props)
    }

    @Bean
    fun batchKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
        return ConcurrentKafkaListenerContainerFactory<String, String>().apply {
            consumerFactory = consumerFactory()
            isBatchListener = true
            setConcurrency(2)
            containerProperties.pollTimeout = 5000
            setContainerCustomizer {
                it.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
            }
        }
    }
}

 

참고: https://blog.voidmainvoid.net/262

 

Kafka consumer의 Automatic Commit은 중복이 생길 수 있다

https://books.google.co.kr/books?id=a3wzDwAAQBAJ&pg=PA77&lpg=PA77 Kafka: The Definitive Guide Every enterprise application creates data, whether it’s log messages, metrics, user activity, outgoing messages, or something else. And how to move all of this

blog.voidmainvoid.net

 

기본적으로 자동커밋의 경우에는 KafkaConsumer.poll()이 발생할때 가장 마지막 레코드의 오프셋까지 커밋이 됩니다

auto.commit.interval.ms는 기본 5초이며 5초마다 poll()해서 처리할 레코드를 가져옵니다

중복 컨슘

커밋 시점과 데이터 처리 완료 시점이 완벽하게 일치할수 없습니다

그 사이에 리밸런싱이 일어날 수 있습니다 (파티션과 컨슈머가 관계 재배치)

오프셋 관리가 컨슈머가 아닌 브로커에서 이루어지기 때문에 (__consumer_offsets토픽)

리밸런싱은 언제 생기나요?

- session.timeout.ms와 heartbeat.interval.ms

heartbeat가 한동안 안와서 timeout되면 컨슈머 상태 비정상 판단해서 리밸런싱

- max.poll.interval.ms

poll최대 대기시간이 넘어서도 poll 안하면 컨슈머 상태 비정상으로 판단

컨슘 누락

여기서 말하는 컨슘 누락은 메시지는 있으나, 데이터처리가 정상처리 되지 않았음에도 불구하고

커밋되는 상황입니다

 

정상적인 케이스에서는 누락이 안됩니다

개발자의 로직결함으로 누락은 가능합니다

비동기 처리를 했는데 커밋을 해버린다든지, try catch로 잡고 poll을 하게 한다든지

DLQ설정없이 commit 하여 데이터 처리가 누락될수 있습니다

EOS ( Exactly Once Semantics )

produce는 중복 메시지를 produce할 수 있습니다

EOS는 어떻게 중복 produce를 처리하나요?

 

메시지 header에 produceId + 시퀀스넘버 (유니크키)

acks 옵션을 all로 해야합니다

acks: "-1" : leader+ follower 까지 모두 ack (all)

acls: "0" : 보내고 끝

acks: "1" : leader ack

producer config

@Configuration
class KafkaProducerConfig(
    private val kafkaProperties: KafkaProperties
) {
    @Bean
    fun producerFactory(): ProducerFactory<String, MyMessage> {
        val configProps: Map<String, Any> = mapOf(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.bootstrapServers,
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java,
            ProducerConfig.ACKS_CONFIG to "-1",
            ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to "true",
        )
        return DefaultKafkaProducerFactory(configProps)
    }

    @Bean
    fun kafkaTemplate(): KafkaTemplate<String, MyMessage> {
        return KafkaTemplate(producerFactory())
    }
}

 

처리후 로그

[00:36:37.857[INFO ][org.apache.kafka.clients.producer.KafkaProducer.configureTransactionState:line602] - [Producer clientId=producer-1] Instantiated an idempotent producer.

Error handling

consumer config 설정

 

default errorHandler 추가했고 10번 재시도하고 commit 됐습니다

@Configuration
class KafkaConsumerConfig(
    val kafkaProperties: KafkaProperties
) {

    @Bean
    fun consumerFactory(): ConsumerFactory<String, Any> {
        val props: Map<String, Any> = mapOf(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.bootstrapServers,
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false, // 수동 커밋
            JsonDeserializer.TRUSTED_PACKAGES to "*"
        )
        return DefaultKafkaConsumerFactory(props)
    }

    @Bean
    fun batchKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
        return ConcurrentKafkaListenerContainerFactory<String, String>().apply {
            consumerFactory = consumerFactory()
            isBatchListener = true
            setConcurrency(2)
            containerProperties.pollTimeout = 5000
            setContainerCustomizer {
                it.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
            }
            val errorHandler = DefaultErrorHandler(generateBackOff())
            errorHandler.addNotRetryableExceptions(IllegalArgumentException::class.java) // 재시도하지 않을 예외를 추가합니다.
            setCommonErrorHandler(errorHandler)
        }
    }

    private fun generateBackOff(): BackOff {
        return ExponentialBackOff().apply {
            initialInterval = 1000 // 초기 재시도 간격
            maxInterval = 10000 // 최대 재시도 간격
            multiplier = 2.0 // 재시도 간격 배수
            maxElapsedTime = 60000 // 최대 재시도 시간
        }
    }
}

 

kafka listener에 익셉션 던지게 작성합니다

@Component
class MyConsumer(val retryCount: AtomicInteger = AtomicInteger(0)) {

    @KafkaListener(topics = ["test-topic"], groupId = "test-group", containerFactory = "kafkaListenerContainerFactory")
    fun consume(message: ConsumerRecord<String, MyMessage>) {
        println("Consumed message: ${message.value()}")
    }

    @KafkaListener(topics = ["test-topic"], groupId = "test-group-batch", containerFactory = "batchKafkaListenerContainerFactory")
    fun consumeBatch(messages: List<ConsumerRecord<String, MyMessage>>, acknowledgment: Acknowledgment) {
        println("Consumed batch of messages: ${messages.size}")
        println("(Retry count: ${retryCount.get()})")
        messages.forEachIndexed { index, record ->
            val message = record.value()
            println("Message ${index + 1}:")
            println("  Topic: ${record.topic()}")
            println("  Partition: ${record.partition()}")
            println("  Offset: ${record.offset()}")
            println("  Key: ${record.key()}")
            println("  Value:")
            println("    ID: ${message.id}")
            println("    Age: ${message.age}")
            println("    Name: ${message.name}")
            println("    Content: ${message.content}")
            println("-----------------------------")
        }
        println("Batch processing completed")
        retryCount.incrementAndGet()
        throw RuntimeException("Intentional exception")
    }
}

 

결과 로그

[00:49:26.196[ERROR][org.springframework.kafka.listener.FallbackBatchErrorHandler.error:line261] - Records discarded: test-topic-0@43,test-topic-0@44,test-topic-0@45,test-topic-0@46,test-topic-0@47,test-topic-0@48,test-topic-0@49,test-topic-0@50,test-topic-0@51,test-topic-0@52
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.example.kotlinspringkafkasample.consumer.MyConsumer.consumeBatch(java.util.List<? extends org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, com.example.kotlinspringkafkasample.model.MyMessage>>,org.springframework.kafka.support.Acknowledgment)' threw exception
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2942)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2478)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:2454)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$invokeBatchErrorHandler$46(KafkaMessageListenerContainer.java:2490)
	at org.springframework.kafka.listener.ErrorHandlingUtils.retryBatch(ErrorHandlingUtils.java:170)
	at org.springframework.kafka.listener.FallbackBatchErrorHandler.handleBatch(FallbackBatchErrorHandler.java:150)
	at org.springframework.kafka.listener.FailedBatchProcessor.fallback(FailedBatchProcessor.java:196)
	at org.springframework.kafka.listener.FailedBatchProcessor.handle(FailedBatchProcessor.java:166)
	at org.springframework.kafka.listener.FailedBatchProcessor.doHandle(FailedBatchProcessor.java:157)
	at org.springframework.kafka.listener.DefaultErrorHandler.handleBatch(DefaultErrorHandler.java:182)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2488)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:2297)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:2168)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2147)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1505)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1469)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1344)
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
	at java.base/java.lang.Thread.run(Thread.java:840)
	Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
		at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:391)
		at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180)
		at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172)
		at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61)
		at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2464)
Caused by: java.lang.RuntimeException: Intentional exception
	at com.example.kotlinspringkafkasample.consumer.MyConsumer.consumeBatch(MyConsumer.kt:38)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119)
	at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56)
	at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:375)
	at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180)
	at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172)
	at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2464)
	... 17 common frames omitted

 

커스텀 구현이 필요하면 CommonErrorHandler를 구현합니다

 

DLT (DLQ)

kafka config를 수정해줍니다

DeadLetterPublishingRecoverer를 추가해줍니다

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.*
import org.springframework.kafka.listener.ContainerProperties
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer
import org.springframework.kafka.listener.DefaultErrorHandler
import org.springframework.kafka.support.serializer.JsonDeserializer
import org.springframework.kafka.support.serializer.JsonSerializer
import org.springframework.util.backoff.BackOff
import org.springframework.util.backoff.ExponentialBackOff

@Configuration
class KafkaConsumerConfig(
    val kafkaProperties: KafkaProperties
) {

    @Bean
    fun consumerFactory(): ConsumerFactory<String, Any> {
        val props: Map<String, Any> = mapOf(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.bootstrapServers,
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false, // 수동 커밋
            JsonDeserializer.TRUSTED_PACKAGES to "*"
        )
        return DefaultKafkaConsumerFactory(props)
    }
    
    ...

    @Bean
    fun batchKafkaListenerContainerFactory(
        kafkaTemplate: KafkaTemplate<String, Any>
    ): ConcurrentKafkaListenerContainerFactory<String, String> {
        return ConcurrentKafkaListenerContainerFactory<String, String>().apply {
            consumerFactory = consumerFactory()
            isBatchListener = true
            setConcurrency(2)
            containerProperties.pollTimeout = 5000
            setContainerCustomizer {
                it.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
            }
            val errorHandler = DefaultErrorHandler(DeadLetterPublishingRecoverer(kafkaTemplate))
            setCommonErrorHandler(errorHandler)
        }
    }

    private fun generateBackOff(): BackOff {
        return ExponentialBackOff().apply {
            initialInterval = 1000 // 초기 재시도 간격
            maxInterval = 10000 // 최대 재시도 간격
            multiplier = 2.0 // 재시도 간격 배수
            maxElapsedTime = 60000 // 최대 재시도 시간
        }
    }
}

@Configuration
class KafkaProducerConfig<V>(
    private val kafkaProperties: KafkaProperties
) {
    @Bean
    fun producerFactory(): ProducerFactory<String, V> {
        val configProps: Map<String, Any> = mapOf(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.bootstrapServers,
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java,
            ProducerConfig.ACKS_CONFIG to "-1",
            ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to "true",
        )
        return DefaultKafkaProducerFactory(configProps)
    }

    @Bean
    fun kafkaTemplate(): KafkaTemplate<String, V> {
        return KafkaTemplate(producerFactory())
    }
}

 

토픽명.DLT 토픽이 생성되었고

 

실패한 메시지가 그대로 들어가게됩니다

 

커스텀 구현의 경우는 ConsumerAwareRecordRecoverer 를 구현합니다

 

@SpringBootTest 이슈

AutoConfiguration에 Embeddedkafka가 없기 때문에 빌드나, 테스트가 동작하지 않습니다

 

build.gradle.kts 에 의존성 추가해줍니다

testImplementation("org.springframework.kafka:spring-kafka-test")

 

application.yml 에 다음과 같이 공통설정을 추가해줍니다

kafka:
  bootstrap:
    addresses: ${spring.embedded.kafka.brokers:localhost:9092}