Multi Threading

Multi Threading #

Setiap aplikasi yang serius perlu melakukan lebih dari satu hal sekaligus: menangani request HTTP sambil memproses database, mengunduh file sambil memperbarui UI, atau menjalankan beberapa komputasi berat secara paralel. Di Kotlin ada tiga lapisan cara menangani ini: Thread (API Java paling dasar), ExecutorService (manajemen thread pool), dan Coroutines (pendekatan modern Kotlin yang lebih efisien). Ketiganya punya trade-off berbeda — memahami kapan dan mengapa memilih masing-masing adalah kunci menulis aplikasi concurrent yang benar dan performan. Artikel ini membahas ketiganya secara mendalam, dengan fokus pada Coroutines sebagai cara idiomatis Kotlin.

Masalah Concurrency yang Perlu Dipahami #

Sebelum membahas solusi, penting memahami masalah yang bisa muncul saat beberapa thread mengakses data yang sama secara bersamaan:

// Race condition — bug yang tidak deterministik dan susah di-debug
var counter = 0

fun increment() {
    repeat(10_000) {
        counter++  // TIDAK thread-safe! read-modify-write bukan atomik
    }
}

val t1 = Thread { increment() }
val t2 = Thread { increment() }
t1.start(); t2.start()
t1.join(); t2.join()

println(counter)  // Seharusnya 20000, tapi bisa lebih kecil karena race condition

Masalah umum dalam concurrency: race condition (hasil bergantung pada timing), deadlock (dua thread saling menunggu satu sama lain), dan starvation (satu thread tidak pernah mendapat giliran). Solusinya bisa berupa sinkronisasi eksplisit, struktur data thread-safe, atau menghindari shared mutable state sama sekali.


Thread — API Dasar Java #

Thread adalah unit eksekusi paling dasar di JVM. Setiap thread adalah objek OS yang membutuhkan sekitar 1MB memori stack dan ribuan nanodetik untuk dibuat.

// Cara 1: lambda langsung
val thread1 = Thread {
    println("Thread 1 berjalan di: ${Thread.currentThread().name}")
    Thread.sleep(1000)  // blokir thread ini selama 1 detik
    println("Thread 1 selesai")
}

// Cara 2: fungsi thread() dari Kotlin stdlib — lebih idiomatis
import kotlin.concurrent.thread

val thread2 = thread(name = "Worker-A", isDaemon = false) {
    println("Thread A berjalan")
}

thread1.start()
// thread2 sudah otomatis start karena fungsi thread() memulainya

// join() — tunggu thread selesai sebelum lanjut
thread1.join()
println("Thread 1 sudah selesai, lanjut ke kode berikutnya")

Sinkronisasi dengan synchronized #

Untuk melindungi data bersama dari race condition, gunakan blok synchronized:

var counter = 0
val lock = Any()  // objek sebagai kunci

val threads = (1..10).map {
    Thread {
        repeat(1000) {
            synchronized(lock) {
                counter++  // hanya satu thread yang bisa masuk blok ini sekaligus
            }
        }
    }
}

threads.forEach { it.start() }
threads.forEach { it.join() }
println(counter)  // selalu 10000 — aman dari race condition

AtomicInteger — Alternatif Lebih Efisien #

Untuk operasi sederhana seperti counter, AtomicInteger lebih efisien dari synchronized:

import java.util.concurrent.atomic.AtomicInteger

val atomicCounter = AtomicInteger(0)

val threads = (1..10).map {
    Thread {
        repeat(1000) {
            atomicCounter.incrementAndGet()  // atomik — tidak perlu lock
        }
    }
}

threads.forEach { it.start() }
threads.forEach { it.join() }
println(atomicCounter.get())  // selalu 10000

Keterbatasan Thread Langsung #

Thread langsung cocok untuk tugas sederhana, tapi punya masalah di skala besar. Membuat 10.000 thread untuk 10.000 request adalah bencana performa — setiap thread mengkonsumsi memori dan waktu OS untuk scheduling. Ini adalah alasan thread pool dan coroutine ada.


ExecutorService — Manajemen Thread Pool #

ExecutorService mengelola sekumpulan thread yang digunakan ulang — daripada membuat thread baru setiap kali ada tugas, tugas diantrikan dan dieksekusi oleh thread yang sudah ada di pool.

import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

// Fixed thread pool — jumlah thread tetap
val fixedPool = Executors.newFixedThreadPool(4)

