{"id":3196,"library":"opentelemetry-instrumentation-confluent-kafka","title":"OpenTelemetry Confluent Kafka Instrumentation","description":"This library provides OpenTelemetry instrumentation for the `confluent-kafka` Python client, enabling tracing of messages produced and consumed via Kafka. It is part of the `opentelemetry-python-contrib` project, which typically follows a beta release cadence, indicating ongoing development and potential API changes.","status":"active","version":"0.62b0","language":"en","source_language":"en","source_url":"https://github.com/open-telemetry/opentelemetry-python-contrib","tags":["opentelemetry","observability","kafka","tracing","instrumentation","confluent-kafka"],"install":[{"cmd":"pip install opentelemetry-instrumentation-confluent-kafka","lang":"bash","label":"Install library"}],"dependencies":[{"reason":"This library instruments the confluent-kafka client for Kafka interaction.","package":"confluent-kafka","optional":false}],"imports":[{"symbol":"ConfluentKafkaInstrumentor","correct":"from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor"}],"quickstart":{"code":"import os\nfrom opentelemetry import trace\nfrom opentelemetry.sdk.resources import Resource\nfrom opentelemetry.sdk.trace import TracerProvider\nfrom opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor\nfrom opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor\nfrom confluent_kafka import Producer, Consumer, KafkaException\n\n# Configure OpenTelemetry SDK (for demonstration purposes, using ConsoleSpanExporter)\nresource = Resource.create({\"service.name\": \"kafka-app\"})\ntracer_provider = TracerProvider(resource=resource)\ntracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))\ntrace.set_tracer_provider(tracer_provider)\n\n# Instrument the confluent_kafka library\nConfluentKafkaInstrumentor().instrument()\n\n# Get a tracer\ntracer = trace.get_tracer(__name__)\n\n# Kafka configuration (replace with your Kafka broker details)\nbootstrap_servers = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092')\ntopic = 'my_instrumented_topic'\n\ndef produce_message():\n    producer_conf = {'bootstrap.servers': bootstrap_servers}\n    producer = Producer(producer_conf)\n\n    with tracer.start_as_current_span(\"produce-kafka-message\") as span:\n        try:\n            producer.produce(topic, key='key', value='Hello, OpenTelemetry Kafka!', callback=lambda err, msg: None)\n            producer.flush(timeout=5) # Ensure message is sent\n            print(\"Produced: Hello, OpenTelemetry Kafka!\")\n        except Exception as e:\n            span.record_exception(e)\n            print(f\"Producer error: {e}\")\n\ndef consume_message():\n    consumer_conf = {\n        'bootstrap.servers': bootstrap_servers,\n        'group.id': 'my_consumer_group',\n        'auto.offset.reset': 'earliest'\n    }\n    consumer = Consumer(consumer_conf)\n\n    with tracer.start_as_current_span(\"consume-kafka-message\") as span:\n        try:\n            consumer.subscribe([topic])\n            msg = consumer.poll(timeout=1.0) # Poll for a message\n            if msg is None:\n                print(\"No message received.\")\n            elif msg.error():\n                raise KafkaException(msg.error())\n            else:\n                print(f\"Consumed: {msg.value().decode('utf-8')}\")\n            consumer.close()\n        except KafkaException as e:\n            span.record_exception(e)\n            print(f\"Consumer error: {e}\")\n        except Exception as e:\n            span.record_exception(e)\n            print(f\"An unexpected error occurred: {e}\")\n\nif __name__ == '__main__':\n    # Example of setting an environment variable for Kafka brokers if needed\n    # os.environ['KAFKA_BOOTSTRAP_SERVERS'] = 'your_kafka_broker:9092'\n    produce_message()\n    consume_message()","lang":"python","description":"This quickstart demonstrates how to enable OpenTelemetry tracing for Confluent Kafka producers and consumers. It sets up a minimal OpenTelemetry SDK with a console exporter, then uses `ConfluentKafkaInstrumentor().instrument()` to automatically instrument the `confluent-kafka` library. Finally, it shows a simple producer sending a message and a consumer receiving it, with traces visible in the console output."},"warnings":[{"fix":"Ensure your code handles the return values of `Producer.poll()` and `Producer.flush()` or upgrade the instrumentation library to a compatible version.","message":"In versions prior to `1.25.0/0.46b0`, the `Producer.poll()` and `Producer.flush()` methods, when instrumented, did not return values. As of `1.25.0/0.46b0`, these methods now return values as expected, which may affect code relying on their prior non-returning behavior when instrumented.","severity":"breaking","affected_versions":"< 1.25.0/0.46b0"},{"fix":"Monitor release notes for breaking changes before upgrading in production environments. Consider pinning to specific minor versions to manage changes.","message":"This instrumentation library is currently in a beta (b0) release state. This means its API and behavior may change in future versions, and it is not yet considered stable for production environments where API stability is critical.","severity":"gotcha","affected_versions":"0.x.xb0 series"},{"fix":"Upgrade to `opentelemetry-instrumentation-confluent-kafka==0.62b0` or higher to benefit from improved `confluent-kafka` version compatibility. Always test with your specific `confluent-kafka` version.","message":"Versions of `opentelemetry-instrumentation-confluent-kafka` prior to `0.62b0` might have stricter or implicit upper bounds on the `confluent-kafka` dependency, potentially causing compatibility issues with newer `confluent-kafka` versions. As of `0.62b0`, the upper bound has been loosened to `<3.0.0`.","severity":"gotcha","affected_versions":"< 0.62b0"},{"fix":"Ensure a complete OpenTelemetry SDK setup is performed in your application's entry point before any instrumented code runs. Refer to the OpenTelemetry Python SDK documentation for full setup details.","message":"Simply installing and calling `instrument()` on an OpenTelemetry instrumentation library is not enough to generate traces. You must also configure an OpenTelemetry `TracerProvider`, `SpanProcessor`, and an `Exporter` (e.g., `OTLPSpanExporter` or `ConsoleSpanExporter`) for traces to be collected and sent.","severity":"gotcha","affected_versions":"All versions"}],"env_vars":null,"last_verified":"2026-04-11T00:00:00.000Z","next_check":"2026-07-10T00:00:00.000Z"}