Kotlin Coroutines for managing CPU and IO bound tasks

Let’s dive into a follow-up on this recent post, where I explored Kotlin Coroutines practically. In the previous post, I executed multiple suspend functions concurrently using the async() coroutine builder, all within a single thread managed by an event-loop established with the runBlocking() coroutine builder. This approach is efficient for I/O-bound operations, but what about CPU-intensive tasks that could benefit from parallel execution on separate threads? Consider functions involving both heavy computations and I/O operations. Coroutines, coupled with a dispatcher that utilizes a ThreadPool, provide an elegant solution.

When invoking a CPU-bound function twice concurrently using the async coroutine builder, the execution remains sequential if no suspension points exist within the function. The first coroutine launched by async() will run to completion before the second one starts since both run within the event-loop thread.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
 `fun doCalculation(id: Int, duration: Int): Int{
    var count = 0
    while (count++ < duration) {
        println("calculation: $id, step: $count, thread: ${Thread.currentThread().name}")
        Thread.sleep(500)
    }
    println("Finishing calculation: $id, step: $count, thread: ${Thread.currentThread().name}")
    return id
}

fun performCalculationsSequentially() {
    runBlocking {
        val time = measureTimeMillis {
            val c1 = async { doCalculation(id = 1, duration = 3) }
            val c2 = async { doCalculation(id = 2, duration = 6) }
            val results = listOf(c1, c2).awaitAll()
            println("tasks finished, ${results.joinToString(", ")}")
        }
        println("Time taken: $time") 
    }
}

/*
Started, cpubound
calculation: 1, step: 1, thread: main
calculation: 1, step: 2, thread: main
calculation: 1, step: 3, thread: main
Finishing calculation: 1, step: 4, thread: main
calculation: 2, step: 1, thread: main
calculation: 2, step: 2, thread: main
calculation: 2, step: 3, thread: main
calculation: 2, step: 4, thread: main
calculation: 2, step: 5, thread: main
calculation: 2, step: 6, thread: main
Finishing calculation: 2, step: 7, thread: main
tasks finished, 1, 2
Time taken: 4591
*/`

While event-loop-based coroutines excel in coordinating multiple suspending functions (akin to JavaScript’s async or Python’s asyncio), they fall short when dealing with non-suspending, CPU-bound operations.

By employing Dispatchers.Default with the async coroutine builder (e.g., async(Dispatchers.Default)), we can leverage a ThreadPool for parallel execution. In the following code, each calculation will run on a separate thread provided by the ThreadPool.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
 `fun doCalculation(id: Int, duration: Int): Int{
    var count = 0
    while (count++ < duration) {
        println("calculation: $id, step: $count, thread: ${Thread.currentThread().name}")
        Thread.sleep(500)
    }
    println("Finishing calculation: $id, step: $count, thread: ${Thread.currentThread().name}")
    return id
}

fun performCalculations3() {
    runBlocking {
        println("coroutineContext $coroutineContext, thread: ${Thread.currentThread().name}")
        val time = measureTimeMillis {
            val c1 = async (Dispatchers.Default){
                println("inside async, coroutineContext $coroutineContext")
                doCalculation(id = 1, duration = 3)
            }
            val c2 = async (Dispatchers.Default){
                println("inside async, coroutineContext $coroutineContext")
                doCalculation(id = 2, duration = 6)
            }
            val results = listOf(c1, c2).awaitAll()
            println("tasks finished: ${results.joinToString(", ")}, coroutineContext $coroutineContext, thread: ${Thread.currentThread().name}")
        }
        println("Time taken: $time") // Time taken: 3079
    }
}

/*
coroutineContext [BlockingCoroutine{Active}@46f5f779, BlockingEventLoop@1c2c22f3], thread: main
inside async, coroutineContext [DeferredCoroutine{Active}@3a286e59, Dispatchers.Default]
inside async, coroutineContext [DeferredCoroutine{Active}@5e11571d, Dispatchers.Default]
calculation: 2, step: 1, thread: DefaultDispatcher-worker-2
calculation: 1, step: 1, thread: DefaultDispatcher-worker-1
calculation: 1, step: 2, thread: DefaultDispatcher-worker-1
calculation: 2, step: 2, thread: DefaultDispatcher-worker-2
calculation: 1, step: 3, thread: DefaultDispatcher-worker-1
calculation: 2, step: 3, thread: DefaultDispatcher-worker-2
Finishing calculation: 1, step: 4, thread: DefaultDispatcher-worker-1
calculation: 2, step: 4, thread: DefaultDispatcher-worker-2
calculation: 2, step: 5, thread: DefaultDispatcher-worker-2
calculation: 2, step: 6, thread: DefaultDispatcher-worker-2
Finishing calculation: 2, step: 7, thread: DefaultDispatcher-worker-2
Time taken: 3079
*/`

In this scenario, the main coroutine utilizes the event-loop, while the two coroutines created with async(Dispatchers.Default) leverage the ThreadPool. The parallel execution is evident from the reduced execution time of 3 seconds (duration 6 * 500 milliseconds) compared to a sequential execution time of 4.5 seconds (9 * 500 milliseconds).

Given the purely CPU-bound nature of our doCalculation function, one might question the need for coroutines and consider using a ThreadPoolExecutor directly. However, the async coroutine builder provides convenient Deferred objects that simplify the process of awaiting results, a feature that might not be as straightforward with a ThreadPoolExecutor. Furthermore, the value of coroutines with thread-pool dispatchers becomes more apparent when dealing with suspending functions that involve both I/O operations and CPU-bound tasks (e.g., fetching and encrypting data from a web service).

Moreover, it’s possible to switch a coroutine’s dispatcher dynamically using the [withContext](https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html) function. Instead of creating a new coroutine, this function establishes a new CoroutineContext with the specified dispatcher. The code block passed to withContext will execute within this new context, effectively changing the dispatcher for any suspend functions invoked within that block. This allows for flexibility in managing coroutine execution, starting on an event loop and switching to a thread pool when necessary.

Licensed under CC BY-NC-SA 4.0
Last updated on Mar 22, 2024 00:10 +0100