aiormq: Asynchronous AMQP Client
raw JSON → 6.9.4 verified Fri Apr 24 auth: no python
aiormq is a pure Python asynchronous client library for the AMQP 0.9.1 protocol, commonly used for interacting with RabbitMQ. It provides a non-blocking interface leveraging `asyncio`. The current version is 6.9.4, and it maintains a steady release cadence with frequent patch and minor updates addressing bug fixes and performance improvements.
pip install aiormq Common errors
error ModuleNotFoundError: No module named 'aiormq' ↓
cause The 'aiormq' package is not installed in the Python environment.
fix
Install the package using pip: 'pip install aiormq'.
error ImportError: cannot import name 'connect' from 'aiormq' ↓
cause The 'connect' function does not exist in the 'aiormq' module.
fix
Use 'from aiormq import Connection' and then 'connection = await Connection.connect()'.
error AttributeError: module 'aiormq' has no attribute 'Channel' ↓
cause The 'Channel' class is not directly accessible from the 'aiormq' module.
fix
Access the 'Channel' class through an established connection: 'channel = await connection.channel()'.
error TypeError: 'NoneType' object is not callable ↓
cause Attempting to call a method on a 'None' object, possibly due to a failed connection.
fix
Ensure the connection is successfully established before calling methods: 'connection = await aiormq.connect()'.
error RuntimeError: Event loop is closed ↓
cause The asyncio event loop has been closed before the asynchronous operation could complete.
fix
Ensure the event loop is running when performing asynchronous operations: 'asyncio.run(main())'.
Warnings
breaking Python 3.7 support was dropped in aiormq version 6.8.0. If you are using Python 3.7, you must upgrade your Python version or pin `aiormq<6.8.0`. ↓
fix Upgrade to Python 3.8+ or pin `aiormq` to a version less than 6.8.0 in your `requirements.txt`.
gotcha Properly closing connections and channels is crucial in asynchronous applications to prevent resource leaks and ensure graceful shutdown. Failing to `await connection.close()` and `await channel.close()` can lead to hung connections or unreleased resources. ↓
fix Always wrap your AMQP operations in `try...finally` blocks to ensure `await connection.close()` and `await channel.close()` are called, even if exceptions occur. For channels, ensure they are closed before the connection.
gotcha While `aiormq` aims to handle AMQP URLs robustly, be mindful of special characters, complex VHosts, or non-standard query parameters in your connection string. Historically, there have been minor fixes related to URL parsing (e.g., slash unquoting in 6.8.1). ↓
fix Ensure your AMQP connection URL is properly URL-encoded, especially for parts like VHosts or usernames/passwords that might contain special characters. Test complex URLs thoroughly.
Install compatibility last tested: 2026-04-24
runtime status import time mem disk
3.10-alpine 0.27s 8.6MB 21.1M
3.10-slim 0.20s 8.6MB 22M
3.11-alpine 0.40s 9.8MB 22.8M
3.11-slim 0.33s 9.8MB 24M
3.12-alpine 0.61s 9.8MB 14.7M
3.12-slim 0.54s 9.8MB 16M
3.13-alpine 0.60s 10.3MB 14.3M
3.13-slim 0.57s 10.3MB 15M
3.9-alpine 0.24s 8.3MB 21.1M
3.9-slim 0.21s 8.3MB 22M
Imports
- connect
from aiormq import connect - Channel
from aiormq.abc import Channel - IncomingMessage
from aiormq.abc import IncomingMessage
Quickstart last tested: 2026-04-25
import asyncio
import os
from aiormq import connect
from aiormq.abc import IncomingMessage
# Get AMQP URL from environment variable, default to local RabbitMQ
AMQP_URL = os.environ.get('AMQP_URL', 'amqp://guest:guest@localhost/')
QUEUE_NAME = 'aiormq_test_queue'
async def on_message(message: IncomingMessage):
"""Callback for consuming messages."""
print(f"[x] Received: {message.body.decode()}")
await message.ack() # Acknowledge the message
async def main():
connection = None
try:
# Establish connection
print(f"[*] Connecting to {AMQP_URL}...")
connection = await connect(AMQP_URL)
print("[*] Connection established.")
# Create a channel
channel = await connection.channel()
print("[*] Channel created.")
# Declare a queue (idempotent operation)
await channel.queue_declare(QUEUE_NAME)
print(f"[*] Queue '{QUEUE_NAME}' declared.")
# Publish a message
message_body = b"Hello, aiormq world!"
await channel.basic_publish(
exchange='',
routing_key=QUEUE_NAME,
body=message_body
)
print(f"[x] Published message: '{message_body.decode()}'")
# Start consuming messages
consumer_tag = await channel.basic_consume(QUEUE_NAME, on_message)
print(f"[*] Consuming from '{QUEUE_NAME}'. Consumer tag: {consumer_tag}")
# Keep the consumer running for a short period (e.g., 5 seconds)
print("[*] Waiting for messages... Press Ctrl+C to exit")
await asyncio.sleep(5)
# Stop consuming
await channel.basic_cancel(consumer_tag)
print(f"[*] Consumer '{consumer_tag}' cancelled.")
except Exception as e:
print(f"[!] An error occurred: {e}")
finally:
if connection:
print("[*] Closing connection...")
await connection.close()
print("[*] Connection closed.")
if __name__ == '__main__':
asyncio.run(main())