aiormq: Asynchronous AMQP Client

raw JSON →
6.9.4 verified Fri Apr 24 auth: no python

aiormq is a pure Python asynchronous client library for the AMQP 0.9.1 protocol, commonly used for interacting with RabbitMQ. It provides a non-blocking interface leveraging `asyncio`. The current version is 6.9.4, and it maintains a steady release cadence with frequent patch and minor updates addressing bug fixes and performance improvements.

pip install aiormq
error ModuleNotFoundError: No module named 'aiormq'
cause The 'aiormq' package is not installed in the Python environment.
fix
Install the package using pip: 'pip install aiormq'.
error ImportError: cannot import name 'connect' from 'aiormq'
cause The 'connect' function does not exist in the 'aiormq' module.
fix
Use 'from aiormq import Connection' and then 'connection = await Connection.connect()'.
error AttributeError: module 'aiormq' has no attribute 'Channel'
cause The 'Channel' class is not directly accessible from the 'aiormq' module.
fix
Access the 'Channel' class through an established connection: 'channel = await connection.channel()'.
error TypeError: 'NoneType' object is not callable
cause Attempting to call a method on a 'None' object, possibly due to a failed connection.
fix
Ensure the connection is successfully established before calling methods: 'connection = await aiormq.connect()'.
error RuntimeError: Event loop is closed
cause The asyncio event loop has been closed before the asynchronous operation could complete.
fix
Ensure the event loop is running when performing asynchronous operations: 'asyncio.run(main())'.
breaking Python 3.7 support was dropped in aiormq version 6.8.0. If you are using Python 3.7, you must upgrade your Python version or pin `aiormq<6.8.0`.
fix Upgrade to Python 3.8+ or pin `aiormq` to a version less than 6.8.0 in your `requirements.txt`.
gotcha Properly closing connections and channels is crucial in asynchronous applications to prevent resource leaks and ensure graceful shutdown. Failing to `await connection.close()` and `await channel.close()` can lead to hung connections or unreleased resources.
fix Always wrap your AMQP operations in `try...finally` blocks to ensure `await connection.close()` and `await channel.close()` are called, even if exceptions occur. For channels, ensure they are closed before the connection.
gotcha While `aiormq` aims to handle AMQP URLs robustly, be mindful of special characters, complex VHosts, or non-standard query parameters in your connection string. Historically, there have been minor fixes related to URL parsing (e.g., slash unquoting in 6.8.1).
fix Ensure your AMQP connection URL is properly URL-encoded, especially for parts like VHosts or usernames/passwords that might contain special characters. Test complex URLs thoroughly.
runtime status import time mem disk
3.10-alpine 0.27s 8.6MB 21.1M
3.10-slim 0.20s 8.6MB 22M
3.11-alpine 0.40s 9.8MB 22.8M
3.11-slim 0.33s 9.8MB 24M
3.12-alpine 0.61s 9.8MB 14.7M
3.12-slim 0.54s 9.8MB 16M
3.13-alpine 0.60s 10.3MB 14.3M
3.13-slim 0.57s 10.3MB 15M
3.9-alpine 0.24s 8.3MB 21.1M
3.9-slim 0.21s 8.3MB 22M

This quickstart demonstrates how to connect to an AMQP broker (like RabbitMQ), declare a queue, publish a message, and consume messages asynchronously. It uses `os.environ.get` for the AMQP URL, making it easy to configure without hardcoding credentials. Remember to have a RabbitMQ instance running at `localhost:5672` or specify a different `AMQP_URL`.

import asyncio
import os
from aiormq import connect
from aiormq.abc import IncomingMessage

# Get AMQP URL from environment variable, default to local RabbitMQ
AMQP_URL = os.environ.get('AMQP_URL', 'amqp://guest:guest@localhost/')
QUEUE_NAME = 'aiormq_test_queue'

async def on_message(message: IncomingMessage):
    """Callback for consuming messages."""
    print(f"[x] Received: {message.body.decode()}")
    await message.ack() # Acknowledge the message

async def main():
    connection = None
    try:
        # Establish connection
        print(f"[*] Connecting to {AMQP_URL}...")
        connection = await connect(AMQP_URL)
        print("[*] Connection established.")

        # Create a channel
        channel = await connection.channel()
        print("[*] Channel created.")

        # Declare a queue (idempotent operation)
        await channel.queue_declare(QUEUE_NAME)
        print(f"[*] Queue '{QUEUE_NAME}' declared.")

        # Publish a message
        message_body = b"Hello, aiormq world!"
        await channel.basic_publish(
            exchange='',
            routing_key=QUEUE_NAME,
            body=message_body
        )
        print(f"[x] Published message: '{message_body.decode()}'")

        # Start consuming messages
        consumer_tag = await channel.basic_consume(QUEUE_NAME, on_message)
        print(f"[*] Consuming from '{QUEUE_NAME}'. Consumer tag: {consumer_tag}")

        # Keep the consumer running for a short period (e.g., 5 seconds)
        print("[*] Waiting for messages... Press Ctrl+C to exit")
        await asyncio.sleep(5)

        # Stop consuming
        await channel.basic_cancel(consumer_tag)
        print(f"[*] Consumer '{consumer_tag}' cancelled.")

    except Exception as e:
        print(f"[!] An error occurred: {e}")
    finally:
        if connection:
            print("[*] Closing connection...")
            await connection.close()
            print("[*] Connection closed.")

if __name__ == '__main__':
    asyncio.run(main())