aiormq: Asynchronous AMQP Client
aiormq is a pure Python asynchronous client library for the AMQP 0.9.1 protocol, commonly used for interacting with RabbitMQ. It provides a non-blocking interface leveraging `asyncio`. The current version is 6.9.4, and it maintains a steady release cadence with frequent patch and minor updates addressing bug fixes and performance improvements.
Warnings
- breaking Python 3.7 support was dropped in aiormq version 6.8.0. If you are using Python 3.7, you must upgrade your Python version or pin `aiormq<6.8.0`.
- gotcha Properly closing connections and channels is crucial in asynchronous applications to prevent resource leaks and ensure graceful shutdown. Failing to `await connection.close()` and `await channel.close()` can lead to hung connections or unreleased resources.
- gotcha While `aiormq` aims to handle AMQP URLs robustly, be mindful of special characters, complex VHosts, or non-standard query parameters in your connection string. Historically, there have been minor fixes related to URL parsing (e.g., slash unquoting in 6.8.1).
Install
-
pip install aiormq
Imports
- connect
from aiormq import connect
- Channel
from aiormq.abc import Channel
- IncomingMessage
from aiormq.abc import IncomingMessage
Quickstart
import asyncio
import os
from aiormq import connect
from aiormq.abc import IncomingMessage
# Get AMQP URL from environment variable, default to local RabbitMQ
AMQP_URL = os.environ.get('AMQP_URL', 'amqp://guest:guest@localhost/')
QUEUE_NAME = 'aiormq_test_queue'
async def on_message(message: IncomingMessage):
"""Callback for consuming messages."""
print(f"[x] Received: {message.body.decode()}")
await message.ack() # Acknowledge the message
async def main():
connection = None
try:
# Establish connection
print(f"[*] Connecting to {AMQP_URL}...")
connection = await connect(AMQP_URL)
print("[*] Connection established.")
# Create a channel
channel = await connection.channel()
print("[*] Channel created.")
# Declare a queue (idempotent operation)
await channel.queue_declare(QUEUE_NAME)
print(f"[*] Queue '{QUEUE_NAME}' declared.")
# Publish a message
message_body = b"Hello, aiormq world!"
await channel.basic_publish(
exchange='',
routing_key=QUEUE_NAME,
body=message_body
)
print(f"[x] Published message: '{message_body.decode()}'")
# Start consuming messages
consumer_tag = await channel.basic_consume(QUEUE_NAME, on_message)
print(f"[*] Consuming from '{QUEUE_NAME}'. Consumer tag: {consumer_tag}")
# Keep the consumer running for a short period (e.g., 5 seconds)
print("[*] Waiting for messages... Press Ctrl+C to exit")
await asyncio.sleep(5)
# Stop consuming
await channel.basic_cancel(consumer_tag)
print(f"[*] Consumer '{consumer_tag}' cancelled.")
except Exception as e:
print(f"[!] An error occurred: {e}")
finally:
if connection:
print("[*] Closing connection...")
await connection.close()
print("[*] Connection closed.")
if __name__ == '__main__':
asyncio.run(main())