스프링 프레임워크/kafka

Chaining Kafka and Database Transactions with SpringBoot

blogger903 2024. 8. 26. 09:26
728x90

Chaining Kafka and Database Transactions with Spring Boot, Minikube, Debezium connector for MySQL

 

이번 포스트에서는 kafka produce와 database transaction을 원자적으로 처리하는 것을 kotlin + springboot + mysql + kafka 조합으로 구성해봤습니다.

kafka message produce와 JPA 영속화 코드를 트랜잭션으로 묶는게 목표였으나, 리서치해보고 직접 여러 가지 방법을 POC해봤을때, 서로 다른 솔루션을 같은 트랜잭션으로 묶는건 그렇게 간단하지도, 운영에서 사용하는게 맞을지도 미지수이기 때문에 운영에 사용할 방법은 아니라는 결정을 내렸습니다.

대안으로는 Kafka Connect의 Source Connector로 구성하는 것이었고, Minikube로 Kafka cluster, Kafka Connect, Mysql을 배포한 상태에서 JPA repository로 save하여 테이블의 레코드의 변경을 감지하여 kafka topic으로 produce해보도록 하겠습니다

 

다루는 내용

  • k8s에 Kafka Source Connector 배포
  • Repository.save 시 kafka topic으로 produce된 message 확인

 

주요 환경

  • springboot 3.0.11
  • java 17
  • kotlin
  • spring-kafka
  • spring-data-jpa

k8s cluster에 Kafka Source Connector 배포

k8s cluster에 kafka source connector를 배포하려면 순차적으로 여러 작업이 필요합니다

  • minikube 설치
  • kafka cluster 생성
  • kafka connect 배포
  • kafka source connector 배포

minikube 설치

brew install minikube

 

minikube start

minikube start

 

 

kafka cluster 생성

strimzi operator로 kafka cluster를 생성하겠습니다
참고: https://strimzi.io/docs/operators/latest/deploying

 

Deploying and Upgrading (0.42.0)

After you have deployed Strimzi, you can set up client access to your Kafka cluster. To verify the deployment, you can deploy example producer and consumer clients. Otherwise, create listeners that provide client access within or outside the Kubernetes clu

strimzi.io


helm 설치

brew install helm

 

strimzi kafka operator 설치

helm repo add strimzi https://strimzi.io/charts/
helm install my-strimzi-release strimzi/strimzi-kafka-operator

 

kafka cluster 배포

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: controller
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 3
  roles:
    - controller
  storage:
    type: jbod
    volumes:
      - id: 0
        type: ephemeral
        kraftMetadata: shared
---

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: broker
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 3
  roles:
    - broker
  storage:
    type: jbod
    volumes:
      - id: 0
        type: ephemeral
        kraftMetadata: shared
---

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  annotations:
    strimzi.io/node-pools: enabled
    strimzi.io/kraft: enabled
spec:
  kafka:
    version: 3.7.1
    metadataVersion: 3.7-IV4
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
      - name: external
        port: 9094
        type: nodeport
        tls: false
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
  entityOperator:
    topicOperator: {}
    userOperator: {}

참고: https://github.com/strimzi/strimzi-kafka-operator/tree/main/examples/kafka/kraft

 

strimzi-kafka-operator/examples/kafka/kraft at main · strimzi/strimzi-kafka-operator

Apache Kafka® running on Kubernetes. Contribute to strimzi/strimzi-kafka-operator development by creating an account on GitHub.

github.com

 

kafka connect 배포

Debezium MySQL을 사용하고 싶기 때문에

kafka connect 배포 debezium plugin을 추가해서 커스텀 이미지를 생성해서 kafka connect를 배포해야합니다

 

참고: https://debezium.io/documentation/reference/stable/architecture.html

 

Debezium Architecture :: Debezium Documentation

Yet an alternative way for using the Debezium connectors is the Debezium engine. In this case, Debezium will not be run via Kafka Connect, but as a library embedded into your custom Java applications. This can be useful for either consuming change events w

debezium.io

 

kafka connect 배포

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: debezium-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.7.1
  replicas: 1
  bootstrapServers: my-cluster-kafka-bootstrap:9092
  config:
    config.providers: secrets
    config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    # -1 means it will use the default replication factor configured in the broker
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
  build:
    output:
      type: docker
      image: 10.101.244.23/debezium-connect-mysql:latest
    plugins:
      - name: debezium-mysql-connector
        artifacts:
          - type: tgz
            url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.5.0.Final/debezium-connector-mysql-2.5.0.Final-plugin.tar.gz

 

