OpenTelemetry Aio-pika Instrumentation

0.62b0 · active · verified Sat Apr 11

This library, part of the OpenTelemetry Python Contrib project, provides automatic instrumentation for `aio-pika` to trace RabbitMQ applications. It helps in generating telemetry data like traces and spans for operations involving `aio-pika` message publishing and consuming. The current version is `0.62b0`, and it's actively developed as part of the broader OpenTelemetry Python ecosystem.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to instrument `aio-pika` to trace message publishing and consuming. It sets up a basic OpenTelemetry TracerProvider to export traces to the console and then instruments `aio-pika` globally. It requires a running RabbitMQ instance, which can be started with `docker run -p 5672:5672 rabbitmq`.

import asyncio
from aio_pika import Message, connect
from opentelemetry.instrumentation.aio_pika import AioPikaInstrumentor
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor

# Configure OpenTelemetry TracerProvider
resource = Resource.create({"service.name": "aio-pika-example"})
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
trace.set_tracer_provider(tracer_provider)

# Instrument aio-pika
AioPikaInstrumentor().instrument()

async def main() -> None:
    print("Connecting to RabbitMQ... (Ensure RabbitMQ is running, e.g., docker run -p 5672:5672 rabbitmq)")
    connection = await connect("amqp://guest:guest@localhost/")
    async with connection:
        channel = await connection.channel()
        queue = await channel.declare_queue("hello")

        # Publish a message
        print("Publishing message...")
        await channel.default_exchange.publish(
            Message(b"Hello World!"),
            routing_key=queue.name
        )
        print("Message published.")

        # Basic consume example (for demonstration, a real consumer would run longer)
        print("Consuming message...")
        async with queue.iterator() as queue_iter:
            async for message in queue_iter:
                async with message.process():
                    print(f"Received message: {message.body.decode()}")
                    break # Stop after one message for this quickstart

if __name__ == "__main__":
    asyncio.run(main())

view raw JSON →