Confluent Kafka Python Client
raw JSON → 2.13.2 verified Tue May 12 auth: no python install: draft
Confluent's Python client for Apache Kafka, currently at version 2.13.2. It provides a high-level producer, consumer, and AdminClient, built as a robust binding on top of the high-performance C client, librdkafka. The library is actively maintained with frequent releases, typically on a monthly or bi-monthly cadence, offering compatibility with Apache Kafka brokers (version 0.8 or later), Confluent Cloud, and Confluent Platform.
pip install confluent-kafka Common errors
error ModuleNotFoundError: No module named 'confluent_kafka.cimpl' ↓
cause This error typically occurs when the `confluent-kafka` Python package, which includes C extensions (`.cimpl`), is not correctly installed or built for the specific Python environment and architecture, often due to missing `librdkafka` development headers or an incompatible environment.
fix
Ensure
librdkafka development packages are installed (e.g., sudo apt-get install librdkafka-dev on Debian/Ubuntu, brew install librdkafka on macOS) and then reinstall confluent-kafka using pip (e.g., pip install confluent-kafka). If using a virtual environment, ensure it's activated before installation. error Connect to ... failed: Connection refused ↓
cause The Kafka client failed to establish a connection to the specified Kafka broker, usually because the broker is not running, is configured on a different host/port, or a firewall is blocking the connection. This can also occur if `bootstrap.servers` is incorrectly configured.
fix
Verify that the Kafka broker is running and accessible from the client machine (check IP address and port in
bootstrap.servers). Ensure no firewalls are blocking the connection on the specified port, and that the listeners and advertised.listeners in the Kafka broker configuration are correctly set to an accessible address. error SSL handshake failed: error:0A000086:SSL routines::certificate verify failed: broker certificate could not be verified, verify that ssl.ca.location is correctly configured. ↓
cause The client could not verify the broker's SSL certificate, indicating an issue with the SSL/TLS configuration, often an incorrect or missing CA certificate bundle (`ssl.ca.location`), an expired certificate, or a hostname mismatch.
fix
Ensure the
ssl.ca.location configuration points to the correct and valid CA certificate file that signed the broker's certificate. Verify that the broker's certificate is not expired and that the hostname in the certificate matches the bootstrap.servers entry. Sometimes, setting ssl.endpoint.identification.algorithm='' can temporarily bypass hostname verification for testing, but it's not recommended for production. error SASL authentication failed using login context 'Client'. ↓
cause The client failed to authenticate with the Kafka broker using SASL, often due to incorrect SASL mechanism configuration, invalid username/password, or an issue with the JAAS configuration (for mechanisms like GSSAPI, PLAIN, SCRAM).
fix
Double-check all SASL-related configurations, including
security.protocol, sasl.mechanism, sasl.username, and sasl.password. Ensure they precisely match the broker's configuration and that the credentials are correct. For SASL_SSL, verify SSL configurations are also correct. For Confluent Cloud, use the generated API keys and secrets. Warnings
breaking Python 3.7 support was dropped in `confluent-kafka` v2.12.1. Applications running on Python 3.7 will require an upgrade to Python 3.8+. ↓
fix Upgrade your Python environment to version 3.8 or newer.
breaking The KIP-848 consumer rebalance protocol, production-ready from v2.12.0, introduces a new, optional rebalance protocol with 'contract changes' if enabled. Older client protocol API versions (prior to 1.8.2) will be deprecated and eventually unsupported by Kafka 4.0 and Confluent Platform 8.0 (starting February 2026). ↓
fix Review the KIP-848 migration guide for new consumer configurations (`group.protocol='consumer'`). Ensure your client version is 1.8.2 or newer to maintain compatibility with future Kafka broker versions.
gotcha A memory leak in `Producer.produce()` when called with headers and an exception (like `BufferError` or `RuntimeError`) was raised has been fixed. Prior versions could leak `rd_headers` memory. ↓
fix Upgrade to `confluent-kafka` v2.13.2 or newer to prevent this specific memory leak.
gotcha Version `v2.8.1` was explicitly noted as having a 'breaking change' and users were advised 'Do not run software with v2.8.1 installed.' ↓
fix Avoid using `confluent-kafka` version 2.8.1. Upgrade to a newer stable version (e.g., 2.8.2+) or revert to an older stable version.
gotcha Starting with v2.13.0, full type hinting has been enforced across all interfaces. This is an enhancement but may reveal type-related issues in existing codebases during static analysis or runtime if type mismatches were previously overlooked. ↓
fix Ensure your code adheres to Python's type system and handle any newly exposed type errors. Consider adding type checks or updating code to match expected types.
gotcha The `Producer` and `Consumer` classes now support context managers (`with` statement) and have an explicit `close()` method (from v2.13.0) for proper resource cleanup. Failing to close clients explicitly can lead to resource leaks or uncommitted offsets. ↓
fix Always explicitly call `producer.flush()` and `producer.close()`, `consumer.close()` or use `with` statements to ensure resources are properly released.
breaking `confluent-kafka` requires C/C++ build tools (such as `gcc`, `g++`, and Python development headers) to compile its C extensions. Installation from source will fail if these build dependencies are not present in the environment (e.g., in minimal Docker images like `alpine`). ↓
fix Install the necessary build tools and Python development headers in your environment. For Alpine Linux, use `apk add gcc musl-dev python3-dev`. For Debian/Ubuntu-based systems, use `apt-get install gcc python3-dev`.
Install
pip install confluent-kafka[avro] pip install confluent-kafka[schemaregistry] Install compatibility draft last tested: 2026-05-12
python os / libc variant status wheel install import disk
3.10 alpine (musl) confluent-kafka build_error - - - -
3.10 alpine (musl) confluent-kafka - - - -
3.10 alpine (musl) avro build_error - - - -
3.10 alpine (musl) avro - - - -
3.10 alpine (musl) schemaregistry build_error - - - -
3.10 alpine (musl) schemaregistry - - - -
3.10 slim (glibc) confluent-kafka wheel 2.2s 0.01s 30M
3.10 slim (glibc) confluent-kafka - - 0.01s 30M
3.10 slim (glibc) avro wheel 5.3s 0.01s 69M
3.10 slim (glibc) avro - - 0.01s 67M
3.10 slim (glibc) schemaregistry wheel 4.3s 0.01s 55M
3.10 slim (glibc) schemaregistry - - 0.01s 53M
3.11 alpine (musl) confluent-kafka build_error - - - -
3.11 alpine (musl) confluent-kafka - - - -
3.11 alpine (musl) avro build_error - - - -
3.11 alpine (musl) avro - - - -
3.11 alpine (musl) schemaregistry build_error - - - -
3.11 alpine (musl) schemaregistry - - - -
3.11 slim (glibc) confluent-kafka wheel 2.1s 0.02s 32M
3.11 slim (glibc) confluent-kafka - - 0.02s 32M
3.11 slim (glibc) avro wheel 5.0s 0.02s 74M
3.11 slim (glibc) avro - - 0.02s 72M
3.11 slim (glibc) schemaregistry wheel 4.5s 0.03s 58M
3.11 slim (glibc) schemaregistry - - 0.02s 56M
3.12 alpine (musl) confluent-kafka build_error - - - -
3.12 alpine (musl) confluent-kafka - - - -
3.12 alpine (musl) avro build_error - - - -
3.12 alpine (musl) avro - - - -
3.12 alpine (musl) schemaregistry build_error - - - -
3.12 alpine (musl) schemaregistry - - - -
3.12 slim (glibc) confluent-kafka wheel 1.8s 0.02s 24M
3.12 slim (glibc) confluent-kafka - - 0.02s 24M
3.12 slim (glibc) avro wheel 4.2s 0.02s 66M
3.12 slim (glibc) avro - - 0.02s 64M
3.12 slim (glibc) schemaregistry wheel 3.5s 0.02s 50M
3.12 slim (glibc) schemaregistry - - 0.02s 48M
3.13 alpine (musl) confluent-kafka build_error - - - -
3.13 alpine (musl) confluent-kafka - - - -
3.13 alpine (musl) avro build_error - - - -
3.13 alpine (musl) avro - - - -
3.13 alpine (musl) schemaregistry build_error - - - -
3.13 alpine (musl) schemaregistry - - - -
3.13 slim (glibc) confluent-kafka wheel 1.8s 0.02s 24M
3.13 slim (glibc) confluent-kafka - - 0.02s 24M
3.13 slim (glibc) avro wheel 4.2s 0.02s 65M
3.13 slim (glibc) avro - - 0.02s 63M
3.13 slim (glibc) schemaregistry wheel 3.5s 0.02s 49M
3.13 slim (glibc) schemaregistry - - 0.02s 47M
3.9 alpine (musl) confluent-kafka build_error - - - -
3.9 alpine (musl) confluent-kafka - - - -
3.9 alpine (musl) avro build_error - - - -
3.9 alpine (musl) avro - - - -
3.9 alpine (musl) schemaregistry build_error - - - -
3.9 alpine (musl) schemaregistry - - - -
3.9 slim (glibc) confluent-kafka wheel 2.7s 0.01s 30M
3.9 slim (glibc) confluent-kafka - - 0.01s 30M
3.9 slim (glibc) avro wheel 6.0s 0.01s 68M
3.9 slim (glibc) avro - - 0.01s 67M
3.9 slim (glibc) schemaregistry wheel 5.1s 0.01s 54M
3.9 slim (glibc) schemaregistry - - 0.01s 53M
Imports
- Producer
from confluent_kafka import Producer - Consumer
from confluent_kafka import Consumer - AdminClient
from confluent_kafka import AdminClient - SchemaRegistryClient
from confluent_kafka.schema_registry import SchemaRegistryClient - AvroSerializer
from confluent_kafka.schema_registry.avro import AvroSerializer - AIOProducer
from confluent_kafka.aio import AIOProducer - AIOConsumer
from confluent_kafka.aio import AIOConsumer
Quickstart last tested: 2026-04-24
import os
from confluent_kafka import Producer
from confluent_kafka import Consumer, KafkaException
def delivery_report(err, msg):
if err is not None:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}]')
def run_producer():
conf = {
'bootstrap.servers': os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092'),
# Add security configs if connecting to Confluent Cloud or secured clusters
# 'security.protocol': 'SASL_SSL',
# 'sasl.mechanisms': 'PLAIN',
# 'sasl.username': os.environ.get('KAFKA_API_KEY'),
# 'sasl.password': os.environ.get('KAFKA_API_SECRET')
}
producer = Producer(conf)
topic = 'my_test_topic'
try:
for i in range(5):
message = f'Hello Kafka from Python {i}'
producer.produce(topic, message.encode('utf-8'), callback=delivery_report)
print(f'Produced message: {message}')
producer.flush(10) # Wait up to 10 seconds for messages to be delivered
except BufferError:
print('Local producer queue is full. Try again later.')
except Exception as e:
print(f'Error producing message: {e}')
finally:
producer.flush()
def run_consumer():
conf = {
'bootstrap.servers': os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092'),
'group.id': 'my_python_consumer_group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
# Add security configs if connecting to Confluent Cloud or secured clusters
# 'security.protocol': 'SASL_SSL',
# 'sasl.mechanisms': 'PLAIN',
# 'sasl.username': os.environ.get('KAFKA_API_KEY'),
# 'sasl.password': os.environ.get('KAFKA_API_SECRET')
}
consumer = Consumer(conf)
topic = 'my_test_topic'
try:
consumer.subscribe([topic])
while True:
msg = consumer.poll(1.0) # Poll for messages with a 1-second timeout
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaException.PARTITION_EOF:
# End of partition event, not an error
print(f'{msg.topic()} [{msg.partition()}] reached end offset {msg.offset()}')
else:
print(f'Consumer error: {msg.error()}')
else:
print(f"Received message: {msg.value().decode('utf-8')} from {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}")
consumer.commit(msg)
except KeyboardInterrupt:
pass
except Exception as e:
print(f'Error consuming message: {e}')
finally:
consumer.close()
if __name__ == '__main__':
print("Running producer...")
run_producer()
print("\nRunning consumer (Press Ctrl+C to stop)...")
run_consumer()