Confluent Kafka Python Client
Confluent's Python client for Apache Kafka, currently at version 2.13.2. It provides a high-level producer, consumer, and AdminClient, built as a robust binding on top of the high-performance C client, librdkafka. The library is actively maintained with frequent releases, typically on a monthly or bi-monthly cadence, offering compatibility with Apache Kafka brokers (version 0.8 or later), Confluent Cloud, and Confluent Platform.
Common errors
-
ModuleNotFoundError: No module named 'confluent_kafka.cimpl'
cause This error typically occurs when the `confluent-kafka` Python package, which includes C extensions (`.cimpl`), is not correctly installed or built for the specific Python environment and architecture, often due to missing `librdkafka` development headers or an incompatible environment.fixEnsure `librdkafka` development packages are installed (e.g., `sudo apt-get install librdkafka-dev` on Debian/Ubuntu, `brew install librdkafka` on macOS) and then reinstall `confluent-kafka` using pip (e.g., `pip install confluent-kafka`). If using a virtual environment, ensure it's activated before installation. -
Connect to ... failed: Connection refused
cause The Kafka client failed to establish a connection to the specified Kafka broker, usually because the broker is not running, is configured on a different host/port, or a firewall is blocking the connection. This can also occur if `bootstrap.servers` is incorrectly configured.fixVerify that the Kafka broker is running and accessible from the client machine (check IP address and port in `bootstrap.servers`). Ensure no firewalls are blocking the connection on the specified port, and that the `listeners` and `advertised.listeners` in the Kafka broker configuration are correctly set to an accessible address. -
SSL handshake failed: error:0A000086:SSL routines::certificate verify failed: broker certificate could not be verified, verify that ssl.ca.location is correctly configured.
cause The client could not verify the broker's SSL certificate, indicating an issue with the SSL/TLS configuration, often an incorrect or missing CA certificate bundle (`ssl.ca.location`), an expired certificate, or a hostname mismatch.fixEnsure the `ssl.ca.location` configuration points to the correct and valid CA certificate file that signed the broker's certificate. Verify that the broker's certificate is not expired and that the hostname in the certificate matches the `bootstrap.servers` entry. Sometimes, setting `ssl.endpoint.identification.algorithm=''` can temporarily bypass hostname verification for testing, but it's not recommended for production. -
SASL authentication failed using login context 'Client'.
cause The client failed to authenticate with the Kafka broker using SASL, often due to incorrect SASL mechanism configuration, invalid username/password, or an issue with the JAAS configuration (for mechanisms like GSSAPI, PLAIN, SCRAM).fixDouble-check all SASL-related configurations, including `security.protocol`, `sasl.mechanism`, `sasl.username`, and `sasl.password`. Ensure they precisely match the broker's configuration and that the credentials are correct. For SASL_SSL, verify SSL configurations are also correct. For Confluent Cloud, use the generated API keys and secrets.
Warnings
- breaking Python 3.7 support was dropped in `confluent-kafka` v2.12.1. Applications running on Python 3.7 will require an upgrade to Python 3.8+.
- breaking The KIP-848 consumer rebalance protocol, production-ready from v2.12.0, introduces a new, optional rebalance protocol with 'contract changes' if enabled. Older client protocol API versions (prior to 1.8.2) will be deprecated and eventually unsupported by Kafka 4.0 and Confluent Platform 8.0 (starting February 2026).
- gotcha A memory leak in `Producer.produce()` when called with headers and an exception (like `BufferError` or `RuntimeError`) was raised has been fixed. Prior versions could leak `rd_headers` memory.
- gotcha Version `v2.8.1` was explicitly noted as having a 'breaking change' and users were advised 'Do not run software with v2.8.1 installed.'
- gotcha Starting with v2.13.0, full type hinting has been enforced across all interfaces. This is an enhancement but may reveal type-related issues in existing codebases during static analysis or runtime if type mismatches were previously overlooked.
- gotcha The `Producer` and `Consumer` classes now support context managers (`with` statement) and have an explicit `close()` method (from v2.13.0) for proper resource cleanup. Failing to close clients explicitly can lead to resource leaks or uncommitted offsets.
- breaking `confluent-kafka` requires C/C++ build tools (such as `gcc`, `g++`, and Python development headers) to compile its C extensions. Installation from source will fail if these build dependencies are not present in the environment (e.g., in minimal Docker images like `alpine`).
Install
-
pip install confluent-kafka -
pip install confluent-kafka[avro] -
pip install confluent-kafka[schemaregistry]
Imports
- Producer
from confluent_kafka import Producer
- Consumer
from confluent_kafka import Consumer
- AdminClient
from confluent_kafka import AdminClient
- SchemaRegistryClient
from confluent_kafka.schema_registry import SchemaRegistryClient
- AvroSerializer
from confluent_kafka.schema_registry.avro import AvroSerializer
- AIOProducer
from confluent_kafka.aio import AIOProducer
- AIOConsumer
from confluent_kafka.aio import AIOConsumer
Quickstart
import os
from confluent_kafka import Producer
from confluent_kafka import Consumer, KafkaException
def delivery_report(err, msg):
if err is not None:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}]')
def run_producer():
conf = {
'bootstrap.servers': os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092'),
# Add security configs if connecting to Confluent Cloud or secured clusters
# 'security.protocol': 'SASL_SSL',
# 'sasl.mechanisms': 'PLAIN',
# 'sasl.username': os.environ.get('KAFKA_API_KEY'),
# 'sasl.password': os.environ.get('KAFKA_API_SECRET')
}
producer = Producer(conf)
topic = 'my_test_topic'
try:
for i in range(5):
message = f'Hello Kafka from Python {i}'
producer.produce(topic, message.encode('utf-8'), callback=delivery_report)
print(f'Produced message: {message}')
producer.flush(10) # Wait up to 10 seconds for messages to be delivered
except BufferError:
print('Local producer queue is full. Try again later.')
except Exception as e:
print(f'Error producing message: {e}')
finally:
producer.flush()
def run_consumer():
conf = {
'bootstrap.servers': os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092'),
'group.id': 'my_python_consumer_group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
# Add security configs if connecting to Confluent Cloud or secured clusters
# 'security.protocol': 'SASL_SSL',
# 'sasl.mechanisms': 'PLAIN',
# 'sasl.username': os.environ.get('KAFKA_API_KEY'),
# 'sasl.password': os.environ.get('KAFKA_API_SECRET')
}
consumer = Consumer(conf)
topic = 'my_test_topic'
try:
consumer.subscribe([topic])
while True:
msg = consumer.poll(1.0) # Poll for messages with a 1-second timeout
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaException.PARTITION_EOF:
# End of partition event, not an error
print(f'{msg.topic()} [{msg.partition()}] reached end offset {msg.offset()}')
else:
print(f'Consumer error: {msg.error()}')
else:
print(f"Received message: {msg.value().decode('utf-8')} from {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}")
consumer.commit(msg)
except KeyboardInterrupt:
pass
except Exception as e:
print(f'Error consuming message: {e}')
finally:
consumer.close()
if __name__ == '__main__':
print("Running producer...")
run_producer()
print("\nRunning consumer (Press Ctrl+C to stop)...")
run_consumer()