PyKafka
PyKafka is a full-featured, pure-Python client for Apache Kafka, which optionally uses a C extension (librdkafka) for improved performance. It supports Kafka versions 0.8.2 and newer, providing Pythonic implementations of Kafka producers and consumers. The library aims to offer a similar abstraction level to the JVM Kafka client. PyKafka is actively maintained, with the current stable version being 2.8.0, and typically releases updates as needed.
Common errors
-
TypeError: argument of type 'bytes' is not iterable
cause Attempting to pass a byte string to a `consumer_group` parameter or `client.topics[...]` accessor that now expects a `str`.fixDecode the byte string to a UTF-8 string: `consumer_group=b'my_group'.decode('utf-8')` or `client.topics[b'my_topic'].decode('utf-8')`. -
pykafka.exceptions.NoBrokersAvailableError: No brokers available for topic 'my_topic'
cause The KafkaClient could not connect to any of the specified Kafka brokers, or the topic does not exist/is not discoverable.fixVerify that Kafka brokers are running and accessible from the client machine (check network, firewall). Ensure the `hosts` argument to `KafkaClient` is correct (e.g., `'localhost:9092'`). Check Kafka logs for broker issues or topic existence. -
pykafka.exceptions.SocketDisconnectedError: Tried to use a connection that was disconnected
cause The client's connection to a Kafka broker was unexpectedly lost during an operation.fixThis can be transient. Often, catching the exception and re-instantiating the PyKafka component (e.g., `KafkaClient`, `Producer`, `Consumer`) or calling `stop()` then `start()` on the component might re-establish connection. Check network stability and Kafka broker health. -
pykafka.exceptions.GroupAuthorizationFailed: [Error 29] Group Authorization Failed: This consumer group is not authorized to access this topic.
cause The Kafka user associated with the client does not have the necessary permissions to access the specified consumer group or topic.fixCheck your Kafka ACLs (Access Control Lists) to ensure that the client has read/write permissions for the topic and consumer group, and `READ` permission on the cluster for consumer group management.
Warnings
- breaking In PyKafka 2.8.0, the `consumer_group` keyword argument for consumer components and the parameter for `TopicDict.__getitem__` (when accessing topics) changed their expected type from `bytes` to `str`. If you were passing byte strings, this will now raise a `TypeError`.
- gotcha When using `Producer` with `delivery_reports=True`, it's critical to regularly drain the delivery report queue. Failing to do so can lead to unbounded memory growth, as reports are stored in memory until consumed.
- gotcha The behavior of `auto_offset_reset` and `reset_offset_on_start` in consumers can be counter-intuitive. `reset_offset_on_start=True` will *always* reset the offset based on `auto_offset_reset` for the first fetch, even if committed offsets exist. If `False`, it will use committed offsets if available.
- gotcha While PyKafka is a pure-Python client, its high-performance C extension (librdkafka) is highly recommended for production use. Without it, performance can be significantly lower, and certain features might be less robust. Installation of `librdkafka` requires system-level dependencies.
Install
-
pip install pykafka -
RDKAFKA_INSTALL=system pip install pykafka
Imports
- KafkaClient
from pykafka import KafkaClient
- OffsetType
from pykafka.common import OffsetType
- Producer
from pykafka import Producer
from pykafka.producer import Producer
- SimpleConsumer
from pykafka import SimpleConsumer
from pykafka.simpleconsumer import SimpleConsumer
- BalancedConsumer
from pykafka import BalancedConsumer
from pykafka.balancedconsumer import BalancedConsumer
Quickstart
import os
import time
from pykafka import KafkaClient
from pykafka.common import OffsetType
KAFKA_HOSTS = os.environ.get('KAFKA_HOSTS', 'localhost:9092')
TOPIC_NAME = os.environ.get('KAFKA_TOPIC', 'test-topic').encode('utf-8')
CONSUMER_GROUP = os.environ.get('KAFKA_CONSUMER_GROUP', 'my-consumer-group').encode('utf-8')
# 1. Connect to Kafka
client = KafkaClient(hosts=KAFKA_HOSTS)
topic = client.topics[TOPIC_NAME]
# 2. Produce a message
with topic.get_producer() as producer:
print(f"Producing message to {TOPIC_NAME.decode()}...")
for i in range(5):
message_value = f"test message {i}".encode('utf-8')
producer.produce(message_value)
print(f"Sent: {message_value.decode()}")
print("Finished producing messages.")
# Give Kafka a moment to process
time.sleep(2)
# 3. Consume messages
print(f"\nConsuming messages from {TOPIC_NAME.decode()} (group: {CONSUMER_GROUP.decode()})...")
consumer = topic.get_simple_consumer(
consumer_group=CONSUMER_GROUP,
auto_commit_enable=True,
auto_offset_reset=OffsetType.EARLIEST, # Start from the beginning if no offset is committed
reset_offset_on_start=True # Force reset on start, useful for quickstart
)
for i, message in enumerate(consumer):
if message is not None:
print(f"Received: Offset={message.offset}, Value={message.value.decode()}")
if i >= 4: # Consume the 5 messages we sent
break
consumer.stop()
print("Finished consuming messages.")