Kombu
Kombu is an asynchronous messaging library for Python, providing a high-level interface to the AMQP protocol and supporting various message brokers. It abstracts away the complexities of message passing, allowing developers to focus on application logic. Currently at version 5.6.2, Kombu maintains an active development and release cadence, often aligning with updates in its parent project, Celery.
Warnings
- breaking The use of `datetime.datetime.utcnow()` is deprecated across Python and has been replaced with timezone-aware `datetime.datetime.now(datetime.UTC)` in Kombu v5.6.0. Code using `utcnow()` might encounter deprecation warnings or unexpected behavior with timezone handling.
- breaking Kombu v5.5.4 introduced a change where `redis.connection.ConnectionPool.get_connection` no longer accepts arguments. This can break applications using older `redis-py` versions or custom connection pool logic that passed arguments to this method.
- gotcha Kombu v5.6.0 introduced the `max_prefetch` parameter for `kombu.common.QoS` to prevent Out Of Memory (OOM) crashes with queues flooded by ETA/countdown tasks. By default, it's `None` (unlimited), which can still lead to OOM errors under heavy load.
- gotcha A `credential_provider` compatibility issue with `redis-py < 5.3.0` was fixed in Kombu v5.6.2. Users relying on `credential_provider` with older `redis-py` versions might have experienced issues.
- breaking As of Kombu v5.6.0, MongoDB transport URI options are normalized to lowercase and flattened (e.g., `replicaSet=test_rs` becomes `options['replicaset']`). This changes how options are accessed and might break existing configurations relying on case-sensitive or nested structures.
- breaking Kombu v5.0.0 and subsequent v5.x releases require the `amqp` library version to be `>5.0`. Older versions of Kombu v5.0.x were yanked from PyPI due to not enforcing this dependency correctly, leading to potential dependency conflicts and runtime errors.
- gotcha When working with Kombu `Connection` objects, especially when using connection pools, it's best practice to use `connection.release()` instead of `connection.close()`. `release()` returns the connection to the pool, while `close()` forcefully closes it, potentially disrupting other parts of your application that expect the pool to manage connections.
Install
-
pip install kombu -
pip install 'kombu[redis,librabbitmq,sqs,yaml]'
Imports
- Connection
from kombu import Connection
- Producer
from kombu import Producer
- Consumer
from kombu import Consumer
- Exchange
from kombu import Exchange
- Queue
from kombu import Queue
- QoS
from kombu.common import QoS
- eventloop
from kombu.common import eventloop
Quickstart
import datetime
from kombu import Connection
BROKER_URL = 'redis://localhost:6379/0' # Use os.environ.get('BROKER_URL', '...') in production
# --- Publisher ---
with Connection(BROKER_URL) as conn:
simple_queue = conn.SimpleQueue('simple_queue')
message_payload = f'helloworld, sent at {datetime.datetime.now()}'
simple_queue.put(message_payload)
print(f'Sent: {message_payload}')
simple_queue.close()
# --- Consumer ---
with Connection(BROKER_URL) as conn:
simple_queue = conn.SimpleQueue('simple_queue')
try:
message = simple_queue.get(block=True, timeout=5)
print(f'Received: {message.payload}')
message.ack()
except conn.Empty: # Or kombu.exceptions.TimeoutError
print('No messages received within timeout.')
finally:
simple_queue.close()