// Kirim tugas ke pool
repeat(10) { taskId ->
    fixedPool.submit {
        println("Tugas $taskId di thread: ${Thread.currentThread().name}")
        Thread.sleep(500)
    }
}

// Shutdown — tunggu semua tugas selesai, lalu tutup pool
fixedPool.shutdown()
fixedPool.awaitTermination(10, TimeUnit.SECONDS)
println("Semua tugas selesai")

Jenis-Jenis Thread Pool #

// Fixed pool — N thread tetap, tugas lebih banyak dari N akan diantri
val fixed = Executors.newFixedThreadPool(4)

// Cached pool — thread dibuat sesuai kebutuhan, dihapus jika idle > 60 detik
// Cocok untuk tugas banyak tapi singkat
val cached = Executors.newCachedThreadPool()

// Single thread executor — satu thread, tugas dieksekusi berurutan
val single = Executors.newSingleThreadExecutor()

// Scheduled pool — untuk tugas yang perlu dijadwal atau berulang
val scheduled = Executors.newScheduledThreadPool(2)
scheduled.scheduleAtFixedRate(
    { println("Tugas berulang: ${System.currentTimeMillis()}") },
    0,    // delay awal (detik)
    5,    // interval (detik)
    TimeUnit.SECONDS
)

Future — Mendapatkan Hasil dari Thread Pool #

import java.util.concurrent.Callable

val pool = Executors.newFixedThreadPool(2)

// submit Callable — mengembalikan Future<T>
val future1 = pool.submit(Callable {
    Thread.sleep(1000)
    "Hasil dari tugas 1"
})

val future2 = pool.submit(Callable {
    Thread.sleep(500)
    42
})

// get() memblokir sampai hasil tersedia
println(future1.get())  // "Hasil dari tugas 1"
println(future2.get())  // 42

pool.shutdown()

Kotlin Coroutines — Pendekatan Modern #

Coroutine adalah unit eksekusi yang jauh lebih ringan dari thread. Kamu bisa membuat ratusan ribu coroutine dalam satu program tanpa kehabisan memori, karena coroutine tidak memetakan 1:1 ke thread OS — mereka dijadwalkan oleh Kotlin runtime di atas sejumlah kecil thread.

Menambahkan Dependensi #

// build.gradle.kts
dependencies {
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0")

    // Untuk Android
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-android:1.8.0")
}

runBlocking dan launch #

import kotlinx.coroutines.*

fun main() = runBlocking {  // membuat coroutine scope dan blokir thread utama
    println("Mulai di: ${Thread.currentThread().name}")

    // launch — fire-and-forget, mengembalikan Job
    val job1 = launch {
        delay(1000)  // suspensi non-blocking (tidak memblokir thread!)
        println("Job 1 selesai di: ${Thread.currentThread().name}")
    }

    val job2 = launch {
        delay(500)
        println("Job 2 selesai di: ${Thread.currentThread().name}")
    }

    println("Menunggu job selesai...")
    job1.join()  // tunggu job1
    job2.join()  // tunggu job2

    println("Semua selesai!")
}
// Output:
// Mulai di: main
// Menunggu job selesai...
// Job 2 selesai di: main  (setelah 500ms)
// Job 1 selesai di: main  (setelah 1000ms)
// Semua selesai!

async dan await — Komputasi Paralel dengan Hasil #

async mengembalikan Deferred<T> — seperti Future tapi untuk coroutine. Gunakan await() untuk mengambil hasilnya.

import kotlinx.coroutines.*

suspend fun ambilDataPengguna(id: Int): String {
    delay(1000)  // simulasi panggilan API
    return "Pengguna #$id"
}

suspend fun ambilPesananPengguna(userId: Int): List<String> {
    delay(800)   // simulasi query database
    return listOf("Pesanan A", "Pesanan B")
}

fun main() = runBlocking {
    val mulai = System.currentTimeMillis()

    // Jalankan dua operasi secara PARALEL dengan async
    val deferredPengguna = async { ambilDataPengguna(42) }
    val deferredPesanan  = async { ambilPesananPengguna(42) }

    // Tunggu keduanya selesai
    val pengguna = deferredPengguna.await()
    val pesanan  = deferredPesanan.await()

    val durasi = System.currentTimeMillis() - mulai
    println("$pengguna: $pesanan")
    println("Selesai dalam ${durasi}ms")  // ~1000ms (paralel), bukan 1800ms (sekuensial)
}

