kafka-python
kafka-python is a pure Python client for Apache Kafka, designed to function much like the official Java client. It provides high-level Producer and Consumer APIs with Pythonic interfaces, including consumer iterators. The library is currently in version 2.3.0, released in November 2025, and maintains an active development and release cadence with frequent bug fixes and feature enhancements, while also being backwards-compatible with older Kafka brokers (to 0.8.0).
Warnings
- breaking Version 2.3.x will be the last release branch to support Python 2. Future major versions will drop Python 2 compatibility entirely.
- gotcha The `KafkaConsumer` is not thread-safe. Sharing a single `KafkaConsumer` instance across multiple threads can lead to unpredictable behavior and data corruption. For concurrent consumption, multiprocessing is recommended.
- gotcha Messages are raw bytes in Kafka. Failing to provide appropriate `key_deserializer` and `value_deserializer` functions to `KafkaConsumer` (and serializers for `KafkaProducer`) can lead to deserialization errors or incorrect data interpretation. JSON, Avro, or Protobuf payloads require explicit (de)serialization.
- gotcha Not explicitly calling `producer.flush()` or `producer.close()` before application exit may result in lost messages that are still buffered. Similarly, `consumer.close()` is crucial to commit offsets and leave the consumer group cleanly.
- gotcha Misconfiguring consumer `group_id` or `auto_offset_reset`, or not understanding manual offset commits, can lead to message re-processing, data loss, or improper partition rebalances within consumer groups. `group_id=None` disables auto-assignment and offset commits.
- gotcha Setting `request.timeout.ms` (client-side config) too low can exacerbate transient broker issues, leading to premature retries or failures. A shorter timeout might seem intuitive for quicker reactions, but it can worsen performance during temporary network glitches or high broker load.
Install
-
pip install kafka-python
Imports
- KafkaProducer
from kafka import KafkaProducer
- KafkaConsumer
from kafka import KafkaConsumer
- KafkaAdminClient
from kafka.admin import KafkaAdminClient
Quickstart
import json
import time
from kafka import KafkaProducer, KafkaConsumer
# Ensure Kafka is running, e.g., on localhost:9092
BOOTSTRAP_SERVERS = 'localhost:9092'
TOPIC_NAME = 'my_test_topic'
# --- Producer Example ---
producer = KafkaProducer(
bootstrap_servers=[BOOTSTRAP_SERVERS],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
try:
for i in range(5):
message = {'number': i, 'timestamp': time.time()}
future = producer.send(TOPIC_NAME, message)
record_metadata = future.get(timeout=10) # Block for confirmation
print(f"Produced message: {message} to topic {record_metadata.topic}, partition {record_metadata.partition}, offset {record_metadata.offset}")
time.sleep(1)
except Exception as e:
print(f"Producer error: {e}")
finally:
producer.flush()
producer.close()
print("Producer closed.")
# --- Consumer Example ---
consumer = KafkaConsumer(
TOPIC_NAME,
bootstrap_servers=[BOOTSTRAP_SERVERS],
auto_offset_reset='earliest', # Start consuming from the earliest available message
enable_auto_commit=True,
group_id='my_python_group',
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
consumer_timeout_ms=5000 # Stop waiting for messages after 5 seconds
)
print(f"\nConsuming messages from topic: {TOPIC_NAME}")
try:
for message in consumer:
print(f"Consumed message: key={message.key}, value={message.value}, topic={message.topic}, partition={message.partition}, offset={message.offset}")
except Exception as e:
print(f"Consumer error: {e}")
finally:
consumer.close()
print("Consumer closed.")