Kafka #
Apache Kafka adalah platform streaming terdistribusi yang dirancang untuk menangani aliran data real-time dalam skala besar. Ia bukan sekadar message broker seperti RabbitMQ — ia adalah log terdistribusi yang bisa menyimpan data selama berhari-hari dan memungkinkan banyak konsumer membaca data yang sama secara independen. Kafka digunakan untuk decoupling microservice, event sourcing, stream processing, audit trail, dan sinkronisasi data antar sistem. Di Kotlin, kamu menggunakan Kafka melalui Apache Kafka Clients (library Java resmi) yang bekerja sempurna di JVM. Artikel ini membahas konsep utama Kafka, Producer dan Consumer, serialisasi pesan, consumer group, penanganan error, dan pola-pola yang digunakan di produksi.
Konsep Utama Kafka #
flowchart LR
P1[Producer 1] --> T[Topic: pesanan]
P2[Producer 2] --> T
T --> |Partition 0| C1[Consumer Group A\nService Email]
T --> |Partition 1| C2[Consumer Group A\nService Email]
T --> |Partition 0| C3[Consumer Group B\nService Inventori]
T --> |Partition 1| C4[Consumer Group B\nService Inventori]| Konsep | Penjelasan |
|---|---|
| Topic | Kategori/saluran pesan, seperti tabel di database |
| Partition | Pembagian topic untuk paralelisme, pesan dalam satu partition terurut |
| Producer | Pihak yang mengirim pesan ke topic |
| Consumer | Pihak yang membaca pesan dari topic |
| Consumer Group | Sekumpulan consumer yang berbagi beban membaca partition |
| Offset | Posisi pesan dalam partition, digunakan untuk tracking progress |
| Broker | Server Kafka tempat data disimpan |
Keunggulan utama Kafka dibanding message broker tradisional: pesan tidak dihapus setelah dikonsumsi — ia tersimpan berdasarkan retention period. Ini memungkinkan berbagai consumer group membaca data yang sama secara independen.
Setup dan Dependensi #
// build.gradle.kts
dependencies {
// Apache Kafka Clients
implementation("org.apache.kafka:kafka-clients:3.7.0")
// kotlinx.serialization untuk serialisasi pesan
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.3")
// Coroutine (opsional, untuk consumer async)
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0")
}
Producer — Mengirim Pesan #
Konfigurasi Producer #
import org.apache.kafka.clients.producer.*
import org.apache.kafka.common.serialization.StringSerializer
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
object KonfigurasiKafka {
const val BOOTSTRAP_SERVERS = "localhost:9092"
fun propertiProducer(): java.util.Properties {
return java.util.Properties().apply {
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS)
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
// Keandalan — pastikan pesan tidak hilang
put(ProducerConfig.ACKS_CONFIG, "all") // tunggu semua replica acknowledge
put(ProducerConfig.RETRIES_CONFIG, 3) // coba ulang jika gagal
put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true) // hindari duplikasi
// Performa
put(ProducerConfig.LINGER_MS_CONFIG, 5) // tunggu 5ms untuk batch
put(ProducerConfig.BATCH_SIZE_CONFIG, 16384) // ukuran batch 16KB
put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy") // kompresi
// Identifikasi aplikasi
put(ProducerConfig.CLIENT_ID_CONFIG, "myapp-producer")
}
}
}
Mengirim Pesan Sederhana #
@Serializable
data class EventPesanan(
val pesananId: String,
val penggunaId: Long,
val total: Double,
val status: String,
val timestamp: Long = System.currentTimeMillis()
)
class ProducerPesanan {
private val json = Json { ignoreUnknownKeys = true }
private val producer = KafkaProducer<String, String>(KonfigurasiKafka.propertiProducer())
fun kirimEventPesanan(event: EventPesanan) {
val kunci = event.pesananId // key menentukan partition assignment
val nilai = json.encodeToString(EventPesanan.serializer(), event)
val record = ProducerRecord(
"pesanan-events", // nama topic
kunci,
nilai
)
// Kirim secara asinkron dengan callback
producer.send(record) { metadata, exception ->
if (exception != null) {
println("Gagal kirim pesan: ${exception.message}")
} else {
println(
"Pesan terkirim — topic: ${metadata.topic()}, " +
"partition: ${metadata.partition()}, " +
"offset: ${metadata.offset()}"
)
}
}
}
// Kirim dan tunggu konfirmasi (synchronous)
fun kirimDanTunggu(event: EventPesanan): RecordMetadata {
val record = ProducerRecord(
"pesanan-events",
event.pesananId,
json.encodeToString(EventPesanan.serializer(), event)
)
return producer.send(record).get() // .get() memblokir sampai ada konfirmasi
}
fun tutup() {
producer.flush() // pastikan semua pesan terkirim
producer.close()
}
}
fun main() {
val producer = ProducerPesanan()
repeat(5) { i ->
val event = EventPesanan(
pesananId = "ORD-${1000 + i}",
penggunaId = (i + 1).toLong(),
total = (i + 1) * 50_000.0,
status = "DIBUAT"
)
producer.kirimEventPesanan(event)
}
producer.tutup()
}
Mengirim ke Partition Tertentu #
// Secara default, Kafka menggunakan hash dari key untuk menentukan partition
// Ini memastikan semua pesan dengan key yang sama masuk ke partition yang sama
// → ordering terjamin per key
// Kirim ke partition tertentu secara eksplisit (jarang dibutuhkan)
val record = ProducerRecord(
"pesanan-events",
0, // nomor partition eksplisit
"ORD-001", // key
jsonPayload // value
)
Consumer — Membaca Pesan #
Konfigurasi Consumer #
import org.apache.kafka.clients.consumer.*
import org.apache.kafka.common.serialization.StringDeserializer
fun propertiConsumer(groupId: String): java.util.Properties {
return java.util.Properties().apply {
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KonfigurasiKafka.BOOTSTRAP_SERVERS)
put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
// Auto commit offset setiap 5 detik (bisa nonaktifkan untuk kontrol manual)
put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false) // manual commit lebih aman
// Mulai dari mana jika tidak ada offset yang tersimpan
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") // atau "latest"
// Polling configuration
put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100) // maks 100 record per poll
put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300_000) // 5 menit timeout polling
put(ConsumerConfig.CLIENT_ID_CONFIG, "myapp-consumer-$groupId")
}
}
Consumer Dasar #
class ConsumerPesanan(groupId: String = "layanan-email") {
private val json = Json { ignoreUnknownKeys = true }
private val consumer = KafkaConsumer<String, String>(propertiConsumer(groupId))
@Volatile private var berjalan = true
fun mulai() {
consumer.subscribe(listOf("pesanan-events"))
println("Consumer '$groupId' mulai mendengarkan...")
try {
while (berjalan) {
// poll() memblokir sampai ada pesan atau timeout
val records = consumer.poll(java.time.Duration.ofMillis(1000))
records.forEach { record ->
try {
prosesRecord(record)
} catch (e: Exception) {
println("Error memproses record: ${e.message}")
// Di sini bisa: log error, kirim ke dead letter topic, skip, atau throw
}
}
// Manual commit setelah semua record dalam batch berhasil diproses
if (records.count() > 0) {
consumer.commitSync() // atau commitAsync() untuk performa lebih baik
println("Commit ${records.count()} records")
}
}
} finally {
consumer.close()
println("Consumer ditutup")
}
}
private fun prosesRecord(record: ConsumerRecord<String, String>) {
val event = json.decodeFromString(EventPesanan.serializer(), record.value())
println(
"Memproses pesanan ${event.pesananId} | " +
"partition: ${record.partition()} | " +
"offset: ${record.offset()} | " +
"total: Rp${"%,.0f".format(event.total)}"
)
// Logika bisnis: kirim email konfirmasi, dll
kirimEmailKonfirmasi(event)
}
private fun kirimEmailKonfirmasi(event: EventPesanan) {
println("Email konfirmasi dikirim untuk pesanan ${event.pesananId}")
}
fun berhenti() { berjalan = false }
}
Consumer dengan Commit Manual Per Offset #
// Commit offset hanya untuk record yang berhasil diproses
fun consumerDenganCommitSelektif(consumer: KafkaConsumer<String, String>) {
consumer.subscribe(listOf("pesanan-events"))
while (true) {
val records = consumer.poll(java.time.Duration.ofMillis(1000))
val offsetsUntukCommit = mutableMapOf<
org.apache.kafka.common.TopicPartition,
OffsetAndMetadata
>()
records.forEach { record ->
try {
// Proses record
println("Proses: ${record.key()} offset: ${record.offset()}")
// Catat offset yang berhasil
offsetsUntukCommit[
org.apache.kafka.common.TopicPartition(record.topic(), record.partition())
] = OffsetAndMetadata(record.offset() + 1)
} catch (e: Exception) {
println("Gagal proses record ${record.offset()}: ${e.message}")
// Hentikan loop — commit hanya sampai sebelum record gagal
return@forEach
}
}
if (offsetsUntukCommit.isNotEmpty()) {
consumer.commitSync(offsetsUntukCommit)
}
}
}
Consumer Group dan Paralelisme #
Consumer group memungkinkan beban membaca dibagi antar beberapa instance consumer. Setiap partition hanya dibaca oleh satu consumer dalam group yang sama:
// Jalankan beberapa consumer dalam group yang sama untuk paralelisme
fun main() {
val threadCount = 3 // harus <= jumlah partition
val threads = (1..threadCount).map { i ->
Thread {
val consumer = ConsumerPesanan("layanan-email")
// Setiap consumer akan mendapat partition yang berbeda
consumer.mulai()
}.apply {
name = "consumer-thread-$i"
isDaemon = true
}
}
threads.forEach { it.start() }
threads.forEach { it.join() }
}
Kafka dengan Coroutine #
Untuk integrasi yang lebih bersih dengan kode Kotlin async:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// Producer wrapper untuk coroutine
class ProducerKoroutin {
private val json = Json { ignoreUnknownKeys = true }
private val producer = KafkaProducer<String, String>(KonfigurasiKafka.propertiProducer())
suspend fun kirim(topic: String, kunci: String, nilai: String) {
withContext(Dispatchers.IO) {
val record = ProducerRecord(topic, kunci, nilai)
producer.send(record).get() // blocking dalam IO dispatcher
}
}
suspend fun kirimEvent(event: EventPesanan) {
kirim(
"pesanan-events",
event.pesananId,
json.encodeToString(EventPesanan.serializer(), event)
)
}
}
// Consumer sebagai Flow
fun kafkaFlow(
topics: List<String>,
groupId: String,
scope: CoroutineScope
): Flow<ConsumerRecord<String, String>> = callbackFlow {
val consumer = KafkaConsumer<String, String>(propertiConsumer(groupId))
consumer.subscribe(topics)
val pekerjaan = scope.launch(Dispatchers.IO) {
try {
while (isActive) {
val records = consumer.poll(java.time.Duration.ofMillis(100))
records.forEach { record ->
trySend(record) // kirim ke flow
}
if (records.count() > 0) consumer.commitAsync()
}
} finally {
consumer.close()
}
}
awaitClose { pekerjaan.cancel() }
}
// Penggunaan dengan Flow
fun main() = runBlocking {
val flow = kafkaFlow(listOf("pesanan-events"), "layanan-notifikasi", this)
val json = Json { ignoreUnknownKeys = true }
flow
.map { record ->
json.decodeFromString(EventPesanan.serializer(), record.value())
}
.filter { event -> event.status == "DIBUAT" }
.collect { event ->
println("Notifikasi untuk pesanan ${event.pesananId}")
}
}
Penanganan Error dan Dead Letter Topic #
class ConsumerDenganDLT(
private val groupId: String,
private val topicUtama: String,
private val topicDlt: String = "$topicUtama.dlt"
) {
private val json = Json { ignoreUnknownKeys = true }
private val consumer = KafkaConsumer<String, String>(propertiConsumer(groupId))
private val producer = KafkaProducer<String, String>(KonfigurasiKafka.propertiProducer())
fun mulai() {
consumer.subscribe(listOf(topicUtama))
while (true) {
val records = consumer.poll(java.time.Duration.ofMillis(1000))
records.forEach { record ->
val berhasil = prosesdenganRetry(record, maksPercobaan = 3)
if (!berhasil) {
// Kirim ke Dead Letter Topic untuk penanganan manual
kirimKeDlt(record)
}
}
consumer.commitSync()
}
}
private fun prosesdenganRetry(
record: ConsumerRecord<String, String>,
maksPercobaan: Int
): Boolean {
repeat(maksPercobaan) { percobaan ->
try {
val event = json.decodeFromString(EventPesanan.serializer(), record.value())
prosesEvent(event)
return true
} catch (e: Exception) {
println("Percobaan ${percobaan + 1}/$maksPercobaan gagal: ${e.message}")
if (percobaan < maksPercobaan - 1) {
Thread.sleep(500L * (percobaan + 1)) // exponential backoff sederhana
}
}
}
return false
}
private fun prosesEvent(event: EventPesanan) {
// Simulasi kegagalan acak
if (Math.random() < 0.1) throw RuntimeException("Gagal proses event!")
println("Event ${event.pesananId} berhasil diproses")
}
private fun kirimKeDlt(record: ConsumerRecord<String, String>) {
val dltRecord = ProducerRecord(
topicDlt,
record.key(),
record.value()
).apply {
// Tambahkan header untuk debugging
headers().add("original-topic", topicUtama.toByteArray())
headers().add("error-timestamp", System.currentTimeMillis().toString().toByteArray())
headers().add("consumer-group", groupId.toByteArray())
}
producer.send(dltRecord).get()
println("Record ${record.key()} dikirim ke DLT: $topicDlt")
}
}
Membuat Topic Secara Programatik #
import org.apache.kafka.clients.admin.*
fun buatTopic(nama: String, partisi: Int = 3, replikasi: Short = 1) {
val properties = java.util.Properties().apply {
put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KonfigurasiKafka.BOOTSTRAP_SERVERS)
}
AdminClient.create(properties).use { admin ->
val topicBaru = NewTopic(nama, partisi, replikasi).apply {
configs(mapOf(
"retention.ms" to "604800000", // 7 hari retention
"cleanup.policy" to "delete",
"compression.type" to "snappy"
))
}
val hasil = admin.createTopics(listOf(topicBaru))
hasil.values()[nama]?.get()
println("Topic '$nama' berhasil dibuat dengan $partisi partisi")
}
}
fun daftarTopic(): Set<String> {
val properties = java.util.Properties().apply {
put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KonfigurasiKafka.BOOTSTRAP_SERVERS)
}
return AdminClient.create(properties).use { admin ->
admin.listTopics().names().get()
}
}
Exactly-Once Semantics #
Untuk kasus yang membutuhkan jaminan exactly-once (tidak ada duplikasi, tidak ada kehilangan):
fun propertiProducerExactlyOnce(): java.util.Properties {
return KonfigurasiKafka.propertiProducer().apply {
put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)
put(ProducerConfig.ACKS_CONFIG, "all")
put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "myapp-producer-1") // ID unik per instance
}
}
fun kirimDenganTransaksi(producer: KafkaProducer<String, String>, events: List<EventPesanan>) {
val json = Json { ignoreUnknownKeys = true }
producer.initTransactions()
try {
producer.beginTransaction()
events.forEach { event ->
producer.send(
ProducerRecord(
"pesanan-events",
event.pesananId,
json.encodeToString(EventPesanan.serializer(), event)
)
)
}
producer.commitTransaction()
println("${events.size} event berhasil dikirim dalam satu transaksi")
} catch (e: Exception) {
producer.abortTransaction()
println("Transaksi dibatalkan: ${e.message}")
throw e
}
}
Ringkasan #
- Kafka untuk decoupling dan streaming — gunakan Kafka ketika service perlu berkomunikasi tanpa tight coupling, atau ketika data perlu dikonsumsi oleh banyak service secara independen.
- Key menentukan partition — semua pesan dengan key yang sama masuk ke partition yang sama, menjamin ordering per entity. Gunakan entity ID (pesanan ID, pengguna ID) sebagai key.
ACKS=alldan idempotence — untuk produksi, selalu setacks=alldanenable.idempotence=trueuntuk menghindari kehilangan dan duplikasi pesan.- Manual commit lebih aman dari auto commit — dengan
enable.auto.commit=falsedancommitSync()setelah pemrosesan, kamu yakin offset hanya di-commit setelah pesan berhasil diproses.- Consumer group untuk paralelisme — jumlah consumer dalam group tidak boleh melebihi jumlah partition. Tambahkan partition saat perlu meningkatkan paralelisme.
- Dead Letter Topic untuk penanganan error — pesan yang gagal setelah retry dikirim ke DLT untuk penanganan manual. Tambahkan header yang informatif (error message, timestamp, topic asal).
- Partisi = unit paralelisme — buat topic dengan jumlah partition yang sesuai prediksi throughput. Lebih mudah menambah partition daripada mengurangi (mengurangi bisa mengubah routing).
- Kafka bukan pengganti database — Kafka adalah log terdistribusi untuk streaming, bukan database. Gunakan Kafka bersama database (PostgreSQL, MongoDB) dalam arsitektur event-driven.