taskiq-aio-pika
taskiq-aio-pika is a broker implementation for the Taskiq asynchronous task queue, enabling it to use RabbitMQ as a message broker. It leverages aio-pika for asynchronous AMQP communication, providing robust and scalable message passing. The current version is 0.6.0 and it typically follows the release cadence of the main Taskiq library.
Common errors
-
aio_pika.exceptions.AMQPConnectionError: [Errno 111] Connection refused
cause The RabbitMQ server is not running or is not accessible from the machine where the Taskiq worker is trying to connect.fixStart your RabbitMQ server. Verify the RabbitMQ host and port in your `AioPikaBroker` URL (e.g., `localhost:5672`). Check network connectivity and firewall settings. -
ImportError: cannot import name 'AioPikaBroker' from 'taskiq_aio_pika'
cause The `taskiq-aio-pika` package is not installed, or there is a typo in the import statement.fixEnsure the package is installed: `pip install taskiq-aio-pika`. Double-check the spelling of `AioPikaBroker` and `taskiq_aio_pika`. -
taskiq.exceptions.TaskIQException: Broker is not connected.
cause The `AioPikaBroker` failed to establish a connection to RabbitMQ after initialization, or the connection was lost.fixExamine the application logs for earlier `aio-pika` connection errors (e.g., `Connection refused`, `Authentication failed`). Verify RabbitMQ server status, credentials, and network reachability. This error often surfaces after underlying connection issues.
Warnings
- gotcha RabbitMQ server must be running and accessible at the specified URL. If RabbitMQ is not running or the connection URL (e.g., host, port, credentials) is incorrect, the broker will fail to connect.
- breaking Major version updates of `taskiq` (e.g., `0.x` to `0.y` with significant changes) may introduce breaking changes to its internal APIs or settings structure that `taskiq-aio-pika` relies on. This can lead to unexpected errors or require broker updates.
- gotcha Pydantic version conflicts can occur if other dependencies in your project require a different major version of Pydantic than what `taskiq` and `taskiq-aio-pika` expect. `taskiq-aio-pika` v0.6.0 is built for Pydantic v2.
Install
-
pip install taskiq-aio-pika
Imports
- AioPikaBroker
from taskiq_aio_pika import AioPikaBroker
Quickstart
import os
from taskiq import TaskiqSettings
from taskiq_aio_pika import AioPikaBroker
# Configure RabbitMQ connection using environment variable or a default
RABBITMQ_URL = os.environ.get(
"RABBITMQ_URL",
"amqp://guest:guest@localhost:5672/"
)
class AppSettings(TaskiqSettings):
"""Taskiq settings with AioPikaBroker for RabbitMQ."""
broker: AioPikaBroker = AioPikaBroker(
url=RABBITMQ_URL,
)
# Instantiate the settings
settings = AppSettings()
# Define a task using the broker's decorator
@settings.broker.task
async def process_data(data: str) -> str:
"""A simple task to process some data."""
print(f"Processing: {data}")
return f"Processed: {data.upper()}"
# To enqueue this task:
# import asyncio
# asyncio.run(process_data.kiq(data="hello world"))
# To run the worker (assuming this code is in `app_settings.py`):
# taskiq worker app_settings:settings