OpenTelemetry Kafka-Python Instrumentation

0.62b0 · active · verified Fri Apr 10

This library provides OpenTelemetry tracing instrumentation for applications using the `kafka-python` client library. It enables automatic collection of trace data for Kafka produce and consume operations, facilitating observability in distributed systems. As part of the `opentelemetry-python-contrib` project, it maintains a regular release cadence, aligning with OpenTelemetry Python SDK updates.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to instrument `kafka-python` producer and consumer operations. It sets up a basic OpenTelemetry `TracerProvider` with a `ConsoleSpanExporter` to print traces to the console. The `KafkaInstrumentor().instrument()` call automatically instruments the `kafka-python` library. Requires a running Kafka broker (default `localhost:9092`).

import os
import time
from kafka import KafkaProducer, KafkaConsumer

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
from opentelemetry.instrumentation.kafka import KafkaInstrumentor

# Set up OpenTelemetry TracerProvider
provider = TracerProvider()
processor = SimpleSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

# Instrument Kafka
KafkaInstrumentor().instrument()

# --- Kafka Configuration (replace with your Kafka broker address) ---
KAFKA_BROKER = os.environ.get('KAFKA_BROKER', 'localhost:9092')
KAFKA_TOPIC = 'my_instrumented_topic'

def run_kafka_producer():
    print(f"[Producer] Connecting to Kafka at {KAFKA_BROKER}...")
    producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER])
    message = b'Hello, OpenTelemetry Kafka!'
    print(f"[Producer] Sending message: {message.decode()}")
    future = producer.send(KAFKA_TOPIC, message)
    record_metadata = future.get(timeout=10)
    print(f"[Producer] Message sent to topic: {record_TOPIC}, partition: {record_metadata.partition}, offset: {record_metadata.offset}")
    producer.close()

def run_kafka_consumer():
    print(f"[Consumer] Connecting to Kafka at {KAFKA_BROKER}...")
    consumer = KafkaConsumer(
        KAFKA_TOPIC,
        bootstrap_servers=[KAFKA_BROKER],
        group_id='my_otel_group',
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        consumer_timeout_ms=5000 # Stop after 5 seconds if no messages
    )
    print(f"[Consumer] Listening for messages on topic: {KAFKA_TOPIC}")
    try:
        for message in consumer:
            print(f"[Consumer] Received message: {message.value.decode()} from topic: {message.topic}, partition: {message.partition}, offset: {message.offset}")
            break # Consume one message and exit
    except Exception as e:
        print(f"[Consumer] Error: {e}")
    finally:
        consumer.close()
        print("[Consumer] Closed Kafka consumer.")

if __name__ == "__main__":
    print("Ensure a Kafka broker is running at 'localhost:9092' or set the KAFKA_BROKER environment variable.")
    time.sleep(2) # Give Kafka some time to start if just launched
    run_kafka_producer()
    time.sleep(1)
    run_kafka_consumer()

view raw JSON →