aiormq: Asynchronous AMQP Client

6.9.4 · active · verified Thu Apr 09

aiormq is a pure Python asynchronous client library for the AMQP 0.9.1 protocol, commonly used for interacting with RabbitMQ. It provides a non-blocking interface leveraging `asyncio`. The current version is 6.9.4, and it maintains a steady release cadence with frequent patch and minor updates addressing bug fixes and performance improvements.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to connect to an AMQP broker (like RabbitMQ), declare a queue, publish a message, and consume messages asynchronously. It uses `os.environ.get` for the AMQP URL, making it easy to configure without hardcoding credentials. Remember to have a RabbitMQ instance running at `localhost:5672` or specify a different `AMQP_URL`.

import asyncio
import os
from aiormq import connect
from aiormq.abc import IncomingMessage

# Get AMQP URL from environment variable, default to local RabbitMQ
AMQP_URL = os.environ.get('AMQP_URL', 'amqp://guest:guest@localhost/')
QUEUE_NAME = 'aiormq_test_queue'

async def on_message(message: IncomingMessage):
    """Callback for consuming messages."""
    print(f"[x] Received: {message.body.decode()}")
    await message.ack() # Acknowledge the message

async def main():
    connection = None
    try:
        # Establish connection
        print(f"[*] Connecting to {AMQP_URL}...")
        connection = await connect(AMQP_URL)
        print("[*] Connection established.")

        # Create a channel
        channel = await connection.channel()
        print("[*] Channel created.")

        # Declare a queue (idempotent operation)
        await channel.queue_declare(QUEUE_NAME)
        print(f"[*] Queue '{QUEUE_NAME}' declared.")

        # Publish a message
        message_body = b"Hello, aiormq world!"
        await channel.basic_publish(
            exchange='',
            routing_key=QUEUE_NAME,
            body=message_body
        )
        print(f"[x] Published message: '{message_body.decode()}'")

        # Start consuming messages
        consumer_tag = await channel.basic_consume(QUEUE_NAME, on_message)
        print(f"[*] Consuming from '{QUEUE_NAME}'. Consumer tag: {consumer_tag}")

        # Keep the consumer running for a short period (e.g., 5 seconds)
        print("[*] Waiting for messages... Press Ctrl+C to exit")
        await asyncio.sleep(5)

        # Stop consuming
        await channel.basic_cancel(consumer_tag)
        print(f"[*] Consumer '{consumer_tag}' cancelled.")

    except Exception as e:
        print(f"[!] An error occurred: {e}")
    finally:
        if connection:
            print("[*] Closing connection...")
            await connection.close()
            print("[*] Connection closed.")

if __name__ == '__main__':
    asyncio.run(main())

view raw JSON →