OpenTelemetry AioKafka Instrumentation
This library provides OpenTelemetry instrumentation for the `aiokafka` client, enabling automatic tracing of Kafka producer and consumer operations within `asyncio` applications. As part of the `opentelemetry-python-contrib` project, it currently operates under a beta release schedule (0.x.x series) with frequent updates and bug fixes, aligning with the broader OpenTelemetry Python ecosystem development.
Warnings
- breaking This instrumentation library is in beta (`0.x.x` versioning with a `b` suffix), indicating that its API and behavior may change without strict adherence to semantic versioning. Always pin your version and review release notes for potential breaking changes between minor versions.
- gotcha Ensure the underlying `aiokafka` library version is compatible with the instrumentation. Past issues (e.g., with `0.52b0` and `aiokafka~=0.12.0`) have shown compatibility problems leading to runtime errors, particularly with `AIOKafkaProducer.send()` arguments.
- gotcha The `instrument()` method must be called early in your application's lifecycle, before `aiokafka` `AIOKafkaProducer` or `AIOKafkaConsumer` instances are created. If `instrument()` is called too late, existing `aiokafka` objects will not be instrumented.
- gotcha For the instrumentation to generate visible traces, a Kafka broker must be running and accessible at the specified `bootstrap_servers`. The provided quickstart assumes `localhost:9092`.
- gotcha The quickstart uses a `ConsoleSpanExporter`, which prints traces to standard output. For production environments, you will need to configure a more robust exporter (e.g., OTLP Exporter) to send traces to an observability backend like Jaeger, Zipkin, or an OpenTelemetry Collector.
Install
-
pip install opentelemetry-instrumentation-aiokafka opentelemetry-sdk aiokafka
Imports
- AioKafkaInstrumentor
from opentelemetry.instrumentation.aiokafka import AioKafkaInstrumentor
Quickstart
import asyncio
import os
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
from opentelemetry.instrumentation.aiokafka import AioKafkaInstrumentor
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
# Configure OpenTelemetry TracerProvider
provider = TracerProvider()
processor = SimpleSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
# Instrument aiokafka
AioKafkaInstrumentor().instrument()
KAFKA_BOOTSTRAP_SERVERS = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092')
KAFKA_TOPIC = "test_topic"
async def produce_message():
producer = AIOKafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)
await producer.start()
try:
with trace.get_tracer(__name__).start_as_current_span("send_test_message"):
message = b"Hello OpenTelemetry from AioKafka!"
print(f"Producing message: {message.decode()}")
await producer.send_and_wait(KAFKA_TOPIC, message)
print("Message produced.")
finally:
await producer.stop()
async def consume_messages():
consumer = AIOKafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
group_id="my-otel-group",
auto_offset_reset="earliest"
)
await consumer.start()
try:
print("Consumer started, waiting for messages...")
# Consume at least one message
async for msg in consumer:
with trace.get_tracer(__name__).start_as_current_span("process_kafka_message"):
print(f"Consumed: Topic={msg.topic}, Partition={msg.partition}, Offset={msg.offset}, Value={msg.value.decode()}")
break # For quickstart, consume one and exit
finally:
await consumer.stop()
async def main():
# Ensure Kafka is running at KAFKA_BOOTSTRAP_SERVERS before running.
# You might need to create the topic 'test_topic' manually or configure Kafka for auto-creation.
producer_task = asyncio.create_task(produce_message())
consumer_task = asyncio.create_task(consume_messages())
await asyncio.gather(producer_task, consumer_task)
if __name__ == "__main__":
print(f"Using Kafka brokers: {KAFKA_BOOTSTRAP_SERVERS}")
asyncio.run(main())