Dramatiq
Dramatiq is a fast, robust, and performant Python 3 background task processing library. It allows you to defer functions to run in the background, typically using message brokers like Redis or RabbitMQ. Currently at version 2.1.0, it maintains an active development cycle with frequent minor releases and occasional major versions introducing breaking changes.
Warnings
- breaking In v2.0.0, the `ResultMiddleware` constructor now requires a `backend` argument. Previously, it would implicitly try to infer one or default.
- breaking In v2.0.0, the `StubBroker.join()` `fail_fast` parameter's default value changed from `False` to `True`. This means `join()` will now raise an exception immediately on task failure.
- gotcha Using Gevent with free-threaded Python (e.g., Python 3.13+) is not recommended by Dramatiq and can lead to unexpected behavior. Dramatiq will issue a warning if this combination is detected.
- gotcha Dramatiq relies on a global broker instance configured via `dramatiq.set_broker()`. In multi-application environments, tests, or concurrent contexts, careful management is needed to ensure the correct broker is active for each operation.
- gotcha Specific broker backends (e.g., Redis, RabbitMQ) and other features (e.g., Prometheus metrics) are installed via optional 'extras' (e.g., `pip install dramatiq[redis]`). Forgetting these will result in `ModuleNotFoundError` or similar import errors.
Install
-
pip install dramatiq -
pip install dramatiq[redis] -
pip install dramatiq[rabbitmq]
Imports
- actor
from dramatiq import actor
- set_broker
from dramatiq import set_broker
- Broker
from dramatiq import Broker
- RedisBroker
from dramatiq.brokers.redis import RedisBroker
- RabbitmqBroker
from dramatiq.brokers.rabbitmq import RabbitmqBroker
- StubBroker
from dramatiq.brokers.stub import StubBroker
- ResultMiddleware
from dramatiq.middleware import ResultMiddleware
Quickstart
import dramatiq
from dramatiq.brokers.stub import StubBroker
import time
import os
# Configure the stub broker for local testing and synchronous processing
# Note: StubBroker processes tasks directly without a separate worker process.
# For real applications, use RedisBroker, RabbitmqBroker, etc., with 'pip install dramatiq[broker]'.
broker = StubBroker()
dramatiq.set_broker(broker)
@dramatiq.actor
def my_task(name):
print(f"[Task] Starting task for {name}...")
time.sleep(0.01) # Simulate some work
print(f"[Task] Task for {name} completed.")
return f"Hello, {name}!"
# Send a message to the broker
print("Sending message...")
message = my_task.send("World")
print(f"Sent message with ID: {message.message_id}")
# With StubBroker, you can explicitly process pending messages
# In a real application, a 'dramatiq worker' process would handle this.
broker.join(drop_messages=True) # Process all pending messages
print("All stub broker messages processed.")
# If ResultMiddleware and a backend were configured, you could retrieve results:
# try:
# result = message.get_result(block=True, timeout=1)
# print(f"Task result: {result}")
# except Exception as e:
# print(f"Could not get result: {e}")