Pika Python AMQP Client
Pika is a Python AMQP client library that provides both synchronous (BlockingConnection) and asynchronous (e.g., AsyncioConnection) interfaces for interacting with AMQP 0-9-1 brokers like RabbitMQ. The library is actively maintained, with regular patch and minor releases (currently at 1.3.2, with 1.4.0 in beta), addressing bug fixes, performance improvements, and Python compatibility.
Warnings
- breaking Pika 1.0.0 introduced significant breaking changes, especially around connection parameters and callback signatures. `ConnectionParameters` no longer accepts `credentials` directly; `pika.PlainCredentials` (or similar) must be used. Also, the `on_message_callback` signature changed.
- gotcha Mixing blocking (BlockingConnection) and non-blocking (AsyncioConnection, TornadoConnection) I/O models in the same application without proper isolation can lead to deadlocks, unexpected behavior, or resource exhaustion. Choose one I/O model and stick to it, or use separate processes for different models.
- gotcha Failure to explicitly close channels and connections can lead to resource leaks (e.g., open file descriptors), hung processes, and issues with the RabbitMQ broker. Pika 1.0.0 and later require explicit closure for `BlockingConnection`.
- gotcha Pika does not automatically handle connection or channel recovery by default. Network outages, broker restarts, or protocol errors will cause `pika.exceptions.ConnectionClosed` or `pika.exceptions.ChannelClosed`.
Install
-
pip install pika
Imports
- BlockingConnection
from pika import BlockingConnection
- ConnectionParameters
from pika import ConnectionParameters
- PlainCredentials
from pika import PlainCredentials
- AsyncioConnection
from pika.adapters.asyncio_connection import AsyncioConnection
Quickstart
import pika
import os
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
# Get RabbitMQ host from environment or default to localhost
rabbitmq_host = os.environ.get('RABBITMQ_HOST', 'localhost')
connection = None
channel = None
try:
# Establish connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_host))
channel = connection.channel()
# Declare a queue
channel.queue_declare(queue='hello')
# Publish a message
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
# Start consuming (example consumer setup)
# channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
# print(' [*] Waiting for messages. To exit press CTRL+C')
# channel.start_consuming()
finally:
# Ensure channel and connection are closed
if channel and channel.is_open:
channel.close()
if connection and connection.is_open:
connection.close()