{"id":2634,"library":"opentelemetry-instrumentation-kafka-python","title":"OpenTelemetry Kafka-Python Instrumentation","description":"This library provides OpenTelemetry tracing instrumentation for applications using the `kafka-python` client library. It enables automatic collection of trace data for Kafka produce and consume operations, facilitating observability in distributed systems. As part of the `opentelemetry-python-contrib` project, it maintains a regular release cadence, aligning with OpenTelemetry Python SDK updates.","status":"active","version":"0.62b0","language":"en","source_language":"en","source_url":"https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/instrumentation/opentelemetry-instrumentation-kafka-python","tags":["opentelemetry","kafka","instrumentation","tracing","monitoring","observability","python"],"install":[{"cmd":"pip install opentelemetry-instrumentation-kafka-python kafka-python opentelemetry-sdk","lang":"bash","label":"Install with Kafka-Python and SDK"}],"dependencies":[{"reason":"The core Kafka client library being instrumented. Required for the instrumentation to function.","package":"kafka-python","optional":false},{"reason":"OpenTelemetry API for Python. Required runtime dependency.","package":"opentelemetry-api","optional":false},{"reason":"OpenTelemetry SDK for Python. Required for setting up tracers and exporters.","package":"opentelemetry-sdk","optional":false},{"reason":"Base OpenTelemetry instrumentation package.","package":"opentelemetry-instrumentation","optional":false},{"reason":"OpenTelemetry semantic conventions for attribute naming.","package":"opentelemetry-semantic-conventions","optional":false}],"imports":[{"symbol":"KafkaInstrumentor","correct":"from opentelemetry.instrumentation.kafka import KafkaInstrumentor"}],"quickstart":{"code":"import os\nimport time\nfrom kafka import KafkaProducer, KafkaConsumer\n\nfrom opentelemetry import trace\nfrom opentelemetry.sdk.trace import TracerProvider\nfrom opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor\nfrom opentelemetry.instrumentation.kafka import KafkaInstrumentor\n\n# Set up OpenTelemetry TracerProvider\nprovider = TracerProvider()\nprocessor = SimpleSpanProcessor(ConsoleSpanExporter())\nprovider.add_span_processor(processor)\ntrace.set_tracer_provider(provider)\n\n# Instrument Kafka\nKafkaInstrumentor().instrument()\n\n# --- Kafka Configuration (replace with your Kafka broker address) ---\nKAFKA_BROKER = os.environ.get('KAFKA_BROKER', 'localhost:9092')\nKAFKA_TOPIC = 'my_instrumented_topic'\n\ndef run_kafka_producer():\n    print(f\"[Producer] Connecting to Kafka at {KAFKA_BROKER}...\")\n    producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER])\n    message = b'Hello, OpenTelemetry Kafka!'\n    print(f\"[Producer] Sending message: {message.decode()}\")\n    future = producer.send(KAFKA_TOPIC, message)\n    record_metadata = future.get(timeout=10)\n    print(f\"[Producer] Message sent to topic: {record_TOPIC}, partition: {record_metadata.partition}, offset: {record_metadata.offset}\")\n    producer.close()\n\ndef run_kafka_consumer():\n    print(f\"[Consumer] Connecting to Kafka at {KAFKA_BROKER}...\")\n    consumer = KafkaConsumer(\n        KAFKA_TOPIC,\n        bootstrap_servers=[KAFKA_BROKER],\n        group_id='my_otel_group',\n        auto_offset_reset='earliest',\n        enable_auto_commit=True,\n        consumer_timeout_ms=5000 # Stop after 5 seconds if no messages\n    )\n    print(f\"[Consumer] Listening for messages on topic: {KAFKA_TOPIC}\")\n    try:\n        for message in consumer:\n            print(f\"[Consumer] Received message: {message.value.decode()} from topic: {message.topic}, partition: {message.partition}, offset: {message.offset}\")\n            break # Consume one message and exit\n    except Exception as e:\n        print(f\"[Consumer] Error: {e}\")\n    finally:\n        consumer.close()\n        print(\"[Consumer] Closed Kafka consumer.\")\n\nif __name__ == \"__main__\":\n    print(\"Ensure a Kafka broker is running at 'localhost:9092' or set the KAFKA_BROKER environment variable.\")\n    time.sleep(2) # Give Kafka some time to start if just launched\n    run_kafka_producer()\n    time.sleep(1)\n    run_kafka_consumer()\n","lang":"python","description":"This quickstart demonstrates how to instrument `kafka-python` producer and consumer operations. It sets up a basic OpenTelemetry `TracerProvider` with a `ConsoleSpanExporter` to print traces to the console. The `KafkaInstrumentor().instrument()` call automatically instruments the `kafka-python` library. Requires a running Kafka broker (default `localhost:9092`)."},"warnings":[{"fix":"Use `opentelemetry-instrumentation-aiokafka` for asynchronous Kafka clients or ensure you are using the synchronous `kafka-python` library.","message":"This instrumentation library (`opentelemetry-instrumentation-kafka-python`) is designed for the synchronous `kafka-python` client. It does NOT support `aiokafka` (asynchronous Kafka client). For `aiokafka` instrumentation, use `opentelemetry-instrumentation-aiokafka` instead.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Always explicitly install `kafka-python` alongside `opentelemetry-instrumentation-kafka-python`. Do not rely on automatic dependency resolution to install the core Kafka library if you intend to instrument it.","message":"Starting with `opentelemetry-python-contrib` v1.32.0/0.53b0, the internal dependency resolution logic for instrumentations changed. The `KafkaInstrumentor` now explicitly assumes `kafka-python` is installed and will raise an error if it's not present, rather than passively skipping instrumentation. Dependency checks are primarily for version constraints.","severity":"breaking","affected_versions":">= 1.32.0/0.53b0 (contrib repo version)"},{"fix":"Ensure `kafka-python` is explicitly installed: `pip install opentelemetry-instrumentation-kafka-python kafka-python`.","message":"The `opentelemetry-instrumentation-kafka-python` package declares `kafka-python` as an 'optional dependency' in its `pyproject.toml` (specifically `instruments-any`). However, for the instrumentation to function, `kafka-python` *must* be installed by the user.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Consider using programmatic instrumentation, deploying with a single worker process, or setting up a `PeriodicExportingMetricReader` and one OTLP worker per process when using Prometheus to receive OTLP metrics directly. Avoid `PrometheusMetricReader` with forking.","message":"Deploying OpenTelemetry in applications using pre-forking servers (e.g., Gunicorn with multiple workers) can lead to inconsistencies or deadlocks related to background threads and locks in the OpenTelemetry SDK's `PeriodicExportingMetricReader`. This can affect trace and metric collection in Kafka consumers running in such environments.","severity":"gotcha","affected_versions":"All versions of OpenTelemetry Python SDK"}],"env_vars":null,"last_verified":"2026-04-10T00:00:00.000Z","next_check":"2026-07-09T00:00:00.000Z"}