OpenTelemetry Aio-pika Instrumentation
This library, part of the OpenTelemetry Python Contrib project, provides automatic instrumentation for `aio-pika` to trace RabbitMQ applications. It helps in generating telemetry data like traces and spans for operations involving `aio-pika` message publishing and consuming. The current version is `0.62b0`, and it's actively developed as part of the broader OpenTelemetry Python ecosystem.
Warnings
- breaking Older versions of `opentelemetry-instrumentation-aio-pika` are incompatible with `aio-pika` versions 9.1+. A refactoring in `aio-pika` 9.1 changed `channel.connection` to `channel._connection`, leading to `AttributeError` in unpatched instrumentation.
- gotcha The instrumentation is currently in beta (`0.62b0`). While generally stable, minor breaking changes to the API or semantic conventions, especially for in-development signals, may occur before a stable `1.0.0` release.
- gotcha This instrumentation currently provides tracing (spans) but does NOT provide metrics for `aio-pika` operations.
- gotcha In complex asynchronous message consumption scenarios with `aio-pika`, especially when message processing involves further asynchronous tasks or handlers that detach from the original consumer callback, OpenTelemetry context propagation might be lost. This can result in broken traces where child spans are not linked to the originating message consumption span.
Install
-
pip install opentelemetry-instrumentation-aio-pika
Imports
- AioPikaInstrumentor
from opentelemetry.instrumentation.aio_pika import AioPikaInstrumentor
Quickstart
import asyncio
from aio_pika import Message, connect
from opentelemetry.instrumentation.aio_pika import AioPikaInstrumentor
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
# Configure OpenTelemetry TracerProvider
resource = Resource.create({"service.name": "aio-pika-example"})
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
trace.set_tracer_provider(tracer_provider)
# Instrument aio-pika
AioPikaInstrumentor().instrument()
async def main() -> None:
print("Connecting to RabbitMQ... (Ensure RabbitMQ is running, e.g., docker run -p 5672:5672 rabbitmq)")
connection = await connect("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
queue = await channel.declare_queue("hello")
# Publish a message
print("Publishing message...")
await channel.default_exchange.publish(
Message(b"Hello World!"),
routing_key=queue.name
)
print("Message published.")
# Basic consume example (for demonstration, a real consumer would run longer)
print("Consuming message...")
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
print(f"Received message: {message.body.decode()}")
break # Stop after one message for this quickstart
if __name__ == "__main__":
asyncio.run(main())