RabbitMQ

RabbitMQ #

RabbitMQ adalah message broker yang mengimplementasikan protokol AMQP (Advanced Message Queuing Protocol). Berbeda dari Kafka yang adalah log terdistribusi, RabbitMQ adalah broker yang aktif merutekan pesan — ia menggunakan konsep Exchange dan Queue yang dipisahkan oleh binding rules. Ini membuat RabbitMQ sangat fleksibel untuk berbagai pola messaging: point-to-point, publish-subscribe, request-reply, dan routing berbasis konten. RabbitMQ unggul untuk use case yang membutuhkan routing pesan yang kompleks, task queue dengan prioritas, dan pesan yang perlu di-acknowledge setelah diproses. Di Kotlin, kamu menggunakan library RabbitMQ Java Client (AMQP client resmi) untuk berkomunikasi dengan broker.

RabbitMQ vs Kafka — Kapan Memilih #

PILIH RabbitMQ jika:
  ✓ Butuh routing pesan yang fleksibel (per tipe, per konten)
  ✓ Task queue dengan worker yang banyak (distribusi beban)
  ✓ RPC (request-reply) pattern
  ✓ Pesan perlu di-acknowledge dan dihapus setelah diproses
  ✓ Dead letter handling yang mudah dikonfigurasi
  ✓ Prioritas antrian

PILIH Kafka jika:
  ✓ Streaming data dalam volume sangat besar
  ✓ Perlu replay pesan (konsumer bisa baca ulang dari awal)
  ✓ Banyak consumer group independen membaca data yang sama
  ✓ Event sourcing dan audit trail jangka panjang
  ✓ Stream processing (Kafka Streams, flink)

Konsep Utama RabbitMQ #

flowchart LR
    P[Producer] --> E["Exchange\n(Direct/Fanout/Topic)"]
    E --> |"binding key: pesanan.baru"| Q1["Queue: email-notif"]
    E --> |"binding key: pesanan.*"| Q2["Queue: audit-log"]
    E --> |"fanout"| Q3["Queue: dashboard-realtime"]
    Q1 --> C1[Consumer: Email Service]
    Q2 --> C2[Consumer: Audit Service]
    Q3 --> C3[Consumer: Dashboard]
KonsepPenjelasan
ExchangeTitik masuk pesan, menentukan bagaimana pesan dirouting
QueueTempat penyimpanan pesan sampai dikonsumsi
BindingAturan yang menghubungkan Exchange ke Queue
Routing KeyLabel pada pesan yang digunakan Exchange untuk routing
AckSinyal dari consumer bahwa pesan berhasil diproses
NackSinyal bahwa pesan gagal, bisa dikembalikan ke queue

Tipe Exchange #

Direct  — pesan dikirim ke queue berdasarkan routing key yang persis sama
Fanout  — pesan dikirim ke SEMUA queue yang terikat ke exchange ini
Topic   — pesan dikirim berdasarkan pola routing key (wildcard * dan #)
Headers — routing berdasarkan header atribut pesan (jarang digunakan)

Setup dan Dependensi #

// build.gradle.kts
dependencies {
    // RabbitMQ Java Client (AMQP)
    implementation("com.rabbitmq:amqp-client:5.21.0")

    // kotlinx.serialization untuk pesan
    implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.3")

    // Coroutine (opsional)
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0")
}

Koneksi ke RabbitMQ #

import com.rabbitmq.client.*
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json

object KoneksiRabbitMQ {
    private val factory = ConnectionFactory().apply {
        host = System.getenv("RABBITMQ_HOST") ?: "localhost"
        port = System.getenv("RABBITMQ_PORT")?.toInt() ?: 5672
        username = System.getenv("RABBITMQ_USER") ?: "guest"
        password = System.getenv("RABBITMQ_PASSWORD") ?: "guest"
        virtualHost = System.getenv("RABBITMQ_VHOST") ?: "/"

        // Pengaturan koneksi
        connectionTimeout = 30_000        // 30 detik timeout koneksi
        requestedHeartbeat = 60           // heartbeat setiap 60 detik
        isAutomaticRecoveryEnabled = true // otomatis reconnect jika koneksi terputus
        networkRecoveryInterval = 5_000   // coba reconnect setiap 5 detik
    }

    // Satu koneksi bisa punya banyak channel
    // Channel adalah unit logical komunikasi, ringan untuk dibuat
    fun buatKoneksi(): Connection = factory.newConnection()
}

// Penggunaan
fun main() {
    KoneksiRabbitMQ.buatKoneksi().use { koneksi ->
        koneksi.createChannel().use { channel ->
            println("Terhubung ke RabbitMQ: ${koneksi.serverProperties["version"]}")
        }
    }
}

Direct Exchange — Point-to-Point #

Direct Exchange mengirim pesan ke queue yang memiliki binding key yang persis sama dengan routing key pesan.

@Serializable
data class EventPesanan(
    val pesananId: String,
    val penggunaId: Long,
    val total: Double,
    val status: String
)

class MessagingDirect {
    private val json = Json { ignoreUnknownKeys = true }
    private val EXCHANGE = "pesanan.direct"
    private val QUEUE_EMAIL = "pesanan.email"
    private val QUEUE_INVENTORI = "pesanan.inventori"
    private val KEY_BARU = "pesanan.baru"
    private val KEY_SELESAI = "pesanan.selesai"

    // Setup: deklarasikan exchange dan queue
    fun setupInfrastruktur(channel: Channel) {
        // Deklarasi exchange — durable: bertahan setelah broker restart
        channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, true)

        // Deklarasi queue — durable: queue bertahan, messages bisa juga durable
        channel.queueDeclare(QUEUE_EMAIL, true, false, false, null)
        channel.queueDeclare(QUEUE_INVENTORI, true, false, false, null)

        // Binding: hubungkan queue ke exchange dengan routing key
        channel.queueBind(QUEUE_EMAIL, EXCHANGE, KEY_BARU)      // email hanya terima pesanan baru
        channel.queueBind(QUEUE_EMAIL, EXCHANGE, KEY_SELESAI)   // email juga terima pesanan selesai
        channel.queueBind(QUEUE_INVENTORI, EXCHANGE, KEY_BARU)  // inventori hanya pesanan baru

        println("Infrastruktur messaging siap")
    }

    // Publisher
    fun kirimEventPesanan(channel: Channel, event: EventPesanan, routingKey: String) {
        val payload = json.encodeToString(EventPesanan.serializer(), event).toByteArray()

        val properties = AMQP.BasicProperties.Builder()
            .contentType("application/json")
            .deliveryMode(2)  // 2 = persistent (pesan tidak hilang jika broker restart)
            .messageId(event.pesananId)
            .timestamp(java.util.Date())
            .build()

        channel.basicPublish(EXCHANGE, routingKey, properties, payload)
        println("Event terkirim ke '$routingKey': ${event.pesananId}")
    }

    // Consumer dengan manual acknowledgement
    fun mulaiConsumer(channel: Channel, queueName: String, namaConsumer: String) {
        // prefetchCount = 1: ambil satu pesan dulu, baru minta berikutnya
        // Ini mendistribusikan beban secara merata antar consumer
        channel.basicQos(1)

        val deliverCallback = DeliverCallback { _, delivery ->
            try {
                val pesan = String(delivery.body)
                val event = json.decodeFromString(EventPesanan.serializer(), pesan)
                println("[$namaConsumer] Memproses: ${event.pesananId}")

                prosesEvent(namaConsumer, event)

                // Acknowledge — beritahu broker pesan berhasil diproses
                channel.basicAck(delivery.envelope.deliveryTag, false)

            } catch (e: Exception) {
                println("[$namaConsumer] Gagal: ${e.message}")
                // Nack dengan requeue=false → kirim ke dead letter (jika dikonfigurasi)
                channel.basicNack(delivery.envelope.deliveryTag, false, false)
            }
        }

        val cancelCallback = CancelCallback { tag ->
            println("[$namaConsumer] Consumer dibatalkan: $tag")
        }

        // autoAck=false: kita yang kontrol kapan ack dikirim
        channel.basicConsume(queueName, false, deliverCallback, cancelCallback)
        println("[$namaConsumer] Mendengarkan queue '$queueName'...")
    }

    private fun prosesEvent(consumer: String, event: EventPesanan) {
        Thread.sleep(500)  // simulasi pemrosesan
        println("[$consumer] Selesai proses ${event.pesananId}")
    }
}

fun main() {
    val messaging = MessagingDirect()
    val json = Json { ignoreUnknownKeys = true }

    KoneksiRabbitMQ.buatKoneksi().use { koneksi ->
        koneksi.createChannel().use { channel ->
            messaging.setupInfrastruktur(channel)

            // Publish beberapa pesan
            listOf(
                EventPesanan("ORD-001", 1L, 150_000.0, "BARU") to "pesanan.baru",
                EventPesanan("ORD-002", 2L, 250_000.0, "BARU") to "pesanan.baru",
                EventPesanan("ORD-001", 1L, 150_000.0, "SELESAI") to "pesanan.selesai"
            ).forEach { (event, key) ->
                messaging.kirimEventPesanan(channel, event, key)
            }
        }

        // Consumer di channel terpisah
        val channelEmail = koneksi.createChannel()
        messaging.mulaiConsumer(channelEmail, "pesanan.email", "Email Service")

        Thread.sleep(5000)
    }
}

Fanout Exchange — Broadcast ke Semua Queue #

Fanout Exchange mengirim pesan ke semua queue yang terikat, tanpa memperhatikan routing key:

class MessagingFanout {
    private val json = Json { ignoreUnknownKeys = true }
    private val EXCHANGE = "notifikasi.fanout"

    fun setup(channel: Channel) {
        channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT, true)

        // Setiap service membuat queue sendiri dan bind ke fanout exchange
        // Queue bisa bersifat sementara (exclusive, autodelete)
        val queueEmail = channel.queueDeclare("notif.email", true, false, false, null).queue
        val queueSms   = channel.queueDeclare("notif.sms", true, false, false, null).queue
        val queuePush  = channel.queueDeclare("notif.push", true, false, false, null).queue

        // Binding ke fanout — routing key diabaikan
        channel.queueBind(queueEmail, EXCHANGE, "")
        channel.queueBind(queueSms,   EXCHANGE, "")
        channel.queueBind(queuePush,  EXCHANGE, "")
    }

    fun broadcast(channel: Channel, pesan: String) {
        val payload = pesan.toByteArray()
        val props = AMQP.BasicProperties.Builder().deliveryMode(2).build()

        // Routing key kosong untuk fanout
        channel.basicPublish(EXCHANGE, "", props, payload)
        println("Broadcast dikirim ke semua subscriber: $pesan")
    }
}

Topic Exchange — Routing Berbasis Pola #

Topic Exchange menggunakan wildcard untuk routing:

  • * menggantikan tepat satu kata
  • # menggantikan nol atau lebih kata
class MessagingTopic {
    private val json = Json { ignoreUnknownKeys = true }
    private val EXCHANGE = "sistem.topic"

    fun setup(channel: Channel) {
        channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC, true)

        // Queue untuk semua event pesanan
        channel.queueDeclare("audit.semua", true, false, false, null)
        channel.queueBind("audit.semua", EXCHANGE, "pesanan.#")
        // Menerima: pesanan.baru, pesanan.bayar, pesanan.kirim, pesanan.selesai, dll

        // Queue hanya untuk error apapun
        channel.queueDeclare("alert.error", true, false, false, null)
        channel.queueBind("alert.error", EXCHANGE, "*.error")
        // Menerima: pesanan.error, pembayaran.error, pengiriman.error, dll

        // Queue untuk monitoring umum
        channel.queueDeclare("monitoring.semua", true, false, false, null)
        channel.queueBind("monitoring.semua", EXCHANGE, "#")
        // Menerima: SEMUA pesan apapun routing key-nya
    }

    fun kirim(channel: Channel, routingKey: String, pesan: String) {
        val props = AMQP.BasicProperties.Builder().deliveryMode(2).build()
        channel.basicPublish(EXCHANGE, routingKey, props, pesan.toByteArray())
        println("Pesan '$pesan' dikirim dengan key '$routingKey'")
    }
}

Dead Letter Exchange (DLX) #

DLX menerima pesan yang di-nack, expired, atau queue penuh — memungkinkan penanganan pesan gagal:

fun setupDenganDlx(channel: Channel) {
    // Buat DLX exchange dan queue terlebih dahulu
    channel.exchangeDeclare("dlx.exchange", BuiltinExchangeType.DIRECT, true)
    channel.queueDeclare("dlx.queue", true, false, false, null)
    channel.queueBind("dlx.queue", "dlx.exchange", "dead")

    // Buat queue utama dengan argumen DLX
    val args = mapOf(
        "x-dead-letter-exchange" to "dlx.exchange",    // exchange tujuan DLX
        "x-dead-letter-routing-key" to "dead",          // routing key untuk DLX
        "x-message-ttl" to 30_000,                      // pesan expired setelah 30 detik
        "x-max-length" to 1000                           // maks 1000 pesan dalam queue
    )

    channel.exchangeDeclare("pesanan.exchange", BuiltinExchangeType.DIRECT, true)
    channel.queueDeclare("pesanan.queue", true, false, false, args)
    channel.queueBind("pesanan.queue", "pesanan.exchange", "pesanan")

    println("Queue dengan DLX berhasil dikonfigurasi")
}

// Consumer yang mengirim ke DLX saat gagal
fun consumerDenganDlx(channel: Channel) {
    channel.basicQos(1)

    channel.basicConsume("pesanan.queue", false,
        { _, delivery ->
            val pesan = String(delivery.body)
            try {
                proseskanPesan(pesan)
                channel.basicAck(delivery.envelope.deliveryTag, false)
            } catch (e: Exception) {
                println("Gagal: ${e.message} — kirim ke DLX")
                // requeue=false: jangan kembalikan ke queue utama, kirim ke DLX
                channel.basicNack(delivery.envelope.deliveryTag, false, false)
            }
        },
        { tag -> println("Consumer dibatalkan: $tag") }
    )
}

fun proseskanPesan(pesan: String) {
    if (Math.random() < 0.3) throw RuntimeException("Pemrosesan gagal!")
    println("Pesan diproses: $pesan")
}

Pola RPC (Request-Reply) #

RabbitMQ sangat cocok untuk pola request-reply synchronous melalui messaging:

import java.util.UUID
import java.util.concurrent.ArrayBlockingQueue

class RpcClient(private val channel: Channel) {
    private val QUEUE_REQUEST = "rpc.hitung"

    // Queue sementara untuk menerima reply
    private val queueReply = channel.queueDeclare().queue

    // Korelasi ID → queue untuk menyimpan hasil sementara
    private val pending = java.util.concurrent.ConcurrentHashMap<
        String, ArrayBlockingQueue<String>
    >()

    init {
        // Setup consumer untuk reply queue
        channel.basicConsume(queueReply, true,
            { _, delivery ->
                val correlationId = delivery.properties.correlationId
                val hasil = String(delivery.body)
                pending[correlationId]?.offer(hasil)
            },
            { _ -> }
        )
    }

    fun panggilRpc(input: String, timeoutMs: Long = 5000): String? {
        val correlationId = UUID.randomUUID().toString()
        val antrianHasil = ArrayBlockingQueue<String>(1)
        pending[correlationId] = antrianHasil

        try {
            val props = AMQP.BasicProperties.Builder()
                .correlationId(correlationId)
                .replyTo(queueReply)        // beritahu server ke mana reply dikirim
                .build()

            channel.basicPublish("", QUEUE_REQUEST, props, input.toByteArray())
            println("RPC request '$input' dikirim dengan correlationId: $correlationId")

            // Tunggu reply dengan timeout
            return antrianHasil.poll(timeoutMs, java.util.concurrent.TimeUnit.MILLISECONDS)
        } finally {
            pending.remove(correlationId)
        }
    }
}

class RpcServer(private val channel: Channel) {
    private val QUEUE_REQUEST = "rpc.hitung"

    fun mulai() {
        channel.queueDeclare(QUEUE_REQUEST, false, false, false, null)
        channel.basicQos(1)

        channel.basicConsume(QUEUE_REQUEST, false,
            { _, delivery ->
                val input = String(delivery.body)
                println("RPC request diterima: $input")

                // Proses request
                val hasil = prosesRequest(input)

                // Kirim reply ke replyTo queue dengan correlationId yang sama
                val props = AMQP.BasicProperties.Builder()
                    .correlationId(delivery.properties.correlationId)
                    .build()

                channel.basicPublish(
                    "", delivery.properties.replyTo,
                    props, hasil.toByteArray()
                )

                channel.basicAck(delivery.envelope.deliveryTag, false)
                println("Reply '$hasil' dikirim")
            },
            { _ -> }
        )
        println("RPC Server mendengarkan queue '$QUEUE_REQUEST'")
    }