Bandingkan dengan pendekatan sekuensial yang membuang waktu:

// ANTI-PATTERN: sekuensial padahal tidak ada dependensi antar operasi
val pengguna = ambilDataPengguna(42)   // tunggu 1000ms
val pesanan  = ambilPesananPengguna(42) // baru mulai, tunggu 800ms lagi
// Total: 1800ms — tidak efisien

Coroutine Dispatcher — Di Thread Mana Coroutine Berjalan? #

Dispatcher menentukan thread atau thread pool yang digunakan coroutine.

import kotlinx.coroutines.*

fun main() = runBlocking {
    // Default — thread pool optimal untuk komputasi CPU-intensive
    launch(Dispatchers.Default) {
        println("Default: ${Thread.currentThread().name}")
        // Cocok untuk: kalkulasi berat, sorting, parsing
    }

    // IO — thread pool besar untuk operasi I/O yang memblokir
    launch(Dispatchers.IO) {
        println("IO: ${Thread.currentThread().name}")
        // Cocok untuk: baca/tulis file, HTTP, database blocking
    }

    // Main — hanya tersedia di Android/UI framework
    // launch(Dispatchers.Main) { updateUI() }

    // Unconfined — mulai di thread pemanggil, lanjut di thread suspension point
    launch(Dispatchers.Unconfined) {
        println("Unconfined start: ${Thread.currentThread().name}")
        delay(100)
        println("Unconfined after delay: ${Thread.currentThread().name}")
        // Cocok untuk: testing, penggunaan sangat spesifik
    }

    // newSingleThreadContext — buat dispatcher dengan satu thread khusus
    val customDispatcher = newSingleThreadContext("CustomThread")
    launch(customDispatcher) {
        println("Custom: ${Thread.currentThread().name}")  // CustomThread
    }
    customDispatcher.close()  // jangan lupa tutup untuk bebaskan resource
}

Berpindah Dispatcher dalam Coroutine #

suspend fun prosesData(data: String): String {
    // Mulai di IO thread untuk baca dari disk
    val rawData = withContext(Dispatchers.IO) {
        bacaDariDisk(data)
    }

    // Pindah ke Default untuk komputasi berat
    val hasilProses = withContext(Dispatchers.Default) {
        prosesBerat(rawData)
    }

    return hasilProses
}

Mutex — Mencegah Race Condition di Coroutine #

Untuk melindungi data bersama di coroutine, gunakan Mutex (bukan synchronized yang memblokir thread):

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

val mutex = Mutex()
var counter = 0

fun main() = runBlocking {
    val jobs = (1..1000).map {
        launch(Dispatchers.Default) {
            mutex.withLock {
                counter++  // kritis — hanya satu coroutine sekaligus
            }
        }
    }
    jobs.forEach { it.join() }
    println(counter)  // selalu 1000
}

AtomicInteger Tetap Berlaku di Coroutine #

import java.util.concurrent.atomic.AtomicInteger

val atomicCounter = AtomicInteger(0)

fun main() = runBlocking {
    (1..1000).map {
        launch(Dispatchers.Default) {
            atomicCounter.incrementAndGet()
        }
    }.forEach { it.join() }

    println(atomicCounter.get())  // selalu 1000, tanpa mutex
}

Channel — Komunikasi Antar Coroutine #

Channel adalah cara coroutine berkomunikasi satu sama lain — seperti queue thread-safe yang bisa di-suspend saat penuh atau kosong.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    val channel = Channel<Int>()

    // Producer — kirim data ke channel
    launch {
        for (i in 1..5) {
            println("Mengirim: $i")
            channel.send(i)
            delay(100)
        }
        channel.close()  // tandai channel selesai
    }

    // Consumer — terima data dari channel
    launch {
        for (nilai in channel) {  // otomatis stop saat channel ditutup
            println("Menerima: $nilai")
        }
    }
}
// Mengirim: 1
// Menerima: 1
// Mengirim: 2
// Menerima: 2
// ...

produce — Channel Producer DSL #

fun CoroutineScope.angkaGenap(maks: Int): ReceiveChannel<Int> = produce {
    var n = 2
    while (n <= maks) {
        send(n)
        n += 2
    }
}

fun main() = runBlocking {
    val genap = angkaGenap(20)
    for (n in genap) print("$n ")
    println()
    // 2 4 6 8 10 12 14 16 18 20
}

Flow — Stream Data Asinkron #

