After working on my JavaScript AsyncExecutor, I realized I needed to implement a Python AsyncExecutor.
I had previously explored a very basic post the differences between JavaScript and Python’s async/await mechanisms. The Python approach, using async/await, native coroutines, tasks, and futures, seems more intricate than JavaScript’s async/await with promises. However, this might just be due to my greater familiarity with asynchronous JavaScript.
Implementing an AsyncExecutor is fairly straightforward. Unlike JavaScript, which primarily uses Promises, Python offers various awaitable objects: native coroutines, futures, and tasks (which are essentially futures). The AsyncExecutor accepts “awaitable functions” (functions returning any of these awaitable objects) along with their arguments. Upon submission, it returns a Future that will be resolved with the function’s result once it completes.
The _run_action() method manages the execution of these “awaitable functions” when a slot becomes available. Being a native coroutine itself, _run_action requires special handling when called from non-coroutine functions like submit() (used for the initial submissions) or _process_result() (used when slots free up). In these cases, asyncio.create_task(self._run_action(action)) is necessary, which contrasts with JavaScript’s immediate execution of async functions without explicit task creation or awaiting.
Here’s the Python code (also available on a gist):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
| import asyncio
from dataclasses import dataclass
import random
from typing import Any
import json
@dataclass
class AsyncAction:
future: asyncio.Future
awaitable_fn: Any # function that returns an awaitable (coroutine, a task...)
args: list[Any]
kwargs: list[Any]
class AsyncExecutor:
def __init__(self, event_loop: asyncio.AbstractEventLoop, max_running_actions: int):
self.event_loop = event_loop
self.max_running_actions = max_running_actions
self.running_counter = 0
self.not_launched_actions = []
def submit(self, awaitable_fn, *args, **kwargs) -> asyncio.Future:
"""
receives a function to be executed when there's one available slot. That function returns and awaitable
"""
future = self.event_loop.create_future()
action = AsyncAction(future, awaitable_fn, args, kwargs)
if self.running_counter < self.max_running_actions:
self.running_counter += 1
# _run_action returns a coroutine, so if I'm not awaiting it need to run it as a task
#self._run_action(action)
asyncio.create_task(self._run_action(action))
else:
self.not_launched_actions.append(action)
return future
async def _run_action(self, action: AsyncAction):
result = await action.awaitable_fn(*(action.args), **(action.kwargs))
self._process_result(action, result)
def _process_result(self, action: AsyncAction, result: Any):
self.running_counter -= 1
action.future.set_result(result)
if len(self.not_launched_actions):
self.running_counter += 1
asyncio.create_task(self._run_action(self.not_launched_actions.pop(0)))
async def mock_download(url: str, delay: int):
print("starting mock download")
await asyncio.sleep(delay)
return url.upper()
def create_download_task(url: str, delay: int):
print(create_download_task.__name__)
return asyncio.get_running_loop().create_task(mock_download(url, delay))
async def main():
async_executor = AsyncExecutor(asyncio.get_running_loop(), 4)
futures = []
for i in range(0,10):
delay = random.randint(1, 4)
if i % 2 == 0:
future = async_executor.submit(mock_download, f"www.jesoutienslapolice.fr/post_{i}", delay)
else:
future = async_executor.submit(create_download_task, f"www.jesoutienslapolice.fr/post_{i}", delay)
future.add_done_callback(lambda fut: print(f"{fut.result()} done"))
futures.append(future)
future = async_executor.submit(mock_download, f"www.jesoutienslapolice.fr/post_{i}", delay)
future.add_done_callback(lambda fut: print(f"{fut.result()} done"))
futures.append(future)
print(f"{len(futures)} submitted")
results = await asyncio.gather(*futures)
print(f"all finished: {json.dumps(results, indent=4)}")
asyncio.run(main())
|
As demonstrated, asyncio.create_task plays a crucial role in Python’s asynchronous operations. This discussion provides further insight into its function.
Essentially, it submits a coroutine for execution “in the background,” allowing it to run concurrently with other tasks and the current task. The execution context switches between these tasks at await points. asyncio.create_task returns an awaitable “task” object, which serves as a handle to potentially cancel the coroutine’s execution.
Considered a fundamental element in asyncio, asyncio.create_task is analogous to starting a thread. Similarly, awaiting the task using await is akin to joining a thread.
in Python