OpenTelemetry Confluent Kafka Instrumentation
This library provides OpenTelemetry instrumentation for the `confluent-kafka` Python client, enabling tracing of messages produced and consumed via Kafka. It is part of the `opentelemetry-python-contrib` project, which typically follows a beta release cadence, indicating ongoing development and potential API changes.
Warnings
- breaking In versions prior to `1.25.0/0.46b0`, the `Producer.poll()` and `Producer.flush()` methods, when instrumented, did not return values. As of `1.25.0/0.46b0`, these methods now return values as expected, which may affect code relying on their prior non-returning behavior when instrumented.
- gotcha This instrumentation library is currently in a beta (b0) release state. This means its API and behavior may change in future versions, and it is not yet considered stable for production environments where API stability is critical.
- gotcha Versions of `opentelemetry-instrumentation-confluent-kafka` prior to `0.62b0` might have stricter or implicit upper bounds on the `confluent-kafka` dependency, potentially causing compatibility issues with newer `confluent-kafka` versions. As of `0.62b0`, the upper bound has been loosened to `<3.0.0`.
- gotcha Simply installing and calling `instrument()` on an OpenTelemetry instrumentation library is not enough to generate traces. You must also configure an OpenTelemetry `TracerProvider`, `SpanProcessor`, and an `Exporter` (e.g., `OTLPSpanExporter` or `ConsoleSpanExporter`) for traces to be collected and sent.
Install
-
pip install opentelemetry-instrumentation-confluent-kafka
Imports
- ConfluentKafkaInstrumentor
from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor
Quickstart
import os
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor
from confluent_kafka import Producer, Consumer, KafkaException
# Configure OpenTelemetry SDK (for demonstration purposes, using ConsoleSpanExporter)
resource = Resource.create({"service.name": "kafka-app"})
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
trace.set_tracer_provider(tracer_provider)
# Instrument the confluent_kafka library
ConfluentKafkaInstrumentor().instrument()
# Get a tracer
tracer = trace.get_tracer(__name__)
# Kafka configuration (replace with your Kafka broker details)
bootstrap_servers = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092')
topic = 'my_instrumented_topic'
def produce_message():
producer_conf = {'bootstrap.servers': bootstrap_servers}
producer = Producer(producer_conf)
with tracer.start_as_current_span("produce-kafka-message") as span:
try:
producer.produce(topic, key='key', value='Hello, OpenTelemetry Kafka!', callback=lambda err, msg: None)
producer.flush(timeout=5) # Ensure message is sent
print("Produced: Hello, OpenTelemetry Kafka!")
except Exception as e:
span.record_exception(e)
print(f"Producer error: {e}")
def consume_message():
consumer_conf = {
'bootstrap.servers': bootstrap_servers,
'group.id': 'my_consumer_group',
'auto.offset.reset': 'earliest'
}
consumer = Consumer(consumer_conf)
with tracer.start_as_current_span("consume-kafka-message") as span:
try:
consumer.subscribe([topic])
msg = consumer.poll(timeout=1.0) # Poll for a message
if msg is None:
print("No message received.")
elif msg.error():
raise KafkaException(msg.error())
else:
print(f"Consumed: {msg.value().decode('utf-8')}")
consumer.close()
except KafkaException as e:
span.record_exception(e)
print(f"Consumer error: {e}")
except Exception as e:
span.record_exception(e)
print(f"An unexpected error occurred: {e}")
if __name__ == '__main__':
# Example of setting an environment variable for Kafka brokers if needed
# os.environ['KAFKA_BOOTSTRAP_SERVERS'] = 'your_kafka_broker:9092'
produce_message()
consume_message()