Pure Python client for Apache Kafka

2.3.1 · active · verified Wed Apr 15

kafka-python is a pure Python client for Apache Kafka, designed to function much like the official Java client with a set of Pythonic interfaces for producing, consuming, and administering Kafka topics. It is actively maintained with frequent releases and supports Kafka brokers from version 0.8.0 up to 4.0 and beyond due to backward compatibility of the Kafka protocol. The current latest stable release is 2.3.1, though PyPI may list 2.3.0 as the newest packaged version.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to set up a basic Kafka producer to send JSON messages to a topic and a consumer to read them. It highlights common configurations like `bootstrap_servers`, `value_serializer`/`value_deserializer`, `group_id`, and `auto_offset_reset`. Ensure a Kafka broker is running and accessible (e.g., at `localhost:9092`) and the target topic exists.

import os
from kafka import KafkaProducer, KafkaConsumer
import json
import time

# Configuration
KAFKA_BOOTSTRAP_SERVERS = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092')
KAFKA_TOPIC = os.environ.get('KAFKA_TOPIC', 'my_test_topic')

# --- Producer Example ---
def produce_messages():
    producer = KafkaProducer(
        bootstrap_servers=[KAFKA_BOOTSTRAP_SERVERS],
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    print(f"Producing messages to topic: {KAFKA_TOPIC}")
    for i in range(5):
        message = {'number': i, 'timestamp': time.time()}
        future = producer.send(KAFKA_TOPIC, message)
        try:
            record_metadata = future.get(timeout=10)
            print(f"Sent: {message} to partition {record_metadata.partition} offset {record_metadata.offset}")
        except Exception as e:
            print(f"Error sending message: {e}")
    producer.flush() # Ensure all messages are sent
    producer.close()
    print("Producer finished.")

# --- Consumer Example ---
def consume_messages():
    consumer = KafkaConsumer(
        KAFKA_TOPIC,
        bootstrap_servers=[KAFKA_BOOTSTRAP_SERVERS],
        auto_offset_reset='earliest', # Start reading from the beginning of the topic if no committed offset
        enable_auto_commit=True,    # Auto-commit offsets periodically
        group_id='my_python_group', # Required for auto-assignment and offset commits
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )
    print(f"Consuming messages from topic: {KAFKA_TOPIC}")
    try:
        for message in consumer:
            # message value and key are deserialized
            print(f"Received: topic={message.topic}, partition={message.partition}, offset={message.offset}, key={message.key}, value={message.value}")
    except KeyboardInterrupt:
        print("Consumer interrupted.")
    finally:
        consumer.close()
        print("Consumer closed.")

if __name__ == '__main__':
    # Make sure a Kafka broker is running at KAFKA_BOOTSTRAP_SERVERS (e.g., localhost:9092)
    # And the topic 'my_test_topic' exists (or let Kafka create it if configured)
    import threading

    # Run producer in a separate thread
    producer_thread = threading.Thread(target=produce_messages)
    producer_thread.start()

    # Give producer a moment to start and send some messages
    time.sleep(2)

    # Run consumer in the main thread (or another thread, but be mindful of thread-safety)
    consume_messages()

    producer_thread.join()

view raw JSON →