아래 두 가지 필드는 잘 매핑해줍니다 

bootstrapServers: my-cluster-kafka-bootstrap:9092
spec.build.output.image: /debezium-connect-mysql:latest

 

위에서 생성한 kafka cluster의 svc:포트를 명시해주시고

kafka connect에 debezium plugin을 추가한 커스텀 이지를 생성후 저장할 registry가 필요합니다

 

minikube에서 registry addon을 통해 registry를 생성합니다
참고: https://minikube.sigs.k8s.io/docs/handbook/registry/

 

Registries

How to interact with registries

minikube.sigs.k8s.io

 

MySqlConnector 배포

배포

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: mysql-connector
  labels:
    strimzi.io/cluster: debezium-connect-cluster
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  tasksMax: 1
  config:
    database.hostname: mysql.default.svc.cluster.local
    database.port: 3306
    database.user: root
    database.password: rootpassword
    database.server.id: 184054
    database.server.name: mysql
    database.include.list: mydb
    table.include.list: mydb.message_event
    topic.prefix: mysql
    schema.history.internal.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
    schema.history.internal.kafka.topic: schema-changes.mydb
    include.schema.changes: true
    snapshot.mode: initial

 

배포 troubleshooting

 

recover it by reconfiguring the connector to SCHEMA_ONLY_RECOVERY

io.debezium.DebeziumException: The db history topic is missing. You may attempt to recover it by reconfiguring the connector to SCHEMA_ONLY_RECOVERY at io.debezium.connector.mysql.MySqlConnectorTask.validateAndLoadSchemaHistory(MySqlConnectorTask.java:332)

 

  • database 접속 정보
  • schema.history.internal.kafka.bootstrap.servers 브로커 접근정보
  • database.server.id는 유니크하게 
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: mysql-connector
  labels:
    strimzi.io/cluster: debezium-connect-cluster
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  tasksMax: 1
  config:
    database.hostname: mysql.default.svc.cluster.local
    database.port: 3306
    database.user: root
    database.password: rootpassword
    database.server.id: 184055
    database.server.name: mysql
    database.include.list: mydb
    table.include.list: mydb.message_event
    topic.prefix: mysql
    schema.history.internal.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
    schema.history.internal.kafka.topic: schema-changes.mysql
    snapshot.mode: schema_only_recovery

 

참고:

 

Deploying Debezium on Kubernetes :: Debezium Documentation

To deploy a Debezium connector, you need to deploy a Kafka Connect cluster with the required connector plug-in(s), before instantiating the actual connector itself. As the first step, a container image for Kafka Connect with the plug-in has to be created.

debezium.io

 

Debezium connector for MySQL :: Debezium Documentation

Mandatory field that describes the source metadata for the event. In a delete event value, the source field structure is the same as for create and update events for the same table. Many source field values are also the same. In a delete event value, the t

debezium.io

 

정상동작 확인

 

 

 

Repository.save 시 kafka topic 생성확인

 

build.gradle.kts

import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
    id("org.springframework.boot") version "3.0.11"
    id("io.spring.dependency-management") version "1.1.0"
    kotlin("jvm") version "1.7.22"
    kotlin("plugin.spring") version "1.7.22"
    kotlin("plugin.jpa") version "1.7.22"
}

group = "com.example"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_17

repositories {
    mavenCentral()
}

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-data-jpa")
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("org.springframework.boot:spring-boot-starter-batch")
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
    implementation("org.springframework.kafka:spring-kafka")
    implementation("org.springframework.boot:spring-boot-starter-validation")
    implementation("mysql:mysql-connector-java:8.0.33")
    runtimeOnly("com.h2database:h2")

    implementation("org.springdoc:springdoc-openapi-starter-webmvc-ui:2.1.0")

    implementation("org.springframework.kafka:spring-kafka")

    implementation("com.querydsl:querydsl-jpa:5.0.0:jakarta")
    implementation("com.querydsl:querydsl-kotlin:5.0.0")

    annotationProcessor("com.querydsl:querydsl-apt:5.0.0:jakarta")

    testImplementation("org.springframework.kafka:spring-kafka-test")
    testImplementation("org.springframework.boot:spring-boot-starter-test")
    testImplementation("org.springframework.batch:spring-batch-test")
    testImplementation("org.junit.jupiter:junit-jupiter-api")
    testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine")
}

