Confluent Kafka Python Client

2.13.2 · active · verified Sun Mar 29

Confluent's Python client for Apache Kafka, currently at version 2.13.2. It provides a high-level producer, consumer, and AdminClient, built as a robust binding on top of the high-performance C client, librdkafka. The library is actively maintained with frequent releases, typically on a monthly or bi-monthly cadence, offering compatibility with Apache Kafka brokers (version 0.8 or later), Confluent Cloud, and Confluent Platform.

Warnings

Install

Imports

Quickstart

A basic example demonstrating how to set up a Kafka Producer to send messages and a Kafka Consumer to receive messages using the `confluent-kafka` library. It includes error handling, an optional delivery report for the producer, and manual offset committing for the consumer. Replace `localhost:9092` with your Kafka broker address and configure authentication if connecting to a secured cluster like Confluent Cloud using environment variables.

import os
from confluent_kafka import Producer
from confluent_kafka import Consumer, KafkaException

def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

def run_producer():
    conf = {
        'bootstrap.servers': os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092'),
        # Add security configs if connecting to Confluent Cloud or secured clusters
        # 'security.protocol': 'SASL_SSL',
        # 'sasl.mechanisms': 'PLAIN',
        # 'sasl.username': os.environ.get('KAFKA_API_KEY'),
        # 'sasl.password': os.environ.get('KAFKA_API_SECRET')
    }

    producer = Producer(conf)
    topic = 'my_test_topic'

    try:
        for i in range(5):
            message = f'Hello Kafka from Python {i}'
            producer.produce(topic, message.encode('utf-8'), callback=delivery_report)
            print(f'Produced message: {message}')
        producer.flush(10) # Wait up to 10 seconds for messages to be delivered

    except BufferError:
        print('Local producer queue is full. Try again later.')
    except Exception as e:
        print(f'Error producing message: {e}')
    finally:
        producer.flush()

def run_consumer():
    conf = {
        'bootstrap.servers': os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092'),
        'group.id': 'my_python_consumer_group',
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': False,
        # Add security configs if connecting to Confluent Cloud or secured clusters
        # 'security.protocol': 'SASL_SSL',
        # 'sasl.mechanisms': 'PLAIN',
        # 'sasl.username': os.environ.get('KAFKA_API_KEY'),
        # 'sasl.password': os.environ.get('KAFKA_API_SECRET')
    }

    consumer = Consumer(conf)
    topic = 'my_test_topic'

    try:
        consumer.subscribe([topic])

        while True:
            msg = consumer.poll(1.0) # Poll for messages with a 1-second timeout
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaException.PARTITION_EOF:
                    # End of partition event, not an error
                    print(f'{msg.topic()} [{msg.partition()}] reached end offset {msg.offset()}')
                else:
                    print(f'Consumer error: {msg.error()}')
            else:
                print(f"Received message: {msg.value().decode('utf-8')} from {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}")
                consumer.commit(msg)

    except KeyboardInterrupt:
        pass
    except Exception as e:
        print(f'Error consuming message: {e}')
    finally:
        consumer.close()

if __name__ == '__main__':
    print("Running producer...")
    run_producer()
    print("\nRunning consumer (Press Ctrl+C to stop)...")
    run_consumer()

view raw JSON →