AMQP Low-Level Client
raw JSON → 5.3.1 verified Tue May 12 auth: no python install: verified quickstart: stale
amqp is a low-level AMQP client library for Python, maintained by the Celery project. It is a modern fork of the `amqplib` library, providing essential functionality for interacting with AMQP brokers like RabbitMQ. The library supports AMQP 0-9-1 and is actively maintained, with regular releases (current version 5.3.1) and support for recent Python versions (>=3.6). It's often used in scenarios requiring fine-grained control over AMQP messaging.
pip install amqp Common errors
error ModuleNotFoundError: No module named 'amqp' ↓
cause The 'amqp' library is not installed in the Python environment.
fix
Install the 'amqp' library using pip: 'pip install amqp'.
error ImportError: cannot import name 'Connection' from 'kombu' ↓
cause The 'Connection' class has been removed or relocated in recent versions of the 'kombu' library.
fix
Update the import statement to 'from kombu.connection import Connection' or check the 'kombu' documentation for the correct import path.
error ModuleNotFoundError: No module named 'celery.backends.amqp' ↓
cause The 'amqp' backend has been removed from Celery as of version 5.0.
fix
Use an alternative result backend such as 'rpc://' or install a third-party package that provides the 'amqp' backend.
error ModuleNotFoundError: No module named 'basic_message' ↓
cause The 'amqplib' library contains relative imports that fail in certain environments.
fix
Modify the 'amqplib' source code to use absolute imports or switch to a maintained fork like 'py-amqp'.
error ModuleNotFoundError: No module named 'kombu.five' ↓
cause The 'kombu.five' module has been removed in recent versions of 'kombu'.
fix
Upgrade 'kombu' to the latest version and ensure all dependencies are up to date.
Warnings
breaking amqp is a fork of `amqplib` and introduces significant API differences, including changes in method signatures and the underlying AMQP protocol version (0-9-1 in `amqp` vs. 0-8 in `amqplib`). Projects migrating from `amqplib` will require code changes. ↓
fix Review the official `py-amqp` documentation for updated API usage and object structures (e.g., `Connection`, `Channel`, exception types). Pay attention to arguments removed or deprecated (e.g., `Channel.access_request`, `ticket`, `insist`).
breaking Python 3.6 and 3.7 support has been dropped in recent major versions. Ensure your environment meets the `requires_python` specification. ↓
fix Upgrade your Python environment to 3.8 or higher. Refer to the `py-amqp` changelog for specific Python version compatibility.
breaking Several core exception classes were renamed for Pythonic consistency (e.g., `AMQPException` became `AMQPError`, `AMQPConnectionException` became `ConnectionError`, `AMQPChannelException` became `ChannelError`). ↓
fix Update exception handling blocks to catch the new exception names, typically found under `amqp.exceptions`.
gotcha Long-running connections require explicit heartbeat management. If `Connection.heartbeat_tick(rate=2)` or `Connection.send_heartbeat()` are not called periodically, the connection may silently drop or experience hangs, especially in systems with network intermediaries that terminate idle connections. ↓
fix Integrate `connection.drain_events()` into your main loop or call `connection.heartbeat_tick()` at regular intervals to send heartbeat frames and process incoming events.
gotcha A regression in early 5.0.x versions could lead to `CHANNEL_ERROR/ChannelNotOpen` exceptions. This was fixed in a later patch release. ↓
fix Upgrade to `amqp` version 5.0.9 or later to receive the fix for this channel error regression.
gotcha When using RabbitMQ 4.x+, custom `frame_max` values set in client code below 8192 bytes (the new default) may cause connection issues. The recommended approach is to either not override `frame_max` or set it to 131072 bytes (the default server value). ↓
fix Remove explicit `frame_max` settings from client code or ensure it's set to at least 8192, preferably 131072, to align with RabbitMQ 4.x defaults and recommendations.
Install compatibility verified last tested: 2026-05-12
python os / libc status wheel install import disk
3.10 alpine (musl) wheel - 0.10s 18.2M
3.10 alpine (musl) - - 0.09s 18.2M
3.10 slim (glibc) wheel 1.6s 0.06s 19M
3.10 slim (glibc) - - 0.06s 19M
3.11 alpine (musl) wheel - 0.17s 20.1M
3.11 alpine (musl) - - 0.18s 20.1M
3.11 slim (glibc) wheel 1.6s 0.14s 21M
3.11 slim (glibc) - - 0.13s 21M
3.12 alpine (musl) wheel - 0.12s 12.0M
3.12 alpine (musl) - - 0.13s 12.0M
3.12 slim (glibc) wheel 1.4s 0.13s 13M
3.12 slim (glibc) - - 0.12s 13M
3.13 alpine (musl) wheel - 0.12s 11.7M
3.13 alpine (musl) - - 0.12s 11.6M
3.13 slim (glibc) wheel 1.6s 0.13s 12M
3.13 slim (glibc) - - 0.13s 12M
3.9 alpine (musl) wheel - 0.10s 17.7M
3.9 alpine (musl) - - 0.09s 17.7M
3.9 slim (glibc) wheel 1.8s 0.07s 18M
3.9 slim (glibc) - - 0.07s 18M
Imports
- Connection
from amqp import Connection - Channel
from amqp import Channel - BasicMessage
from amqp import BasicMessage - AMQPError wrong
from amqp import AMQPExceptioncorrectfrom amqp.exceptions import AMQPError
Quickstart stale last tested: 2026-04-24
import os
from amqp import Connection, BasicMessage
import time
# Replace with your AMQP broker URL (e.g., 'amqp://guest:guest@localhost:5672/')
AMQP_URL = os.environ.get('AMQP_BROKER_URL', 'amqp://guest:guest@localhost:5672/')
QUEUE_NAME = 'my_test_queue'
def publish_message(body, connection_url):
with Connection(connection_url) as connection:
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME, durable=True)
message = BasicMessage(body=body.encode('utf-8'))
channel.basic_publish(message, routing_key=QUEUE_NAME)
print(f"[x] Sent '{body}'")
def consume_message(connection_url):
with Connection(connection_url) as connection:
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME, durable=True)
def callback(message):
print(f"[x] Received '{message.body.decode()}'")
channel.basic_ack(message.delivery_tag)
channel.basic_consume(queue=QUEUE_NAME, callback=callback)
print(f' [*] Waiting for messages on {QUEUE_NAME}. To exit press CTRL+C')
try:
while True:
connection.drain_events() # Crucial for processing events and heartbeats
except KeyboardInterrupt:
print("Exiting consumer.")
if __name__ == "__main__":
# Example usage: Publisher
print("--- Publisher ---")
publish_message("Hello, AMQP!", AMQP_URL)
publish_message("Another message.", AMQP_URL)
# Example usage: Consumer (run in a separate process/thread or after publisher finishes)
# For demonstration, we'll wait a bit and then consume
print("\n--- Consumer ---")
time.sleep(2) # Give publisher time to send messages
consume_message(AMQP_URL)