Pure Python client for Apache Kafka
raw JSON → 2.3.1 verified Tue May 12 auth: no python install: verified
kafka-python is a pure Python client for Apache Kafka, designed to function much like the official Java client with a set of Pythonic interfaces for producing, consuming, and administering Kafka topics. It is actively maintained with frequent releases and supports Kafka brokers from version 0.8.0 up to 4.0 and beyond due to backward compatibility of the Kafka protocol. The current latest stable release is 2.3.1, though PyPI may list 2.3.0 as the newest packaged version.
pip install kafka-python Common errors
error kafka.errors.NoBrokersAvailable: NoBrokersAvailable ↓
cause The Kafka client cannot establish a connection with the specified bootstrap servers, often due to the Kafka broker being down, incorrect host/port configuration, network issues (firewall, DNS), or client-broker version incompatibility.
fix
Verify that Kafka brokers are running and accessible from the client, check network connectivity and firewall rules, ensure
bootstrap_servers is configured with correct 'host:port' pairs, and confirm client/broker version compatibility (sometimes by explicitly setting api_version). error kafka.errors.OffsetOutOfRangeError ↓
cause A Kafka consumer attempts to fetch messages from an offset that no longer exists in the broker's log, typically because old messages have been deleted due to retention policies or the consumer's committed offset is stale.
fix
Configure the
auto_offset_reset parameter in KafkaConsumer to either 'earliest' (to start consuming from the beginning of the available log) or 'latest' (to start from the newest messages) to handle out-of-range offsets gracefully. error kafka.errors.AuthenticationFailedError: Authentication failed for user ... ↓
cause The Kafka client is unable to authenticate with the broker, often due to incorrect SASL configuration parameters (e.g., `sasl_plain_username`, `sasl_plain_password`, `sasl_mechanism`) or an improperly configured Kerberos setup if GSSAPI is used.
fix
Double-check the
security_protocol, sasl_mechanism, sasl_plain_username, and sasl_plain_password in your Kafka client configuration, ensuring they match the broker's security settings and that credentials are correct. For Kerberos, confirm the client machine has a valid Kerberos ticket and configuration. error kafka.errors.TimeoutException ↓
cause A Kafka client (producer or consumer) operation, such as sending a message, fetching metadata, or committing offsets, failed to complete within the configured timeout period, often due to network instability, an overloaded Kafka broker, or leader election issues.
fix
Increase
request_timeout_ms and delivery_timeout_ms in your client configuration, verify network connectivity between the client and Kafka brokers, monitor broker health and load, and ensure that all topic partitions have an active leader. Warnings
breaking Python 2 support was officially dropped with `kafka-python` release 2.3.x. Users on Python 2 must use an older version of the library. ↓
fix Upgrade to Python 3.8+ or pin `kafka-python` to a version compatible with Python 2.x (e.g., < 2.3.0).
gotcha The `KafkaConsumer` class is *not* thread-safe. While `KafkaProducer` can be shared across threads, `KafkaConsumer` instances should not. For concurrent consumption, it's recommended to use multiprocessing or a separate `KafkaConsumer` instance per thread with proper synchronization. ↓
fix Use separate `KafkaConsumer` instances for each thread or process, or manage concurrency via multiprocessing.
gotcha For high-throughput applications, inefficient serialization (e.g., plain JSON) can become a bottleneck. Using more efficient formats like Avro or Protobuf with appropriate `key_serializer` and `value_serializer` functions is recommended. ↓
fix Implement custom `key_serializer` and `value_deserializer` functions using libraries like Avro or Protobuf for better performance and schema enforcement.
gotcha Relying solely on `enable_auto_commit=True` can lead to message duplication or loss in case of consumer crashes, as offsets might be committed before messages are fully processed. ↓
fix Set `enable_auto_commit=False` and manually commit offsets using `consumer.commit()` after successful message processing to ensure exactly-once or at-least-once processing semantics.
gotcha For Kafka brokers version 0.11 and above, `kafka-python` uses a new message protocol that requires `crc32c` for checksum validation. The pure Python implementation is significantly slower. ↓
fix Install the `crc32c` package (e.g., `pip install 'kafka-python[crc32c]'`) for optimized native code, especially in high-throughput scenarios.
gotcha `KafkaAdminClient` was historically marked as an unstable interface and, while improved, changes to its internal protocol tuple returns have occurred. Users should be mindful of potential interface adjustments in minor releases. ↓
fix Refer to release notes for `KafkaAdminClient` changes, and consider thorough testing if using advanced admin features. It's generally more stable in recent 2.x releases.
Install
pip install 'kafka-python[crc32c,lz4,snappy,zstd]' Install compatibility verified last tested: 2026-05-12
python os / libc variant status wheel install import disk
3.10 alpine (musl) crc32c,lz4,snappy,zstd build_error - - - -
3.10 alpine (musl) kafka-python wheel - 0.18s 20.8M
3.10 alpine (musl) kafka-python - - 0.19s 20.8M
3.10 slim (glibc) crc32c,lz4,snappy,zstd wheel 2.9s 0.16s 55M
3.10 slim (glibc) kafka-python wheel 1.8s 0.14s 21M
3.10 slim (glibc) kafka-python - - 0.13s 21M
3.11 alpine (musl) crc32c,lz4,snappy,zstd build_error - - - -
3.11 alpine (musl) kafka-python wheel - 0.32s 23.2M
3.11 alpine (musl) kafka-python - - 0.38s 23.2M
3.11 slim (glibc) crc32c,lz4,snappy,zstd wheel 2.6s 0.30s 57M
3.11 slim (glibc) kafka-python wheel 1.8s 0.26s 24M
3.11 slim (glibc) kafka-python - - 0.29s 24M
3.12 alpine (musl) crc32c,lz4,snappy,zstd build_error - - - -
3.12 alpine (musl) kafka-python wheel - 0.23s 15.0M
3.12 alpine (musl) kafka-python - - 0.25s 15.0M
3.12 slim (glibc) crc32c,lz4,snappy,zstd wheel 2.4s 0.27s 49M
3.12 slim (glibc) kafka-python wheel 1.6s 0.24s 15M
3.12 slim (glibc) kafka-python - - 0.29s 15M
3.13 alpine (musl) crc32c,lz4,snappy,zstd build_error - - - -
3.13 alpine (musl) kafka-python wheel - 0.24s 14.8M
3.13 alpine (musl) kafka-python - - 0.24s 14.6M
3.13 slim (glibc) crc32c,lz4,snappy,zstd wheel 2.3s 0.27s 49M
3.13 slim (glibc) kafka-python wheel 1.6s 0.23s 15M
3.13 slim (glibc) kafka-python - - 0.36s 15M
3.9 alpine (musl) crc32c,lz4,snappy,zstd build_error - - - -
3.9 alpine (musl) kafka-python wheel - 0.18s 20.3M
3.9 alpine (musl) kafka-python - - 0.20s 20.3M
3.9 slim (glibc) crc32c,lz4,snappy,zstd wheel 3.4s 0.18s 54M
3.9 slim (glibc) kafka-python wheel 2.1s 0.17s 21M
3.9 slim (glibc) kafka-python - - 0.16s 21M
Imports
- KafkaProducer
from kafka import KafkaProducer - KafkaConsumer
from kafka import KafkaConsumer - KafkaAdminClient wrong
from kafka.admin import KafkaAdminClientcorrectfrom kafka import KafkaAdminClient - TopicPartition
from kafka import TopicPartition
Quickstart last tested: 2026-04-24
import os
from kafka import KafkaProducer, KafkaConsumer
import json
import time
# Configuration
KAFKA_BOOTSTRAP_SERVERS = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092')
KAFKA_TOPIC = os.environ.get('KAFKA_TOPIC', 'my_test_topic')
# --- Producer Example ---
def produce_messages():
producer = KafkaProducer(
bootstrap_servers=[KAFKA_BOOTSTRAP_SERVERS],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
print(f"Producing messages to topic: {KAFKA_TOPIC}")
for i in range(5):
message = {'number': i, 'timestamp': time.time()}
future = producer.send(KAFKA_TOPIC, message)
try:
record_metadata = future.get(timeout=10)
print(f"Sent: {message} to partition {record_metadata.partition} offset {record_metadata.offset}")
except Exception as e:
print(f"Error sending message: {e}")
producer.flush() # Ensure all messages are sent
producer.close()
print("Producer finished.")
# --- Consumer Example ---
def consume_messages():
consumer = KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=[KAFKA_BOOTSTRAP_SERVERS],
auto_offset_reset='earliest', # Start reading from the beginning of the topic if no committed offset
enable_auto_commit=True, # Auto-commit offsets periodically
group_id='my_python_group', # Required for auto-assignment and offset commits
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
print(f"Consuming messages from topic: {KAFKA_TOPIC}")
try:
for message in consumer:
# message value and key are deserialized
print(f"Received: topic={message.topic}, partition={message.partition}, offset={message.offset}, key={message.key}, value={message.value}")
except KeyboardInterrupt:
print("Consumer interrupted.")
finally:
consumer.close()
print("Consumer closed.")
if __name__ == '__main__':
# Make sure a Kafka broker is running at KAFKA_BOOTSTRAP_SERVERS (e.g., localhost:9092)
# And the topic 'my_test_topic' exists (or let Kafka create it if configured)
import threading
# Run producer in a separate thread
producer_thread = threading.Thread(target=produce_messages)
producer_thread.start()
# Give producer a moment to start and send some messages
time.sleep(2)
# Run consumer in the main thread (or another thread, but be mindful of thread-safety)
consume_messages()
producer_thread.join()