aio-pika

9.6.2 · active · verified Thu Apr 09

aio-pika is an asyncio-native Python library providing a high-level wrapper around `aiormq` for interacting with AMQP brokers like RabbitMQ. It simplifies common messaging patterns, offering both basic and robust (auto-reconnecting) connections. The library is actively maintained with frequent patch and minor releases, ensuring compatibility and bug fixes.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to establish a robust connection, declare a queue, publish a message, and consume it using aio-pika. It highlights the use of `connect_robust` for production readiness and explicit message acknowledgment. Make sure a RabbitMQ instance is running on `localhost`.

import asyncio
from aio_pika import connect_robust, Message, IncomingMessage

async def main():
    # Connect to RabbitMQ (replace with your actual connection string)
    connection_string = "amqp://guest:guest@localhost/"
    connection = await connect_robust(connection_string)

    async with connection:
        channel = await connection.channel() # type: ignore

        # Declare a queue
        queue_name = "my_queue"
        queue = await channel.declare_queue(queue_name, auto_delete=True)

        # Consumer function
        async def on_message_received(message: IncomingMessage):
            async with message.process():
                print(f" [x] Received {message.body.decode()}")
                # Acknowledge the message explicitly

        # Start consuming messages
        print(f" [*] Waiting for messages on {queue_name}. To exit press CTRL+C")
        await queue.consume(on_message_received, no_ack=False)

        # Publish a message
        message_body = b"Hello, aio-pika!"
        await channel.default_exchange.publish(
            Message(message_body),
            routing_key=queue_name
        )
        print(f" [x] Sent '{message_body.decode()}'")

        # Keep the consumer running for a bit
        await asyncio.sleep(5)

if __name__ == "__main__":
    asyncio.run(main())

view raw JSON →