Flow adalah sekuens data asinkron yang bisa di-collect secara reaktif. Berbeda dari Channel, Flow bersifat cold — hanya menghasilkan data saat di-collect.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// Membuat Flow
fun angkaFlow(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(300)  // simulasi data yang datang bertahap
        emit(i)     // kirimkan nilai ke collector
    }
}

fun main() = runBlocking {
    angkaFlow()
        .filter { it % 2 == 0 }     // filter seperti List
        .map { it * it }             // transformasi
        .collect { nilai ->          // mulai koleksi
            println("Nilai: $nilai")
        }
}
// Nilai: 4
// Nilai: 16

// Flow dari koleksi yang ada
listOf(1, 2, 3, 4, 5).asFlow()
    .map { it * 2 }
    .collect { print("$it ") }
// 2 4 6 8 10

Thread vs Coroutine — Kapan Memilih yang Mana #

flowchart TD
    A{Kebutuhan\nConcurrency?} --> B{Jumlah unit\neksekusi?}
    B -- Sedikit, puluhan --> C[Thread / ExecutorService]
    B -- Banyak, ribuan+ --> D[Coroutine]
    A --> E{Operasi yang\ndilakukan?}
    E -- CPU-intensive\nparsing, kalkulasi --> F["Coroutine\nDispatchers.Default"]
    E -- I/O blocking\nfile, HTTP, DB --> G["Coroutine\nDispatchers.IO"]
    E -- Streaming data\nreaktif --> H["Flow"]
    E -- Komunikasi\nantar task --> I["Channel"]
AspekThreadExecutorServiceCoroutine
Berat~1MB stack per threadTergantung pool sizeSangat ringan (~beberapa KB)
SkalaRatusanRatusan-ribuanRatusan ribu
BlockingMemblokir threadMemblokir threadSuspensi non-blocking
KodeCallback / join manualFuture.get()await(), sequential style
Error handlingSulit, uncaught exceptionSulitStructured concurrency
Cocok untukTask OS-level, legacyBackend sederhanaSemua kasus modern

Structured Concurrency — Keunggulan Coroutine #

Salah satu keunggulan terbesar coroutine adalah structured concurrency — coroutine anak tidak bisa “kabur” dari scope induknya. Jika induk dibatalkan, semua coroutine anak ikut dibatalkan.

fun main() = runBlocking {
    val scope = CoroutineScope(Dispatchers.Default)

    val job = scope.launch {
        launch {  // coroutine anak
            delay(5000)
            println("Ini tidak akan tercetak jika job dibatalkan")
        }
        launch {  // coroutine anak kedua
            delay(3000)
            println("Ini juga tidak akan tercetak")
        }
        delay(10000)
    }

    delay(1000)
    job.cancel()  // membatalkan job DAN semua coroutine anak
    job.join()
    println("Semua coroutine sudah dibatalkan")
}

Ringkasan #

  • Thread untuk kasus sederhana — gunakan thread { } dari Kotlin stdlib untuk task sederhana yang tidak perlu skala. Selalu join() sebelum mengakses hasilnya.
  • ExecutorService untuk thread pool — lebih efisien dari membuat thread baru setiap kali. Gunakan newFixedThreadPool untuk throughput konstan, newCachedThreadPool untuk burst request.
  • Coroutine adalah standar modern Kotlin — jauh lebih ringan dari thread, mendukung suspensi non-blocking, dan lebih mudah dibaca karena kode asinkron ditulis seperti sinkron.
  • launch untuk fire-and-forget, async untuk nilai — gunakan launch ketika tidak butuh hasil, async + await() ketika butuh nilai dari komputasi paralel.
  • Dispatchers.Default untuk CPU, Dispatchers.IO untuk I/O — jangan jalankan operasi I/O blocking di Default (memboroskan CPU thread), dan jangan komputasi berat di IO.
  • Mutex bukan synchronized di coroutinesynchronized memblokir thread, Mutex.withLock {} hanya mensuspensi coroutine — jauh lebih efisien.
  • Flow untuk stream data — gunakan Flow daripada Channel untuk data yang dihasilkan secara reaktif. Flow bersifat cold dan bisa di-compose dengan filter, map, flatMap.
  • Structured concurrency mencegah coroutine leak — batalkan scope induk untuk membatalkan semua coroutine anak secara otomatis. Hindari GlobalScope kecuali benar-benar diperlukan.

← Sebelumnya: Build Tools   Berikutnya: I/O →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact