Taskiq Redis Integration

1.2.2 · active · verified Mon Apr 13

Taskiq-redis is a plugin for the `taskiq` asynchronous distributed task queue, providing Redis-based brokers and result backends. It enables tasks to be processed and their results stored using Redis's various data structures, including Lists, Pub/Sub, and Streams. The library is actively maintained, with a current version of 1.2.2, and sees frequent updates to address issues and introduce new features.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to set up a `taskiq-redis` broker and result backend, define a task, send it, and retrieve its result. It highlights the importance of setting an expiration time for results to manage Redis memory usage. It also includes an optional scheduler setup for periodic tasks.

import asyncio
import os
from taskiq import TaskiqScheduler
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend, ListRedisScheduleSource

REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379')

# 1. Create a RedisAsyncResultBackend to store task results.
# IMPORTANT: Always set result_ex_time or result_px_time to prevent unbounded Redis memory usage.
redis_result_backend = RedisAsyncResultBackend(
    redis_url=REDIS_URL,
    result_ex_time=3600 # Results expire after 1 hour
)

# 2. Create a broker instance (e.g., ListQueueBroker for reliable single-consumer processing).
# Pass the result backend to the broker.
broker = ListQueueBroker(
    url=REDIS_URL,
    result_backend=redis_result_backend
)

# 3. Define a task using the broker's decorator.
@broker.task
async def my_simple_task(value: str) -> str:
    print(f"Executing task with value: {value}")
    await asyncio.sleep(1) # Simulate async work
    return f"Processed: {value.upper()}"

# 4. (Optional) Create a scheduler if you need scheduled tasks.
scheduler = TaskiqScheduler(
    broker=broker,
    sources=[ListRedisScheduleSource(url=REDIS_URL)]
)

async def main():
    # Startup the broker (and scheduler if used).
    await broker.startup()
    if 'TASKIQ_RUN_SCHEDULER' in os.environ:
        await scheduler.startup()

    # 5. Send a task to the broker.
    task = await my_simple_task.kiq("hello taskiq")
    print(f"Task sent with ID: {task.task_id}")

    # 6. Wait for the result.
    result = await task.wait_result(timeout=10)
    if result.is_err:
        print(f"Task failed: {result.error}")
    else:
        print(f"Task result: {result.return_value}")

    # Shutdown the broker (and scheduler).
    if 'TASKIQ_RUN_SCHEDULER' in os.environ:
        await scheduler.shutdown()
    await broker.shutdown()

if __name__ == "__main__":
    # To run this example:
    # 1. Ensure a Redis server is running (e.g., via Docker: docker run --name some-redis -p 6379:6379 -d redis)
    # 2. Save this code as 'my_app.py'.
    # 3. In one terminal, start the worker: taskiq worker my_app:broker
    # 4. In another terminal, run the script to send tasks: python my_app.py
    # 5. To enable scheduler, set TASKIQ_RUN_SCHEDULER=1 before running the script and worker:
    #    TASKIQ_RUN_SCHEDULER=1 python my_app.py
    #    TASKIQ_RUN_SCHEDULER=1 taskiq scheduler my_app:scheduler
    asyncio.run(main())

view raw JSON →