{"id":5513,"library":"taskiq-redis","title":"Taskiq Redis Integration","description":"Taskiq-redis is a plugin for the `taskiq` asynchronous distributed task queue, providing Redis-based brokers and result backends. It enables tasks to be processed and their results stored using Redis's various data structures, including Lists, Pub/Sub, and Streams. The library is actively maintained, with a current version of 1.2.2, and sees frequent updates to address issues and introduce new features.","status":"active","version":"1.2.2","language":"en","source_language":"en","source_url":"https://github.com/taskiq-python/taskiq-redis","tags":["task queue","redis","async","broker","result backend","distributed tasks","scheduler"],"install":[{"cmd":"pip install taskiq taskiq-redis","lang":"bash","label":"Install core library and Redis integration"}],"dependencies":[{"reason":"Core task queue library; taskiq-redis is a plugin for it.","package":"taskiq","optional":false},{"reason":"Underlying Redis client library; taskiq-redis v1.2.0+ requires Redis v7+.","package":"redis>=7","optional":false}],"imports":[{"symbol":"ListQueueBroker","correct":"from taskiq_redis import ListQueueBroker"},{"symbol":"RedisAsyncResultBackend","correct":"from taskiq_redis import RedisAsyncResultBackend"},{"note":"The Stream broker is specifically named 'RedisStreamBroker'.","wrong":"from taskiq_redis import StreamBroker","symbol":"RedisStreamBroker","correct":"from taskiq_redis import RedisStreamBroker"},{"note":"RedisScheduleSource is deprecated and inefficient for high-volume use; ListRedisScheduleSource is the recommended replacement.","wrong":"from taskiq_redis import RedisScheduleSource","symbol":"ListRedisScheduleSource","correct":"from taskiq_redis import ListRedisScheduleSource"}],"quickstart":{"code":"import asyncio\nimport os\nfrom taskiq import TaskiqScheduler\nfrom taskiq_redis import ListQueueBroker, RedisAsyncResultBackend, ListRedisScheduleSource\n\nREDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379')\n\n# 1. Create a RedisAsyncResultBackend to store task results.\n# IMPORTANT: Always set result_ex_time or result_px_time to prevent unbounded Redis memory usage.\nredis_result_backend = RedisAsyncResultBackend(\n    redis_url=REDIS_URL,\n    result_ex_time=3600 # Results expire after 1 hour\n)\n\n# 2. Create a broker instance (e.g., ListQueueBroker for reliable single-consumer processing).\n# Pass the result backend to the broker.\nbroker = ListQueueBroker(\n    url=REDIS_URL,\n    result_backend=redis_result_backend\n)\n\n# 3. Define a task using the broker's decorator.\n@broker.task\nasync def my_simple_task(value: str) -> str:\n    print(f\"Executing task with value: {value}\")\n    await asyncio.sleep(1) # Simulate async work\n    return f\"Processed: {value.upper()}\"\n\n# 4. (Optional) Create a scheduler if you need scheduled tasks.\nscheduler = TaskiqScheduler(\n    broker=broker,\n    sources=[ListRedisScheduleSource(url=REDIS_URL)]\n)\n\nasync def main():\n    # Startup the broker (and scheduler if used).\n    await broker.startup()\n    if 'TASKIQ_RUN_SCHEDULER' in os.environ:\n        await scheduler.startup()\n\n    # 5. Send a task to the broker.\n    task = await my_simple_task.kiq(\"hello taskiq\")\n    print(f\"Task sent with ID: {task.task_id}\")\n\n    # 6. Wait for the result.\n    result = await task.wait_result(timeout=10)\n    if result.is_err:\n        print(f\"Task failed: {result.error}\")\n    else:\n        print(f\"Task result: {result.return_value}\")\n\n    # Shutdown the broker (and scheduler).\n    if 'TASKIQ_RUN_SCHEDULER' in os.environ:\n        await scheduler.shutdown()\n    await broker.shutdown()\n\nif __name__ == \"__main__\":\n    # To run this example:\n    # 1. Ensure a Redis server is running (e.g., via Docker: docker run --name some-redis -p 6379:6379 -d redis)\n    # 2. Save this code as 'my_app.py'.\n    # 3. In one terminal, start the worker: taskiq worker my_app:broker\n    # 4. In another terminal, run the script to send tasks: python my_app.py\n    # 5. To enable scheduler, set TASKIQ_RUN_SCHEDULER=1 before running the script and worker:\n    #    TASKIQ_RUN_SCHEDULER=1 python my_app.py\n    #    TASKIQ_RUN_SCHEDULER=1 taskiq scheduler my_app:scheduler\n    asyncio.run(main())\n","lang":"python","description":"This quickstart demonstrates how to set up a `taskiq-redis` broker and result backend, define a task, send it, and retrieve its result. It highlights the importance of setting an expiration time for results to manage Redis memory usage. It also includes an optional scheduler setup for periodic tasks."},"warnings":[{"fix":"Always configure either `result_ex_time` (seconds) or `result_px_time` (milliseconds) when initializing `RedisAsyncResultBackend`. For example: `RedisAsyncResultBackend(redis_url=REDIS_URL, result_ex_time=3600)`.","message":"Failing to set `result_ex_time` or `result_px_time` in `RedisAsyncResultBackend` will cause task results to persist indefinitely in Redis, leading to unbounded memory growth and potential performance issues.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Migrate to `ListRedisScheduleSource` for scheduling. This source is designed for more efficient dynamic scheduling by storing schedules in lists, reducing the overhead of retrieving them.","message":"The `RedisScheduleSource` is inefficient for high-volume or dynamic schedules as it performs a full `SCAN` of Redis keys, leading to slow performance. It has been deprecated.","severity":"deprecated","affected_versions":"<= 1.0.7 (deprecated in 1.0.7, effectively removed in later versions, replaced by ListRedisScheduleSource)"},{"fix":"If you need messages to be processed exactly once by a single worker and require message durability, use `ListQueueBroker` (for a simple FIFO queue) or `RedisStreamBroker` (for more advanced stream processing with consumer groups and acknowledgements).","message":"Using `PubSubBroker` (instead of `ListQueueBroker` or `RedisStreamBroker`) delivers messages to *all* subscribed workers, rather than distributing them to a single worker. It also does not support acknowledgements, meaning messages can be lost if a worker fails during processing.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Ensure your project's `redis` library dependency is updated to version 7 or higher. Check your `requirements.txt` or `pyproject.toml` and upgrade `redis` if necessary.","message":"Version 1.2.0 of `taskiq-redis` updated its internal `redis` dependency, requiring `redis-py` version 7 or newer. This might cause compatibility issues if your project uses an older version of `redis-py`.","severity":"breaking","affected_versions":">= 1.2.0"},{"fix":"While `taskiq-redis` does not directly expose a way to inject an existing connection pool into `RedisStreamBroker`'s constructor, you might consider manually interacting with Redis Streams (`XADD`) using your existing client if strict resource sharing is critical, or accept the broker's dedicated connection pool.","message":"When using `RedisStreamBroker`, instantiating it creates a new Redis connection pool. This can lead to inefficient resource usage if your application already has an existing `redis.asyncio.Redis` client and you wish to reuse its connection pool.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Upgrade to `taskiq-redis` version 1.2.2 or newer, which includes a fix that adds a timeout to the `RedisStreamBroker`'s `xautoclaim` lock.","message":"Older versions of `RedisStreamBroker` (prior to 1.2.2) could experience infinite locking issues with the `xautoclaim` lock, potentially preventing tasks from being processed.","severity":"gotcha","affected_versions":"< 1.2.2"},{"fix":"Careful consideration of Redis Cluster key distribution is needed. For scheduled tasks, manual hash tag assignment might be required, though this could lead to hot-spotting. For `RedisStreamBroker` specifically, ensure consumer groups are properly initialized across the cluster or consider alternative brokers for clustered environments if issues persist.","message":"Using `taskiq-redis` schedule sources (e.g., `RedisScheduleSource`, potentially `ListRedisScheduleSource` in certain configurations) with a Redis Cluster can lead to `NOGROUP` or `mget` errors due to tasks not being stored in the same key slot across cluster nodes.","severity":"gotcha","affected_versions":"All versions, when used with Redis Cluster"}],"env_vars":null,"last_verified":"2026-04-13T00:00:00.000Z","next_check":"2026-07-12T00:00:00.000Z"}