AMQP Low-Level Client

5.3.1 · active · verified Sun Mar 29

amqp is a low-level AMQP client library for Python, maintained by the Celery project. It is a modern fork of the `amqplib` library, providing essential functionality for interacting with AMQP brokers like RabbitMQ. The library supports AMQP 0-9-1 and is actively maintained, with regular releases (current version 5.3.1) and support for recent Python versions (>=3.6). It's often used in scenarios requiring fine-grained control over AMQP messaging.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to establish a connection, declare a queue, publish a message, and consume a message with acknowledgments. It uses a basic publisher-consumer pattern and highlights the `Connection.drain_events()` method, which is essential for processing incoming messages and handling heartbeats. The AMQP broker URL should be provided via an environment variable `AMQP_BROKER_URL` or default to `amqp://guest:guest@localhost:5672/`.

import os
from amqp import Connection, BasicMessage
import time

# Replace with your AMQP broker URL (e.g., 'amqp://guest:guest@localhost:5672/')
AMQP_URL = os.environ.get('AMQP_BROKER_URL', 'amqp://guest:guest@localhost:5672/')
QUEUE_NAME = 'my_test_queue'

def publish_message(body, connection_url):
    with Connection(connection_url) as connection:
        channel = connection.channel()
        channel.queue_declare(queue=QUEUE_NAME, durable=True)
        message = BasicMessage(body=body.encode('utf-8'))
        channel.basic_publish(message, routing_key=QUEUE_NAME)
        print(f"[x] Sent '{body}'")

def consume_message(connection_url):
    with Connection(connection_url) as connection:
        channel = connection.channel()
        channel.queue_declare(queue=QUEUE_NAME, durable=True)

        def callback(message):
            print(f"[x] Received '{message.body.decode()}'")
            channel.basic_ack(message.delivery_tag)

        channel.basic_consume(queue=QUEUE_NAME, callback=callback)
        print(f' [*] Waiting for messages on {QUEUE_NAME}. To exit press CTRL+C')
        try:
            while True:
                connection.drain_events() # Crucial for processing events and heartbeats
        except KeyboardInterrupt:
            print("Exiting consumer.")

if __name__ == "__main__":
    # Example usage: Publisher
    print("--- Publisher ---")
    publish_message("Hello, AMQP!", AMQP_URL)
    publish_message("Another message.", AMQP_URL)

    # Example usage: Consumer (run in a separate process/thread or after publisher finishes)
    # For demonstration, we'll wait a bit and then consume
    print("\n--- Consumer ---")
    time.sleep(2) # Give publisher time to send messages
    consume_message(AMQP_URL)

view raw JSON →