asyncio.TaskGroup, Runner, and Timeout Backport
The `taskgroup` library provides a backport of the `asyncio.TaskGroup`, `asyncio.Runner`, and `asyncio.timeout` functionalities from Python 3.12.8 to older Python versions, specifically Python 3.8, 3.9, 3.10, and 3.11. This allows developers to leverage the structured concurrency model introduced in Python 3.11+ for `asyncio` in environments that are not yet able to upgrade to the latest Python releases.
Warnings
- gotcha This library is a backport. While it aims for faithful replication, subtle behavioral differences or edge cases might exist compared to the native `asyncio.TaskGroup`, `Runner`, or `timeout` in Python 3.11+. Always test thoroughly.
- breaking The `asyncio.TaskGroup` requires Python 3.11 or higher. Using this backport on Python 3.11+ might lead to conflicts or unexpected behavior if both the native and backported versions are accidentally imported or used, although the provided quickstart handles this with a `sys.version_info` check.
- gotcha When an exception occurs in a task within an `asyncio.TaskGroup`, all other running tasks in that group are cancelled, and all non-cancellation exceptions are propagated as an `ExceptionGroup`. Proper error handling using `except*` (for `ExceptionGroup`) is crucial for robust applications. This behavior is fundamental to structured concurrency.
Install
-
pip install taskgroup
Imports
- TaskGroup
from taskgroup import TaskGroup
- Runner
from taskgroup import Runner
- timeout
from taskgroup import timeout
- run
from taskgroup import run
Quickstart
import sys
import asyncio
# Conditionally import TaskGroup, run, and timeout based on Python version
if sys.version_info >= (3, 11):
from asyncio import TaskGroup, run, timeout
else:
from taskgroup import TaskGroup, run, timeout
async def task_printer(name, delay):
print(f"Task {name}: Starting in {delay} seconds...")
await asyncio.sleep(delay)
print(f"Task {name}: Finished.")
return f"Result from {name}"
async def main_task_group():
print("Main: Creating tasks...")
async with TaskGroup() as group:
task1 = group.create_task(task_printer("A", 2))
task2 = group.create_task(task_printer("B", 1))
task3 = group.create_task(task_printer("C", 3))
print("Main: All tasks in group finished.")
print(f"Task A result: {task1.result()}")
print(f"Task B result: {task2.result()}")
print(f"Task C result: {task3.result()}")
async def main_runner():
print("\n--- Using Runner ---")
async def inner_coro():
await task_printer("D", 0.5)
await task_printer("E", 1.5)
with Runner() as runner:
runner.run(inner_coro())
print("Runner finished.")
async def main_timeout():
print("\n--- Using Timeout ---")
async def long_running_task():
await asyncio.sleep(5)
print("Long running task completed (should not happen if timeout works).")
try:
async with timeout(2):
await long_running_task()
except TimeoutError:
print("Task timed out as expected.")
if __name__ == "__main__":
print(f"Running with {'asyncio' if sys.version_info >= (3, 11) else 'taskgroup backport'}.")
run(main_task_group())
run(main_runner())
run(main_timeout())