Taskiq: Asynchronous Distributed Task Queue

0.12.1 · active · verified Sun Apr 12

Taskiq is an asynchronous distributed task queue for Python, inspired by projects like Celery and Dramatiq. It supports both synchronous and asynchronous functions and integrates with popular async frameworks like FastAPI and AioHTTP. Taskiq is actively maintained with frequent releases, currently at version 0.12.1.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to define and execute an asynchronous task using Taskiq's `InMemoryBroker`. It covers defining a broker, decorating a function as a task, sending the task using `.kiq()`, and retrieving its result with `.wait_result()`. For distributed usage, an external worker process would be started to consume tasks from a real broker (like Redis or RabbitMQ).

import asyncio
from taskiq import InMemoryBroker

# 1. Create a broker instance.
# InMemoryBroker is for local development only. For production, use taskiq-redis, taskiq-aio-pika, etc.
broker = InMemoryBroker()

# 2. Define a task using the broker's decorator.
@broker.task
async def add_numbers(a: int, b: int) -> int:
    print(f"Executing add_numbers({a}, {b})")
    await asyncio.sleep(0.1) # Simulate some async work
    return a + b

async def main():
    # 3. Startup the broker. This is crucial for proper functioning.
    await broker.startup()

    # 4. Send the task to the broker.
    task = await add_numbers.kiq(1, 2)

    # 5. Wait for the result.
    result = await task.wait_result(timeout=5)

    if result.is_err:
        print(f"Task failed: {result.error}")
    else:
        print(f"Task result: {result.return_value}")
    
    # 6. Shutdown the broker.
    await broker.shutdown()

if __name__ == "__main__":
    asyncio.run(main())

# To run with a worker (for distributed brokers, e.g., Redis):
# 1. Save the above code as 'my_app.py'.
# 2. Start an external broker (e.g., Redis).
# 3. Run the worker from your terminal:
#    taskiq worker my_app:broker
# 4. Run the Python script (my_app.py) to send tasks.

view raw JSON →