Multi Threading #
Setiap aplikasi yang serius perlu melakukan lebih dari satu hal sekaligus: menangani request HTTP sambil memproses database, mengunduh file sambil memperbarui UI, atau menjalankan beberapa komputasi berat secara paralel. Di Kotlin ada tiga lapisan cara menangani ini: Thread (API Java paling dasar), ExecutorService (manajemen thread pool), dan Coroutines (pendekatan modern Kotlin yang lebih efisien). Ketiganya punya trade-off berbeda — memahami kapan dan mengapa memilih masing-masing adalah kunci menulis aplikasi concurrent yang benar dan performan. Artikel ini membahas ketiganya secara mendalam, dengan fokus pada Coroutines sebagai cara idiomatis Kotlin.
Masalah Concurrency yang Perlu Dipahami #
Sebelum membahas solusi, penting memahami masalah yang bisa muncul saat beberapa thread mengakses data yang sama secara bersamaan:
// Race condition — bug yang tidak deterministik dan susah di-debug
var counter = 0
fun increment() {
repeat(10_000) {
counter++ // TIDAK thread-safe! read-modify-write bukan atomik
}
}
val t1 = Thread { increment() }
val t2 = Thread { increment() }
t1.start(); t2.start()
t1.join(); t2.join()
println(counter) // Seharusnya 20000, tapi bisa lebih kecil karena race condition
Masalah umum dalam concurrency: race condition (hasil bergantung pada timing), deadlock (dua thread saling menunggu satu sama lain), dan starvation (satu thread tidak pernah mendapat giliran). Solusinya bisa berupa sinkronisasi eksplisit, struktur data thread-safe, atau menghindari shared mutable state sama sekali.
Thread — API Dasar Java #
Thread adalah unit eksekusi paling dasar di JVM. Setiap thread adalah objek OS yang membutuhkan sekitar 1MB memori stack dan ribuan nanodetik untuk dibuat.
// Cara 1: lambda langsung
val thread1 = Thread {
println("Thread 1 berjalan di: ${Thread.currentThread().name}")
Thread.sleep(1000) // blokir thread ini selama 1 detik
println("Thread 1 selesai")
}
// Cara 2: fungsi thread() dari Kotlin stdlib — lebih idiomatis
import kotlin.concurrent.thread
val thread2 = thread(name = "Worker-A", isDaemon = false) {
println("Thread A berjalan")
}
thread1.start()
// thread2 sudah otomatis start karena fungsi thread() memulainya
// join() — tunggu thread selesai sebelum lanjut
thread1.join()
println("Thread 1 sudah selesai, lanjut ke kode berikutnya")
Sinkronisasi dengan synchronized
#
Untuk melindungi data bersama dari race condition, gunakan blok synchronized:
var counter = 0
val lock = Any() // objek sebagai kunci
val threads = (1..10).map {
Thread {
repeat(1000) {
synchronized(lock) {
counter++ // hanya satu thread yang bisa masuk blok ini sekaligus
}
}
}
}
threads.forEach { it.start() }
threads.forEach { it.join() }
println(counter) // selalu 10000 — aman dari race condition
AtomicInteger — Alternatif Lebih Efisien #
Untuk operasi sederhana seperti counter, AtomicInteger lebih efisien dari synchronized:
import java.util.concurrent.atomic.AtomicInteger
val atomicCounter = AtomicInteger(0)
val threads = (1..10).map {
Thread {
repeat(1000) {
atomicCounter.incrementAndGet() // atomik — tidak perlu lock
}
}
}
threads.forEach { it.start() }
threads.forEach { it.join() }
println(atomicCounter.get()) // selalu 10000
Keterbatasan Thread Langsung #
Thread langsung cocok untuk tugas sederhana, tapi punya masalah di skala besar. Membuat 10.000 thread untuk 10.000 request adalah bencana performa — setiap thread mengkonsumsi memori dan waktu OS untuk scheduling. Ini adalah alasan thread pool dan coroutine ada.
ExecutorService — Manajemen Thread Pool #
ExecutorService mengelola sekumpulan thread yang digunakan ulang — daripada membuat thread baru setiap kali ada tugas, tugas diantrikan dan dieksekusi oleh thread yang sudah ada di pool.
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
// Fixed thread pool — jumlah thread tetap
val fixedPool = Executors.newFixedThreadPool(4)
// Kirim tugas ke pool
repeat(10) { taskId ->
fixedPool.submit {
println("Tugas $taskId di thread: ${Thread.currentThread().name}")
Thread.sleep(500)
}
}
// Shutdown — tunggu semua tugas selesai, lalu tutup pool
fixedPool.shutdown()
fixedPool.awaitTermination(10, TimeUnit.SECONDS)
println("Semua tugas selesai")
Jenis-Jenis Thread Pool #
// Fixed pool — N thread tetap, tugas lebih banyak dari N akan diantri
val fixed = Executors.newFixedThreadPool(4)
// Cached pool — thread dibuat sesuai kebutuhan, dihapus jika idle > 60 detik
// Cocok untuk tugas banyak tapi singkat
val cached = Executors.newCachedThreadPool()
// Single thread executor — satu thread, tugas dieksekusi berurutan
val single = Executors.newSingleThreadExecutor()
// Scheduled pool — untuk tugas yang perlu dijadwal atau berulang
val scheduled = Executors.newScheduledThreadPool(2)
scheduled.scheduleAtFixedRate(
{ println("Tugas berulang: ${System.currentTimeMillis()}") },
0, // delay awal (detik)
5, // interval (detik)
TimeUnit.SECONDS
)
Future — Mendapatkan Hasil dari Thread Pool
#
import java.util.concurrent.Callable
val pool = Executors.newFixedThreadPool(2)
// submit Callable — mengembalikan Future<T>
val future1 = pool.submit(Callable {
Thread.sleep(1000)
"Hasil dari tugas 1"
})
val future2 = pool.submit(Callable {
Thread.sleep(500)
42
})
// get() memblokir sampai hasil tersedia
println(future1.get()) // "Hasil dari tugas 1"
println(future2.get()) // 42
pool.shutdown()
Kotlin Coroutines — Pendekatan Modern #
Coroutine adalah unit eksekusi yang jauh lebih ringan dari thread. Kamu bisa membuat ratusan ribu coroutine dalam satu program tanpa kehabisan memori, karena coroutine tidak memetakan 1:1 ke thread OS — mereka dijadwalkan oleh Kotlin runtime di atas sejumlah kecil thread.
Menambahkan Dependensi #
// build.gradle.kts
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0")
// Untuk Android
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-android:1.8.0")
}
runBlocking dan launch
#
import kotlinx.coroutines.*
fun main() = runBlocking { // membuat coroutine scope dan blokir thread utama
println("Mulai di: ${Thread.currentThread().name}")
// launch — fire-and-forget, mengembalikan Job
val job1 = launch {
delay(1000) // suspensi non-blocking (tidak memblokir thread!)
println("Job 1 selesai di: ${Thread.currentThread().name}")
}
val job2 = launch {
delay(500)
println("Job 2 selesai di: ${Thread.currentThread().name}")
}
println("Menunggu job selesai...")
job1.join() // tunggu job1
job2.join() // tunggu job2
println("Semua selesai!")
}
// Output:
// Mulai di: main
// Menunggu job selesai...
// Job 2 selesai di: main (setelah 500ms)
// Job 1 selesai di: main (setelah 1000ms)
// Semua selesai!
async dan await — Komputasi Paralel dengan Hasil
#
async mengembalikan Deferred<T> — seperti Future tapi untuk coroutine. Gunakan await() untuk mengambil hasilnya.
import kotlinx.coroutines.*
suspend fun ambilDataPengguna(id: Int): String {
delay(1000) // simulasi panggilan API
return "Pengguna #$id"
}
suspend fun ambilPesananPengguna(userId: Int): List<String> {
delay(800) // simulasi query database
return listOf("Pesanan A", "Pesanan B")
}
fun main() = runBlocking {
val mulai = System.currentTimeMillis()
// Jalankan dua operasi secara PARALEL dengan async
val deferredPengguna = async { ambilDataPengguna(42) }
val deferredPesanan = async { ambilPesananPengguna(42) }
// Tunggu keduanya selesai
val pengguna = deferredPengguna.await()
val pesanan = deferredPesanan.await()
val durasi = System.currentTimeMillis() - mulai
println("$pengguna: $pesanan")
println("Selesai dalam ${durasi}ms") // ~1000ms (paralel), bukan 1800ms (sekuensial)
}
Bandingkan dengan pendekatan sekuensial yang membuang waktu:
// ANTI-PATTERN: sekuensial padahal tidak ada dependensi antar operasi
val pengguna = ambilDataPengguna(42) // tunggu 1000ms
val pesanan = ambilPesananPengguna(42) // baru mulai, tunggu 800ms lagi
// Total: 1800ms — tidak efisien
Coroutine Dispatcher — Di Thread Mana Coroutine Berjalan? #
Dispatcher menentukan thread atau thread pool yang digunakan coroutine.
import kotlinx.coroutines.*
fun main() = runBlocking {
// Default — thread pool optimal untuk komputasi CPU-intensive
launch(Dispatchers.Default) {
println("Default: ${Thread.currentThread().name}")
// Cocok untuk: kalkulasi berat, sorting, parsing
}
// IO — thread pool besar untuk operasi I/O yang memblokir
launch(Dispatchers.IO) {
println("IO: ${Thread.currentThread().name}")
// Cocok untuk: baca/tulis file, HTTP, database blocking
}
// Main — hanya tersedia di Android/UI framework
// launch(Dispatchers.Main) { updateUI() }
// Unconfined — mulai di thread pemanggil, lanjut di thread suspension point
launch(Dispatchers.Unconfined) {
println("Unconfined start: ${Thread.currentThread().name}")
delay(100)
println("Unconfined after delay: ${Thread.currentThread().name}")
// Cocok untuk: testing, penggunaan sangat spesifik
}
// newSingleThreadContext — buat dispatcher dengan satu thread khusus
val customDispatcher = newSingleThreadContext("CustomThread")
launch(customDispatcher) {
println("Custom: ${Thread.currentThread().name}") // CustomThread
}
customDispatcher.close() // jangan lupa tutup untuk bebaskan resource
}
Berpindah Dispatcher dalam Coroutine #
suspend fun prosesData(data: String): String {
// Mulai di IO thread untuk baca dari disk
val rawData = withContext(Dispatchers.IO) {
bacaDariDisk(data)
}
// Pindah ke Default untuk komputasi berat
val hasilProses = withContext(Dispatchers.Default) {
prosesBerat(rawData)
}
return hasilProses
}
Mutex — Mencegah Race Condition di Coroutine #
Untuk melindungi data bersama di coroutine, gunakan Mutex (bukan synchronized yang memblokir thread):
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
val mutex = Mutex()
var counter = 0
fun main() = runBlocking {
val jobs = (1..1000).map {
launch(Dispatchers.Default) {
mutex.withLock {
counter++ // kritis — hanya satu coroutine sekaligus
}
}
}
jobs.forEach { it.join() }
println(counter) // selalu 1000
}
AtomicInteger Tetap Berlaku di Coroutine
#
import java.util.concurrent.atomic.AtomicInteger
val atomicCounter = AtomicInteger(0)
fun main() = runBlocking {
(1..1000).map {
launch(Dispatchers.Default) {
atomicCounter.incrementAndGet()
}
}.forEach { it.join() }
println(atomicCounter.get()) // selalu 1000, tanpa mutex
}
Channel — Komunikasi Antar Coroutine #
Channel adalah cara coroutine berkomunikasi satu sama lain — seperti queue thread-safe yang bisa di-suspend saat penuh atau kosong.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
val channel = Channel<Int>()
// Producer — kirim data ke channel
launch {
for (i in 1..5) {
println("Mengirim: $i")
channel.send(i)
delay(100)
}
channel.close() // tandai channel selesai
}
// Consumer — terima data dari channel
launch {
for (nilai in channel) { // otomatis stop saat channel ditutup
println("Menerima: $nilai")
}
}
}
// Mengirim: 1
// Menerima: 1
// Mengirim: 2
// Menerima: 2
// ...
produce — Channel Producer DSL
#
fun CoroutineScope.angkaGenap(maks: Int): ReceiveChannel<Int> = produce {
var n = 2
while (n <= maks) {
send(n)
n += 2
}
}
fun main() = runBlocking {
val genap = angkaGenap(20)
for (n in genap) print("$n ")
println()
// 2 4 6 8 10 12 14 16 18 20
}
Flow — Stream Data Asinkron #
Flow adalah sekuens data asinkron yang bisa di-collect secara reaktif. Berbeda dari Channel, Flow bersifat cold — hanya menghasilkan data saat di-collect.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// Membuat Flow
fun angkaFlow(): Flow<Int> = flow {
for (i in 1..5) {
delay(300) // simulasi data yang datang bertahap
emit(i) // kirimkan nilai ke collector
}
}
fun main() = runBlocking {
angkaFlow()
.filter { it % 2 == 0 } // filter seperti List
.map { it * it } // transformasi
.collect { nilai -> // mulai koleksi
println("Nilai: $nilai")
}
}
// Nilai: 4
// Nilai: 16
// Flow dari koleksi yang ada
listOf(1, 2, 3, 4, 5).asFlow()
.map { it * 2 }
.collect { print("$it ") }
// 2 4 6 8 10
Thread vs Coroutine — Kapan Memilih yang Mana #
flowchart TD
A{Kebutuhan\nConcurrency?} --> B{Jumlah unit\neksekusi?}
B -- Sedikit, puluhan --> C[Thread / ExecutorService]
B -- Banyak, ribuan+ --> D[Coroutine]
A --> E{Operasi yang\ndilakukan?}
E -- CPU-intensive\nparsing, kalkulasi --> F["Coroutine\nDispatchers.Default"]
E -- I/O blocking\nfile, HTTP, DB --> G["Coroutine\nDispatchers.IO"]
E -- Streaming data\nreaktif --> H["Flow"]
E -- Komunikasi\nantar task --> I["Channel"]| Aspek | Thread | ExecutorService | Coroutine |
|---|---|---|---|
| Berat | ~1MB stack per thread | Tergantung pool size | Sangat ringan (~beberapa KB) |
| Skala | Ratusan | Ratusan-ribuan | Ratusan ribu |
| Blocking | Memblokir thread | Memblokir thread | Suspensi non-blocking |
| Kode | Callback / join manual | Future.get() | await(), sequential style |
| Error handling | Sulit, uncaught exception | Sulit | Structured concurrency |
| Cocok untuk | Task OS-level, legacy | Backend sederhana | Semua kasus modern |
Structured Concurrency — Keunggulan Coroutine #
Salah satu keunggulan terbesar coroutine adalah structured concurrency — coroutine anak tidak bisa “kabur” dari scope induknya. Jika induk dibatalkan, semua coroutine anak ikut dibatalkan.
fun main() = runBlocking {
val scope = CoroutineScope(Dispatchers.Default)
val job = scope.launch {
launch { // coroutine anak
delay(5000)
println("Ini tidak akan tercetak jika job dibatalkan")
}
launch { // coroutine anak kedua
delay(3000)
println("Ini juga tidak akan tercetak")
}
delay(10000)
}
delay(1000)
job.cancel() // membatalkan job DAN semua coroutine anak
job.join()
println("Semua coroutine sudah dibatalkan")
}
Ringkasan #
- Thread untuk kasus sederhana — gunakan
thread { }dari Kotlin stdlib untuk task sederhana yang tidak perlu skala. Selalujoin()sebelum mengakses hasilnya.- ExecutorService untuk thread pool — lebih efisien dari membuat thread baru setiap kali. Gunakan
newFixedThreadPooluntuk throughput konstan,newCachedThreadPooluntuk burst request.- Coroutine adalah standar modern Kotlin — jauh lebih ringan dari thread, mendukung suspensi non-blocking, dan lebih mudah dibaca karena kode asinkron ditulis seperti sinkron.
launchuntuk fire-and-forget,asyncuntuk nilai — gunakanlaunchketika tidak butuh hasil,async + await()ketika butuh nilai dari komputasi paralel.Dispatchers.Defaultuntuk CPU,Dispatchers.IOuntuk I/O — jangan jalankan operasi I/O blocking diDefault(memboroskan CPU thread), dan jangan komputasi berat diIO.Mutexbukansynchronizeddi coroutine —synchronizedmemblokir thread,Mutex.withLock {}hanya mensuspensi coroutine — jauh lebih efisien.Flowuntuk stream data — gunakanFlowdaripadaChanneluntuk data yang dihasilkan secara reaktif.Flowbersifat cold dan bisa di-compose denganfilter,map,flatMap.- Structured concurrency mencegah coroutine leak — batalkan scope induk untuk membatalkan semua coroutine anak secara otomatis. Hindari
GlobalScopekecuali benar-benar diperlukan.