Confluent Kafka Python Client
Confluent's Python client for Apache Kafka, currently at version 2.13.2. It provides a high-level producer, consumer, and AdminClient, built as a robust binding on top of the high-performance C client, librdkafka. The library is actively maintained with frequent releases, typically on a monthly or bi-monthly cadence, offering compatibility with Apache Kafka brokers (version 0.8 or later), Confluent Cloud, and Confluent Platform.
Warnings
- breaking Python 3.7 support was dropped in `confluent-kafka` v2.12.1. Applications running on Python 3.7 will require an upgrade to Python 3.8+.
- breaking The KIP-848 consumer rebalance protocol, production-ready from v2.12.0, introduces a new, optional rebalance protocol with 'contract changes' if enabled. Older client protocol API versions (prior to 1.8.2) will be deprecated and eventually unsupported by Kafka 4.0 and Confluent Platform 8.0 (starting February 2026).
- gotcha A memory leak in `Producer.produce()` when called with headers and an exception (like `BufferError` or `RuntimeError`) was raised has been fixed. Prior versions could leak `rd_headers` memory.
- gotcha Version `v2.8.1` was explicitly noted as having a 'breaking change' and users were advised 'Do not run software with v2.8.1 installed.'
- gotcha Starting with v2.13.0, full type hinting has been enforced across all interfaces. This is an enhancement but may reveal type-related issues in existing codebases during static analysis or runtime if type mismatches were previously overlooked.
- gotcha The `Producer` and `Consumer` classes now support context managers (`with` statement) and have an explicit `close()` method (from v2.13.0) for proper resource cleanup. Failing to close clients explicitly can lead to resource leaks or uncommitted offsets.
Install
-
pip install confluent-kafka -
pip install confluent-kafka[avro] -
pip install confluent-kafka[schemaregistry]
Imports
- Producer
from confluent_kafka import Producer
- Consumer
from confluent_kafka import Consumer
- AdminClient
from confluent_kafka import AdminClient
- SchemaRegistryClient
from confluent_kafka.schema_registry import SchemaRegistryClient
- AvroSerializer
from confluent_kafka.schema_registry.avro import AvroSerializer
- AIOProducer
from confluent_kafka.aio import AIOProducer
- AIOConsumer
from confluent_kafka.aio import AIOConsumer
Quickstart
import os
from confluent_kafka import Producer
from confluent_kafka import Consumer, KafkaException
def delivery_report(err, msg):
if err is not None:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}]')
def run_producer():
conf = {
'bootstrap.servers': os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092'),
# Add security configs if connecting to Confluent Cloud or secured clusters
# 'security.protocol': 'SASL_SSL',
# 'sasl.mechanisms': 'PLAIN',
# 'sasl.username': os.environ.get('KAFKA_API_KEY'),
# 'sasl.password': os.environ.get('KAFKA_API_SECRET')
}
producer = Producer(conf)
topic = 'my_test_topic'
try:
for i in range(5):
message = f'Hello Kafka from Python {i}'
producer.produce(topic, message.encode('utf-8'), callback=delivery_report)
print(f'Produced message: {message}')
producer.flush(10) # Wait up to 10 seconds for messages to be delivered
except BufferError:
print('Local producer queue is full. Try again later.')
except Exception as e:
print(f'Error producing message: {e}')
finally:
producer.flush()
def run_consumer():
conf = {
'bootstrap.servers': os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092'),
'group.id': 'my_python_consumer_group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
# Add security configs if connecting to Confluent Cloud or secured clusters
# 'security.protocol': 'SASL_SSL',
# 'sasl.mechanisms': 'PLAIN',
# 'sasl.username': os.environ.get('KAFKA_API_KEY'),
# 'sasl.password': os.environ.get('KAFKA_API_SECRET')
}
consumer = Consumer(conf)
topic = 'my_test_topic'
try:
consumer.subscribe([topic])
while True:
msg = consumer.poll(1.0) # Poll for messages with a 1-second timeout
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaException.PARTITION_EOF:
# End of partition event, not an error
print(f'{msg.topic()} [{msg.partition()}] reached end offset {msg.offset()}')
else:
print(f'Consumer error: {msg.error()}')
else:
print(f"Received message: {msg.value().decode('utf-8')} from {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}")
consumer.commit(msg)
except KeyboardInterrupt:
pass
except Exception as e:
print(f'Error consuming message: {e}')
finally:
consumer.close()
if __name__ == '__main__':
print("Running producer...")
run_producer()
print("\nRunning consumer (Press Ctrl+C to stop)...")
run_consumer()