OpenTelemetry Confluent Kafka Instrumentation

0.62b0 · active · verified Sat Apr 11

This library provides OpenTelemetry instrumentation for the `confluent-kafka` Python client, enabling tracing of messages produced and consumed via Kafka. It is part of the `opentelemetry-python-contrib` project, which typically follows a beta release cadence, indicating ongoing development and potential API changes.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to enable OpenTelemetry tracing for Confluent Kafka producers and consumers. It sets up a minimal OpenTelemetry SDK with a console exporter, then uses `ConfluentKafkaInstrumentor().instrument()` to automatically instrument the `confluent-kafka` library. Finally, it shows a simple producer sending a message and a consumer receiving it, with traces visible in the console output.

import os
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor
from confluent_kafka import Producer, Consumer, KafkaException

# Configure OpenTelemetry SDK (for demonstration purposes, using ConsoleSpanExporter)
resource = Resource.create({"service.name": "kafka-app"})
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
trace.set_tracer_provider(tracer_provider)

# Instrument the confluent_kafka library
ConfluentKafkaInstrumentor().instrument()

# Get a tracer
tracer = trace.get_tracer(__name__)

# Kafka configuration (replace with your Kafka broker details)
bootstrap_servers = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092')
topic = 'my_instrumented_topic'

def produce_message():
    producer_conf = {'bootstrap.servers': bootstrap_servers}
    producer = Producer(producer_conf)

    with tracer.start_as_current_span("produce-kafka-message") as span:
        try:
            producer.produce(topic, key='key', value='Hello, OpenTelemetry Kafka!', callback=lambda err, msg: None)
            producer.flush(timeout=5) # Ensure message is sent
            print("Produced: Hello, OpenTelemetry Kafka!")
        except Exception as e:
            span.record_exception(e)
            print(f"Producer error: {e}")

def consume_message():
    consumer_conf = {
        'bootstrap.servers': bootstrap_servers,
        'group.id': 'my_consumer_group',
        'auto.offset.reset': 'earliest'
    }
    consumer = Consumer(consumer_conf)

    with tracer.start_as_current_span("consume-kafka-message") as span:
        try:
            consumer.subscribe([topic])
            msg = consumer.poll(timeout=1.0) # Poll for a message
            if msg is None:
                print("No message received.")
            elif msg.error():
                raise KafkaException(msg.error())
            else:
                print(f"Consumed: {msg.value().decode('utf-8')}")
            consumer.close()
        except KafkaException as e:
            span.record_exception(e)
            print(f"Consumer error: {e}")
        except Exception as e:
            span.record_exception(e)
            print(f"An unexpected error occurred: {e}")

if __name__ == '__main__':
    # Example of setting an environment variable for Kafka brokers if needed
    # os.environ['KAFKA_BOOTSTRAP_SERVERS'] = 'your_kafka_broker:9092'
    produce_message()
    consume_message()

view raw JSON →