Kafka Python Next Generation Client
kafka-python-ng is a pure Python client for Apache Kafka, designed to function similarly to the official Java client, but with Pythonic interfaces. It provides high-level producer and consumer APIs, as well as admin functionality. The library is actively maintained, with frequent releases, and is compatible with Kafka brokers from version 0.8.0 up to 2.6+ (with optimal features for 0.9+). The current version is 2.2.3 and requires Python >=3.8.
Warnings
- breaking The project was renamed from `kafka-python` to `kafka-python-ng` in version `2.0.3`. This requires updating `pip install` commands and potentially import paths (`from kafka-python-ng ...` to `from kafka ...` for previous explicit imports).
- gotcha Unlike `KafkaProducer` which is thread-safe, `KafkaConsumer` is *not thread-safe*. Sharing a single `KafkaConsumer` instance across multiple threads can lead to unexpected behavior or data loss.
- breaking Support for End-of-Life (EOL) Python versions was removed starting from `v2.1.0`. The library now explicitly requires Python >=3.8.
- gotcha SSL connection issues are common, often related to certificate formats (Java Keystore/JKS vs. PEM) or incorrect `ssl_cafile` paths. Python clients typically require PEM-formatted certificates.
- gotcha Fully coordinated consumer groups, dynamic partition assignment, and offset management in `KafkaConsumer` require Kafka brokers version 0.9 or newer. Older brokers (e.g., 0.8.x) may not support these features or require manual partition assignment.
- gotcha Versions like `2.0.2` had an import issue (`ModuleNotFoundError: No module named 'kafka.vendor.six.moves'`) on certain Linux distributions (e.g., Rocky Linux 10) with Python 3.12.
Install
-
pip install kafka-python-ng
Imports
- KafkaProducer
from kafka import KafkaProducer
- KafkaConsumer
from kafka import KafkaConsumer
- KafkaAdminClient
from kafka import KafkaAdminClient
Quickstart
import os
import json
import time
from kafka import KafkaProducer, KafkaConsumer
# Configure Kafka bootstrap servers, use environment variable for production readiness
BOOTSTRAP_SERVERS = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092').split(',')
TOPIC_NAME = 'my_test_topic'
def produce_messages():
producer = KafkaProducer(
bootstrap_servers=BOOTSTRAP_SERVERS,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
print(f"Producing messages to topic: {TOPIC_NAME}")
for i in range(5):
message = {'number': i, 'timestamp': time.time()}
producer.send(TOPIC_NAME, message)
print(f"Sent: {message}")
time.sleep(1)
producer.flush()
producer.close()
print("Producer finished.")
def consume_messages():
consumer = KafkaConsumer(
TOPIC_NAME,
bootstrap_servers=BOOTSTRAP_SERVERS,
auto_offset_reset='earliest',
group_id='my_python_group',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
print(f"Consuming messages from topic: {TOPIC_NAME} (group: my_python_group)")
for message in consumer:
print(f"Received: Topic={message.topic}, Partition={message.partition}, Offset={message.offset}, Value={message.value}")
consumer.close()
print("Consumer finished.")
if __name__ == "__main__":
# In a real application, producer and consumer would likely run in separate processes/threads
# For this quickstart, we'll run producer, then consumer sequentially
# Ensure a Kafka broker is running at 'localhost:9092' or specify KAFKA_BOOTSTRAP_SERVERS env var
produce_messages()
print("\nWaiting for a moment before consuming...")
time.sleep(5) # Give Kafka time to process
consume_messages()