Google Pub/Sub

Google Pub/Sub #

Google Cloud Pub/Sub adalah layanan messaging terkelola dari Google Cloud Platform yang menawarkan pengiriman pesan at-least-once dengan latensi rendah dan throughput sangat tinggi. Ia adalah padanan GCP untuk Amazon SQS + SNS yang dikombinasikan — satu Topic bisa punya banyak Subscription, dan setiap Subscription bisa berupa push (GCP mendorong pesan ke endpoint HTTP kamu) atau pull (kamu menarik pesan secara aktif). Pub/Sub sangat cocok untuk arsitektur berbasis GCP, menghubungkan microservice, streaming data ke BigQuery atau Cloud Storage, dan integrasi dengan Cloud Functions dan Cloud Run. Di Kotlin, kamu menggunakan library Google Cloud Pub/Sub Java yang terintegrasi mulus dengan GCP authentication. Artikel ini membahas Publisher, dua mode Subscriber (push dan pull), message ordering, filter, Dead Letter Topic, dan development dengan emulator.

Konsep Utama Google Pub/Sub #

flowchart LR
    P1[Publisher A] --> T["Topic:\npesanan-events"]
    P2[Publisher B] --> T
    T --> S1["Subscription: email-notif\n(Pull)"]
    T --> S2["Subscription: audit-log\n(Push → Cloud Function)"]
    T --> S3["Subscription: bigquery\n(BigQuery Subscription)"]
    S1 --> C1[Email Service\n(menarik pesan)]
    S2 --> C2[Cloud Function\n(endpoint HTTP)]
    S3 --> C3[BigQuery Table\n(analitik)]
KonsepPenjelasan
TopicSaluran pesan — publisher mengirim ke sini
SubscriptionLangganan ke Topic — bisa Pull atau Push
MessageUnit data, berisi body (bytes) dan attributes (Map<String,String>)
AckKonfirmasi pesan berhasil diproses, Pub/Sub menghapusnya
NackPesan gagal, Pub/Sub akan kirim ulang setelah ack deadline
Ack DeadlineBatas waktu untuk ack (10 detik default, maks 600 detik)
Ordering KeyField opsional untuk menjamin urutan pesan per key

Perbandingan dengan SQS dan Kafka #

AspekGoogle Pub/SubAmazon SQSApache Kafka
ModelTopic + multi SubscriptionQueue (1 producer, 1 group consumer)Topic + Consumer Group
Push delivery✓ Native (HTTP push)✗ Butuh polling✗ Pull only
Ordering✓ Dengan ordering key✗ Standard; ✓ FIFO✓ Per partition
Retention7 hari (maks 31 hari)14 hariConfigurable (bisa lama)
Replay✗ Tidak bisa (hanya forward)✗ Tidak bisa✓ Bisa dari awal
Filter✓ Native per subscription✗ Tidak ada✗ Di consumer
EkosistemGCP nativeAWS nativeCloud-agnostic

Setup dan Dependensi #

// build.gradle.kts
dependencies {
    // Google Cloud Pub/Sub
    implementation("com.google.cloud:google-cloud-pubsub:1.127.2")

    // kotlinx.serialization
    implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.3")

    // Coroutine
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-guava:1.8.0")  // Guava Future → coroutine
}

Autentikasi GCP #

Google Cloud menggunakan Application Default Credentials (ADC) — library secara otomatis mencari kredensial dari berbagai sumber:

// Cara kerja ADC (secara otomatis, tanpa kode):
// 1. Environment variable GOOGLE_APPLICATION_CREDENTIALS (path ke service account JSON)
// 2. gcloud auth application-default login (untuk development lokal)
// 3. Service Account yang terpasang di GCE/GKE/Cloud Run

// Untuk development lokal:
// $ gcloud auth application-default login
// $ export GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json

// Atau buat credentials secara eksplisit di kode:
import com.google.auth.oauth2.GoogleCredentials
import com.google.auth.oauth2.ServiceAccountCredentials
import java.io.FileInputStream

fun buatCredentials(): GoogleCredentials {
    val pathServiceAccount = System.getenv("GOOGLE_APPLICATION_CREDENTIALS")
        ?: throw IllegalStateException("GOOGLE_APPLICATION_CREDENTIALS tidak di-set")

    return ServiceAccountCredentials.fromStream(FileInputStream(pathServiceAccount))
}

