OpenTelemetry AioKafka Instrumentation

0.62b0 · active · verified Fri Apr 10

This library provides OpenTelemetry instrumentation for the `aiokafka` client, enabling automatic tracing of Kafka producer and consumer operations within `asyncio` applications. As part of the `opentelemetry-python-contrib` project, it currently operates under a beta release schedule (0.x.x series) with frequent updates and bug fixes, aligning with the broader OpenTelemetry Python ecosystem development.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to instrument `aiokafka` producer and consumer using `opentelemetry-instrumentation-aiokafka`. It sets up a basic `TracerProvider` with a `ConsoleSpanExporter` to print traces to the console, then uses `AioKafkaInstrumentor().instrument()` to automatically trace `AIOKafkaProducer` and `AIOKafkaConsumer` operations. Ensure a Kafka broker is running and accessible at `localhost:9092` (or via the `KAFKA_BOOTSTRAP_SERVERS` environment variable).

import asyncio
import os

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
from opentelemetry.instrumentation.aiokafka import AioKafkaInstrumentor
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer

# Configure OpenTelemetry TracerProvider
provider = TracerProvider()
processor = SimpleSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

# Instrument aiokafka
AioKafkaInstrumentor().instrument()

KAFKA_BOOTSTRAP_SERVERS = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092')
KAFKA_TOPIC = "test_topic"

async def produce_message():
    producer = AIOKafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)
    await producer.start()
    try:
        with trace.get_tracer(__name__).start_as_current_span("send_test_message"):
            message = b"Hello OpenTelemetry from AioKafka!"
            print(f"Producing message: {message.decode()}")
            await producer.send_and_wait(KAFKA_TOPIC, message)
            print("Message produced.")
    finally:
        await producer.stop()

async def consume_messages():
    consumer = AIOKafkaConsumer(
        KAFKA_TOPIC,
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        group_id="my-otel-group",
        auto_offset_reset="earliest"
    )
    await consumer.start()
    try:
        print("Consumer started, waiting for messages...")
        # Consume at least one message
        async for msg in consumer:
            with trace.get_tracer(__name__).start_as_current_span("process_kafka_message"):
                print(f"Consumed: Topic={msg.topic}, Partition={msg.partition}, Offset={msg.offset}, Value={msg.value.decode()}")
            break # For quickstart, consume one and exit
    finally:
        await consumer.stop()

async def main():
    # Ensure Kafka is running at KAFKA_BOOTSTRAP_SERVERS before running.
    # You might need to create the topic 'test_topic' manually or configure Kafka for auto-creation.
    producer_task = asyncio.create_task(produce_message())
    consumer_task = asyncio.create_task(consume_messages())

    await asyncio.gather(producer_task, consumer_task)

if __name__ == "__main__":
    print(f"Using Kafka brokers: {KAFKA_BOOTSTRAP_SERVERS}")
    asyncio.run(main())

view raw JSON →