BullMQ for Python

2.20.3 · active · verified Sat Apr 11

BullMQ for Python is an official Python port of the popular Node.js message queue, designed for reliable background job processing using Redis. It leverages `asyncio` for efficient, concurrent task execution and is interoperable with its Node.js counterpart due to shared Lua scripts. Currently at version 2.20.3, the library sees active development with frequent bug fixes and feature enhancements, including new major versions that may introduce breaking changes.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to add jobs to a BullMQ queue and process them with a worker. It uses `asyncio` for asynchronous operations. Ensure a Redis server is running (e.g., via `docker run -d -p 6379:6379 redis:latest`) and configure the `REDIS_URL` environment variable or provide explicit connection details. In a production environment, the queue producer and worker would typically run in separate processes or services.

import asyncio
import os
from bullmq import Queue, Worker

REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379')

async def add_job_to_queue():
    # Connect to Redis. Pass 'connection' as a dictionary in options.
    queue = Queue("myQueue", connection={"host": "localhost", "port": 6379})
    print(f"Adding job to queue 'myQueue' on {REDIS_URL}")
    job = await queue.add("myJobName", {"foo": "bar"})
    print(f"Job added with ID: {job.id}, Data: {job.data}")
    await queue.close()

async def process_job(job, job_token):
    print(f"Processing job {job.id} with data: {job.data}")
    # Simulate async work
    await asyncio.sleep(1)
    return {"status": "completed", "original_data": job.data}

async def start_worker():
    print(f"Starting worker for queue 'myQueue' on {REDIS_URL}")
    # Connect to Redis. Pass 'connection' as a dictionary in options.
    worker = Worker("myQueue", process_job, connection={"host": "localhost", "port": 6379})

    # You can listen for worker events (optional)
    worker.on("completed", lambda job, result: print(f"Job {job.id} completed with result: {result}"))
    worker.on("failed", lambda job, err: print(f"Job {job.id} failed with error: {err}"))

    print("Worker started. Press Ctrl+C to stop.")
    # Keep the worker running (e.g., for a long time in a real application)
    try:
        while True: # Keep worker alive for demonstration
            await asyncio.sleep(3600) # Sleep for a long time
    except asyncio.CancelledError:
        pass
    finally:
        print("Shutting down worker...")
        await worker.close()

async def main():
    # This example assumes a Redis server is running at localhost:6379
    # For real applications, use environment variables for connection details.
    # docker run -d -p 6379:6379 redis:latest

    # Run adding a job and starting a worker concurrently
    # For a real application, these would typically run in separate processes/services.
    await asyncio.gather(add_job_to_queue(), start_worker())

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("Application stopped by user.")

view raw JSON →