kafka-python

2.3.0 · active · verified Sun Mar 29

kafka-python is a pure Python client for Apache Kafka, designed to function much like the official Java client. It provides high-level Producer and Consumer APIs with Pythonic interfaces, including consumer iterators. The library is currently in version 2.3.0, released in November 2025, and maintains an active development and release cadence with frequent bug fixes and feature enhancements, while also being backwards-compatible with older Kafka brokers (to 0.8.0).

Warnings

Install

Imports

Quickstart

This quickstart demonstrates basic message production and consumption. It initializes a `KafkaProducer` to send JSON-serialized messages to a topic and a `KafkaConsumer` to read and deserialize those messages, configured to start from the earliest offset within a consumer group. Remember to have a Kafka broker running (e.g., on `localhost:9092`).

import json
import time
from kafka import KafkaProducer, KafkaConsumer

# Ensure Kafka is running, e.g., on localhost:9092
BOOTSTRAP_SERVERS = 'localhost:9092'
TOPIC_NAME = 'my_test_topic'

# --- Producer Example ---
producer = KafkaProducer(
    bootstrap_servers=[BOOTSTRAP_SERVERS],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

try:
    for i in range(5):
        message = {'number': i, 'timestamp': time.time()}
        future = producer.send(TOPIC_NAME, message)
        record_metadata = future.get(timeout=10) # Block for confirmation
        print(f"Produced message: {message} to topic {record_metadata.topic}, partition {record_metadata.partition}, offset {record_metadata.offset}")
        time.sleep(1)
except Exception as e:
    print(f"Producer error: {e}")
finally:
    producer.flush()
    producer.close()
    print("Producer closed.")

# --- Consumer Example ---
consumer = KafkaConsumer(
    TOPIC_NAME,
    bootstrap_servers=[BOOTSTRAP_SERVERS],
    auto_offset_reset='earliest', # Start consuming from the earliest available message
    enable_auto_commit=True,
    group_id='my_python_group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8')),
    consumer_timeout_ms=5000 # Stop waiting for messages after 5 seconds
)

print(f"\nConsuming messages from topic: {TOPIC_NAME}")
try:
    for message in consumer:
        print(f"Consumed message: key={message.key}, value={message.value}, topic={message.topic}, partition={message.partition}, offset={message.offset}")
except Exception as e:
    print(f"Consumer error: {e}")
finally:
    consumer.close()
    print("Consumer closed.")

view raw JSON →