Membuat Topic dan Subscription #

import com.google.cloud.pubsub.v1.TopicAdminClient
import com.google.cloud.pubsub.v1.SubscriptionAdminClient
import com.google.pubsub.v1.*

object AdminPubSub {
    private val PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT") ?: "my-project"

    fun namaTopik(nama: String) = ProjectTopicName.of(PROJECT_ID, nama).toString()
    fun namaSubscription(nama: String) = ProjectSubscriptionName.of(PROJECT_ID, nama).toString()

    fun buatTopik(nama: String): String {
        val topikName = namaTopik(nama)

        TopicAdminClient.create().use { admin ->
            return try {
                val topik = admin.createTopic(topikName)
                println("Topic dibuat: ${topik.name}")
                topik.name
            } catch (e: com.google.api.gax.rpc.AlreadyExistsException) {
                println("Topic sudah ada: $topikName")
                topikName
            }
        }
    }

    fun buatSubscriptionPull(namaSub: String, namaTopik: String): String {
        val subName = namaSubscription(namaSub)
        val topikNameStr = namaTopik(namaTopik)

        val subscriptionRequest = Subscription.newBuilder()
            .setName(subName)
            .setTopic(topikNameStr)
            .setAckDeadlineSeconds(60)          // 60 detik untuk memproses dan ack
            .setMessageRetentionDuration(         // simpan pesan hingga 7 hari
                com.google.protobuf.Duration.newBuilder().setSeconds(7 * 24 * 3600).build()
            )
            .setRetainAckedMessages(false)        // hapus pesan yang sudah di-ack
            .build()

        SubscriptionAdminClient.create().use { admin ->
            return try {
                val sub = admin.createSubscription(subscriptionRequest)
                println("Subscription Pull dibuat: ${sub.name}")
                sub.name
            } catch (e: com.google.api.gax.rpc.AlreadyExistsException) {
                println("Subscription sudah ada: $subName")
                subName
            }
        }
    }

    fun buatSubscriptionPush(namaSub: String, namaTopik: String, endpointUrl: String): String {
        val subName = namaSubscription(namaSub)
        val topikNameStr = namaTopik(namaTopik)

        val pushConfig = PushConfig.newBuilder()
            .setPushEndpoint(endpointUrl)  // URL yang akan menerima POST dari Pub/Sub
            .build()

        val subscriptionRequest = Subscription.newBuilder()
            .setName(subName)
            .setTopic(topikNameStr)
            .setPushConfig(pushConfig)
            .setAckDeadlineSeconds(60)
            .build()

        SubscriptionAdminClient.create().use { admin ->
            val sub = admin.createSubscription(subscriptionRequest)
            println("Subscription Push dibuat: ${sub.name}$endpointUrl")
            return sub.name
        }
    }

    // Subscription dengan filter — hanya terima pesan dengan atribut tertentu
    fun buatSubscriptionDenganFilter(namaSub: String, namaTopik: String, filter: String): String {
        // Contoh filter: attributes.status = "BARU"
        // atau: hasPrefix(attributes.tipe, "pesanan.")
        val subName = namaSubscription(namaSub)

        val subscriptionRequest = Subscription.newBuilder()
            .setName(subName)
            .setTopic(namaTopik(namaTopik))
            .setFilter(filter)
            .setAckDeadlineSeconds(30)
            .build()

        SubscriptionAdminClient.create().use { admin ->
            val sub = admin.createSubscription(subscriptionRequest)
            println("Subscription dengan filter '$filter' dibuat: ${sub.name}")
            return sub.name
        }
    }
}

Publisher — Mengirim Pesan #

import com.google.cloud.pubsub.v1.Publisher
import com.google.protobuf.ByteString
import com.google.pubsub.v1.PubsubMessage
import com.google.pubsub.v1.TopicName
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json

@Serializable
data class EventPesanan(
    val pesananId: String,
    val penggunaId: Long,
    val total: Double,
    val status: String,
    val timestamp: Long = System.currentTimeMillis()
)

