Kafka

Kafka #

Apache Kafka adalah platform streaming terdistribusi yang dirancang untuk menangani aliran data real-time dalam skala besar. Ia bukan sekadar message broker seperti RabbitMQ — ia adalah log terdistribusi yang bisa menyimpan data selama berhari-hari dan memungkinkan banyak konsumer membaca data yang sama secara independen. Kafka digunakan untuk decoupling microservice, event sourcing, stream processing, audit trail, dan sinkronisasi data antar sistem. Di Kotlin, kamu menggunakan Kafka melalui Apache Kafka Clients (library Java resmi) yang bekerja sempurna di JVM. Artikel ini membahas konsep utama Kafka, Producer dan Consumer, serialisasi pesan, consumer group, penanganan error, dan pola-pola yang digunakan di produksi.

Konsep Utama Kafka #

flowchart LR
    P1[Producer 1] --> T[Topic: pesanan]
    P2[Producer 2] --> T
    T --> |Partition 0| C1[Consumer Group A\nService Email]
    T --> |Partition 1| C2[Consumer Group A\nService Email]
    T --> |Partition 0| C3[Consumer Group B\nService Inventori]
    T --> |Partition 1| C4[Consumer Group B\nService Inventori]
KonsepPenjelasan
TopicKategori/saluran pesan, seperti tabel di database
PartitionPembagian topic untuk paralelisme, pesan dalam satu partition terurut
ProducerPihak yang mengirim pesan ke topic
ConsumerPihak yang membaca pesan dari topic
Consumer GroupSekumpulan consumer yang berbagi beban membaca partition
OffsetPosisi pesan dalam partition, digunakan untuk tracking progress
BrokerServer Kafka tempat data disimpan

Keunggulan utama Kafka dibanding message broker tradisional: pesan tidak dihapus setelah dikonsumsi — ia tersimpan berdasarkan retention period. Ini memungkinkan berbagai consumer group membaca data yang sama secara independen.


Setup dan Dependensi #

// build.gradle.kts
dependencies {
    // Apache Kafka Clients
    implementation("org.apache.kafka:kafka-clients:3.7.0")

    // kotlinx.serialization untuk serialisasi pesan
    implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.3")

    // Coroutine (opsional, untuk consumer async)
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0")
}

Producer — Mengirim Pesan #

Konfigurasi Producer #

import org.apache.kafka.clients.producer.*
import org.apache.kafka.common.serialization.StringSerializer
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json

object KonfigurasiKafka {
    const val BOOTSTRAP_SERVERS = "localhost:9092"

    fun propertiProducer(): java.util.Properties {
        return java.util.Properties().apply {
            put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS)
            put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
            put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)

            // Keandalan — pastikan pesan tidak hilang
            put(ProducerConfig.ACKS_CONFIG, "all")           // tunggu semua replica acknowledge
            put(ProducerConfig.RETRIES_CONFIG, 3)             // coba ulang jika gagal
            put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true) // hindari duplikasi

            // Performa
            put(ProducerConfig.LINGER_MS_CONFIG, 5)           // tunggu 5ms untuk batch
            put(ProducerConfig.BATCH_SIZE_CONFIG, 16384)       // ukuran batch 16KB
            put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy") // kompresi

            // Identifikasi aplikasi
            put(ProducerConfig.CLIENT_ID_CONFIG, "myapp-producer")
        }
    }
}

Mengirim Pesan Sederhana #

@Serializable
data class EventPesanan(
    val pesananId: String,
    val penggunaId: Long,
    val total: Double,
    val status: String,
    val timestamp: Long = System.currentTimeMillis()
)

class ProducerPesanan {
    private val json = Json { ignoreUnknownKeys = true }
    private val producer = KafkaProducer<String, String>(KonfigurasiKafka.propertiProducer())

    fun kirimEventPesanan(event: EventPesanan) {
        val kunci = event.pesananId        // key menentukan partition assignment
        val nilai = json.encodeToString(EventPesanan.serializer(), event)

        val record = ProducerRecord(
            "pesanan-events",  // nama topic
            kunci,
            nilai
        )

        // Kirim secara asinkron dengan callback
        producer.send(record) { metadata, exception ->
            if (exception != null) {
                println("Gagal kirim pesan: ${exception.message}")
            } else {
                println(
                    "Pesan terkirim — topic: ${metadata.topic()}, " +
                    "partition: ${metadata.partition()}, " +
                    "offset: ${metadata.offset()}"
                )
            }
        }
    }

