AMQP Low-Level Client
amqp is a low-level AMQP client library for Python, maintained by the Celery project. It is a modern fork of the `amqplib` library, providing essential functionality for interacting with AMQP brokers like RabbitMQ. The library supports AMQP 0-9-1 and is actively maintained, with regular releases (current version 5.3.1) and support for recent Python versions (>=3.6). It's often used in scenarios requiring fine-grained control over AMQP messaging.
Warnings
- breaking amqp is a fork of `amqplib` and introduces significant API differences, including changes in method signatures and the underlying AMQP protocol version (0-9-1 in `amqp` vs. 0-8 in `amqplib`). Projects migrating from `amqplib` will require code changes.
- breaking Python 3.6 and 3.7 support has been dropped in recent major versions. Ensure your environment meets the `requires_python` specification.
- breaking Several core exception classes were renamed for Pythonic consistency (e.g., `AMQPException` became `AMQPError`, `AMQPConnectionException` became `ConnectionError`, `AMQPChannelException` became `ChannelError`).
- gotcha Long-running connections require explicit heartbeat management. If `Connection.heartbeat_tick(rate=2)` or `Connection.send_heartbeat()` are not called periodically, the connection may silently drop or experience hangs, especially in systems with network intermediaries that terminate idle connections.
- gotcha A regression in early 5.0.x versions could lead to `CHANNEL_ERROR/ChannelNotOpen` exceptions. This was fixed in a later patch release.
- gotcha When using RabbitMQ 4.x+, custom `frame_max` values set in client code below 8192 bytes (the new default) may cause connection issues. The recommended approach is to either not override `frame_max` or set it to 131072 bytes (the default server value).
Install
-
pip install amqp
Imports
- Connection
from amqp import Connection
- Channel
from amqp import Channel
- BasicMessage
from amqp import BasicMessage
- AMQPError
from amqp.exceptions import AMQPError
Quickstart
import os
from amqp import Connection, BasicMessage
import time
# Replace with your AMQP broker URL (e.g., 'amqp://guest:guest@localhost:5672/')
AMQP_URL = os.environ.get('AMQP_BROKER_URL', 'amqp://guest:guest@localhost:5672/')
QUEUE_NAME = 'my_test_queue'
def publish_message(body, connection_url):
with Connection(connection_url) as connection:
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME, durable=True)
message = BasicMessage(body=body.encode('utf-8'))
channel.basic_publish(message, routing_key=QUEUE_NAME)
print(f"[x] Sent '{body}'")
def consume_message(connection_url):
with Connection(connection_url) as connection:
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME, durable=True)
def callback(message):
print(f"[x] Received '{message.body.decode()}'")
channel.basic_ack(message.delivery_tag)
channel.basic_consume(queue=QUEUE_NAME, callback=callback)
print(f' [*] Waiting for messages on {QUEUE_NAME}. To exit press CTRL+C')
try:
while True:
connection.drain_events() # Crucial for processing events and heartbeats
except KeyboardInterrupt:
print("Exiting consumer.")
if __name__ == "__main__":
# Example usage: Publisher
print("--- Publisher ---")
publish_message("Hello, AMQP!", AMQP_URL)
publish_message("Another message.", AMQP_URL)
# Example usage: Consumer (run in a separate process/thread or after publisher finishes)
# For demonstration, we'll wait a bit and then consume
print("\n--- Consumer ---")
time.sleep(2) # Give publisher time to send messages
consume_message(AMQP_URL)