class PublisherPubSub(
    private val projectId: String,
    private val topikId: String
) {
    private val json = Json { ignoreUnknownKeys = true }
    private val topikName = TopicName.of(projectId, topikId)

    // Gunakan use{} untuk memastikan publisher ditutup dan semua pesan ter-flush
    fun <T> denganPublisher(blok: Publisher.() -> T): T {
        val publisher = Publisher.newBuilder(topikName)
            .build()
        return try {
            publisher.blok()
        } finally {
            publisher.shutdown()
            publisher.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS)
        }
    }

    fun kirim(event: EventPesanan): String {
        return denganPublisher {
            val payload = json.encodeToString(EventPesanan.serializer(), event)
            val data = ByteString.copyFromUtf8(payload)

            val pesan = PubsubMessage.newBuilder()
                .setData(data)
                .putAttributes("tipe", "EventPesanan")      // atribut untuk filtering
                .putAttributes("versi", "1.0")
                .putAttributes("status", event.status)
                .build()

            val future = publish(pesan)
            val messageId = future.get()  // blokir sampai dapat konfirmasi
            println("Pesan terkirim — ID: $messageId")
            messageId
        }
    }

    // Kirim dengan ordering key — menjamin urutan per key
    fun kirimDenganOrdering(event: EventPesanan, orderingKey: String): String {
        val publisher = Publisher.newBuilder(topikName)
            .setEnableMessageOrdering(true)  // harus diaktifkan di publisher
            .build()

        return try {
            val payload = json.encodeToString(EventPesanan.serializer(), event)
            val pesan = PubsubMessage.newBuilder()
                .setData(ByteString.copyFromUtf8(payload))
                .setOrderingKey(orderingKey)  // pesan dengan key yang sama dijamin urut
                .putAttributes("penggunaId", event.penggunaId.toString())
                .build()

            publisher.publish(pesan).get()
        } finally {
            publisher.shutdown()
            publisher.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS)
        }
    }

    // Kirim batch — publisher sudah melakukan batching internal secara otomatis
    // tapi kita bisa kirim banyak sekaligus dan tunggu semua selesai
    fun kirimBanyak(events: List<EventPesanan>): List<String> {
        return denganPublisher {
            val futures = events.map { event ->
                val pesan = PubsubMessage.newBuilder()
                    .setData(ByteString.copyFromUtf8(
                        json.encodeToString(EventPesanan.serializer(), event)
                    ))
                    .putAttributes("pesananId", event.pesananId)
                    .build()
                publish(pesan)
            }
            futures.map { it.get() }  // tunggu semua selesai
        }
    }
}

Subscriber Pull — Menarik Pesan Secara Aktif #

import com.google.cloud.pubsub.v1.AckReplyConsumer
import com.google.cloud.pubsub.v1.MessageReceiver
import com.google.cloud.pubsub.v1.Subscriber
import com.google.pubsub.v1.ProjectSubscriptionName
import com.google.pubsub.v1.PubsubMessage

class SubscriberPull(
    private val projectId: String,
    private val subscriptionId: String
) {
    private val json = Json { ignoreUnknownKeys = true }
    private val subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId)

    // Subscriber asinkron — callback dipanggil setiap ada pesan baru
    fun mulaiAsync(): Subscriber {
        val receiver = MessageReceiver { pesan: PubsubMessage, consumer: AckReplyConsumer ->
            try {
                proseskanPesan(pesan)
                consumer.ack()   // beritahu Pub/Sub pesan berhasil diproses
            } catch (e: Exception) {
                println("Gagal proses: ${e.message}")
                consumer.nack()  // Pub/Sub akan kirim ulang setelah ack deadline
            }
        }

        val subscriber = Subscriber.newBuilder(subscriptionName, receiver)
            .setFlowControlSettings(
                com.google.api.gax.batching.FlowControlSettings.newBuilder()
                    .setMaxOutstandingElementCount(100)  // maks 100 pesan yang belum di-ack
                    .setMaxOutstandingRequestBytes(10 * 1024 * 1024)  // maks 10MB
                    .build()
            )
            .build()

        subscriber.startAsync().awaitRunning()
        println("Subscriber '$subscriptionId' mulai mendengarkan...")
        return subscriber
    }

    private fun proseskanPesan(pesan: PubsubMessage) {
        val payload = pesan.data.toStringUtf8()
        val tipe = pesan.attributesMap["tipe"]
        val messageId = pesan.messageId

        println("Pesan diterima — ID: $messageId, tipe: $tipe")

        when (tipe) {
            "EventPesanan" -> {
                val event = json.decodeFromString(EventPesanan.serializer(), payload)
                println("Proses pesanan: ${event.pesananId}${event.status}")
                // Logika bisnis...
            }
            else -> println("Tipe tidak dikenal: $tipe")
        }
    }
}