    // Kirim dan tunggu konfirmasi (synchronous)
    fun kirimDanTunggu(event: EventPesanan): RecordMetadata {
        val record = ProducerRecord(
            "pesanan-events",
            event.pesananId,
            json.encodeToString(EventPesanan.serializer(), event)
        )
        return producer.send(record).get()  // .get() memblokir sampai ada konfirmasi
    }

    fun tutup() {
        producer.flush()   // pastikan semua pesan terkirim
        producer.close()
    }
}

fun main() {
    val producer = ProducerPesanan()

    repeat(5) { i ->
        val event = EventPesanan(
            pesananId = "ORD-${1000 + i}",
            penggunaId = (i + 1).toLong(),
            total = (i + 1) * 50_000.0,
            status = "DIBUAT"
        )
        producer.kirimEventPesanan(event)
    }

    producer.tutup()
}

Mengirim ke Partition Tertentu #

// Secara default, Kafka menggunakan hash dari key untuk menentukan partition
// Ini memastikan semua pesan dengan key yang sama masuk ke partition yang sama
// → ordering terjamin per key

// Kirim ke partition tertentu secara eksplisit (jarang dibutuhkan)
val record = ProducerRecord(
    "pesanan-events",
    0,                // nomor partition eksplisit
    "ORD-001",        // key
    jsonPayload       // value
)

Consumer — Membaca Pesan #

Konfigurasi Consumer #

import org.apache.kafka.clients.consumer.*
import org.apache.kafka.common.serialization.StringDeserializer

fun propertiConsumer(groupId: String): java.util.Properties {
    return java.util.Properties().apply {
        put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KonfigurasiKafka.BOOTSTRAP_SERVERS)
        put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
        put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
        put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)

        // Auto commit offset setiap 5 detik (bisa nonaktifkan untuk kontrol manual)
        put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false)  // manual commit lebih aman

        // Mulai dari mana jika tidak ada offset yang tersimpan
        put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")  // atau "latest"

        // Polling configuration
        put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100)          // maks 100 record per poll
        put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300_000)  // 5 menit timeout polling

        put(ConsumerConfig.CLIENT_ID_CONFIG, "myapp-consumer-$groupId")
    }
}

Consumer Dasar #

class ConsumerPesanan(groupId: String = "layanan-email") {
    private val json = Json { ignoreUnknownKeys = true }
    private val consumer = KafkaConsumer<String, String>(propertiConsumer(groupId))

    @Volatile private var berjalan = true

    fun mulai() {
        consumer.subscribe(listOf("pesanan-events"))
        println("Consumer '$groupId' mulai mendengarkan...")

        try {
            while (berjalan) {
                // poll() memblokir sampai ada pesan atau timeout
                val records = consumer.poll(java.time.Duration.ofMillis(1000))

                records.forEach { record ->
                    try {
                        prosesRecord(record)
                    } catch (e: Exception) {
                        println("Error memproses record: ${e.message}")
                        // Di sini bisa: log error, kirim ke dead letter topic, skip, atau throw
                    }
                }

                // Manual commit setelah semua record dalam batch berhasil diproses
                if (records.count() > 0) {
                    consumer.commitSync()  // atau commitAsync() untuk performa lebih baik
                    println("Commit ${records.count()} records")
                }
            }
        } finally {
            consumer.close()
            println("Consumer ditutup")
        }
    }

    private fun prosesRecord(record: ConsumerRecord<String, String>) {
        val event = json.decodeFromString(EventPesanan.serializer(), record.value())
        println(
            "Memproses pesanan ${event.pesananId} | " +
            "partition: ${record.partition()} | " +
            "offset: ${record.offset()} | " +
            "total: Rp${"%,.0f".format(event.total)}"
        )
        // Logika bisnis: kirim email konfirmasi, dll
        kirimEmailKonfirmasi(event)
    }

    private fun kirimEmailKonfirmasi(event: EventPesanan) {
        println("Email konfirmasi dikirim untuk pesanan ${event.pesananId}")
    }

    fun berhenti() { berjalan = false }
}

Consumer dengan Commit Manual Per Offset #

// Commit offset hanya untuk record yang berhasil diproses
fun consumerDenganCommitSelektif(consumer: KafkaConsumer<String, String>) {
    consumer.subscribe(listOf("pesanan-events"))

    while (true) {
        val records = consumer.poll(java.time.Duration.ofMillis(1000))

        val offsetsUntukCommit = mutableMapOf<
            org.apache.kafka.common.TopicPartition,
            OffsetAndMetadata
        >()

        records.forEach { record ->
            try {
                // Proses record
                println("Proses: ${record.key()} offset: ${record.offset()}")

                // Catat offset yang berhasil
                offsetsUntukCommit[
                    org.apache.kafka.common.TopicPartition(record.topic(), record.partition())
                ] = OffsetAndMetadata(record.offset() + 1)

            } catch (e: Exception) {
                println("Gagal proses record ${record.offset()}: ${e.message}")
                // Hentikan loop — commit hanya sampai sebelum record gagal
                return@forEach
            }
        }

        if (offsetsUntukCommit.isNotEmpty()) {
            consumer.commitSync(offsetsUntukCommit)
        }
    }
}

Consumer Group dan Paralelisme #

Consumer group memungkinkan beban membaca dibagi antar beberapa instance consumer. Setiap partition hanya dibaca oleh satu consumer dalam group yang sama:

// Jalankan beberapa consumer dalam group yang sama untuk paralelisme
fun main() {
    val threadCount = 3  // harus <= jumlah partition

    val threads = (1..threadCount).map { i ->
        Thread {
            val consumer = ConsumerPesanan("layanan-email")
            // Setiap consumer akan mendapat partition yang berbeda
            consumer.mulai()
        }.apply {
            name = "consumer-thread-$i"
            isDaemon = true
        }
    }

    threads.forEach { it.start() }
    threads.forEach { it.join() }
}

Kafka dengan Coroutine #

Untuk integrasi yang lebih bersih dengan kode Kotlin async:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// Producer wrapper untuk coroutine
class ProducerKoroutin {
    private val json = Json { ignoreUnknownKeys = true }
    private val producer = KafkaProducer<String, String>(KonfigurasiKafka.propertiProducer())

    suspend fun kirim(topic: String, kunci: String, nilai: String) {
        withContext(Dispatchers.IO) {
            val record = ProducerRecord(topic, kunci, nilai)
            producer.send(record).get()  // blocking dalam IO dispatcher
        }
    }

    suspend fun kirimEvent(event: EventPesanan) {
        kirim(
            "pesanan-events",
            event.pesananId,
            json.encodeToString(EventPesanan.serializer(), event)
        )
    }
}

// Consumer sebagai Flow
fun kafkaFlow(
    topics: List<String>,
    groupId: String,
    scope: CoroutineScope
): Flow<ConsumerRecord<String, String>> = callbackFlow {
    val consumer = KafkaConsumer<String, String>(propertiConsumer(groupId))
    consumer.subscribe(topics)

    val pekerjaan = scope.launch(Dispatchers.IO) {
        try {
            while (isActive) {
                val records = consumer.poll(java.time.Duration.ofMillis(100))
                records.forEach { record ->
                    trySend(record)  // kirim ke flow
                }
                if (records.count() > 0) consumer.commitAsync()
            }
        } finally {
            consumer.close()
        }
    }

    awaitClose { pekerjaan.cancel() }
}

// Penggunaan dengan Flow
fun main() = runBlocking {
    val flow = kafkaFlow(listOf("pesanan-events"), "layanan-notifikasi", this)
    val json = Json { ignoreUnknownKeys = true }

    flow
        .map { record ->
            json.decodeFromString(EventPesanan.serializer(), record.value())
        }
        .filter { event -> event.status == "DIBUAT" }
        .collect { event ->
            println("Notifikasi untuk pesanan ${event.pesananId}")
        }
}

Penanganan Error dan Dead Letter Topic #

class ConsumerDenganDLT(
    private val groupId: String,
    private val topicUtama: String,
    private val topicDlt: String = "$topicUtama.dlt"
) {
    private val json = Json { ignoreUnknownKeys = true }
    private val consumer = KafkaConsumer<String, String>(propertiConsumer(groupId))
    private val producer = KafkaProducer<String, String>(KonfigurasiKafka.propertiProducer())

    fun mulai() {
        consumer.subscribe(listOf(topicUtama))

        while (true) {
            val records = consumer.poll(java.time.Duration.ofMillis(1000))

            records.forEach { record ->
                val berhasil = prosesdenganRetry(record, maksPercobaan = 3)

                if (!berhasil) {
                    // Kirim ke Dead Letter Topic untuk penanganan manual
                    kirimKeDlt(record)
                }
            }

            consumer.commitSync()
        }
    }

    private fun prosesdenganRetry(
        record: ConsumerRecord<String, String>,
        maksPercobaan: Int
    ): Boolean {
        repeat(maksPercobaan) { percobaan ->
            try {
                val event = json.decodeFromString(EventPesanan.serializer(), record.value())
                prosesEvent(event)
                return true
            } catch (e: Exception) {
                println("Percobaan ${percobaan + 1}/$maksPercobaan gagal: ${e.message}")
                if (percobaan < maksPercobaan - 1) {
                    Thread.sleep(500L * (percobaan + 1))  // exponential backoff sederhana
                }
            }
        }
        return false
    }

    private fun prosesEvent(event: EventPesanan) {
        // Simulasi kegagalan acak
        if (Math.random() < 0.1) throw RuntimeException("Gagal proses event!")
        println("Event ${event.pesananId} berhasil diproses")
    }

    private fun kirimKeDlt(record: ConsumerRecord<String, String>) {
        val dltRecord = ProducerRecord(
            topicDlt,
            record.key(),
            record.value()
        ).apply {
            // Tambahkan header untuk debugging
            headers().add("original-topic", topicUtama.toByteArray())
            headers().add("error-timestamp", System.currentTimeMillis().toString().toByteArray())
            headers().add("consumer-group", groupId.toByteArray())
        }

        producer.send(dltRecord).get()
        println("Record ${record.key()} dikirim ke DLT: $topicDlt")
    }
}

Membuat Topic Secara Programatik #

import org.apache.kafka.clients.admin.*

fun buatTopic(nama: String, partisi: Int = 3, replikasi: Short = 1) {
    val properties = java.util.Properties().apply {
        put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KonfigurasiKafka.BOOTSTRAP_SERVERS)
    }

    AdminClient.create(properties).use { admin ->
        val topicBaru = NewTopic(nama, partisi, replikasi).apply {
            configs(mapOf(
                "retention.ms" to "604800000",   // 7 hari retention
                "cleanup.policy" to "delete",
                "compression.type" to "snappy"
            ))
        }

        val hasil = admin.createTopics(listOf(topicBaru))
        hasil.values()[nama]?.get()
        println("Topic '$nama' berhasil dibuat dengan $partisi partisi")
    }
}

fun daftarTopic(): Set<String> {
    val properties = java.util.Properties().apply {
        put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KonfigurasiKafka.BOOTSTRAP_SERVERS)
    }

    return AdminClient.create(properties).use { admin ->
        admin.listTopics().names().get()
    }
}

Exactly-Once Semantics #

Untuk kasus yang membutuhkan jaminan exactly-once (tidak ada duplikasi, tidak ada kehilangan):

fun propertiProducerExactlyOnce(): java.util.Properties {
    return KonfigurasiKafka.propertiProducer().apply {
        put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)
        put(ProducerConfig.ACKS_CONFIG, "all")
        put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "myapp-producer-1")  // ID unik per instance
    }
}

fun kirimDenganTransaksi(producer: KafkaProducer<String, String>, events: List<EventPesanan>) {
    val json = Json { ignoreUnknownKeys = true }
    producer.initTransactions()

    try {
        producer.beginTransaction()

        events.forEach { event ->
            producer.send(
                ProducerRecord(
                    "pesanan-events",
                    event.pesananId,
                    json.encodeToString(EventPesanan.serializer(), event)
                )
            )
        }

        producer.commitTransaction()
        println("${events.size} event berhasil dikirim dalam satu transaksi")

    } catch (e: Exception) {
        producer.abortTransaction()
        println("Transaksi dibatalkan: ${e.message}")
        throw e
    }
}

Ringkasan #

  • Kafka untuk decoupling dan streaming — gunakan Kafka ketika service perlu berkomunikasi tanpa tight coupling, atau ketika data perlu dikonsumsi oleh banyak service secara independen.
  • Key menentukan partition — semua pesan dengan key yang sama masuk ke partition yang sama, menjamin ordering per entity. Gunakan entity ID (pesanan ID, pengguna ID) sebagai key.
  • ACKS=all dan idempotence — untuk produksi, selalu set acks=all dan enable.idempotence=true untuk menghindari kehilangan dan duplikasi pesan.
  • Manual commit lebih aman dari auto commit — dengan enable.auto.commit=false dan commitSync() setelah pemrosesan, kamu yakin offset hanya di-commit setelah pesan berhasil diproses.
  • Consumer group untuk paralelisme — jumlah consumer dalam group tidak boleh melebihi jumlah partition. Tambahkan partition saat perlu meningkatkan paralelisme.
  • Dead Letter Topic untuk penanganan error — pesan yang gagal setelah retry dikirim ke DLT untuk penanganan manual. Tambahkan header yang informatif (error message, timestamp, topic asal).
  • Partisi = unit paralelisme — buat topic dengan jumlah partition yang sesuai prediksi throughput. Lebih mudah menambah partition daripada mengurangi (mengurangi bisa mengubah routing).
  • Kafka bukan pengganti database — Kafka adalah log terdistribusi untuk streaming, bukan database. Gunakan Kafka bersama database (PostgreSQL, MongoDB) dalam arsitektur event-driven.

← Sebelumnya: Elasticsearch   Berikutnya: RabbitMQ →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact