Pure Python client for Apache Kafka
kafka-python is a pure Python client for Apache Kafka, designed to function much like the official Java client with a set of Pythonic interfaces for producing, consuming, and administering Kafka topics. It is actively maintained with frequent releases and supports Kafka brokers from version 0.8.0 up to 4.0 and beyond due to backward compatibility of the Kafka protocol. The current latest stable release is 2.3.1, though PyPI may list 2.3.0 as the newest packaged version.
Common errors
-
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
cause The Kafka client cannot establish a connection with the specified bootstrap servers, often due to the Kafka broker being down, incorrect host/port configuration, network issues (firewall, DNS), or client-broker version incompatibility.fixVerify that Kafka brokers are running and accessible from the client, check network connectivity and firewall rules, ensure `bootstrap_servers` is configured with correct 'host:port' pairs, and confirm client/broker version compatibility (sometimes by explicitly setting `api_version`). -
kafka.errors.OffsetOutOfRangeError
cause A Kafka consumer attempts to fetch messages from an offset that no longer exists in the broker's log, typically because old messages have been deleted due to retention policies or the consumer's committed offset is stale.fixConfigure the `auto_offset_reset` parameter in `KafkaConsumer` to either 'earliest' (to start consuming from the beginning of the available log) or 'latest' (to start from the newest messages) to handle out-of-range offsets gracefully. -
kafka.errors.AuthenticationFailedError: Authentication failed for user ...
cause The Kafka client is unable to authenticate with the broker, often due to incorrect SASL configuration parameters (e.g., `sasl_plain_username`, `sasl_plain_password`, `sasl_mechanism`) or an improperly configured Kerberos setup if GSSAPI is used.fixDouble-check the `security_protocol`, `sasl_mechanism`, `sasl_plain_username`, and `sasl_plain_password` in your Kafka client configuration, ensuring they match the broker's security settings and that credentials are correct. For Kerberos, confirm the client machine has a valid Kerberos ticket and configuration. -
kafka.errors.TimeoutException
cause A Kafka client (producer or consumer) operation, such as sending a message, fetching metadata, or committing offsets, failed to complete within the configured timeout period, often due to network instability, an overloaded Kafka broker, or leader election issues.fixIncrease `request_timeout_ms` and `delivery_timeout_ms` in your client configuration, verify network connectivity between the client and Kafka brokers, monitor broker health and load, and ensure that all topic partitions have an active leader.
Warnings
- breaking Python 2 support was officially dropped with `kafka-python` release 2.3.x. Users on Python 2 must use an older version of the library.
- gotcha The `KafkaConsumer` class is *not* thread-safe. While `KafkaProducer` can be shared across threads, `KafkaConsumer` instances should not. For concurrent consumption, it's recommended to use multiprocessing or a separate `KafkaConsumer` instance per thread with proper synchronization.
- gotcha For high-throughput applications, inefficient serialization (e.g., plain JSON) can become a bottleneck. Using more efficient formats like Avro or Protobuf with appropriate `key_serializer` and `value_serializer` functions is recommended.
- gotcha Relying solely on `enable_auto_commit=True` can lead to message duplication or loss in case of consumer crashes, as offsets might be committed before messages are fully processed.
- gotcha For Kafka brokers version 0.11 and above, `kafka-python` uses a new message protocol that requires `crc32c` for checksum validation. The pure Python implementation is significantly slower.
- gotcha `KafkaAdminClient` was historically marked as an unstable interface and, while improved, changes to its internal protocol tuple returns have occurred. Users should be mindful of potential interface adjustments in minor releases.
Install
-
pip install kafka-python -
pip install 'kafka-python[crc32c,lz4,snappy,zstd]'
Imports
- KafkaProducer
from kafka import KafkaProducer
- KafkaConsumer
from kafka import KafkaConsumer
- KafkaAdminClient
from kafka.admin import KafkaAdminClient
from kafka import KafkaAdminClient
- TopicPartition
from kafka import TopicPartition
Quickstart
import os
from kafka import KafkaProducer, KafkaConsumer
import json
import time
# Configuration
KAFKA_BOOTSTRAP_SERVERS = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092')
KAFKA_TOPIC = os.environ.get('KAFKA_TOPIC', 'my_test_topic')
# --- Producer Example ---
def produce_messages():
producer = KafkaProducer(
bootstrap_servers=[KAFKA_BOOTSTRAP_SERVERS],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
print(f"Producing messages to topic: {KAFKA_TOPIC}")
for i in range(5):
message = {'number': i, 'timestamp': time.time()}
future = producer.send(KAFKA_TOPIC, message)
try:
record_metadata = future.get(timeout=10)
print(f"Sent: {message} to partition {record_metadata.partition} offset {record_metadata.offset}")
except Exception as e:
print(f"Error sending message: {e}")
producer.flush() # Ensure all messages are sent
producer.close()
print("Producer finished.")
# --- Consumer Example ---
def consume_messages():
consumer = KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=[KAFKA_BOOTSTRAP_SERVERS],
auto_offset_reset='earliest', # Start reading from the beginning of the topic if no committed offset
enable_auto_commit=True, # Auto-commit offsets periodically
group_id='my_python_group', # Required for auto-assignment and offset commits
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
print(f"Consuming messages from topic: {KAFKA_TOPIC}")
try:
for message in consumer:
# message value and key are deserialized
print(f"Received: topic={message.topic}, partition={message.partition}, offset={message.offset}, key={message.key}, value={message.value}")
except KeyboardInterrupt:
print("Consumer interrupted.")
finally:
consumer.close()
print("Consumer closed.")
if __name__ == '__main__':
# Make sure a Kafka broker is running at KAFKA_BOOTSTRAP_SERVERS (e.g., localhost:9092)
# And the topic 'my_test_topic' exists (or let Kafka create it if configured)
import threading
# Run producer in a separate thread
producer_thread = threading.Thread(target=produce_messages)
producer_thread.start()
# Give producer a moment to start and send some messages
time.sleep(2)
# Run consumer in the main thread (or another thread, but be mindful of thread-safety)
consume_messages()
producer_thread.join()