AMQPStorm
AMQPStorm is a thread-safe Python client library for RabbitMQ, offering comprehensive features for both messaging and RabbitMQ management. Currently at version 2.11.1, the library maintains an active release cadence with regular updates and improvements, ensuring stability and compatibility.
Warnings
- breaking Starting with version 2.0.0, messages are delivered as `Message` objects by default. Previous versions might have returned tuples or dictionaries directly from consumer functions.
- gotcha There's an open issue (#144) regarding `Import Error due to API Change in pamqp>=3.0.0 (specification -> commands)`, which is a core dependency. This might cause issues with newer `pamqp` versions.
- deprecated The library's `amqpstorm/message.py` currently uses `datetime.utcnow()`, which is deprecated in Python 3.12+ in favor of `datetime.now(timezone.utc)`. This is an open issue (#143).
- gotcha Properly closing connections and channels is crucial to avoid resource leaks or hangs, especially in long-running applications or when handling errors.
- gotcha When using SSL/TLS connections, incorrect `ssl_options` or `verify` parameter settings can lead to insecure connections (e.g., no certificate verification) or failed connections (e.g., incorrect CA bundles).
Install
-
pip install amqpstorm -
pip install amqpstorm[management] -
pip install amqpstorm[pool]
Imports
- Connection
from amqpstorm import Connection
- UriConnection
from amqpstorm import UriConnection
- Message
from amqpstorm import Message
- management
from amqpstorm import management
Quickstart
import amqpstorm
import os
RABBITMQ_HOST = os.environ.get('RABBITMQ_HOST', 'localhost')
RABBITMQ_USER = os.environ.get('RABBITMQ_USER', 'guest')
RABBITMQ_PASS = os.environ.get('RABBITMQ_PASS', 'guest')
try:
# Establish a connection using a context manager for proper resource handling
with amqpstorm.Connection(RABBITMQ_HOST, RABBITMQ_USER, RABBITMQ_PASS) as connection:
# Open a channel using a context manager
with connection.channel() as channel:
# Declare a queue (idempotent operation)
channel.queue.declare('my_queue')
# Publish a simple message to 'my_queue'
channel.basic.publish(body='Hello, RabbitMQ!', routing_key='my_queue')
print(" [x] Sent 'Hello, RabbitMQ!'")
except amqpstorm.AMQPConnectionError as e:
print(f"Error connecting to RabbitMQ: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")