Explanation of a basic example using Kotlin Coroutines

In my recent [post](previous post), I discussed how Kotlin’s approach to asynchronous programming (particularly with coroutines) differs from what I’m accustomed to in JavaScript and Python (and even C#). Now that I grasp the fundamentals of how suspend, continuation passing style, and compiler magic intertwine, it’s time to put theory into practice.

Kotlin’s asynchronous capabilities are remarkably flexible compared to JavaScript and Python, especially regarding dispatchers. These allow code execution in various contexts like event loops or thread pools. Notably, Kotlin offers at least two distinct thread pools: Dispatchers.Default for CPU-intensive tasks (using threads equivalent to CPU cores) and Dispatchers.IO for I/O-bound operations. For a deeper dive into dispatchers, refer to [this resource](this excellent article).

Let’s examine a common asynchronous scenario: initiating multiple non-blocking I/O operations (simulated HTTP requests fetching blog posts), awaiting their completion, and processing the results.

Here’s a Python implementation:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
`delays = {
    "A1": 3,
    "B1": 1,
    "C1": 0.1,
}

async def get_post(id: str) -> str:
    print(f"getting post: {id}")
    if not id in delays:
        await asyncio.sleep(0.5)
        raise Exception(f"Missing post: {id}")   
     
    await asyncio.sleep(delays[id])
    return f"POST: [[{id}]]"

# async def retrieve_posts_sequentially():
#     print("started")
#     post_ids = ["A1", "B1", "C1"]
#     retrieved_posts = []
#     for id in post_ids:
#         retrieved_posts.append(await get_post(id))
#     print("all posts retrieved")
#     for post in retrieved_posts:
#         print(f"post: {post}")

async def retrieve_posts():
    print("started")
    post_ids = ["A1", "B1", "C1", "D1"]
    requested_posts = [asyncio.create_task(get_post(id)) for id in post_ids]
    retrieved_posts = await asyncio.gather(*requested_posts, return_exceptions= True)
    print("all posts retrieved")
    for post in retrieved_posts:
        print(f"post: {post}")


asyncio.run(retrieve_posts())` 

We begin by creating an event loop using asyncio.run() to execute our asynchronous code. Our async function (coroutine function in Python) named get_post simulates fetching a post. Invoking it produces a coroutine object without immediately running the code. Execution requires either awaiting the coroutine (e.g., await get_post()) or launching it via create_task. Awaiting suspends the calling function until the coroutine finishes, leading to sequential execution (as demonstrated in the commented retrieve_posts_sequentially() function). Conversely, create_task launches the coroutine without awaiting, enabling concurrent execution within the event loop thread. This is illustrated in the retrieve_posts() function, where we launch all requests and then use await asyncio.gather() to wait for their completion.

Now, let’s translate this to Kotlin:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
 `val idsToDelays = mapOf(
    "A1" to 1000,
    "B1" to 2000,
    "C1" to 500,
)

suspend fun getPost(id: String): String {
    println("getPost $id start, context: $coroutineContext Thread: ${Thread.currentThread().name}")
    delay(idsToDelays[id] ?: 500)
    println("getPost $id end, context: $coroutineContext Thread: ${Thread.currentThread().name}")
    return if (idsToDelays[id] !== null) "POST: [[${id}]]"
        else throw Exception("missing ID")
		
val postIds = listOf("A1", "B1", "C1")

//runBlocking {
//	println("context: $coroutineContext")
//	postIds.forEach { getPost(it) }
//}
	
runBlocking {
	// same as the previous test, but now causing an exception and handling it
	val futures = postIds.map { async {
		//this is a try-catch expression, so we don't need to write "return"
		try {
			getPost(it)
		}
		catch (ex: Exception) {
			ex.message
		}
	}}

	val posts = futures.awaitAll()
	posts.forEach(::println)
}

/*
getPost A1 start, context: [DeferredCoroutine{Active}@566776ad, BlockingEventLoop@6108b2d7] Thread: main
getPost B1 start, context: [DeferredCoroutine{Active}@1554909b, BlockingEventLoop@6108b2d7] Thread: main
getPost C1 start, context: [DeferredCoroutine{Active}@6bf256fa, BlockingEventLoop@6108b2d7] Thread: main
getPost D1 start, context: [DeferredCoroutine{Active}@6cd8737, BlockingEventLoop@6108b2d7] Thread: main
getPost C1 end, context: [DeferredCoroutine{Active}@6bf256fa, BlockingEventLoop@6108b2d7] Thread: main
getPost D1 end, context: [DeferredCoroutine{Active}@6cd8737, BlockingEventLoop@6108b2d7] Thread: main
getPost A1 end, context: [DeferredCoroutine{Active}@566776ad, BlockingEventLoop@6108b2d7] Thread: main
getPost B1 end, context: [DeferredCoroutine{Active}@1554909b, BlockingEventLoop@6108b2d7] Thread: main
POST: [[A1]]
POST: [[B1]]
POST: [[C1]]
missing ID
Finished, current thread: main
*/` 

We use the suspend keyword (no async keyword in Kotlin) to mark our asynchronous function (getPost). To execute suspend functions, we utilize a coroutine, created here using the runBlocking() function (a coroutine builder). Without a CoroutineContext, runBlocking executes the code in an event loop within the current thread. Directly calling getPost (as shown in the commented block) would result in sequential execution, similar to the commented Python code. To achieve concurrency, we employ the async() function (another coroutine builder), creating a new coroutine for each getPost call. These coroutines run concurrently, and async() returns a Deferred object (akin to Python’s Task-Future) that completes alongside its coroutine. We use awaitAll() to wait for all Deferred objects to complete. Note that without a specific dispatcher, async() inherits the parent coroutine’s dispatcher (the event loop dispatcher in this case).

In essence:

  • asyncio.run() in Python is analogous to runBlocking() in Kotlin: both initiate an event loop.
  • asyncio.createTask() mirrors async(): both launch coroutines.
  • asyncio.gather(list[Task]) corresponds to List[Deferred].awaitAll(): both await the completion of multiple tasks/coroutines.

For completeness, let’s present the JavaScript equivalent:

1
2
3
4
 `let postIds = ["A1", "B1", "C1"];
let postPromises = postIds.map(getPost);
let retrivedPosts = await Promise.all(postPromises);
retrivedPosts.forEach(console.log);` 

In JavaScript, calling an async function directly returns a Promise. There’s no explicit await needed for execution. To avoid suspension, we gather the returned Promises from getPost calls. Then, we use await Promise.all() to wait for all promises to resolve. The JavaScript runtime inherently manages the event loop.

Licensed under CC BY-NC-SA 4.0
Last updated on Jul 18, 2023 12:15 +0100