Taskiq Redis Integration
Taskiq-redis is a plugin for the `taskiq` asynchronous distributed task queue, providing Redis-based brokers and result backends. It enables tasks to be processed and their results stored using Redis's various data structures, including Lists, Pub/Sub, and Streams. The library is actively maintained, with a current version of 1.2.2, and sees frequent updates to address issues and introduce new features.
Warnings
- gotcha Failing to set `result_ex_time` or `result_px_time` in `RedisAsyncResultBackend` will cause task results to persist indefinitely in Redis, leading to unbounded memory growth and potential performance issues.
- deprecated The `RedisScheduleSource` is inefficient for high-volume or dynamic schedules as it performs a full `SCAN` of Redis keys, leading to slow performance. It has been deprecated.
- gotcha Using `PubSubBroker` (instead of `ListQueueBroker` or `RedisStreamBroker`) delivers messages to *all* subscribed workers, rather than distributing them to a single worker. It also does not support acknowledgements, meaning messages can be lost if a worker fails during processing.
- breaking Version 1.2.0 of `taskiq-redis` updated its internal `redis` dependency, requiring `redis-py` version 7 or newer. This might cause compatibility issues if your project uses an older version of `redis-py`.
- gotcha When using `RedisStreamBroker`, instantiating it creates a new Redis connection pool. This can lead to inefficient resource usage if your application already has an existing `redis.asyncio.Redis` client and you wish to reuse its connection pool.
- gotcha Older versions of `RedisStreamBroker` (prior to 1.2.2) could experience infinite locking issues with the `xautoclaim` lock, potentially preventing tasks from being processed.
- gotcha Using `taskiq-redis` schedule sources (e.g., `RedisScheduleSource`, potentially `ListRedisScheduleSource` in certain configurations) with a Redis Cluster can lead to `NOGROUP` or `mget` errors due to tasks not being stored in the same key slot across cluster nodes.
Install
-
pip install taskiq taskiq-redis
Imports
- ListQueueBroker
from taskiq_redis import ListQueueBroker
- RedisAsyncResultBackend
from taskiq_redis import RedisAsyncResultBackend
- RedisStreamBroker
from taskiq_redis import RedisStreamBroker
- ListRedisScheduleSource
from taskiq_redis import ListRedisScheduleSource
Quickstart
import asyncio
import os
from taskiq import TaskiqScheduler
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend, ListRedisScheduleSource
REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379')
# 1. Create a RedisAsyncResultBackend to store task results.
# IMPORTANT: Always set result_ex_time or result_px_time to prevent unbounded Redis memory usage.
redis_result_backend = RedisAsyncResultBackend(
redis_url=REDIS_URL,
result_ex_time=3600 # Results expire after 1 hour
)
# 2. Create a broker instance (e.g., ListQueueBroker for reliable single-consumer processing).
# Pass the result backend to the broker.
broker = ListQueueBroker(
url=REDIS_URL,
result_backend=redis_result_backend
)
# 3. Define a task using the broker's decorator.
@broker.task
async def my_simple_task(value: str) -> str:
print(f"Executing task with value: {value}")
await asyncio.sleep(1) # Simulate async work
return f"Processed: {value.upper()}"
# 4. (Optional) Create a scheduler if you need scheduled tasks.
scheduler = TaskiqScheduler(
broker=broker,
sources=[ListRedisScheduleSource(url=REDIS_URL)]
)
async def main():
# Startup the broker (and scheduler if used).
await broker.startup()
if 'TASKIQ_RUN_SCHEDULER' in os.environ:
await scheduler.startup()
# 5. Send a task to the broker.
task = await my_simple_task.kiq("hello taskiq")
print(f"Task sent with ID: {task.task_id}")
# 6. Wait for the result.
result = await task.wait_result(timeout=10)
if result.is_err:
print(f"Task failed: {result.error}")
else:
print(f"Task result: {result.return_value}")
# Shutdown the broker (and scheduler).
if 'TASKIQ_RUN_SCHEDULER' in os.environ:
await scheduler.shutdown()
await broker.shutdown()
if __name__ == "__main__":
# To run this example:
# 1. Ensure a Redis server is running (e.g., via Docker: docker run --name some-redis -p 6379:6379 -d redis)
# 2. Save this code as 'my_app.py'.
# 3. In one terminal, start the worker: taskiq worker my_app:broker
# 4. In another terminal, run the script to send tasks: python my_app.py
# 5. To enable scheduler, set TASKIQ_RUN_SCHEDULER=1 before running the script and worker:
# TASKIQ_RUN_SCHEDULER=1 python my_app.py
# TASKIQ_RUN_SCHEDULER=1 taskiq scheduler my_app:scheduler
asyncio.run(main())