BullMQ for Python
BullMQ for Python is an official Python port of the popular Node.js message queue, designed for reliable background job processing using Redis. It leverages `asyncio` for efficient, concurrent task execution and is interoperable with its Node.js counterpart due to shared Lua scripts. Currently at version 2.20.3, the library sees active development with frequent bug fixes and feature enhancements, including new major versions that may introduce breaking changes.
Warnings
- breaking BullMQ v2.0.0 introduced breaking changes. Specifically, Redis connection parameters must now be provided as part of the `options` dictionary for `Queue` and `Worker` constructors (e.g., `connection={'host': 'localhost', 'port': 6379}`). Additionally, worker markers now use a dedicated key in Redis instead of a special job ID, which impacts internal state management.
- gotcha Workers may encounter 'Missing lock for job X.moveToFinished' errors. This usually means a job lost its lock during processing. Common causes include high CPU usage on the worker preventing lock renewal, loss of communication with Redis, or the job being forcefully removed.
- gotcha The `process` function for a BullMQ `Worker` is expected to have two positional arguments: `job` and `job_token`, even if `job_token` is not explicitly used for manual job manipulation. Omitting `job_token` will cause a `TypeError`.
- gotcha Redis-py (the underlying client for BullMQ Python) returns binary responses by default. If you are using a custom Redis client configuration and expect string responses, you must pass `decode_responses=True` to the Redis client constructor.
- gotcha Passing undefined, empty, or non-string values (e.g., objects or arrays) when using environment variables or other dynamic inputs with BullMQ methods can lead to `ERR Error running script ... Lua redis() command arguments must be strings or integers` errors.
- gotcha For robust error handling and retries, your worker's processor function should always raise Python `Exception` objects (or subclasses thereof) when a job fails. BullMQ relies on catching these exceptions to mark jobs as failed and apply retry logic based on `attempts` and `backoff` options.
Install
-
pip install bullmq
Imports
- Queue
from bullmq import Queue
- Worker
from bullmq import Worker
- QueueEvents
from bullmq import QueueEvents
- FlowProducer
from bullmq import FlowProducer
Quickstart
import asyncio
import os
from bullmq import Queue, Worker
REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379')
async def add_job_to_queue():
# Connect to Redis. Pass 'connection' as a dictionary in options.
queue = Queue("myQueue", connection={"host": "localhost", "port": 6379})
print(f"Adding job to queue 'myQueue' on {REDIS_URL}")
job = await queue.add("myJobName", {"foo": "bar"})
print(f"Job added with ID: {job.id}, Data: {job.data}")
await queue.close()
async def process_job(job, job_token):
print(f"Processing job {job.id} with data: {job.data}")
# Simulate async work
await asyncio.sleep(1)
return {"status": "completed", "original_data": job.data}
async def start_worker():
print(f"Starting worker for queue 'myQueue' on {REDIS_URL}")
# Connect to Redis. Pass 'connection' as a dictionary in options.
worker = Worker("myQueue", process_job, connection={"host": "localhost", "port": 6379})
# You can listen for worker events (optional)
worker.on("completed", lambda job, result: print(f"Job {job.id} completed with result: {result}"))
worker.on("failed", lambda job, err: print(f"Job {job.id} failed with error: {err}"))
print("Worker started. Press Ctrl+C to stop.")
# Keep the worker running (e.g., for a long time in a real application)
try:
while True: # Keep worker alive for demonstration
await asyncio.sleep(3600) # Sleep for a long time
except asyncio.CancelledError:
pass
finally:
print("Shutting down worker...")
await worker.close()
async def main():
# This example assumes a Redis server is running at localhost:6379
# For real applications, use environment variables for connection details.
# docker run -d -p 6379:6379 redis:latest
# Run adding a job and starting a worker concurrently
# For a real application, these would typically run in separate processes/services.
await asyncio.gather(add_job_to_queue(), start_worker())
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Application stopped by user.")