Taskiq: Asynchronous Distributed Task Queue
Taskiq is an asynchronous distributed task queue for Python, inspired by projects like Celery and Dramatiq. It supports both synchronous and asynchronous functions and integrates with popular async frameworks like FastAPI and AioHTTP. Taskiq is actively maintained with frequent releases, currently at version 0.12.1.
Warnings
- breaking Support for Python 3.9 was dropped in Taskiq version 0.12.0. Users on Python 3.9 must either upgrade their Python version or stay on an older Taskiq release.
- gotcha The `InMemoryBroker` is designed for local development and testing only. It does not send messages over a network and cannot be used for distributed task execution in production environments.
- gotcha Calling `broker.startup()` and `broker.shutdown()` is essential for all brokers. Failing to call `startup()` can lead to undefined behavior or tasks not being processed, while `shutdown()` ensures resources are properly released.
- gotcha In Taskiq `0.12.0`, `TaskiqAdminMiddleware` did not correctly handle tasks with `dataclasses` in their arguments or return values, leading to serialization issues.
- gotcha When using `taskiq worker --fs-discover` (file system discover) with the default `--tasks-pattern 'task.py'`, Taskiq may attempt to import `task.py` files from third-party libraries installed in your virtual environment, leading to `ImportError` or unexpected behavior.
- gotcha The `ZeroMQBroker` is explicitly stated to be suitable for projects with only ONE worker process. If multiple workers are connected to a `ZeroMQBroker`, tasks will be executed N times (where N is the number of workers) due to ZMQ's publish-subscribe architecture.
Install
-
pip install taskiq -
pip install taskiq-redis -
pip install "taskiq[reload]"
Imports
- InMemoryBroker
from taskiq import InMemoryBroker
- TaskiqScheduler
from taskiq import TaskiqScheduler
- TaskiqAdminMiddleware
from taskiq.middlewares.taskiq_admin_middleware import TaskiqAdminMiddleware
- RedisStreamBroker
from taskiq_redis import RedisStreamBroker
- RedisAsyncResultBackend
from taskiq_redis import RedisAsyncResultBackend
Quickstart
import asyncio
from taskiq import InMemoryBroker
# 1. Create a broker instance.
# InMemoryBroker is for local development only. For production, use taskiq-redis, taskiq-aio-pika, etc.
broker = InMemoryBroker()
# 2. Define a task using the broker's decorator.
@broker.task
async def add_numbers(a: int, b: int) -> int:
print(f"Executing add_numbers({a}, {b})")
await asyncio.sleep(0.1) # Simulate some async work
return a + b
async def main():
# 3. Startup the broker. This is crucial for proper functioning.
await broker.startup()
# 4. Send the task to the broker.
task = await add_numbers.kiq(1, 2)
# 5. Wait for the result.
result = await task.wait_result(timeout=5)
if result.is_err:
print(f"Task failed: {result.error}")
else:
print(f"Task result: {result.return_value}")
# 6. Shutdown the broker.
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
# To run with a worker (for distributed brokers, e.g., Redis):
# 1. Save the above code as 'my_app.py'.
# 2. Start an external broker (e.g., Redis).
# 3. Run the worker from your terminal:
# taskiq worker my_app:broker
# 4. Run the Python script (my_app.py) to send tasks.