PyKafka

2.8.0 · active · verified Thu Apr 16

PyKafka is a full-featured, pure-Python client for Apache Kafka, which optionally uses a C extension (librdkafka) for improved performance. It supports Kafka versions 0.8.2 and newer, providing Pythonic implementations of Kafka producers and consumers. The library aims to offer a similar abstraction level to the JVM Kafka client. PyKafka is actively maintained, with the current stable version being 2.8.0, and typically releases updates as needed.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to initialize a PyKafka client, produce a few messages to a topic, and then consume them using a `SimpleConsumer`. It's configured to connect to a local Kafka instance and resets the consumer offset to the earliest available for demonstration purposes. Ensure `KAFKA_HOSTS`, `KAFKA_TOPIC`, and `KAFKA_CONSUMER_GROUP` environment variables are set or default to `localhost:9092` and `test-topic`, `my-consumer-group` respectively.

import os
import time
from pykafka import KafkaClient
from pykafka.common import OffsetType

KAFKA_HOSTS = os.environ.get('KAFKA_HOSTS', 'localhost:9092')
TOPIC_NAME = os.environ.get('KAFKA_TOPIC', 'test-topic').encode('utf-8')
CONSUMER_GROUP = os.environ.get('KAFKA_CONSUMER_GROUP', 'my-consumer-group').encode('utf-8')

# 1. Connect to Kafka
client = KafkaClient(hosts=KAFKA_HOSTS)
topic = client.topics[TOPIC_NAME]

# 2. Produce a message
with topic.get_producer() as producer:
    print(f"Producing message to {TOPIC_NAME.decode()}...")
    for i in range(5):
        message_value = f"test message {i}".encode('utf-8')
        producer.produce(message_value)
        print(f"Sent: {message_value.decode()}")
    print("Finished producing messages.")

# Give Kafka a moment to process
time.sleep(2)

# 3. Consume messages
print(f"\nConsuming messages from {TOPIC_NAME.decode()} (group: {CONSUMER_GROUP.decode()})...")
consumer = topic.get_simple_consumer(
    consumer_group=CONSUMER_GROUP,
    auto_commit_enable=True,
    auto_offset_reset=OffsetType.EARLIEST, # Start from the beginning if no offset is committed
    reset_offset_on_start=True # Force reset on start, useful for quickstart
)

for i, message in enumerate(consumer):
    if message is not None:
        print(f"Received: Offset={message.offset}, Value={message.value.decode()}")
    if i >= 4: # Consume the 5 messages we sent
        break

consumer.stop()
print("Finished consuming messages.")

view raw JSON →