{"id":2627,"library":"opentelemetry-instrumentation-aiokafka","title":"OpenTelemetry AioKafka Instrumentation","description":"This library provides OpenTelemetry instrumentation for the `aiokafka` client, enabling automatic tracing of Kafka producer and consumer operations within `asyncio` applications. As part of the `opentelemetry-python-contrib` project, it currently operates under a beta release schedule (0.x.x series) with frequent updates and bug fixes, aligning with the broader OpenTelemetry Python ecosystem development.","status":"active","version":"0.62b0","language":"en","source_language":"en","source_url":"https://github.com/open-telemetry/opentelemetry-python-contrib","tags":["opentelemetry","instrumentation","kafka","aiokafka","observability","tracing","asyncio","metrics"],"install":[{"cmd":"pip install opentelemetry-instrumentation-aiokafka opentelemetry-sdk aiokafka","lang":"bash","label":"Install with core OTel SDK and aiokafka"}],"dependencies":[{"reason":"Peer dependency, required for actual Kafka communication.","package":"aiokafka","optional":false},{"reason":"Core OpenTelemetry SDK for tracer provider and exporters.","package":"opentelemetry-sdk","optional":false},{"reason":"Core OpenTelemetry API for tracing interfaces.","package":"opentelemetry-api","optional":false}],"imports":[{"symbol":"AioKafkaInstrumentor","correct":"from opentelemetry.instrumentation.aiokafka import AioKafkaInstrumentor"}],"quickstart":{"code":"import asyncio\nimport os\n\nfrom opentelemetry import trace\nfrom opentelemetry.sdk.trace import TracerProvider\nfrom opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor\nfrom opentelemetry.instrumentation.aiokafka import AioKafkaInstrumentor\nfrom aiokafka import AIOKafkaProducer, AIOKafkaConsumer\n\n# Configure OpenTelemetry TracerProvider\nprovider = TracerProvider()\nprocessor = SimpleSpanProcessor(ConsoleSpanExporter())\nprovider.add_span_processor(processor)\ntrace.set_tracer_provider(provider)\n\n# Instrument aiokafka\nAioKafkaInstrumentor().instrument()\n\nKAFKA_BOOTSTRAP_SERVERS = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092')\nKAFKA_TOPIC = \"test_topic\"\n\nasync def produce_message():\n    producer = AIOKafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)\n    await producer.start()\n    try:\n        with trace.get_tracer(__name__).start_as_current_span(\"send_test_message\"):\n            message = b\"Hello OpenTelemetry from AioKafka!\"\n            print(f\"Producing message: {message.decode()}\")\n            await producer.send_and_wait(KAFKA_TOPIC, message)\n            print(\"Message produced.\")\n    finally:\n        await producer.stop()\n\nasync def consume_messages():\n    consumer = AIOKafkaConsumer(\n        KAFKA_TOPIC,\n        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,\n        group_id=\"my-otel-group\",\n        auto_offset_reset=\"earliest\"\n    )\n    await consumer.start()\n    try:\n        print(\"Consumer started, waiting for messages...\")\n        # Consume at least one message\n        async for msg in consumer:\n            with trace.get_tracer(__name__).start_as_current_span(\"process_kafka_message\"):\n                print(f\"Consumed: Topic={msg.topic}, Partition={msg.partition}, Offset={msg.offset}, Value={msg.value.decode()}\")\n            break # For quickstart, consume one and exit\n    finally:\n        await consumer.stop()\n\nasync def main():\n    # Ensure Kafka is running at KAFKA_BOOTSTRAP_SERVERS before running.\n    # You might need to create the topic 'test_topic' manually or configure Kafka for auto-creation.\n    producer_task = asyncio.create_task(produce_message())\n    consumer_task = asyncio.create_task(consume_messages())\n\n    await asyncio.gather(producer_task, consumer_task)\n\nif __name__ == \"__main__\":\n    print(f\"Using Kafka brokers: {KAFKA_BOOTSTRAP_SERVERS}\")\n    asyncio.run(main())\n","lang":"python","description":"This quickstart demonstrates how to instrument `aiokafka` producer and consumer using `opentelemetry-instrumentation-aiokafka`. It sets up a basic `TracerProvider` with a `ConsoleSpanExporter` to print traces to the console, then uses `AioKafkaInstrumentor().instrument()` to automatically trace `AIOKafkaProducer` and `AIOKafkaConsumer` operations. Ensure a Kafka broker is running and accessible at `localhost:9092` (or via the `KAFKA_BOOTSTRAP_SERVERS` environment variable)."},"warnings":[{"fix":"Pin the exact version (e.g., `opentelemetry-instrumentation-aiokafka==0.62b0`) and carefully review `opentelemetry-python-contrib` changelogs for updates before upgrading.","message":"This instrumentation library is in beta (`0.x.x` versioning with a `b` suffix), indicating that its API and behavior may change without strict adherence to semantic versioning. Always pin your version and review release notes for potential breaking changes between minor versions.","severity":"breaking","affected_versions":"0.x.x (all beta versions)"},{"fix":"Refer to the `opentelemetry-python-contrib` GitHub issues and `aiokafka` documentation for known compatible versions. Test thoroughly after any updates to `aiokafka` or the instrumentation.","message":"Ensure the underlying `aiokafka` library version is compatible with the instrumentation. Past issues (e.g., with `0.52b0` and `aiokafka~=0.12.0`) have shown compatibility problems leading to runtime errors, particularly with `AIOKafkaProducer.send()` arguments.","severity":"gotcha","affected_versions":"All versions, especially older beta releases"},{"fix":"Place `AioKafkaInstrumentor().instrument()` at the very beginning of your application's entry point, after configuring the OpenTelemetry SDK.","message":"The `instrument()` method must be called early in your application's lifecycle, before `aiokafka` `AIOKafkaProducer` or `AIOKafkaConsumer` instances are created. If `instrument()` is called too late, existing `aiokafka` objects will not be instrumented.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Start a Kafka broker locally (e.g., via Docker or a local installation) or configure `KAFKA_BOOTSTRAP_SERVERS` to point to a running instance.","message":"For the instrumentation to generate visible traces, a Kafka broker must be running and accessible at the specified `bootstrap_servers`. The provided quickstart assumes `localhost:9092`.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Replace `ConsoleSpanExporter` with a suitable exporter, such as `OtlpSpanExporter`, and configure it with the appropriate endpoint and credentials.","message":"The quickstart uses a `ConsoleSpanExporter`, which prints traces to standard output. For production environments, you will need to configure a more robust exporter (e.g., OTLP Exporter) to send traces to an observability backend like Jaeger, Zipkin, or an OpenTelemetry Collector.","severity":"gotcha","affected_versions":"All versions"}],"env_vars":null,"last_verified":"2026-04-10T00:00:00.000Z","next_check":"2026-07-09T00:00:00.000Z"}