tasks.withType<KotlinCompile> {
    kotlinOptions {
        freeCompilerArgs = listOf("-Xjsr305=strict")
        jvmTarget = "17"
    }
}

tasks.withType<Test> {
    useJUnitPlatform()
}

 

 

애플리케이션 서비스 로직

import com.example.kotlinspringkafkasample.entity.Message
import com.example.kotlinspringkafkasample.entity.MessageEvent
import com.example.kotlinspringkafkasample.model.MyMessage
import com.example.kotlinspringkafkasample.repository.MessageEventRepository
import com.example.kotlinspringkafkasample.repository.MessageRepository
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional

@Service
class MessageService(
    val messageRepository: MessageRepository,
    val messageEventRepository: MessageEventRepository
) {

    @Transactional
    fun processMessage(message: MyMessage) {

        // 데이터베이스에 메시지 저장
        val savedMessage = messageRepository.save(Message(content = message.content))
        println("Saved message to database: ${savedMessage.id}")

        val savedEvent = messageEventRepository.save(MessageEvent())
        println("Saved message to database: ${savedEvent.id}")

//        // 예외 상황을 시뮬레이션하려면 아래 줄의 주석을 해제하세요
//         throw RuntimeException("Simulated error")
    }
}

 

트랜잭션 로그

[16:01:46.361[DEBUG][org.springframework.orm.jpa.JpaTransactionManager.doGetTransaction:line374] - Found thread-bound EntityManager [SessionImpl(330174493<open>)] for JPA transaction
[16:01:46.362[DEBUG][org.springframework.orm.jpa.JpaTransactionManager.getTransaction:line370] - Creating new transaction with name [com.example.kotlinspringkafkasample.service.MessageService.processMessage]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
[16:01:46.366[DEBUG][org.springframework.orm.jpa.JpaTransactionManager.doBegin:line439] - Exposing JPA transaction as JDBC [org.springframework.orm.jpa.vendor.HibernateJpaDialect$HibernateConnectionHandle@3edf7a08]
[16:01:46.367[DEBUG][org.springframework.orm.jpa.JpaTransactionManager.doGetTransaction:line374] - Found thread-bound EntityManager [SessionImpl(330174493<open>)] for JPA transaction
[16:01:46.367[DEBUG][org.springframework.orm.jpa.JpaTransactionManager.handleExistingTransaction:line470] - Participating in existing transaction
[16:01:46.375[DEBUG][org.hibernate.SQL.logStatement:line128] - 
    /* insert com.example.kotlinspringkafkasample.entity.Message
        */ insert 
    into
        message (content) 
    values
        (?)
Hibernate: 
    /* insert com.example.kotlinspringkafkasample.entity.Message
        */ insert 
    into
        message (content) 
    values
        (?)
[16:01:46.377[TRACE][org.hibernate.orm.jdbc.bind.logBinding:line28] - binding parameter [1] as [VARCHAR] - [content]
Saved message to database: 2
[16:01:46.382[DEBUG][org.springframework.orm.jpa.JpaTransactionManager.doGetTransaction:line374] - Found thread-bound EntityManager [SessionImpl(330174493<open>)] for JPA transaction
[16:01:46.382[DEBUG][org.springframework.orm.jpa.JpaTransactionManager.handleExistingTransaction:line470] - Participating in existing transaction
[16:01:46.384[DEBUG][org.hibernate.SQL.logStatement:line128] - 
    /* insert com.example.kotlinspringkafkasample.entity.MessageEvent
        */ insert 
    into
        message_event 
    values
        ( )
Hibernate: 
    /* insert com.example.kotlinspringkafkasample.entity.MessageEvent
        */ insert 
    into
        message_event 
    values
        ( )
Saved message to database: 2
[16:01:46.386[DEBUG][org.springframework.orm.jpa.JpaTransactionManager.processCommit:line740] - Initiating transaction commit
[16:01:46.386[DEBUG][org.springframework.orm.jpa.JpaTransactionManager.doCommit:line556] - Committing JPA transaction on EntityManager [SessionImpl(330174493<open>)]
[16:01:46.391[DEBUG][org.springframework.orm.jpa.JpaTransactionManager.doCleanupAfterCompletion:line652] - Not closing pre-bound JPA EntityManager after transaction