{"id":4367,"library":"kafka-python-ng","title":"Kafka Python Next Generation Client","description":"kafka-python-ng is a pure Python client for Apache Kafka, designed to function similarly to the official Java client, but with Pythonic interfaces. It provides high-level producer and consumer APIs, as well as admin functionality. The library is actively maintained, with frequent releases, and is compatible with Kafka brokers from version 0.8.0 up to 2.6+ (with optimal features for 0.9+). The current version is 2.2.3 and requires Python >=3.8.","status":"active","version":"2.2.3","language":"en","source_language":"en","source_url":"https://github.com/wbarnha/kafka-python-ng","tags":["kafka","messaging","pubsub","streaming","apache-kafka","client"],"install":[{"cmd":"pip install kafka-python-ng","lang":"bash","label":"Install stable version"}],"dependencies":[{"reason":"Optional C-optimized CRC32 validation for improved performance.","package":"crc32c","optional":true},{"reason":"Optional LZ4 compression support.","package":"lz4","optional":true},{"reason":"Optional Snappy compression support.","package":"snappy","optional":true},{"reason":"Optional Zstandard compression support.","package":"zstd","optional":true}],"imports":[{"symbol":"KafkaProducer","correct":"from kafka import KafkaProducer"},{"symbol":"KafkaConsumer","correct":"from kafka import KafkaConsumer"},{"symbol":"KafkaAdminClient","correct":"from kafka import KafkaAdminClient"}],"quickstart":{"code":"import os\nimport json\nimport time\nfrom kafka import KafkaProducer, KafkaConsumer\n\n# Configure Kafka bootstrap servers, use environment variable for production readiness\nBOOTSTRAP_SERVERS = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092').split(',')\nTOPIC_NAME = 'my_test_topic'\n\ndef produce_messages():\n    producer = KafkaProducer(\n        bootstrap_servers=BOOTSTRAP_SERVERS,\n        value_serializer=lambda v: json.dumps(v).encode('utf-8')\n    )\n    print(f\"Producing messages to topic: {TOPIC_NAME}\")\n    for i in range(5):\n        message = {'number': i, 'timestamp': time.time()}\n        producer.send(TOPIC_NAME, message)\n        print(f\"Sent: {message}\")\n        time.sleep(1)\n    producer.flush()\n    producer.close()\n    print(\"Producer finished.\")\n\ndef consume_messages():\n    consumer = KafkaConsumer(\n        TOPIC_NAME,\n        bootstrap_servers=BOOTSTRAP_SERVERS,\n        auto_offset_reset='earliest',\n        group_id='my_python_group',\n        value_deserializer=lambda m: json.loads(m.decode('utf-8'))\n    )\n    print(f\"Consuming messages from topic: {TOPIC_NAME} (group: my_python_group)\")\n    for message in consumer:\n        print(f\"Received: Topic={message.topic}, Partition={message.partition}, Offset={message.offset}, Value={message.value}\")\n    consumer.close()\n    print(\"Consumer finished.\")\n\nif __name__ == \"__main__\":\n    # In a real application, producer and consumer would likely run in separate processes/threads\n    # For this quickstart, we'll run producer, then consumer sequentially\n    # Ensure a Kafka broker is running at 'localhost:9092' or specify KAFKA_BOOTSTRAP_SERVERS env var\n    produce_messages()\n    print(\"\\nWaiting for a moment before consuming...\")\n    time.sleep(5) # Give Kafka time to process\n    consume_messages()\n","lang":"python","description":"This quickstart demonstrates a basic Kafka producer and consumer. The producer sends JSON-serialized messages to a topic, and the consumer reads and deserializes them. It's configured to connect to `localhost:9092` by default, but can be configured via the `KAFKA_BOOTSTRAP_SERVERS` environment variable for production environments. For the consumer, `auto_offset_reset='earliest'` ensures it starts reading from the beginning of the topic if no offset is committed, and `group_id` enables coordinated consumer group functionality."},"warnings":[{"fix":"Update your `requirements.txt` to `kafka-python-ng` and ensure `from kafka import ...` is used for client classes. If you were explicitly using `kafka_python` in imports, switch to `kafka`.","message":"The project was renamed from `kafka-python` to `kafka-python-ng` in version `2.0.3`. This requires updating `pip install` commands and potentially import paths (`from kafka-python-ng ...` to `from kafka ...` for previous explicit imports).","severity":"breaking","affected_versions":">=2.0.3"},{"fix":"Use a separate `KafkaConsumer` instance per thread, or preferably, use multiprocessing for concurrent consumption.","message":"Unlike `KafkaProducer` which is thread-safe, `KafkaConsumer` is *not thread-safe*. Sharing a single `KafkaConsumer` instance across multiple threads can lead to unexpected behavior or data loss.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Upgrade your Python environment to version 3.8 or newer.","message":"Support for End-of-Life (EOL) Python versions was removed starting from `v2.1.0`. The library now explicitly requires Python >=3.8.","severity":"breaking","affected_versions":">=2.1.0"},{"fix":"Ensure all SSL certificates (CA, client cert, client key) are in PEM format. For JKS, convert them using `keytool` and `openssl`. Verify `ssl_cafile` points to the correct CA certificate chain.","message":"SSL connection issues are common, often related to certificate formats (Java Keystore/JKS vs. PEM) or incorrect `ssl_cafile` paths. Python clients typically require PEM-formatted certificates.","severity":"gotcha","affected_versions":"All versions"},{"fix":"For full consumer group functionality, ensure your Kafka broker version is 0.9 or higher. If using older brokers, you might need to manage partitions and offsets manually.","message":"Fully coordinated consumer groups, dynamic partition assignment, and offset management in `KafkaConsumer` require Kafka brokers version 0.9 or newer. Older brokers (e.g., 0.8.x) may not support these features or require manual partition assignment.","severity":"gotcha","affected_versions":"All versions with Kafka brokers < 0.9"},{"fix":"Upgrade to `kafka-python-ng` version `2.0.3` or newer to resolve this import error.","message":"Versions like `2.0.2` had an import issue (`ModuleNotFoundError: No module named 'kafka.vendor.six.moves'`) on certain Linux distributions (e.g., Rocky Linux 10) with Python 3.12.","severity":"gotcha","affected_versions":"2.0.2"}],"env_vars":null,"last_verified":"2026-04-12T00:00:00.000Z","next_check":"2026-07-11T00:00:00.000Z"}