{"id":4482,"library":"confluent-kafka-stubs","title":"confluent-kafka-stubs for Type Hinting","description":"confluent-kafka-stubs provides static type stub files for the `confluent-kafka` Python client, enabling improved type checking with tools like MyPy. It is currently at version 0.0.3 and is released as part of the main `confluent-kafka-python` project, with updates typically mirroring the development of the core library.","status":"active","version":"0.0.3","language":"en","source_language":"en","source_url":"https://github.com/confluentinc/confluent-kafka-python/tree/master/stubs","tags":["kafka","confluent","stubs","type_hints","mypy","static_analysis"],"install":[{"cmd":"pip install confluent-kafka-stubs","lang":"bash","label":"Install stubs"},{"cmd":"pip install confluent-kafka","lang":"bash","label":"Install core library (required)"}],"dependencies":[{"reason":"This library provides type stubs for 'confluent-kafka'. It is essential to install the actual 'confluent-kafka' library for runtime execution.","package":"confluent-kafka","optional":false}],"imports":[{"note":"confluent-kafka-stubs provides type hints for the 'confluent_kafka' library. You do not import directly from 'confluent_kafka_stubs' at runtime. Your code should import from 'confluent_kafka' as usual, and the stubs will be picked up by type checkers.","symbol":"KafkaProducer, KafkaConsumer, Message","correct":"from confluent_kafka import Producer, Consumer, Message"}],"quickstart":{"code":"import os\nfrom confluent_kafka import Producer, Consumer, KafkaException\n\ndef delivery_report(err, msg):\n    if err is not None:\n        print(f\"Message delivery failed: {err}\")\n    else:\n        print(f\"Message delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}\")\n\ndef run_example():\n    # Configuration for Kafka (example with local Kafka or Confluent Cloud)\n    # Replace with your actual Kafka broker(s)\n    broker = os.environ.get('KAFKA_BROKER', 'localhost:9092')\n    conf = {\n        'bootstrap.servers': broker,\n        # 'sasl.username': os.environ.get('KAFKA_USERNAME'),\n        # 'sasl.password': os.environ.get('KAFKA_PASSWORD'),\n        # 'security.protocol': 'SASL_SSL',\n        # 'sasl.mechanisms': 'PLAIN',\n        'acks': 'all'\n    }\n\n    topic = 'my_example_topic'\n\n    # Producer\n    print(\"--- Producer ---\")\n    producer = Producer(conf)\n    try:\n        producer.produce(topic, key='key1', value='hello world from stubs!', callback=delivery_report)\n        producer.flush(10) # Wait for up to 10 seconds for any outstanding messages to be delivered\n    except KafkaException as e:\n        print(f\"Producer error: {e}\")\n\n    # Consumer\n    print(\"\\n--- Consumer ---\")\n    consumer_conf = {\n        'bootstrap.servers': broker,\n        'group.id': 'my_consumer_group',\n        'auto.offset.reset': 'earliest'\n    }\n    consumer = Consumer(consumer_conf)\n    try:\n        consumer.subscribe([topic])\n        # Poll for messages for a short duration\n        msg = consumer.poll(timeout=5.0)\n        if msg is None:\n            print(\"No message received within timeout.\")\n        elif msg.error():\n            print(f\"Consumer error: {msg.error()}\")\n        else:\n            print(f\"Received message: key={msg.key().decode('utf-8')}, value={msg.value().decode('utf-8')}\")\n    except KafkaException as e:\n        print(f\"Consumer error: {e}\")\n    finally:\n        consumer.close()\n\nif __name__ == '__main__':\n    # To run this with type checking, save as e.g., 'example.py'\n    # Then run 'mypy example.py'\n    # And 'python example.py' (ensure Kafka broker is running)\n    run_example()\n","lang":"python","description":"This quickstart demonstrates a simple producer and consumer using `confluent-kafka`. To leverage the `confluent-kafka-stubs` for type checking, ensure both `confluent-kafka` and `confluent-kafka-stubs` are installed in your environment. You can then run a static analysis tool like MyPy on your code to benefit from the provided type hints."},"warnings":[{"fix":"Always install `confluent-kafka` alongside `confluent-kafka-stubs`: `pip install confluent-kafka confluent-kafka-stubs`.","message":"confluent-kafka-stubs provides only type information for static analysis (e.g., MyPy, IDE auto-completion). It does not contain any executable code or change the runtime behavior of `confluent-kafka`. You must still install and use the `confluent-kafka` library for your application to function.","severity":"gotcha","affected_versions":"0.0.1+"},{"fix":"Keep `confluent-kafka-stubs` updated to a version compatible with your `confluent-kafka` installation. Generally, they are developed together within the same repository, so using the latest available stubs is recommended.","message":"Mismatching versions between `confluent-kafka` and `confluent-kafka-stubs` can lead to incorrect type hints or MyPy errors. If the underlying library's API changes significantly, the stubs might become outdated.","severity":"gotcha","affected_versions":"0.0.1+"},{"fix":"While stubs significantly improve type safety, be aware that complex scenarios might still require some manual type annotation or `type: ignore` comments in your code. Consult `confluent-kafka`'s actual documentation for definitive API behavior.","message":"Some advanced features or dynamic behaviors of `confluent-kafka` (e.g., highly dynamic configurations or callback signatures) might not be perfectly covered by static stubs, leading to occasional `Any` types or less precise hints.","severity":"gotcha","affected_versions":"0.0.1+"}],"env_vars":null,"last_verified":"2026-04-12T00:00:00.000Z","next_check":"2026-07-11T00:00:00.000Z"}