Kafka Python Next Generation Client

2.2.3 · active · verified Sun Apr 12

kafka-python-ng is a pure Python client for Apache Kafka, designed to function similarly to the official Java client, but with Pythonic interfaces. It provides high-level producer and consumer APIs, as well as admin functionality. The library is actively maintained, with frequent releases, and is compatible with Kafka brokers from version 0.8.0 up to 2.6+ (with optimal features for 0.9+). The current version is 2.2.3 and requires Python >=3.8.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates a basic Kafka producer and consumer. The producer sends JSON-serialized messages to a topic, and the consumer reads and deserializes them. It's configured to connect to `localhost:9092` by default, but can be configured via the `KAFKA_BOOTSTRAP_SERVERS` environment variable for production environments. For the consumer, `auto_offset_reset='earliest'` ensures it starts reading from the beginning of the topic if no offset is committed, and `group_id` enables coordinated consumer group functionality.

import os
import json
import time
from kafka import KafkaProducer, KafkaConsumer

# Configure Kafka bootstrap servers, use environment variable for production readiness
BOOTSTRAP_SERVERS = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092').split(',')
TOPIC_NAME = 'my_test_topic'

def produce_messages():
    producer = KafkaProducer(
        bootstrap_servers=BOOTSTRAP_SERVERS,
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    print(f"Producing messages to topic: {TOPIC_NAME}")
    for i in range(5):
        message = {'number': i, 'timestamp': time.time()}
        producer.send(TOPIC_NAME, message)
        print(f"Sent: {message}")
        time.sleep(1)
    producer.flush()
    producer.close()
    print("Producer finished.")

def consume_messages():
    consumer = KafkaConsumer(
        TOPIC_NAME,
        bootstrap_servers=BOOTSTRAP_SERVERS,
        auto_offset_reset='earliest',
        group_id='my_python_group',
        value_deserializer=lambda m: json.loads(m.decode('utf-8'))
    )
    print(f"Consuming messages from topic: {TOPIC_NAME} (group: my_python_group)")
    for message in consumer:
        print(f"Received: Topic={message.topic}, Partition={message.partition}, Offset={message.offset}, Value={message.value}")
    consumer.close()
    print("Consumer finished.")

if __name__ == "__main__":
    # In a real application, producer and consumer would likely run in separate processes/threads
    # For this quickstart, we'll run producer, then consumer sequentially
    # Ensure a Kafka broker is running at 'localhost:9092' or specify KAFKA_BOOTSTRAP_SERVERS env var
    produce_messages()
    print("\nWaiting for a moment before consuming...")
    time.sleep(5) # Give Kafka time to process
    consume_messages()

view raw JSON →