fun main() {
    val subscriber = SubscriberPull("my-project", "pesanan-email").mulaiAsync()

    // Jalankan hingga diinterupsi
    try {
        Thread.currentThread().join()
    } catch (e: InterruptedException) {
        subscriber.stopAsync()
        println("Subscriber dihentikan")
    }
}

Subscriber Push — GCP Mendorong Pesan ke Endpoint HTTP #

Untuk push subscription, GCP mengirim HTTP POST ke endpoint kamu. Cocok untuk Cloud Functions dan Cloud Run:

import io.ktor.server.application.*
import io.ktor.server.request.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import kotlinx.serialization.Serializable
import java.util.Base64

@Serializable
data class PubSubPushPayload(
    val message: PubSubMessage,
    val subscription: String
)

@Serializable
data class PubSubMessage(
    val data: String,                           // base64-encoded payload
    val messageId: String,
    val publishTime: String,
    val attributes: Map<String, String> = emptyMap()
)

// Handler di Ktor untuk menerima push dari Pub/Sub
fun Application.pubSubPushHandler() {
    val json = Json { ignoreUnknownKeys = true }

    routing {
        post("/pubsub/push") {
            val body = call.receiveText()

            val payload = runCatching {
                json.decodeFromString(PubSubPushPayload.serializer(), body)
            }.getOrElse {
                call.respond(io.ktor.http.HttpStatusCode.BadRequest, "Body tidak valid")
                return@post
            }

            // Decode data dari base64
            val dataBytes = Base64.getDecoder().decode(payload.message.data)
            val pesanTeks = String(dataBytes)

            println("Push diterima — ID: ${payload.message.messageId}")
            println("Atribut: ${payload.message.attributes}")
            println("Data: $pesanTeks")

            // Proses pesan
            try {
                val event = json.decodeFromString(EventPesanan.serializer(), pesanTeks)
                prosesEventPush(event)

                // HTTP 200/204 = ack (Pub/Sub tidak akan kirim ulang)
                call.respond(io.ktor.http.HttpStatusCode.NoContent)
            } catch (e: Exception) {
                println("Gagal proses: ${e.message}")
                // HTTP 4xx/5xx = nack (Pub/Sub akan kirim ulang setelah backoff)
                call.respond(io.ktor.http.HttpStatusCode.InternalServerError, "Gagal proses")
            }
        }
    }
}

fun prosesEventPush(event: EventPesanan) {
    println("Push: proses pesanan ${event.pesananId}")
}

Dead Letter Topic #

Pub/Sub mendukung Dead Letter Topic untuk pesan yang gagal diproses berkali-kali:

fun buatSubscriptionDenganDlt(
    namaSub: String,
    namaTopik: String,
    namaDlt: String,
    maksPercobaan: Int = 5
): String {
    // Pastikan DLT sudah ada
    AdminPubSub.buatTopik(namaDlt)

    val dltName = AdminPubSub.namaTopik(namaDlt)
    val subName = AdminPubSub.namaSubscription(namaSub)

    val deadLetterPolicy = DeadLetterPolicy.newBuilder()
        .setDeadLetterTopic(dltName)
        .setMaxDeliveryAttempts(maksPercobaan)  // coba maks N kali sebelum ke DLT
        .build()

    val subscriptionRequest = Subscription.newBuilder()
        .setName(subName)
        .setTopic(AdminPubSub.namaTopik(namaTopik))
        .setDeadLetterPolicy(deadLetterPolicy)
        .setAckDeadlineSeconds(30)
        .build()

    SubscriptionAdminClient.create().use { admin ->
        val sub = admin.createSubscription(subscriptionRequest)
        println("Subscription dengan DLT dibuat: ${sub.name}")
        println("Dead Letter Topic: $dltName (maks $maksPercobaan percobaan)")
        return sub.name
    }
}

Development dengan Pub/Sub Emulator #

Google menyediakan emulator lokal yang bisa dijalankan dengan Docker atau gcloud:

# docker-compose.yml
services:
  pubsub-emulator:
    image: gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators
    command: gcloud beta emulators pubsub start --host-port=0.0.0.0:8085
    ports:
      - "8085:8085"
# Jalankan emulator
docker compose up -d

