OpenTelemetry Kafka-Python Instrumentation
This library provides OpenTelemetry tracing instrumentation for applications using the `kafka-python` client library. It enables automatic collection of trace data for Kafka produce and consume operations, facilitating observability in distributed systems. As part of the `opentelemetry-python-contrib` project, it maintains a regular release cadence, aligning with OpenTelemetry Python SDK updates.
Warnings
- gotcha This instrumentation library (`opentelemetry-instrumentation-kafka-python`) is designed for the synchronous `kafka-python` client. It does NOT support `aiokafka` (asynchronous Kafka client). For `aiokafka` instrumentation, use `opentelemetry-instrumentation-aiokafka` instead.
- breaking Starting with `opentelemetry-python-contrib` v1.32.0/0.53b0, the internal dependency resolution logic for instrumentations changed. The `KafkaInstrumentor` now explicitly assumes `kafka-python` is installed and will raise an error if it's not present, rather than passively skipping instrumentation. Dependency checks are primarily for version constraints.
- gotcha The `opentelemetry-instrumentation-kafka-python` package declares `kafka-python` as an 'optional dependency' in its `pyproject.toml` (specifically `instruments-any`). However, for the instrumentation to function, `kafka-python` *must* be installed by the user.
- gotcha Deploying OpenTelemetry in applications using pre-forking servers (e.g., Gunicorn with multiple workers) can lead to inconsistencies or deadlocks related to background threads and locks in the OpenTelemetry SDK's `PeriodicExportingMetricReader`. This can affect trace and metric collection in Kafka consumers running in such environments.
Install
-
pip install opentelemetry-instrumentation-kafka-python kafka-python opentelemetry-sdk
Imports
- KafkaInstrumentor
from opentelemetry.instrumentation.kafka import KafkaInstrumentor
Quickstart
import os
import time
from kafka import KafkaProducer, KafkaConsumer
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
from opentelemetry.instrumentation.kafka import KafkaInstrumentor
# Set up OpenTelemetry TracerProvider
provider = TracerProvider()
processor = SimpleSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
# Instrument Kafka
KafkaInstrumentor().instrument()
# --- Kafka Configuration (replace with your Kafka broker address) ---
KAFKA_BROKER = os.environ.get('KAFKA_BROKER', 'localhost:9092')
KAFKA_TOPIC = 'my_instrumented_topic'
def run_kafka_producer():
print(f"[Producer] Connecting to Kafka at {KAFKA_BROKER}...")
producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER])
message = b'Hello, OpenTelemetry Kafka!'
print(f"[Producer] Sending message: {message.decode()}")
future = producer.send(KAFKA_TOPIC, message)
record_metadata = future.get(timeout=10)
print(f"[Producer] Message sent to topic: {record_TOPIC}, partition: {record_metadata.partition}, offset: {record_metadata.offset}")
producer.close()
def run_kafka_consumer():
print(f"[Consumer] Connecting to Kafka at {KAFKA_BROKER}...")
consumer = KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=[KAFKA_BROKER],
group_id='my_otel_group',
auto_offset_reset='earliest',
enable_auto_commit=True,
consumer_timeout_ms=5000 # Stop after 5 seconds if no messages
)
print(f"[Consumer] Listening for messages on topic: {KAFKA_TOPIC}")
try:
for message in consumer:
print(f"[Consumer] Received message: {message.value.decode()} from topic: {message.topic}, partition: {message.partition}, offset: {message.offset}")
break # Consume one message and exit
except Exception as e:
print(f"[Consumer] Error: {e}")
finally:
consumer.close()
print("[Consumer] Closed Kafka consumer.")
if __name__ == "__main__":
print("Ensure a Kafka broker is running at 'localhost:9092' or set the KAFKA_BROKER environment variable.")
time.sleep(2) # Give Kafka some time to start if just launched
run_kafka_producer()
time.sleep(1)
run_kafka_consumer()