Google Pub/Sub #
Google Cloud Pub/Sub adalah layanan messaging terkelola dari Google Cloud Platform yang menawarkan pengiriman pesan at-least-once dengan latensi rendah dan throughput sangat tinggi. Ia adalah padanan GCP untuk Amazon SQS + SNS yang dikombinasikan — satu Topic bisa punya banyak Subscription, dan setiap Subscription bisa berupa push (GCP mendorong pesan ke endpoint HTTP kamu) atau pull (kamu menarik pesan secara aktif). Pub/Sub sangat cocok untuk arsitektur berbasis GCP, menghubungkan microservice, streaming data ke BigQuery atau Cloud Storage, dan integrasi dengan Cloud Functions dan Cloud Run. Di Kotlin, kamu menggunakan library Google Cloud Pub/Sub Java yang terintegrasi mulus dengan GCP authentication. Artikel ini membahas Publisher, dua mode Subscriber (push dan pull), message ordering, filter, Dead Letter Topic, dan development dengan emulator.
Konsep Utama Google Pub/Sub #
flowchart LR
P1[Publisher A] --> T["Topic:\npesanan-events"]
P2[Publisher B] --> T
T --> S1["Subscription: email-notif\n(Pull)"]
T --> S2["Subscription: audit-log\n(Push → Cloud Function)"]
T --> S3["Subscription: bigquery\n(BigQuery Subscription)"]
S1 --> C1[Email Service\n(menarik pesan)]
S2 --> C2[Cloud Function\n(endpoint HTTP)]
S3 --> C3[BigQuery Table\n(analitik)]| Konsep | Penjelasan |
|---|---|
| Topic | Saluran pesan — publisher mengirim ke sini |
| Subscription | Langganan ke Topic — bisa Pull atau Push |
| Message | Unit data, berisi body (bytes) dan attributes (Map<String,String>) |
| Ack | Konfirmasi pesan berhasil diproses, Pub/Sub menghapusnya |
| Nack | Pesan gagal, Pub/Sub akan kirim ulang setelah ack deadline |
| Ack Deadline | Batas waktu untuk ack (10 detik default, maks 600 detik) |
| Ordering Key | Field opsional untuk menjamin urutan pesan per key |
Perbandingan dengan SQS dan Kafka #
| Aspek | Google Pub/Sub | Amazon SQS | Apache Kafka |
|---|---|---|---|
| Model | Topic + multi Subscription | Queue (1 producer, 1 group consumer) | Topic + Consumer Group |
| Push delivery | ✓ Native (HTTP push) | ✗ Butuh polling | ✗ Pull only |
| Ordering | ✓ Dengan ordering key | ✗ Standard; ✓ FIFO | ✓ Per partition |
| Retention | 7 hari (maks 31 hari) | 14 hari | Configurable (bisa lama) |
| Replay | ✗ Tidak bisa (hanya forward) | ✗ Tidak bisa | ✓ Bisa dari awal |
| Filter | ✓ Native per subscription | ✗ Tidak ada | ✗ Di consumer |
| Ekosistem | GCP native | AWS native | Cloud-agnostic |
Setup dan Dependensi #
// build.gradle.kts
dependencies {
// Google Cloud Pub/Sub
implementation("com.google.cloud:google-cloud-pubsub:1.127.2")
// kotlinx.serialization
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.3")
// Coroutine
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-guava:1.8.0") // Guava Future → coroutine
}
Autentikasi GCP #
Google Cloud menggunakan Application Default Credentials (ADC) — library secara otomatis mencari kredensial dari berbagai sumber:
// Cara kerja ADC (secara otomatis, tanpa kode):
// 1. Environment variable GOOGLE_APPLICATION_CREDENTIALS (path ke service account JSON)
// 2. gcloud auth application-default login (untuk development lokal)
// 3. Service Account yang terpasang di GCE/GKE/Cloud Run
// Untuk development lokal:
// $ gcloud auth application-default login
// $ export GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json
// Atau buat credentials secara eksplisit di kode:
import com.google.auth.oauth2.GoogleCredentials
import com.google.auth.oauth2.ServiceAccountCredentials
import java.io.FileInputStream
fun buatCredentials(): GoogleCredentials {
val pathServiceAccount = System.getenv("GOOGLE_APPLICATION_CREDENTIALS")
?: throw IllegalStateException("GOOGLE_APPLICATION_CREDENTIALS tidak di-set")
return ServiceAccountCredentials.fromStream(FileInputStream(pathServiceAccount))
}
Membuat Topic dan Subscription #
import com.google.cloud.pubsub.v1.TopicAdminClient
import com.google.cloud.pubsub.v1.SubscriptionAdminClient
import com.google.pubsub.v1.*
object AdminPubSub {
private val PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT") ?: "my-project"
fun namaTopik(nama: String) = ProjectTopicName.of(PROJECT_ID, nama).toString()
fun namaSubscription(nama: String) = ProjectSubscriptionName.of(PROJECT_ID, nama).toString()
fun buatTopik(nama: String): String {
val topikName = namaTopik(nama)
TopicAdminClient.create().use { admin ->
return try {
val topik = admin.createTopic(topikName)
println("Topic dibuat: ${topik.name}")
topik.name
} catch (e: com.google.api.gax.rpc.AlreadyExistsException) {
println("Topic sudah ada: $topikName")
topikName
}
}
}
fun buatSubscriptionPull(namaSub: String, namaTopik: String): String {
val subName = namaSubscription(namaSub)
val topikNameStr = namaTopik(namaTopik)
val subscriptionRequest = Subscription.newBuilder()
.setName(subName)
.setTopic(topikNameStr)
.setAckDeadlineSeconds(60) // 60 detik untuk memproses dan ack
.setMessageRetentionDuration( // simpan pesan hingga 7 hari
com.google.protobuf.Duration.newBuilder().setSeconds(7 * 24 * 3600).build()
)
.setRetainAckedMessages(false) // hapus pesan yang sudah di-ack
.build()
SubscriptionAdminClient.create().use { admin ->
return try {
val sub = admin.createSubscription(subscriptionRequest)
println("Subscription Pull dibuat: ${sub.name}")
sub.name
} catch (e: com.google.api.gax.rpc.AlreadyExistsException) {
println("Subscription sudah ada: $subName")
subName
}
}
}
fun buatSubscriptionPush(namaSub: String, namaTopik: String, endpointUrl: String): String {
val subName = namaSubscription(namaSub)
val topikNameStr = namaTopik(namaTopik)
val pushConfig = PushConfig.newBuilder()
.setPushEndpoint(endpointUrl) // URL yang akan menerima POST dari Pub/Sub
.build()
val subscriptionRequest = Subscription.newBuilder()
.setName(subName)
.setTopic(topikNameStr)
.setPushConfig(pushConfig)
.setAckDeadlineSeconds(60)
.build()
SubscriptionAdminClient.create().use { admin ->
val sub = admin.createSubscription(subscriptionRequest)
println("Subscription Push dibuat: ${sub.name} → $endpointUrl")
return sub.name
}
}
// Subscription dengan filter — hanya terima pesan dengan atribut tertentu
fun buatSubscriptionDenganFilter(namaSub: String, namaTopik: String, filter: String): String {
// Contoh filter: attributes.status = "BARU"
// atau: hasPrefix(attributes.tipe, "pesanan.")
val subName = namaSubscription(namaSub)
val subscriptionRequest = Subscription.newBuilder()
.setName(subName)
.setTopic(namaTopik(namaTopik))
.setFilter(filter)
.setAckDeadlineSeconds(30)
.build()
SubscriptionAdminClient.create().use { admin ->
val sub = admin.createSubscription(subscriptionRequest)
println("Subscription dengan filter '$filter' dibuat: ${sub.name}")
return sub.name
}
}
}
Publisher — Mengirim Pesan #
import com.google.cloud.pubsub.v1.Publisher
import com.google.protobuf.ByteString
import com.google.pubsub.v1.PubsubMessage
import com.google.pubsub.v1.TopicName
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
@Serializable
data class EventPesanan(
val pesananId: String,
val penggunaId: Long,
val total: Double,
val status: String,
val timestamp: Long = System.currentTimeMillis()
)
class PublisherPubSub(
private val projectId: String,
private val topikId: String
) {
private val json = Json { ignoreUnknownKeys = true }
private val topikName = TopicName.of(projectId, topikId)
// Gunakan use{} untuk memastikan publisher ditutup dan semua pesan ter-flush
fun <T> denganPublisher(blok: Publisher.() -> T): T {
val publisher = Publisher.newBuilder(topikName)
.build()
return try {
publisher.blok()
} finally {
publisher.shutdown()
publisher.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS)
}
}
fun kirim(event: EventPesanan): String {
return denganPublisher {
val payload = json.encodeToString(EventPesanan.serializer(), event)
val data = ByteString.copyFromUtf8(payload)
val pesan = PubsubMessage.newBuilder()
.setData(data)
.putAttributes("tipe", "EventPesanan") // atribut untuk filtering
.putAttributes("versi", "1.0")
.putAttributes("status", event.status)
.build()
val future = publish(pesan)
val messageId = future.get() // blokir sampai dapat konfirmasi
println("Pesan terkirim — ID: $messageId")
messageId
}
}
// Kirim dengan ordering key — menjamin urutan per key
fun kirimDenganOrdering(event: EventPesanan, orderingKey: String): String {
val publisher = Publisher.newBuilder(topikName)
.setEnableMessageOrdering(true) // harus diaktifkan di publisher
.build()
return try {
val payload = json.encodeToString(EventPesanan.serializer(), event)
val pesan = PubsubMessage.newBuilder()
.setData(ByteString.copyFromUtf8(payload))
.setOrderingKey(orderingKey) // pesan dengan key yang sama dijamin urut
.putAttributes("penggunaId", event.penggunaId.toString())
.build()
publisher.publish(pesan).get()
} finally {
publisher.shutdown()
publisher.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS)
}
}
// Kirim batch — publisher sudah melakukan batching internal secara otomatis
// tapi kita bisa kirim banyak sekaligus dan tunggu semua selesai
fun kirimBanyak(events: List<EventPesanan>): List<String> {
return denganPublisher {
val futures = events.map { event ->
val pesan = PubsubMessage.newBuilder()
.setData(ByteString.copyFromUtf8(
json.encodeToString(EventPesanan.serializer(), event)
))
.putAttributes("pesananId", event.pesananId)
.build()
publish(pesan)
}
futures.map { it.get() } // tunggu semua selesai
}
}
}
Subscriber Pull — Menarik Pesan Secara Aktif #
import com.google.cloud.pubsub.v1.AckReplyConsumer
import com.google.cloud.pubsub.v1.MessageReceiver
import com.google.cloud.pubsub.v1.Subscriber
import com.google.pubsub.v1.ProjectSubscriptionName
import com.google.pubsub.v1.PubsubMessage
class SubscriberPull(
private val projectId: String,
private val subscriptionId: String
) {
private val json = Json { ignoreUnknownKeys = true }
private val subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId)
// Subscriber asinkron — callback dipanggil setiap ada pesan baru
fun mulaiAsync(): Subscriber {
val receiver = MessageReceiver { pesan: PubsubMessage, consumer: AckReplyConsumer ->
try {
proseskanPesan(pesan)
consumer.ack() // beritahu Pub/Sub pesan berhasil diproses
} catch (e: Exception) {
println("Gagal proses: ${e.message}")
consumer.nack() // Pub/Sub akan kirim ulang setelah ack deadline
}
}
val subscriber = Subscriber.newBuilder(subscriptionName, receiver)
.setFlowControlSettings(
com.google.api.gax.batching.FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(100) // maks 100 pesan yang belum di-ack
.setMaxOutstandingRequestBytes(10 * 1024 * 1024) // maks 10MB
.build()
)
.build()
subscriber.startAsync().awaitRunning()
println("Subscriber '$subscriptionId' mulai mendengarkan...")
return subscriber
}
private fun proseskanPesan(pesan: PubsubMessage) {
val payload = pesan.data.toStringUtf8()
val tipe = pesan.attributesMap["tipe"]
val messageId = pesan.messageId
println("Pesan diterima — ID: $messageId, tipe: $tipe")
when (tipe) {
"EventPesanan" -> {
val event = json.decodeFromString(EventPesanan.serializer(), payload)
println("Proses pesanan: ${event.pesananId} — ${event.status}")
// Logika bisnis...
}
else -> println("Tipe tidak dikenal: $tipe")
}
}
}
fun main() {
val subscriber = SubscriberPull("my-project", "pesanan-email").mulaiAsync()
// Jalankan hingga diinterupsi
try {
Thread.currentThread().join()
} catch (e: InterruptedException) {
subscriber.stopAsync()
println("Subscriber dihentikan")
}
}
Subscriber Push — GCP Mendorong Pesan ke Endpoint HTTP #
Untuk push subscription, GCP mengirim HTTP POST ke endpoint kamu. Cocok untuk Cloud Functions dan Cloud Run:
import io.ktor.server.application.*
import io.ktor.server.request.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import kotlinx.serialization.Serializable
import java.util.Base64
@Serializable
data class PubSubPushPayload(
val message: PubSubMessage,
val subscription: String
)
@Serializable
data class PubSubMessage(
val data: String, // base64-encoded payload
val messageId: String,
val publishTime: String,
val attributes: Map<String, String> = emptyMap()
)
// Handler di Ktor untuk menerima push dari Pub/Sub
fun Application.pubSubPushHandler() {
val json = Json { ignoreUnknownKeys = true }
routing {
post("/pubsub/push") {
val body = call.receiveText()
val payload = runCatching {
json.decodeFromString(PubSubPushPayload.serializer(), body)
}.getOrElse {
call.respond(io.ktor.http.HttpStatusCode.BadRequest, "Body tidak valid")
return@post
}
// Decode data dari base64
val dataBytes = Base64.getDecoder().decode(payload.message.data)
val pesanTeks = String(dataBytes)
println("Push diterima — ID: ${payload.message.messageId}")
println("Atribut: ${payload.message.attributes}")
println("Data: $pesanTeks")
// Proses pesan
try {
val event = json.decodeFromString(EventPesanan.serializer(), pesanTeks)
prosesEventPush(event)
// HTTP 200/204 = ack (Pub/Sub tidak akan kirim ulang)
call.respond(io.ktor.http.HttpStatusCode.NoContent)
} catch (e: Exception) {
println("Gagal proses: ${e.message}")
// HTTP 4xx/5xx = nack (Pub/Sub akan kirim ulang setelah backoff)
call.respond(io.ktor.http.HttpStatusCode.InternalServerError, "Gagal proses")
}
}
}
}
fun prosesEventPush(event: EventPesanan) {
println("Push: proses pesanan ${event.pesananId}")
}
Dead Letter Topic #
Pub/Sub mendukung Dead Letter Topic untuk pesan yang gagal diproses berkali-kali:
fun buatSubscriptionDenganDlt(
namaSub: String,
namaTopik: String,
namaDlt: String,
maksPercobaan: Int = 5
): String {
// Pastikan DLT sudah ada
AdminPubSub.buatTopik(namaDlt)
val dltName = AdminPubSub.namaTopik(namaDlt)
val subName = AdminPubSub.namaSubscription(namaSub)
val deadLetterPolicy = DeadLetterPolicy.newBuilder()
.setDeadLetterTopic(dltName)
.setMaxDeliveryAttempts(maksPercobaan) // coba maks N kali sebelum ke DLT
.build()
val subscriptionRequest = Subscription.newBuilder()
.setName(subName)
.setTopic(AdminPubSub.namaTopik(namaTopik))
.setDeadLetterPolicy(deadLetterPolicy)
.setAckDeadlineSeconds(30)
.build()
SubscriptionAdminClient.create().use { admin ->
val sub = admin.createSubscription(subscriptionRequest)
println("Subscription dengan DLT dibuat: ${sub.name}")
println("Dead Letter Topic: $dltName (maks $maksPercobaan percobaan)")
return sub.name
}
}
Development dengan Pub/Sub Emulator #
Google menyediakan emulator lokal yang bisa dijalankan dengan Docker atau gcloud:
# docker-compose.yml
services:
pubsub-emulator:
image: gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators
command: gcloud beta emulators pubsub start --host-port=0.0.0.0:8085
ports:
- "8085:8085"
# Jalankan emulator
docker compose up -d
# Set environment variable untuk mengarahkan SDK ke emulator
export PUBSUB_EMULATOR_HOST=localhost:8085
// Di kode Kotlin, SDK otomatis mendeteksi PUBSUB_EMULATOR_HOST
// Jika env var tersebut di-set, semua call ke Pub/Sub diarahkan ke emulator
// Bisa juga override endpoint secara eksplisit
import com.google.api.gax.core.NoCredentialsProvider
import com.google.api.gax.grpc.GrpcTransportChannel
import com.google.api.gax.rpc.FixedTransportChannelProvider
import io.grpc.ManagedChannelBuilder
fun buatPublisherEmulator(projectId: String, topikId: String): Publisher {
val channel = ManagedChannelBuilder
.forTarget("localhost:8085")
.usePlaintext()
.build()
val channelProvider = FixedTransportChannelProvider.create(
GrpcTransportChannel.create(channel)
)
return Publisher.newBuilder(TopicName.of(projectId, topikId))
.setChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.build()
}
Pola Message Ordering #
Untuk menjamin urutan pesan per entitas (misalnya semua event pesanan yang sama terurut):
// Publisher — aktifkan ordering dan set ordering key
fun kirimDenganOrdering() {
val publisher = Publisher.newBuilder(TopicName.of("my-project", "pesanan-events"))
.setEnableMessageOrdering(true)
.build()
val events = listOf(
EventPesanan("ORD-001", 1L, 100_000.0, "DIBUAT"),
EventPesanan("ORD-001", 1L, 100_000.0, "DIBAYAR"),
EventPesanan("ORD-001", 1L, 100_000.0, "DIKIRIM"),
EventPesanan("ORD-001", 1L, 100_000.0, "SELESAI")
)
val json = Json { ignoreUnknownKeys = true }
events.forEach { event ->
val pesan = PubsubMessage.newBuilder()
.setData(ByteString.copyFromUtf8(json.encodeToString(EventPesanan.serializer(), event)))
.setOrderingKey(event.pesananId) // semua event ORD-001 terurut
.build()
publisher.publish(pesan).get()
println("Terkirim: ${event.pesananId} — ${event.status}")
}
publisher.shutdown()
publisher.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS)
}
// Subscription juga harus diaktifkan untuk ordering
// enableMessageOrdering = true di settings subscription
Ringkasan #
- Satu Topic, banyak Subscription — ini adalah model fan-out native Pub/Sub. Satu pesan yang diterbitkan ke Topic bisa dikonsumsi secara independen oleh banyak Subscription tanpa koordinasi producer.
- Pull vs Push Subscription — gunakan Pull jika consumer kamu adalah layanan yang terus berjalan (microservice). Gunakan Push untuk Cloud Functions, Cloud Run, atau endpoint HTTP yang menerima request masuk.
- Ack wajib, Nack untuk retry —
consumer.ack()memberitahu Pub/Sub pesan berhasil dan bisa dihapus.consumer.nack()atau diam saja (ack deadline terlewat) akan membuat Pub/Sub mengirim ulang pesan.- Filter subscription untuk routing — gunakan filter pada Subscription untuk menerima hanya pesan dengan atribut tertentu:
attributes.status = "BARU". Lebih efisien dari filtering di consumer.- Ordering key untuk urutan per entitas — aktifkan
enableMessageOrdering = truedi Publisher dan setorderingKeyyang sama untuk semua pesan yang harus terurut. Pub/Sub menjamin urutan per ordering key.- Dead Letter Topic untuk pesan gagal — konfigurasi
maxDeliveryAttemptsdandeadLetterTopic. Pesan yang gagal diproses sebanyak N kali otomatis diteruskan ke DLT untuk penanganan manual.- Emulator untuk development — jalankan
PUBSUB_EMULATOR_HOST=localhost:8085dan SDK secara otomatis mengarahkan semua panggilan ke emulator lokal. Tidak perlu GCP project atau biaya.- Flow control untuk consumer — set
maxOutstandingElementCountdi Subscriber untuk membatasi berapa banyak pesan yang bisa ada di memori secara bersamaan. Mencegah consumer kehabisan memori saat pesan membanjir.