Amazon SQS #
Amazon Simple Queue Service (SQS) adalah layanan antrian pesan terkelola sepenuhnya dari AWS — kamu tidak perlu mengelola server, konfigurasi, atau scaling. SQS sangat cocok untuk arsitektur berbasis AWS di mana komponen-komponen perlu berkomunikasi secara asinkron tanpa tight coupling. Ia hadir dalam dua varian: Standard Queue (throughput hampir tak terbatas, pengiriman at-least-once, urutan tidak dijamin) dan FIFO Queue (pengiriman tepat satu kali, urutan terjamin, tapi throughput lebih terbatas). Di Kotlin, kamu menggunakan AWS SDK for Java v2 yang mendukung coroutine melalui klien asinkron berbasis Netty. Artikel ini membahas konfigurasi, operasi CRUD pesan, pola konsumsi yang efisien, integrasi dengan SNS, dan cara development dengan LocalStack tanpa biaya AWS.
SQS Standard vs FIFO #
flowchart TD
A{Kebutuhan Utama?} --> B{Urutan pesan\npenting?}
B -- Ya --> C{Throughput\n> 3000/detik?}
C -- Ya --> D["Standard Queue\n+ urutkan di consumer\natau gunakan Kafka"]
C -- Tidak --> E["FIFO Queue\nMaks 3000 msg/s\nExact-once delivery"]
B -- Tidak --> F{Volume pesan\nsangat besar?}
F -- Ya --> G["Standard Queue\nThroughput tak terbatas\nAt-least-once"]
F -- Tidak --> G| Aspek | Standard Queue | FIFO Queue |
|---|---|---|
| Urutan | Best-effort (tidak dijamin) | Terjamin (FIFO) |
| Pengiriman | At-least-once (bisa duplikasi) | Exactly-once |
| Throughput | Hampir tak terbatas | 3.000 msg/detik (batch), 300 msg/detik |
| Nama | nama-queue | nama-queue.fifo (wajib diakhiri .fifo) |
| Deduplication | Tidak ada | Otomatis dengan Deduplication ID |
| Harga | Lebih murah | Sedikit lebih mahal |
| Cocok untuk | Log, notifikasi, task queue umum | Transaksi keuangan, pesanan |
Setup dan Dependensi #
// build.gradle.kts
dependencies {
// AWS SDK v2 — SQS
implementation("software.amazon.awssdk:sqs:2.25.27")
// AWS SDK v2 — STS (untuk assume role, opsional)
implementation("software.amazon.awssdk:sts:2.25.27")
// HTTP client untuk SDK v2 (pilih salah satu)
implementation("software.amazon.awssdk:netty-nio-client:2.25.27") // async
// atau:
// implementation("software.amazon.awssdk:apache-client:2.25.27") // sync
// kotlinx.serialization
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.6.3")
// Coroutine
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0")
}
Membuat Client SQS #
import software.amazon.awssdk.auth.credentials.*
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.SqsClient
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import java.net.URI
object KoneksiSqs {
// Client synchronous (blocking)
val client: SqsClient by lazy {
SqsClient.builder()
.region(Region.AP_SOUTHEAST_1) // Asia Pacific (Singapore)
.credentialsProvider(
DefaultCredentialsProvider.create()
// SDK mencari kredensial di:
// 1. Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
// 2. ~/.aws/credentials
// 3. EC2 Instance Profile / ECS Task Role
// 4. IAM Role for Service Account (EKS)
)
.build()
}
// Client untuk LocalStack (development lokal tanpa AWS)
fun clientLokal(endpoint: String = "http://localhost:4566"): SqsClient {
return SqsClient.builder()
.region(Region.AP_SOUTHEAST_1)
.endpointOverride(URI.create(endpoint))
.credentialsProvider(
StaticCredentialsProvider.create(
AwsBasicCredentials.create("test", "test") // LocalStack menerima key apapun
)
)
.build()
}
// Client asinkron dengan coroutine
val asyncClient: SqsAsyncClient by lazy {
SqsAsyncClient.builder()
.region(Region.AP_SOUTHEAST_1)
.credentialsProvider(DefaultCredentialsProvider.create())
.build()
}
}
Membuat dan Mengelola Queue #
import software.amazon.awssdk.services.sqs.model.*
fun buatQueue(nama: String, isFifo: Boolean = false): String {
val client = KoneksiSqs.client
val namaFinal = if (isFifo && !nama.endsWith(".fifo")) "$nama.fifo" else nama
val atribut = mutableMapOf<QueueAttributeName, String>()
if (isFifo) {
atribut[QueueAttributeName.FIFO_QUEUE] = "true"
atribut[QueueAttributeName.CONTENT_BASED_DEDUPLICATION] = "true"
}
// Konfigurasi umum
atribut[QueueAttributeName.VISIBILITY_TIMEOUT] = "30" // 30 detik
atribut[QueueAttributeName.MESSAGE_RETENTION_PERIOD] = "345600" // 4 hari
atribut[QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS] = "20" // long polling
val respons = client.createQueue { req ->
req.queueName(namaFinal).attributes(atribut)
}
println("Queue dibuat: ${respons.queueUrl()}")
return respons.queueUrl()
}
fun ambilUrlQueue(nama: String): String {
return KoneksiSqs.client.getQueueUrl { req ->
req.queueName(nama)
}.queueUrl()
}
fun atributQueue(queueUrl: String): Map<QueueAttributeName, String> {
return KoneksiSqs.client.getQueueAttributes { req ->
req.queueUrl(queueUrl)
.attributeNames(QueueAttributeName.ALL)
}.attributes()
}
fun hapusQueue(queueUrl: String) {
KoneksiSqs.client.deleteQueue { req -> req.queueUrl(queueUrl) }
println("Queue dihapus: $queueUrl")
}
Mengirim Pesan #
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
@Serializable
data class EventPesanan(
val pesananId: String,
val penggunaId: Long,
val total: Double,
val status: String,
val timestamp: Long = System.currentTimeMillis()
)
class ProducerSqs(private val queueUrl: String) {
private val client = KoneksiSqs.client
private val json = Json { ignoreUnknownKeys = true }
// Kirim pesan tunggal
fun kirim(event: EventPesanan): String {
val payload = json.encodeToString(EventPesanan.serializer(), event)
val request = SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody(payload)
.delaySeconds(0) // kirim segera (bisa delay 0-900 detik)
.messageAttributes(mapOf(
"tipe" to MessageAttributeValue.builder()
.dataType("String")
.stringValue("EventPesanan")
.build(),
"versi" to MessageAttributeValue.builder()
.dataType("String")
.stringValue("1.0")
.build()
))
.build()
val respons = client.sendMessage(request)
println("Pesan terkirim — ID: ${respons.messageId()}, MD5: ${respons.md5OfMessageBody()}")
return respons.messageId()
}
// Kirim ke FIFO queue (butuh MessageGroupId dan opsional DeduplicationId)
fun kirimKeFifo(event: EventPesanan): String {
val payload = json.encodeToString(EventPesanan.serializer(), event)
val request = SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody(payload)
.messageGroupId(event.penggunaId.toString()) // pesan per user terurut
.messageDeduplicationId(event.pesananId) // hindari duplikat (jika tidak pakai content-based)
.build()
return client.sendMessage(request).messageId()
}
// Kirim banyak pesan sekaligus — batch (maks 10 pesan per batch)
fun kirimBatch(events: List<EventPesanan>): Pair<Int, Int> {
var berhasil = 0
var gagal = 0
events.chunked(10).forEach { batch ->
val entries = batch.mapIndexed { idx, event ->
SendMessageBatchRequestEntry.builder()
.id("$idx") // ID unik dalam batch, bukan message ID
.messageBody(json.encodeToString(EventPesanan.serializer(), event))
.build()
}
val respons = client.sendMessageBatch { req ->
req.queueUrl(queueUrl).entries(entries)
}
berhasil += respons.successful().size
gagal += respons.failed().size
if (respons.failed().isNotEmpty()) {
respons.failed().forEach { f ->
println("Gagal kirim entry ${f.id()}: ${f.message()}")
}
}
}
println("Batch selesai — berhasil: $berhasil, gagal: $gagal")
return Pair(berhasil, gagal)
}
}
Menerima dan Menghapus Pesan #
class ConsumerSqs(private val queueUrl: String) {
private val client = KoneksiSqs.client
private val json = Json { ignoreUnknownKeys = true }
// Terima pesan sekali (untuk testing atau polling manual)
fun terimaPesan(maks: Int = 10): List<Message> {
val request = ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(maks) // SQS maks 10 per request
.waitTimeSeconds(20) // long polling — tunggu sampai 20 detik jika queue kosong
.visibilityTimeout(30) // sembunyikan pesan 30 detik dari consumer lain
.messageAttributeNames("All")
.build()
return client.receiveMessage(request).messages()
}
// Hapus pesan setelah berhasil diproses
fun hapusPesan(receiptHandle: String) {
client.deleteMessage { req ->
req.queueUrl(queueUrl).receiptHandle(receiptHandle)
}
}
// Hapus banyak pesan sekaligus
fun hapusBatch(messages: List<Message>) {
messages.chunked(10).forEach { batch ->
val entries = batch.mapIndexed { idx, msg ->
DeleteMessageBatchRequestEntry.builder()
.id("$idx")
.receiptHandle(msg.receiptHandle())
.build()
}
client.deleteMessageBatch { req ->
req.queueUrl(queueUrl).entries(entries)
}
}
}
// Kembalikan pesan ke queue lebih cepat (jika tidak bisa diproses sekarang)
fun kembalikanPesan(receiptHandle: String, visibilityTimeoutBaru: Int = 0) {
client.changeMessageVisibility { req ->
req.queueUrl(queueUrl)
.receiptHandle(receiptHandle)
.visibilityTimeout(visibilityTimeoutBaru) // 0 = segera terlihat lagi
}
}
// Loop consumer — poll terus menerus
fun mulaiLoop(berjalan: () -> Boolean = { true }) {
println("Consumer SQS mulai polling: $queueUrl")
while (berjalan()) {
val pesan = terimaPesan(maks = 10)
if (pesan.isEmpty()) {
// Long polling sudah menunggu 20 detik, masih kosong — normal
continue
}
val berhasilDihapus = mutableListOf<Message>()
pesan.forEach { msg ->
try {
proseskanPesan(msg)
berhasilDihapus.add(msg)
} catch (e: Exception) {
println("Gagal proses ${msg.messageId()}: ${e.message}")
// Jangan hapus — biarkan visibility timeout habis
// → pesan akan terlihat lagi dan bisa dicoba ulang
// → setelah maxReceiveCount kali gagal → masuk Dead Letter Queue
}
}
// Hapus hanya yang berhasil diproses
if (berhasilDihapus.isNotEmpty()) {
hapusBatch(berhasilDihapus)
println("${berhasilDihapus.size} pesan berhasil diproses dan dihapus")
}
}
}
private fun proseskanPesan(msg: Message) {
val event = json.decodeFromString(EventPesanan.serializer(), msg.body())
val tipe = msg.messageAttributes()["tipe"]?.stringValue()
println("Proses pesan ${msg.messageId()}: pesanan ${event.pesananId} " +
"(tipe: $tipe, diterima: ${msg.attributes()[MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT]} kali)")
}
}
Dead Letter Queue (DLQ) #
DLQ menerima pesan yang gagal diproses setelah sejumlah percobaan (maxReceiveCount):
fun setupQueueDenganDlq(namaUtama: String, namaDlq: String): Pair<String, String> {
val client = KoneksiSqs.client
// 1. Buat DLQ terlebih dahulu
val urlDlq = buatQueue(namaDlq)
// Ambil ARN dari DLQ
val arnDlq = client.getQueueAttributes { req ->
req.queueUrl(urlDlq).attributeNames(QueueAttributeName.QUEUE_ARN)
}.attributes()[QueueAttributeName.QUEUE_ARN]!!
// 2. Buat queue utama dengan konfigurasi Redrive Policy
val redrivePolicy = """
{
"maxReceiveCount": "3",
"deadLetterTargetArn": "$arnDlq"
}
""".trimIndent()
val urlUtama = client.createQueue { req ->
req.queueName(namaUtama)
.attributes(mapOf(
QueueAttributeName.REDRIVE_POLICY to redrivePolicy,
QueueAttributeName.VISIBILITY_TIMEOUT to "30",
QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS to "20"
))
}.queueUrl()
println("Queue utama: $urlUtama")
println("Dead Letter Queue: $urlDlq")
return Pair(urlUtama, urlDlq)
}
// Monitor DLQ — proses pesan gagal secara manual
fun monitorDlq(urlDlq: String) {
val consumer = ConsumerSqs(urlDlq)
val pesan = consumer.terimaPesan(maks = 10)
println("Pesan di DLQ: ${pesan.size}")
pesan.forEach { msg ->
val hitungTerima = msg.attributes()[MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT]
println(" MessageId: ${msg.messageId()}, diterima: $hitungTerima kali")
println(" Body: ${msg.body().take(100)}...")
}
}
Consumer dengan Coroutine #
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import software.amazon.awssdk.services.sqs.model.*
fun sqsFlow(
queueUrl: String,
scope: CoroutineScope
): Flow<Message> = flow {
val client = KoneksiSqs.client
while (scope.isActive) {
val pesan = withContext(Dispatchers.IO) {
client.receiveMessage { req ->
req.queueUrl(queueUrl)
.maxNumberOfMessages(10)
.waitTimeSeconds(20)
}.messages()
}
pesan.forEach { emit(it) }
}
}
fun main() = runBlocking {
val queueUrl = ambilUrlQueue("pesanan-events")
val json = Json { ignoreUnknownKeys = true }
val client = KoneksiSqs.client
sqsFlow(queueUrl, this)
.map { msg ->
val event = json.decodeFromString(EventPesanan.serializer(), msg.body())
Pair(msg, event)
}
.filter { (_, event) -> event.status == "BARU" }
.collect { (msg, event) ->
println("Proses pesanan baru: ${event.pesananId}")
// Hapus setelah berhasil diproses
withContext(Dispatchers.IO) {
client.deleteMessage { req ->
req.queueUrl(queueUrl).receiptHandle(msg.receiptHandle())
}
}
}
}
Integrasi SNS + SQS (Fan-out Pattern) #
SNS (Simple Notification Service) dikombinasikan dengan SQS untuk mengirim satu pesan ke banyak queue sekaligus:
import software.amazon.awssdk.services.sns.SnsClient
import software.amazon.awssdk.services.sns.model.*
fun setupSnsSqsFanout() {
val sqsClient = KoneksiSqs.client
val snsClient = SnsClient.builder().region(Region.AP_SOUTHEAST_1).build()
// 1. Buat SNS topic
val topicArn = snsClient.createTopic { req ->
req.name("pesanan-events")
}.topicArn()
// 2. Buat beberapa SQS queue
val urlEmail = buatQueue("notif-email")
val urlAudit = buatQueue("audit-log")
val urlInventori = buatQueue("update-inventori")
// 3. Subscribe setiap queue ke SNS topic
listOf(urlEmail, urlAudit, urlInventori).forEach { queueUrl ->
val arnQueue = sqsClient.getQueueAttributes { req ->
req.queueUrl(queueUrl).attributeNames(QueueAttributeName.QUEUE_ARN)
}.attributes()[QueueAttributeName.QUEUE_ARN]!!
snsClient.subscribe { req ->
req.topicArn(topicArn)
.protocol("sqs")
.endpoint(arnQueue)
}
println("Queue $queueUrl terdaftar ke SNS $topicArn")
}
// 4. Sekarang publish ke SNS → otomatis ke semua queue
snsClient.publish { req ->
req.topicArn(topicArn)
.message("""{"pesananId":"ORD-001","status":"BARU"}""")
.subject("PesananBaru")
}
println("Pesan di-fan-out ke semua queue via SNS")
}
Development dengan LocalStack #
LocalStack memungkinkan kamu menjalankan layanan AWS secara lokal menggunakan Docker, tanpa biaya:
# docker-compose.yml
services:
localstack:
image: localstack/localstack:latest
ports:
- "4566:4566"
environment:
- SERVICES=sqs,sns,s3
- DEFAULT_REGION=ap-southeast-1
# Jalankan LocalStack
docker compose up -d
# Buat queue via CLI AWS (arahkan ke LocalStack)
aws --endpoint-url=http://localhost:4566 \
--region ap-southeast-1 \
sqs create-queue --queue-name pesanan-events
# Lihat daftar queue
aws --endpoint-url=http://localhost:4566 \
--region ap-southeast-1 \
sqs list-queues
// Di kode Kotlin, gunakan client dengan endpoint override
val clientLokal = KoneksiSqs.clientLokal("http://localhost:4566")
val queueUrl = clientLokal.createQueue { req -> req.queueName("pesanan-events") }.queueUrl()
println("Queue lokal: $queueUrl")
// Output: http://localhost:4566/000000000000/pesanan-events
Tips Biaya dan Performa #
// 1. Selalu gunakan Long Polling (waitTimeSeconds = 20)
// Short polling: tagihan per request API, termasuk request kosong
// Long polling: tunggu hingga 20 detik sebelum return kosong
// Hemat hingga 80% biaya API calls untuk queue yang jarang terisi
// 2. Batch sebisa mungkin
// sendMessageBatch: maks 10 pesan dalam satu API call
// receiveMessage: maks 10 pesan dalam satu API call
// deleteMessageBatch: maks 10 pesan dalam satu API call
// 3. Visibility timeout harus lebih lama dari waktu pemrosesan
// Jika pemrosesan butuh 5 menit, set visibility timeout 6-7 menit
// Jika terlalu pendek → pesan muncul lagi sebelum selesai diproses
// 4. Hitung ukuran pesan
// SQS maksimum 256KB per pesan
// Untuk payload besar → simpan di S3, kirim referensi S3 di SQS
// 5. Monitoring via CloudWatch
// Metric penting: ApproximateNumberOfMessagesVisible (antrian pesan)
// ApproximateAgeOfOldestMessage (pesan tertua)
// NumberOfMessagesSent, NumberOfMessagesDeleted
Ringkasan #
- Standard Queue untuk volume besar, FIFO untuk urutan — Standard Queue menawarkan throughput hampir tak terbatas tapi tidak menjamin urutan dan bisa menghasilkan duplikat. FIFO Queue menjamin urutan dan exactly-once delivery tapi throughput terbatas 3.000 pesan/detik.
- Long polling wajib — set
waitTimeSeconds=20di setiapreceiveMessage. Ini menghindari ribuan API call yang sia-sia ke queue kosong dan menghemat biaya secara signifikan.- Hapus pesan hanya setelah berhasil diproses — jangan hapus pesan saat diterima. Hapus hanya setelah pemrosesan berhasil. Pesan yang gagal akan otomatis terlihat lagi setelah visibility timeout habis.
- Dead Letter Queue wajib dikonfigurasi — set
maxReceiveCount(biasanya 3-5) dan arahkan ke DLQ. Ini mencegah pesan “berbahaya” (poison message) memblokir queue selamanya.- Visibility timeout harus lebih panjang dari waktu pemrosesan — jika pemrosesan bisa butuh waktu lama, set visibility timeout lebih panjang atau perpanjang secara dinamis dengan
changeMessageVisibility.- Batch untuk efisiensi biaya — gunakan
sendMessageBatch,deleteMessageBatchuntuk mengurangi jumlah API call. SQS ditagih per API call, bukan per pesan.- SNS + SQS untuk fan-out — untuk mengirim satu event ke banyak consumer independen, gunakan SNS di depan beberapa SQS queue. Ini lebih bersih dari mengirim ke banyak queue secara terpisah dari producer.
- LocalStack untuk development — gunakan LocalStack dengan Docker untuk testing lokal tanpa biaya AWS. Endpoint override di SDK cukup untuk mengarahkan semua panggilan ke LocalStack.