# Set environment variable untuk mengarahkan SDK ke emulator
export PUBSUB_EMULATOR_HOST=localhost:8085
// Di kode Kotlin, SDK otomatis mendeteksi PUBSUB_EMULATOR_HOST
// Jika env var tersebut di-set, semua call ke Pub/Sub diarahkan ke emulator

// Bisa juga override endpoint secara eksplisit
import com.google.api.gax.core.NoCredentialsProvider
import com.google.api.gax.grpc.GrpcTransportChannel
import com.google.api.gax.rpc.FixedTransportChannelProvider
import io.grpc.ManagedChannelBuilder

fun buatPublisherEmulator(projectId: String, topikId: String): Publisher {
    val channel = ManagedChannelBuilder
        .forTarget("localhost:8085")
        .usePlaintext()
        .build()

    val channelProvider = FixedTransportChannelProvider.create(
        GrpcTransportChannel.create(channel)
    )

    return Publisher.newBuilder(TopicName.of(projectId, topikId))
        .setChannelProvider(channelProvider)
        .setCredentialsProvider(NoCredentialsProvider.create())
        .build()
}

Pola Message Ordering #

Untuk menjamin urutan pesan per entitas (misalnya semua event pesanan yang sama terurut):

// Publisher — aktifkan ordering dan set ordering key
fun kirimDenganOrdering() {
    val publisher = Publisher.newBuilder(TopicName.of("my-project", "pesanan-events"))
        .setEnableMessageOrdering(true)
        .build()

    val events = listOf(
        EventPesanan("ORD-001", 1L, 100_000.0, "DIBUAT"),
        EventPesanan("ORD-001", 1L, 100_000.0, "DIBAYAR"),
        EventPesanan("ORD-001", 1L, 100_000.0, "DIKIRIM"),
        EventPesanan("ORD-001", 1L, 100_000.0, "SELESAI")
    )

    val json = Json { ignoreUnknownKeys = true }

    events.forEach { event ->
        val pesan = PubsubMessage.newBuilder()
            .setData(ByteString.copyFromUtf8(json.encodeToString(EventPesanan.serializer(), event)))
            .setOrderingKey(event.pesananId)  // semua event ORD-001 terurut
            .build()
        publisher.publish(pesan).get()
        println("Terkirim: ${event.pesananId}${event.status}")
    }

    publisher.shutdown()
    publisher.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS)
}

// Subscription juga harus diaktifkan untuk ordering
// enableMessageOrdering = true di settings subscription

Ringkasan #

  • Satu Topic, banyak Subscription — ini adalah model fan-out native Pub/Sub. Satu pesan yang diterbitkan ke Topic bisa dikonsumsi secara independen oleh banyak Subscription tanpa koordinasi producer.
  • Pull vs Push Subscription — gunakan Pull jika consumer kamu adalah layanan yang terus berjalan (microservice). Gunakan Push untuk Cloud Functions, Cloud Run, atau endpoint HTTP yang menerima request masuk.
  • Ack wajib, Nack untuk retryconsumer.ack() memberitahu Pub/Sub pesan berhasil dan bisa dihapus. consumer.nack() atau diam saja (ack deadline terlewat) akan membuat Pub/Sub mengirim ulang pesan.
  • Filter subscription untuk routing — gunakan filter pada Subscription untuk menerima hanya pesan dengan atribut tertentu: attributes.status = "BARU". Lebih efisien dari filtering di consumer.
  • Ordering key untuk urutan per entitas — aktifkan enableMessageOrdering = true di Publisher dan set orderingKey yang sama untuk semua pesan yang harus terurut. Pub/Sub menjamin urutan per ordering key.
  • Dead Letter Topic untuk pesan gagal — konfigurasi maxDeliveryAttempts dan deadLetterTopic. Pesan yang gagal diproses sebanyak N kali otomatis diteruskan ke DLT untuk penanganan manual.
  • Emulator untuk development — jalankan PUBSUB_EMULATOR_HOST=localhost:8085 dan SDK secara otomatis mengarahkan semua panggilan ke emulator lokal. Tidak perlu GCP project atau biaya.
  • Flow control untuk consumer — set maxOutstandingElementCount di Subscriber untuk membatasi berapa banyak pesan yang bisa ada di memori secara bersamaan. Mencegah consumer kehabisan memori saat pesan membanjir.

← Sebelumnya: Amazon SQS   Berikutnya: Redis →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact