aio-pika
aio-pika is an asyncio-native Python library providing a high-level wrapper around `aiormq` for interacting with AMQP brokers like RabbitMQ. It simplifies common messaging patterns, offering both basic and robust (auto-reconnecting) connections. The library is actively maintained with frequent patch and minor releases, ensuring compatibility and bug fixes.
Warnings
- breaking aio-pika v9.4.0 dropped support for Python 3.7. Users running Python 3.7 must pin their `aio-pika` version to `<9.4.0`.
- deprecated The `aio_pika.Channel.channel` property (which exposed the underlying `aiormq` channel) was deprecated in version 9.1.0.
- gotcha For production applications, it is highly recommended to use `aio_pika.connect_robust()` instead of `aio_pika.connect()`.
- gotcha Prior to version 9.4.2, messages might not have been correctly `nack`ed (negatively acknowledged) upon cancellation of a consumer subscription, potentially leading to messages being lost or stuck.
- gotcha Messages consumed from a queue using `consume()` or `get()` must be explicitly acknowledged (`message.ack()`) or rejected (`message.nack()`, `message.reject()`) unless `no_ack=True` is set during consumption.
Install
-
pip install aio-pika
Imports
- connect
from aio_pika import connect
- connect_robust
from aio_pika import connect_robust
- Message
from aio_pika import Message
- IncomingMessage
from aio_pika import IncomingMessage
Quickstart
import asyncio
from aio_pika import connect_robust, Message, IncomingMessage
async def main():
# Connect to RabbitMQ (replace with your actual connection string)
connection_string = "amqp://guest:guest@localhost/"
connection = await connect_robust(connection_string)
async with connection:
channel = await connection.channel() # type: ignore
# Declare a queue
queue_name = "my_queue"
queue = await channel.declare_queue(queue_name, auto_delete=True)
# Consumer function
async def on_message_received(message: IncomingMessage):
async with message.process():
print(f" [x] Received {message.body.decode()}")
# Acknowledge the message explicitly
# Start consuming messages
print(f" [*] Waiting for messages on {queue_name}. To exit press CTRL+C")
await queue.consume(on_message_received, no_ack=False)
# Publish a message
message_body = b"Hello, aio-pika!"
await channel.default_exchange.publish(
Message(message_body),
routing_key=queue_name
)
print(f" [x] Sent '{message_body.decode()}'")
# Keep the consumer running for a bit
await asyncio.sleep(5)
if __name__ == "__main__":
asyncio.run(main())