{"id":3426,"library":"bullmq","title":"BullMQ for Python","description":"BullMQ for Python is an official Python port of the popular Node.js message queue, designed for reliable background job processing using Redis. It leverages `asyncio` for efficient, concurrent task execution and is interoperable with its Node.js counterpart due to shared Lua scripts. Currently at version 2.20.3, the library sees active development with frequent bug fixes and feature enhancements, including new major versions that may introduce breaking changes.","status":"active","version":"2.20.3","language":"en","source_language":"en","source_url":"https://github.com/taskforcesh/bullmq/tree/master/python","tags":["message queue","background jobs","redis","asyncio","distributed tasks","queue"],"install":[{"cmd":"pip install bullmq","lang":"bash","label":"Install BullMQ"}],"dependencies":[{"reason":"Python Redis client for communication with the Redis server.","package":"redis","optional":false},{"reason":"MessagePack serialization for efficient data handling.","package":"msgpack","optional":false},{"reason":"Semantic versioning utilities.","package":"semver","optional":false},{"reason":"External dependency: a running Redis 5.0+ server (6.2+ recommended) is required for BullMQ to function.","package":"Redis","optional":false}],"imports":[{"symbol":"Queue","correct":"from bullmq import Queue"},{"symbol":"Worker","correct":"from bullmq import Worker"},{"symbol":"QueueEvents","correct":"from bullmq import QueueEvents"},{"symbol":"FlowProducer","correct":"from bullmq import FlowProducer"}],"quickstart":{"code":"import asyncio\nimport os\nfrom bullmq import Queue, Worker\n\nREDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379')\n\nasync def add_job_to_queue():\n    # Connect to Redis. Pass 'connection' as a dictionary in options.\n    queue = Queue(\"myQueue\", connection={\"host\": \"localhost\", \"port\": 6379})\n    print(f\"Adding job to queue 'myQueue' on {REDIS_URL}\")\n    job = await queue.add(\"myJobName\", {\"foo\": \"bar\"})\n    print(f\"Job added with ID: {job.id}, Data: {job.data}\")\n    await queue.close()\n\nasync def process_job(job, job_token):\n    print(f\"Processing job {job.id} with data: {job.data}\")\n    # Simulate async work\n    await asyncio.sleep(1)\n    return {\"status\": \"completed\", \"original_data\": job.data}\n\nasync def start_worker():\n    print(f\"Starting worker for queue 'myQueue' on {REDIS_URL}\")\n    # Connect to Redis. Pass 'connection' as a dictionary in options.\n    worker = Worker(\"myQueue\", process_job, connection={\"host\": \"localhost\", \"port\": 6379})\n\n    # You can listen for worker events (optional)\n    worker.on(\"completed\", lambda job, result: print(f\"Job {job.id} completed with result: {result}\"))\n    worker.on(\"failed\", lambda job, err: print(f\"Job {job.id} failed with error: {err}\"))\n\n    print(\"Worker started. Press Ctrl+C to stop.\")\n    # Keep the worker running (e.g., for a long time in a real application)\n    try:\n        while True: # Keep worker alive for demonstration\n            await asyncio.sleep(3600) # Sleep for a long time\n    except asyncio.CancelledError:\n        pass\n    finally:\n        print(\"Shutting down worker...\")\n        await worker.close()\n\nasync def main():\n    # This example assumes a Redis server is running at localhost:6379\n    # For real applications, use environment variables for connection details.\n    # docker run -d -p 6379:6379 redis:latest\n\n    # Run adding a job and starting a worker concurrently\n    # For a real application, these would typically run in separate processes/services.\n    await asyncio.gather(add_job_to_queue(), start_worker())\n\nif __name__ == \"__main__\":\n    try:\n        asyncio.run(main())\n    except KeyboardInterrupt:\n        print(\"Application stopped by user.\")\n","lang":"python","description":"This quickstart demonstrates how to add jobs to a BullMQ queue and process them with a worker. It uses `asyncio` for asynchronous operations. Ensure a Redis server is running (e.g., via `docker run -d -p 6379:6379 redis:latest`) and configure the `REDIS_URL` environment variable or provide explicit connection details. In a production environment, the queue producer and worker would typically run in separate processes or services."},"warnings":[{"fix":"Update `Queue` and `Worker` instantiation to pass connection details within the `connection` key of the options dictionary. Review application logic if directly interacting with internal Redis keys related to worker markers.","message":"BullMQ v2.0.0 introduced breaking changes. Specifically, Redis connection parameters must now be provided as part of the `options` dictionary for `Queue` and `Worker` constructors (e.g., `connection={'host': 'localhost', 'port': 6379}`). Additionally, worker markers now use a dedicated key in Redis instead of a special job ID, which impacts internal state management.","severity":"breaking","affected_versions":">=2.0.0"},{"fix":"Reduce worker concurrency, optimize job processing code, ensure stable network connectivity to Redis, consider increasing `lockDuration` in worker options, and implement proper job state checks before attempting to remove jobs. Ensure Redis `maxmemory-policy` is set to `noeviction`.","message":"Workers may encounter 'Missing lock for job X.moveToFinished' errors. This usually means a job lost its lock during processing. Common causes include high CPU usage on the worker preventing lock renewal, loss of communication with Redis, or the job being forcefully removed.","severity":"gotcha","affected_versions":"All"},{"fix":"Always define your worker's `process` function with `async def process(job, job_token): ...`.","message":"The `process` function for a BullMQ `Worker` is expected to have two positional arguments: `job` and `job_token`, even if `job_token` is not explicitly used for manual job manipulation. Omitting `job_token` will cause a `TypeError`.","severity":"gotcha","affected_versions":"All (especially in v2.x documentation examples)"},{"fix":"When creating a custom Redis connection object for BullMQ, ensure `decode_responses=True` is set if you need decoded string responses.","message":"Redis-py (the underlying client for BullMQ Python) returns binary responses by default. If you are using a custom Redis client configuration and expect string responses, you must pass `decode_responses=True` to the Redis client constructor.","severity":"gotcha","affected_versions":"All"},{"fix":"Always validate and sanitize input, especially for connection parameters, queue names, and job data. Ensure all values passed to BullMQ or its underlying Redis commands are strings or integers. Use `os.environ.get('KEY', '')` or raise errors if critical environment variables are missing.","message":"Passing undefined, empty, or non-string values (e.g., objects or arrays) when using environment variables or other dynamic inputs with BullMQ methods can lead to `ERR Error running script ... Lua redis() command arguments must be strings or integers` errors.","severity":"gotcha","affected_versions":"All"},{"fix":"Ensure that any failure condition within your `process` function explicitly raises an `Exception` (e.g., `raise ValueError('Invalid data')`). Configure `attempts` and `backoff` options when adding jobs to the queue for automatic retries.","message":"For robust error handling and retries, your worker's processor function should always raise Python `Exception` objects (or subclasses thereof) when a job fails. BullMQ relies on catching these exceptions to mark jobs as failed and apply retry logic based on `attempts` and `backoff` options.","severity":"gotcha","affected_versions":"All"}],"env_vars":null,"last_verified":"2026-04-11T00:00:00.000Z","next_check":"2026-07-10T00:00:00.000Z"}