    private fun prosesRequest(input: String): String {
        val angka = input.toIntOrNull() ?: return "Error: bukan angka"
        return (angka * angka).toString()  // kuadratkan angka
    }
}

Consumer dengan Coroutine #

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel as KotlinChannel

fun rabbitConsumerFlow(
    rabbitChannel: Channel,
    queueName: String
): kotlinx.coroutines.flow.Flow<String> = kotlinx.coroutines.flow.callbackFlow {
    rabbitChannel.basicQos(10)

    val tag = rabbitChannel.basicConsume(queueName, false,
        { _, delivery ->
            val pesan = String(delivery.body)
            trySend(pesan)
            rabbitChannel.basicAck(delivery.envelope.deliveryTag, false)
        },
        { _ -> close() }
    )

    awaitClose {
        runCatching { rabbitChannel.basicCancel(tag) }
    }
}

fun main() = runBlocking {
    val koneksi = KoneksiRabbitMQ.buatKoneksi()
    val channel = koneksi.createChannel()

    rabbitConsumerFlow(channel, "pesanan.email")
        .collect { pesan ->
            println("Coroutine consumer: $pesan")
            // proses pesan secara suspend jika perlu
        }
}

Tips Produksi #

// 1. Selalu deklarasikan queue dan exchange sebagai durable
channel.exchangeDeclare("nama", BuiltinExchangeType.DIRECT, /* durable= */ true)
channel.queueDeclare("nama", /* durable= */ true, false, false, null)

// 2. Selalu set pesan sebagai persistent
val properties = AMQP.BasicProperties.Builder()
    .deliveryMode(2)  // 2 = persistent
    .build()

// 3. Set prefetchCount yang tepat untuk consumer
channel.basicQos(10)  // proses maks 10 pesan sekaligus, sebelum acknowledge

// 4. Reconnect otomatis sudah dikonfigurasi di factory, tapi pastikan
//    channel dibuat ulang setelah reconnect karena channel tidak recovery otomatis
factory.isAutomaticRecoveryEnabled = true

// 5. Monitoring: gunakan RabbitMQ Management Plugin (port 15672)
// GET http://localhost:15672/api/queues untuk melihat status queue via REST

// 6. Konfigurasikan consumer timeout
channel.basicConsume(queue, false, deliverCallback, cancelCallback)
// Jika consumer tidak memproses dalam x detik, koneksi akan di-close
// Atur consumer_timeout di rabbitmq.conf

Ringkasan #

  • Exchange adalah router, Queue adalah penyimpan — producer mengirim ke exchange, consumer membaca dari queue. Binding menghubungkan keduanya. Pahami tiga jenis exchange utama: Direct (exact match), Fanout (broadcast), Topic (wildcard).
  • basicQos(1) untuk distribusi merata — tanpa ini, RabbitMQ bisa menumpuk semua pesan ke satu consumer. prefetchCount = 1 memastikan setiap consumer hanya memegang satu pesan yang belum diproses.
  • Manual ack selalu lebih amanautoAck=false dan basicAck() setelah pemrosesan memastikan pesan tidak hilang jika consumer crash di tengah pemrosesan. Dengan autoAck=true, pesan langsung dihapus setelah dikirim.
  • deliveryMode=2 untuk pesan persisten — tanpa ini, pesan bisa hilang jika broker restart. Pastikan juga queue dan exchange dideklarasikan sebagai durable=true.
  • Dead Letter Exchange untuk penanganan error — jangan biarkan pesan gagal hilang begitu saja. Konfigurasi DLX dan monitor queue-nya untuk penanganan manual atau retry yang terkontrol.
  • Automatic recovery — set isAutomaticRecoveryEnabled=true di factory agar koneksi otomatis pulih setelah jaringan terputus. Tapi ingat: channel tidak direcovery otomatis, hanya koneksi.
  • RPC dengan correlationId dan replyTo — pola request-reply via RabbitMQ menggunakan dua properti ini untuk menghubungkan request dan response. Ini adalah cara elegan untuk RPC tanpa tight coupling.
  • RabbitMQ untuk task queue, Kafka untuk streaming — RabbitMQ unggul untuk distribusi tugas (task queue), routing kompleks, dan RPC. Kafka unggul untuk streaming data besar, replay, dan fan-out ke banyak consumer group independen.

← Sebelumnya: Kafka   Berikutnya: Amazon SQS →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact