Chaining Kafka and Database Transactions with Spring Boot, Minikube, Debezium connector for MySQL
이번 포스트에서는 kafka produce와 database transaction을 원자적으로 처리하는 것을 kotlin + springboot + mysql + kafka 조합으로 구성해봤습니다.
kafka message produce와 JPA 영속화 코드를 트랜잭션으로 묶는게 목표였으나, 리서치해보고 직접 여러 가지 방법을 POC해봤을때, 서로 다른 솔루션을 같은 트랜잭션으로 묶는건 그렇게 간단하지도, 운영에서 사용하는게 맞을지도 미지수이기 때문에 운영에 사용할 방법은 아니라는 결정을 내렸습니다.
대안으로는 Kafka Connect의 Source Connector로 구성하는 것이었고, Minikube로 Kafka cluster, Kafka Connect, Mysql을 배포한 상태에서 JPA repository로 save하여 테이블의 레코드의 변경을 감지하여 kafka topic으로 produce해보도록 하겠습니다
다루는 내용
- k8s에 Kafka Source Connector 배포
- Repository.save 시 kafka topic으로 produce된 message 확인
주요 환경
- springboot 3.0.11
- java 17
- kotlin
- spring-kafka
- spring-data-jpa
k8s cluster에 Kafka Source Connector 배포
k8s cluster에 kafka source connector를 배포하려면 순차적으로 여러 작업이 필요합니다
- minikube 설치
- kafka cluster 생성
- kafka connect 배포
- kafka source connector 배포
minikube 설치
brew install minikube
minikube start
minikube start
kafka cluster 생성
strimzi operator로 kafka cluster를 생성하겠습니다
참고: https://strimzi.io/docs/operators/latest/deploying
Deploying and Upgrading (0.42.0)
After you have deployed Strimzi, you can set up client access to your Kafka cluster. To verify the deployment, you can deploy example producer and consumer clients. Otherwise, create listeners that provide client access within or outside the Kubernetes clu
strimzi.io
helm 설치
brew install helm
strimzi kafka operator 설치
helm repo add strimzi https://strimzi.io/charts/
helm install my-strimzi-release strimzi/strimzi-kafka-operator
kafka cluster 배포
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: controller
labels:
strimzi.io/cluster: my-cluster
spec:
replicas: 3
roles:
- controller
storage:
type: jbod
volumes:
- id: 0
type: ephemeral
kraftMetadata: shared
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: broker
labels:
strimzi.io/cluster: my-cluster
spec:
replicas: 3
roles:
- broker
storage:
type: jbod
volumes:
- id: 0
type: ephemeral
kraftMetadata: shared
---
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
annotations:
strimzi.io/node-pools: enabled
strimzi.io/kraft: enabled
spec:
kafka:
version: 3.7.1
metadataVersion: 3.7-IV4
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
- name: external
port: 9094
type: nodeport
tls: false
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
entityOperator:
topicOperator: {}
userOperator: {}
참고: https://github.com/strimzi/strimzi-kafka-operator/tree/main/examples/kafka/kraft
strimzi-kafka-operator/examples/kafka/kraft at main · strimzi/strimzi-kafka-operator
Apache Kafka® running on Kubernetes. Contribute to strimzi/strimzi-kafka-operator development by creating an account on GitHub.
github.com
kafka connect 배포
Debezium MySQL을 사용하고 싶기 때문에
kafka connect 배포 debezium plugin을 추가해서 커스텀 이미지를 생성해서 kafka connect를 배포해야합니다
참고: https://debezium.io/documentation/reference/stable/architecture.html
Debezium Architecture :: Debezium Documentation
Yet an alternative way for using the Debezium connectors is the Debezium engine. In this case, Debezium will not be run via Kafka Connect, but as a library embedded into your custom Java applications. This can be useful for either consuming change events w
debezium.io
kafka connect 배포
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: debezium-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.7.1
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9092
config:
config.providers: secrets
config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
# -1 means it will use the default replication factor configured in the broker
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
build:
output:
type: docker
image: 10.101.244.23/debezium-connect-mysql:latest
plugins:
- name: debezium-mysql-connector
artifacts:
- type: tgz
url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.5.0.Final/debezium-connector-mysql-2.5.0.Final-plugin.tar.gz
아래 두 가지 필드는 잘 매핑해줍니다
bootstrapServers: my-cluster-kafka-bootstrap:9092
spec.build.output.image: /debezium-connect-mysql:latest
위에서 생성한 kafka cluster의 svc:포트를 명시해주시고
kafka connect에 debezium plugin을 추가한 커스텀 이지를 생성후 저장할 registry가 필요합니다
minikube에서 registry addon을 통해 registry를 생성합니다
참고: https://minikube.sigs.k8s.io/docs/handbook/registry/
Registries
How to interact with registries
minikube.sigs.k8s.io
MySqlConnector 배포
배포
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: mysql-connector
labels:
strimzi.io/cluster: debezium-connect-cluster
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
database.hostname: mysql.default.svc.cluster.local
database.port: 3306
database.user: root
database.password: rootpassword
database.server.id: 184054
database.server.name: mysql
database.include.list: mydb
table.include.list: mydb.message_event
topic.prefix: mysql
schema.history.internal.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
schema.history.internal.kafka.topic: schema-changes.mydb
include.schema.changes: true
snapshot.mode: initial
배포 troubleshooting
recover it by reconfiguring the connector to SCHEMA_ONLY_RECOVERY
io.debezium.DebeziumException: The db history topic is missing. You may attempt to recover it by reconfiguring the connector to SCHEMA_ONLY_RECOVERY at io.debezium.connector.mysql.MySqlConnectorTask.validateAndLoadSchemaHistory(MySqlConnectorTask.java:332)
- database 접속 정보
- schema.history.internal.kafka.bootstrap.servers 브로커 접근정보
- database.server.id는 유니크하게
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: mysql-connector
labels:
strimzi.io/cluster: debezium-connect-cluster
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
database.hostname: mysql.default.svc.cluster.local
database.port: 3306
database.user: root
database.password: rootpassword
database.server.id: 184055
database.server.name: mysql
database.include.list: mydb
table.include.list: mydb.message_event
topic.prefix: mysql
schema.history.internal.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
schema.history.internal.kafka.topic: schema-changes.mysql
snapshot.mode: schema_only_recovery
참고:
Deploying Debezium on Kubernetes :: Debezium Documentation
To deploy a Debezium connector, you need to deploy a Kafka Connect cluster with the required connector plug-in(s), before instantiating the actual connector itself. As the first step, a container image for Kafka Connect with the plug-in has to be created.
debezium.io
Debezium connector for MySQL :: Debezium Documentation
Mandatory field that describes the source metadata for the event. In a delete event value, the source field structure is the same as for create and update events for the same table. Many source field values are also the same. In a delete event value, the t
debezium.io
정상동작 확인
Repository.save 시 kafka topic 생성확인
build.gradle.kts
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
id("org.springframework.boot") version "3.0.11"
id("io.spring.dependency-management") version "1.1.0"
kotlin("jvm") version "1.7.22"
kotlin("plugin.spring") version "1.7.22"
kotlin("plugin.jpa") version "1.7.22"
}
group = "com.example"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_17
repositories {
mavenCentral()
}
dependencies {
implementation("org.springframework.boot:spring-boot-starter-data-jpa")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-batch")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.springframework.kafka:spring-kafka")
implementation("org.springframework.boot:spring-boot-starter-validation")
implementation("mysql:mysql-connector-java:8.0.33")
runtimeOnly("com.h2database:h2")
implementation("org.springdoc:springdoc-openapi-starter-webmvc-ui:2.1.0")
implementation("org.springframework.kafka:spring-kafka")
implementation("com.querydsl:querydsl-jpa:5.0.0:jakarta")
implementation("com.querydsl:querydsl-kotlin:5.0.0")
annotationProcessor("com.querydsl:querydsl-apt:5.0.0:jakarta")
testImplementation("org.springframework.kafka:spring-kafka-test")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.batch:spring-batch-test")
testImplementation("org.junit.jupiter:junit-jupiter-api")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine")
}
tasks.withType<KotlinCompile> {
kotlinOptions {
freeCompilerArgs = listOf("-Xjsr305=strict")
jvmTarget = "17"
}
}
tasks.withType<Test> {
useJUnitPlatform()
}
애플리케이션 서비스 로직
import com.example.kotlinspringkafkasample.entity.Message
import com.example.kotlinspringkafkasample.entity.MessageEvent
import com.example.kotlinspringkafkasample.model.MyMessage
import com.example.kotlinspringkafkasample.repository.MessageEventRepository
import com.example.kotlinspringkafkasample.repository.MessageRepository
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
@Service
class MessageService(
val messageRepository: MessageRepository,
val messageEventRepository: MessageEventRepository
) {
@Transactional
fun processMessage(message: MyMessage) {
// 데이터베이스에 메시지 저장
val savedMessage = messageRepository.save(Message(content = message.content))
println("Saved message to database: ${savedMessage.id}")
val savedEvent = messageEventRepository.save(MessageEvent())
println("Saved message to database: ${savedEvent.id}")
// // 예외 상황을 시뮬레이션하려면 아래 줄의 주석을 해제하세요
// throw RuntimeException("Simulated error")
}
}
트랜잭션 로그
[16:01:46.361[DEBUG][org.springframework.orm.jpa.JpaTransactionManager.doGetTransaction:line374] - Found thread-bound EntityManager [SessionImpl(330174493<open>)] for JPA transaction
[16:01:46.362[DEBUG][org.springframework.orm.jpa.JpaTransactionManager.getTransaction:line370] - Creating new transaction with name [com.example.kotlinspringkafkasample.service.MessageService.processMessage]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
[16:01:46.366[DEBUG][org.springframework.orm.jpa.JpaTransactionManager.doBegin:line439] - Exposing JPA transaction as JDBC [org.springframework.orm.jpa.vendor.HibernateJpaDialect$HibernateConnectionHandle@3edf7a08]
[16:01:46.367[DEBUG][org.springframework.orm.jpa.JpaTransactionManager.doGetTransaction:line374] - Found thread-bound EntityManager [SessionImpl(330174493<open>)] for JPA transaction
[16:01:46.367[DEBUG][org.springframework.orm.jpa.JpaTransactionManager.handleExistingTransaction:line470] - Participating in existing transaction
[16:01:46.375[DEBUG][org.hibernate.SQL.logStatement:line128] -
/* insert com.example.kotlinspringkafkasample.entity.Message
*/ insert
into
message (content)
values
(?)
Hibernate:
/* insert com.example.kotlinspringkafkasample.entity.Message
*/ insert
into
message (content)
values
(?)
[16:01:46.377[TRACE][org.hibernate.orm.jdbc.bind.logBinding:line28] - binding parameter [1] as [VARCHAR] - [content]
Saved message to database: 2
[16:01:46.382[DEBUG][org.springframework.orm.jpa.JpaTransactionManager.doGetTransaction:line374] - Found thread-bound EntityManager [SessionImpl(330174493<open>)] for JPA transaction
[16:01:46.382[DEBUG][org.springframework.orm.jpa.JpaTransactionManager.handleExistingTransaction:line470] - Participating in existing transaction
[16:01:46.384[DEBUG][org.hibernate.SQL.logStatement:line128] -
/* insert com.example.kotlinspringkafkasample.entity.MessageEvent
*/ insert
into
message_event
values
( )
Hibernate:
/* insert com.example.kotlinspringkafkasample.entity.MessageEvent
*/ insert
into
message_event
values
( )
Saved message to database: 2
[16:01:46.386[DEBUG][org.springframework.orm.jpa.JpaTransactionManager.processCommit:line740] - Initiating transaction commit
[16:01:46.386[DEBUG][org.springframework.orm.jpa.JpaTransactionManager.doCommit:line556] - Committing JPA transaction on EntityManager [SessionImpl(330174493<open>)]
[16:01:46.391[DEBUG][org.springframework.orm.jpa.JpaTransactionManager.doCleanupAfterCompletion:line652] - Not closing pre-bound JPA EntityManager after transaction
'스프링 프레임워크 > kafka' 카테고리의 다른 글
spring-kafka DelegatingByTopicDeserializer 역직렬화 (0) | 2024.06.26 |
---|---|
springboot spring-kafka로 consumer 설정 (0) | 2024.06.25 |