RabbitMQ #
RabbitMQ adalah message broker yang mengimplementasikan protokol AMQP (Advanced Message Queuing Protocol). Berbeda dari Kafka yang adalah log terdistribusi, RabbitMQ adalah broker yang aktif merutekan pesan — ia menggunakan konsep Exchange dan Queue yang dipisahkan oleh binding rules. Ini membuat RabbitMQ sangat fleksibel untuk berbagai pola messaging: point-to-point, publish-subscribe, request-reply, dan routing berbasis konten. RabbitMQ unggul untuk use case yang membutuhkan routing pesan yang kompleks, task queue dengan prioritas, dan pesan yang perlu di-acknowledge setelah diproses. Di Kotlin, kamu menggunakan library RabbitMQ Java Client (AMQP client resmi) untuk berkomunikasi dengan broker.
RabbitMQ vs Kafka — Kapan Memilih #
PILIH RabbitMQ jika:
✓ Butuh routing pesan yang fleksibel (per tipe, per konten)
✓ Task queue dengan worker yang banyak (distribusi beban)
✓ RPC (request-reply) pattern
✓ Pesan perlu di-acknowledge dan dihapus setelah diproses
✓ Dead letter handling yang mudah dikonfigurasi
✓ Prioritas antrian
PILIH Kafka jika:
✓ Streaming data dalam volume sangat besar
✓ Perlu replay pesan (konsumer bisa baca ulang dari awal)
✓ Banyak consumer group independen membaca data yang sama
✓ Event sourcing dan audit trail jangka panjang
✓ Stream processing (Kafka Streams, flink)
Konsep Utama RabbitMQ #
flowchart LR
P[Producer] --> E["Exchange\n(Direct/Fanout/Topic)"]
E --> |"binding key: pesanan.baru"| Q1["Queue: email-notif"]
E --> |"binding key: pesanan.*"| Q2["Queue: audit-log"]
E --> |"fanout"| Q3["Queue: dashboard-realtime"]
Q1 --> C1[Consumer: Email Service]
Q2 --> C2[Consumer: Audit Service]
Q3 --> C3[Consumer: Dashboard]| Konsep | Penjelasan |
|---|---|
| Exchange | Titik masuk pesan, menentukan bagaimana pesan dirouting |
| Queue | Tempat penyimpanan pesan sampai dikonsumsi |
| Binding | Aturan yang menghubungkan Exchange ke Queue |
| Routing Key | Label pada pesan yang digunakan Exchange untuk routing |
| Ack | Sinyal dari consumer bahwa pesan berhasil diproses |
| Nack | Sinyal bahwa pesan gagal, bisa dikembalikan ke queue |
Tipe Exchange #
Direct — pesan dikirim ke queue berdasarkan routing key yang persis sama
Fanout — pesan dikirim ke SEMUA queue yang terikat ke exchange ini
Topic — pesan dikirim berdasarkan pola routing key (wildcard * dan #)
Headers — routing berdasarkan header atribut pesan (jarang digunakan)
Setup dan Dependensi #
// build.gradle.kts
dependencies {
// RabbitMQ Java Client (AMQP)
implementation("com.rabbitmq:amqp-client:5.21.0")
// kotlinx.serialization untuk pesan
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.3")
// Coroutine (opsional)
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0")
}
Koneksi ke RabbitMQ #
import com.rabbitmq.client.*
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
object KoneksiRabbitMQ {
private val factory = ConnectionFactory().apply {
host = System.getenv("RABBITMQ_HOST") ?: "localhost"
port = System.getenv("RABBITMQ_PORT")?.toInt() ?: 5672
username = System.getenv("RABBITMQ_USER") ?: "guest"
password = System.getenv("RABBITMQ_PASSWORD") ?: "guest"
virtualHost = System.getenv("RABBITMQ_VHOST") ?: "/"
// Pengaturan koneksi
connectionTimeout = 30_000 // 30 detik timeout koneksi
requestedHeartbeat = 60 // heartbeat setiap 60 detik
isAutomaticRecoveryEnabled = true // otomatis reconnect jika koneksi terputus
networkRecoveryInterval = 5_000 // coba reconnect setiap 5 detik
}
// Satu koneksi bisa punya banyak channel
// Channel adalah unit logical komunikasi, ringan untuk dibuat
fun buatKoneksi(): Connection = factory.newConnection()
}
// Penggunaan
fun main() {
KoneksiRabbitMQ.buatKoneksi().use { koneksi ->
koneksi.createChannel().use { channel ->
println("Terhubung ke RabbitMQ: ${koneksi.serverProperties["version"]}")
}
}
}
Direct Exchange — Point-to-Point #
Direct Exchange mengirim pesan ke queue yang memiliki binding key yang persis sama dengan routing key pesan.
@Serializable
data class EventPesanan(
val pesananId: String,
val penggunaId: Long,
val total: Double,
val status: String
)
class MessagingDirect {
private val json = Json { ignoreUnknownKeys = true }
private val EXCHANGE = "pesanan.direct"
private val QUEUE_EMAIL = "pesanan.email"
private val QUEUE_INVENTORI = "pesanan.inventori"
private val KEY_BARU = "pesanan.baru"
private val KEY_SELESAI = "pesanan.selesai"
// Setup: deklarasikan exchange dan queue
fun setupInfrastruktur(channel: Channel) {
// Deklarasi exchange — durable: bertahan setelah broker restart
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, true)
// Deklarasi queue — durable: queue bertahan, messages bisa juga durable
channel.queueDeclare(QUEUE_EMAIL, true, false, false, null)
channel.queueDeclare(QUEUE_INVENTORI, true, false, false, null)
// Binding: hubungkan queue ke exchange dengan routing key
channel.queueBind(QUEUE_EMAIL, EXCHANGE, KEY_BARU) // email hanya terima pesanan baru
channel.queueBind(QUEUE_EMAIL, EXCHANGE, KEY_SELESAI) // email juga terima pesanan selesai
channel.queueBind(QUEUE_INVENTORI, EXCHANGE, KEY_BARU) // inventori hanya pesanan baru
println("Infrastruktur messaging siap")
}
// Publisher
fun kirimEventPesanan(channel: Channel, event: EventPesanan, routingKey: String) {
val payload = json.encodeToString(EventPesanan.serializer(), event).toByteArray()
val properties = AMQP.BasicProperties.Builder()
.contentType("application/json")
.deliveryMode(2) // 2 = persistent (pesan tidak hilang jika broker restart)
.messageId(event.pesananId)
.timestamp(java.util.Date())
.build()
channel.basicPublish(EXCHANGE, routingKey, properties, payload)
println("Event terkirim ke '$routingKey': ${event.pesananId}")
}
// Consumer dengan manual acknowledgement
fun mulaiConsumer(channel: Channel, queueName: String, namaConsumer: String) {
// prefetchCount = 1: ambil satu pesan dulu, baru minta berikutnya
// Ini mendistribusikan beban secara merata antar consumer
channel.basicQos(1)
val deliverCallback = DeliverCallback { _, delivery ->
try {
val pesan = String(delivery.body)
val event = json.decodeFromString(EventPesanan.serializer(), pesan)
println("[$namaConsumer] Memproses: ${event.pesananId}")
prosesEvent(namaConsumer, event)
// Acknowledge — beritahu broker pesan berhasil diproses
channel.basicAck(delivery.envelope.deliveryTag, false)
} catch (e: Exception) {
println("[$namaConsumer] Gagal: ${e.message}")
// Nack dengan requeue=false → kirim ke dead letter (jika dikonfigurasi)
channel.basicNack(delivery.envelope.deliveryTag, false, false)
}
}
val cancelCallback = CancelCallback { tag ->
println("[$namaConsumer] Consumer dibatalkan: $tag")
}
// autoAck=false: kita yang kontrol kapan ack dikirim
channel.basicConsume(queueName, false, deliverCallback, cancelCallback)
println("[$namaConsumer] Mendengarkan queue '$queueName'...")
}
private fun prosesEvent(consumer: String, event: EventPesanan) {
Thread.sleep(500) // simulasi pemrosesan
println("[$consumer] Selesai proses ${event.pesananId}")
}
}
fun main() {
val messaging = MessagingDirect()
val json = Json { ignoreUnknownKeys = true }
KoneksiRabbitMQ.buatKoneksi().use { koneksi ->
koneksi.createChannel().use { channel ->
messaging.setupInfrastruktur(channel)
// Publish beberapa pesan
listOf(
EventPesanan("ORD-001", 1L, 150_000.0, "BARU") to "pesanan.baru",
EventPesanan("ORD-002", 2L, 250_000.0, "BARU") to "pesanan.baru",
EventPesanan("ORD-001", 1L, 150_000.0, "SELESAI") to "pesanan.selesai"
).forEach { (event, key) ->
messaging.kirimEventPesanan(channel, event, key)
}
}
// Consumer di channel terpisah
val channelEmail = koneksi.createChannel()
messaging.mulaiConsumer(channelEmail, "pesanan.email", "Email Service")
Thread.sleep(5000)
}
}
Fanout Exchange — Broadcast ke Semua Queue #
Fanout Exchange mengirim pesan ke semua queue yang terikat, tanpa memperhatikan routing key:
class MessagingFanout {
private val json = Json { ignoreUnknownKeys = true }
private val EXCHANGE = "notifikasi.fanout"
fun setup(channel: Channel) {
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT, true)
// Setiap service membuat queue sendiri dan bind ke fanout exchange
// Queue bisa bersifat sementara (exclusive, autodelete)
val queueEmail = channel.queueDeclare("notif.email", true, false, false, null).queue
val queueSms = channel.queueDeclare("notif.sms", true, false, false, null).queue
val queuePush = channel.queueDeclare("notif.push", true, false, false, null).queue
// Binding ke fanout — routing key diabaikan
channel.queueBind(queueEmail, EXCHANGE, "")
channel.queueBind(queueSms, EXCHANGE, "")
channel.queueBind(queuePush, EXCHANGE, "")
}
fun broadcast(channel: Channel, pesan: String) {
val payload = pesan.toByteArray()
val props = AMQP.BasicProperties.Builder().deliveryMode(2).build()
// Routing key kosong untuk fanout
channel.basicPublish(EXCHANGE, "", props, payload)
println("Broadcast dikirim ke semua subscriber: $pesan")
}
}
Topic Exchange — Routing Berbasis Pola #
Topic Exchange menggunakan wildcard untuk routing:
*menggantikan tepat satu kata#menggantikan nol atau lebih kata
class MessagingTopic {
private val json = Json { ignoreUnknownKeys = true }
private val EXCHANGE = "sistem.topic"
fun setup(channel: Channel) {
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC, true)
// Queue untuk semua event pesanan
channel.queueDeclare("audit.semua", true, false, false, null)
channel.queueBind("audit.semua", EXCHANGE, "pesanan.#")
// Menerima: pesanan.baru, pesanan.bayar, pesanan.kirim, pesanan.selesai, dll
// Queue hanya untuk error apapun
channel.queueDeclare("alert.error", true, false, false, null)
channel.queueBind("alert.error", EXCHANGE, "*.error")
// Menerima: pesanan.error, pembayaran.error, pengiriman.error, dll
// Queue untuk monitoring umum
channel.queueDeclare("monitoring.semua", true, false, false, null)
channel.queueBind("monitoring.semua", EXCHANGE, "#")
// Menerima: SEMUA pesan apapun routing key-nya
}
fun kirim(channel: Channel, routingKey: String, pesan: String) {
val props = AMQP.BasicProperties.Builder().deliveryMode(2).build()
channel.basicPublish(EXCHANGE, routingKey, props, pesan.toByteArray())
println("Pesan '$pesan' dikirim dengan key '$routingKey'")
}
}
Dead Letter Exchange (DLX) #
DLX menerima pesan yang di-nack, expired, atau queue penuh — memungkinkan penanganan pesan gagal:
fun setupDenganDlx(channel: Channel) {
// Buat DLX exchange dan queue terlebih dahulu
channel.exchangeDeclare("dlx.exchange", BuiltinExchangeType.DIRECT, true)
channel.queueDeclare("dlx.queue", true, false, false, null)
channel.queueBind("dlx.queue", "dlx.exchange", "dead")
// Buat queue utama dengan argumen DLX
val args = mapOf(
"x-dead-letter-exchange" to "dlx.exchange", // exchange tujuan DLX
"x-dead-letter-routing-key" to "dead", // routing key untuk DLX
"x-message-ttl" to 30_000, // pesan expired setelah 30 detik
"x-max-length" to 1000 // maks 1000 pesan dalam queue
)
channel.exchangeDeclare("pesanan.exchange", BuiltinExchangeType.DIRECT, true)
channel.queueDeclare("pesanan.queue", true, false, false, args)
channel.queueBind("pesanan.queue", "pesanan.exchange", "pesanan")
println("Queue dengan DLX berhasil dikonfigurasi")
}
// Consumer yang mengirim ke DLX saat gagal
fun consumerDenganDlx(channel: Channel) {
channel.basicQos(1)
channel.basicConsume("pesanan.queue", false,
{ _, delivery ->
val pesan = String(delivery.body)
try {
proseskanPesan(pesan)
channel.basicAck(delivery.envelope.deliveryTag, false)
} catch (e: Exception) {
println("Gagal: ${e.message} — kirim ke DLX")
// requeue=false: jangan kembalikan ke queue utama, kirim ke DLX
channel.basicNack(delivery.envelope.deliveryTag, false, false)
}
},
{ tag -> println("Consumer dibatalkan: $tag") }
)
}
fun proseskanPesan(pesan: String) {
if (Math.random() < 0.3) throw RuntimeException("Pemrosesan gagal!")
println("Pesan diproses: $pesan")
}
Pola RPC (Request-Reply) #
RabbitMQ sangat cocok untuk pola request-reply synchronous melalui messaging:
import java.util.UUID
import java.util.concurrent.ArrayBlockingQueue
class RpcClient(private val channel: Channel) {
private val QUEUE_REQUEST = "rpc.hitung"
// Queue sementara untuk menerima reply
private val queueReply = channel.queueDeclare().queue
// Korelasi ID → queue untuk menyimpan hasil sementara
private val pending = java.util.concurrent.ConcurrentHashMap<
String, ArrayBlockingQueue<String>
>()
init {
// Setup consumer untuk reply queue
channel.basicConsume(queueReply, true,
{ _, delivery ->
val correlationId = delivery.properties.correlationId
val hasil = String(delivery.body)
pending[correlationId]?.offer(hasil)
},
{ _ -> }
)
}
fun panggilRpc(input: String, timeoutMs: Long = 5000): String? {
val correlationId = UUID.randomUUID().toString()
val antrianHasil = ArrayBlockingQueue<String>(1)
pending[correlationId] = antrianHasil
try {
val props = AMQP.BasicProperties.Builder()
.correlationId(correlationId)
.replyTo(queueReply) // beritahu server ke mana reply dikirim
.build()
channel.basicPublish("", QUEUE_REQUEST, props, input.toByteArray())
println("RPC request '$input' dikirim dengan correlationId: $correlationId")
// Tunggu reply dengan timeout
return antrianHasil.poll(timeoutMs, java.util.concurrent.TimeUnit.MILLISECONDS)
} finally {
pending.remove(correlationId)
}
}
}
class RpcServer(private val channel: Channel) {
private val QUEUE_REQUEST = "rpc.hitung"
fun mulai() {
channel.queueDeclare(QUEUE_REQUEST, false, false, false, null)
channel.basicQos(1)
channel.basicConsume(QUEUE_REQUEST, false,
{ _, delivery ->
val input = String(delivery.body)
println("RPC request diterima: $input")
// Proses request
val hasil = prosesRequest(input)
// Kirim reply ke replyTo queue dengan correlationId yang sama
val props = AMQP.BasicProperties.Builder()
.correlationId(delivery.properties.correlationId)
.build()
channel.basicPublish(
"", delivery.properties.replyTo,
props, hasil.toByteArray()
)
channel.basicAck(delivery.envelope.deliveryTag, false)
println("Reply '$hasil' dikirim")
},
{ _ -> }
)
println("RPC Server mendengarkan queue '$QUEUE_REQUEST'")
}
private fun prosesRequest(input: String): String {
val angka = input.toIntOrNull() ?: return "Error: bukan angka"
return (angka * angka).toString() // kuadratkan angka
}
}
Consumer dengan Coroutine #
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel as KotlinChannel
fun rabbitConsumerFlow(
rabbitChannel: Channel,
queueName: String
): kotlinx.coroutines.flow.Flow<String> = kotlinx.coroutines.flow.callbackFlow {
rabbitChannel.basicQos(10)
val tag = rabbitChannel.basicConsume(queueName, false,
{ _, delivery ->
val pesan = String(delivery.body)
trySend(pesan)
rabbitChannel.basicAck(delivery.envelope.deliveryTag, false)
},
{ _ -> close() }
)
awaitClose {
runCatching { rabbitChannel.basicCancel(tag) }
}
}
fun main() = runBlocking {
val koneksi = KoneksiRabbitMQ.buatKoneksi()
val channel = koneksi.createChannel()
rabbitConsumerFlow(channel, "pesanan.email")
.collect { pesan ->
println("Coroutine consumer: $pesan")
// proses pesan secara suspend jika perlu
}
}
Tips Produksi #
// 1. Selalu deklarasikan queue dan exchange sebagai durable
channel.exchangeDeclare("nama", BuiltinExchangeType.DIRECT, /* durable= */ true)
channel.queueDeclare("nama", /* durable= */ true, false, false, null)
// 2. Selalu set pesan sebagai persistent
val properties = AMQP.BasicProperties.Builder()
.deliveryMode(2) // 2 = persistent
.build()
// 3. Set prefetchCount yang tepat untuk consumer
channel.basicQos(10) // proses maks 10 pesan sekaligus, sebelum acknowledge
// 4. Reconnect otomatis sudah dikonfigurasi di factory, tapi pastikan
// channel dibuat ulang setelah reconnect karena channel tidak recovery otomatis
factory.isAutomaticRecoveryEnabled = true
// 5. Monitoring: gunakan RabbitMQ Management Plugin (port 15672)
// GET http://localhost:15672/api/queues untuk melihat status queue via REST
// 6. Konfigurasikan consumer timeout
channel.basicConsume(queue, false, deliverCallback, cancelCallback)
// Jika consumer tidak memproses dalam x detik, koneksi akan di-close
// Atur consumer_timeout di rabbitmq.conf
Ringkasan #
- Exchange adalah router, Queue adalah penyimpan — producer mengirim ke exchange, consumer membaca dari queue. Binding menghubungkan keduanya. Pahami tiga jenis exchange utama: Direct (exact match), Fanout (broadcast), Topic (wildcard).
basicQos(1)untuk distribusi merata — tanpa ini, RabbitMQ bisa menumpuk semua pesan ke satu consumer.prefetchCount = 1memastikan setiap consumer hanya memegang satu pesan yang belum diproses.- Manual ack selalu lebih aman —
autoAck=falsedanbasicAck()setelah pemrosesan memastikan pesan tidak hilang jika consumer crash di tengah pemrosesan. DenganautoAck=true, pesan langsung dihapus setelah dikirim.deliveryMode=2untuk pesan persisten — tanpa ini, pesan bisa hilang jika broker restart. Pastikan juga queue dan exchange dideklarasikan sebagaidurable=true.- Dead Letter Exchange untuk penanganan error — jangan biarkan pesan gagal hilang begitu saja. Konfigurasi DLX dan monitor queue-nya untuk penanganan manual atau retry yang terkontrol.
- Automatic recovery — set
isAutomaticRecoveryEnabled=truedi factory agar koneksi otomatis pulih setelah jaringan terputus. Tapi ingat: channel tidak direcovery otomatis, hanya koneksi.- RPC dengan correlationId dan replyTo — pola request-reply via RabbitMQ menggunakan dua properti ini untuk menghubungkan request dan response. Ini adalah cara elegan untuk RPC tanpa tight coupling.
- RabbitMQ untuk task queue, Kafka untuk streaming — RabbitMQ unggul untuk distribusi tugas (task queue), routing kompleks, dan RPC. Kafka unggul untuk streaming data besar, replay, dan fan-out ke banyak consumer group independen.