Confluent Kafka Python Client

raw JSON →
2.13.2 verified Tue May 12 auth: no python install: draft

Confluent's Python client for Apache Kafka, currently at version 2.13.2. It provides a high-level producer, consumer, and AdminClient, built as a robust binding on top of the high-performance C client, librdkafka. The library is actively maintained with frequent releases, typically on a monthly or bi-monthly cadence, offering compatibility with Apache Kafka brokers (version 0.8 or later), Confluent Cloud, and Confluent Platform.

pip install confluent-kafka
error ModuleNotFoundError: No module named 'confluent_kafka.cimpl'
cause This error typically occurs when the `confluent-kafka` Python package, which includes C extensions (`.cimpl`), is not correctly installed or built for the specific Python environment and architecture, often due to missing `librdkafka` development headers or an incompatible environment.
fix
Ensure librdkafka development packages are installed (e.g., sudo apt-get install librdkafka-dev on Debian/Ubuntu, brew install librdkafka on macOS) and then reinstall confluent-kafka using pip (e.g., pip install confluent-kafka). If using a virtual environment, ensure it's activated before installation.
error Connect to ... failed: Connection refused
cause The Kafka client failed to establish a connection to the specified Kafka broker, usually because the broker is not running, is configured on a different host/port, or a firewall is blocking the connection. This can also occur if `bootstrap.servers` is incorrectly configured.
fix
Verify that the Kafka broker is running and accessible from the client machine (check IP address and port in bootstrap.servers). Ensure no firewalls are blocking the connection on the specified port, and that the listeners and advertised.listeners in the Kafka broker configuration are correctly set to an accessible address.
error SSL handshake failed: error:0A000086:SSL routines::certificate verify failed: broker certificate could not be verified, verify that ssl.ca.location is correctly configured.
cause The client could not verify the broker's SSL certificate, indicating an issue with the SSL/TLS configuration, often an incorrect or missing CA certificate bundle (`ssl.ca.location`), an expired certificate, or a hostname mismatch.
fix
Ensure the ssl.ca.location configuration points to the correct and valid CA certificate file that signed the broker's certificate. Verify that the broker's certificate is not expired and that the hostname in the certificate matches the bootstrap.servers entry. Sometimes, setting ssl.endpoint.identification.algorithm='' can temporarily bypass hostname verification for testing, but it's not recommended for production.
error SASL authentication failed using login context 'Client'.
cause The client failed to authenticate with the Kafka broker using SASL, often due to incorrect SASL mechanism configuration, invalid username/password, or an issue with the JAAS configuration (for mechanisms like GSSAPI, PLAIN, SCRAM).
fix
Double-check all SASL-related configurations, including security.protocol, sasl.mechanism, sasl.username, and sasl.password. Ensure they precisely match the broker's configuration and that the credentials are correct. For SASL_SSL, verify SSL configurations are also correct. For Confluent Cloud, use the generated API keys and secrets.
breaking Python 3.7 support was dropped in `confluent-kafka` v2.12.1. Applications running on Python 3.7 will require an upgrade to Python 3.8+.
fix Upgrade your Python environment to version 3.8 or newer.
breaking The KIP-848 consumer rebalance protocol, production-ready from v2.12.0, introduces a new, optional rebalance protocol with 'contract changes' if enabled. Older client protocol API versions (prior to 1.8.2) will be deprecated and eventually unsupported by Kafka 4.0 and Confluent Platform 8.0 (starting February 2026).
fix Review the KIP-848 migration guide for new consumer configurations (`group.protocol='consumer'`). Ensure your client version is 1.8.2 or newer to maintain compatibility with future Kafka broker versions.
gotcha A memory leak in `Producer.produce()` when called with headers and an exception (like `BufferError` or `RuntimeError`) was raised has been fixed. Prior versions could leak `rd_headers` memory.
fix Upgrade to `confluent-kafka` v2.13.2 or newer to prevent this specific memory leak.
gotcha Version `v2.8.1` was explicitly noted as having a 'breaking change' and users were advised 'Do not run software with v2.8.1 installed.'
fix Avoid using `confluent-kafka` version 2.8.1. Upgrade to a newer stable version (e.g., 2.8.2+) or revert to an older stable version.
gotcha Starting with v2.13.0, full type hinting has been enforced across all interfaces. This is an enhancement but may reveal type-related issues in existing codebases during static analysis or runtime if type mismatches were previously overlooked.
fix Ensure your code adheres to Python's type system and handle any newly exposed type errors. Consider adding type checks or updating code to match expected types.
gotcha The `Producer` and `Consumer` classes now support context managers (`with` statement) and have an explicit `close()` method (from v2.13.0) for proper resource cleanup. Failing to close clients explicitly can lead to resource leaks or uncommitted offsets.
fix Always explicitly call `producer.flush()` and `producer.close()`, `consumer.close()` or use `with` statements to ensure resources are properly released.
breaking `confluent-kafka` requires C/C++ build tools (such as `gcc`, `g++`, and Python development headers) to compile its C extensions. Installation from source will fail if these build dependencies are not present in the environment (e.g., in minimal Docker images like `alpine`).
fix Install the necessary build tools and Python development headers in your environment. For Alpine Linux, use `apk add gcc musl-dev python3-dev`. For Debian/Ubuntu-based systems, use `apt-get install gcc python3-dev`.
pip install confluent-kafka[avro]
pip install confluent-kafka[schemaregistry]
python os / libc variant status wheel install import disk
3.10 alpine (musl) confluent-kafka build_error - - - -
3.10 alpine (musl) confluent-kafka - - - -
3.10 alpine (musl) avro build_error - - - -
3.10 alpine (musl) avro - - - -
3.10 alpine (musl) schemaregistry build_error - - - -
3.10 alpine (musl) schemaregistry - - - -
3.10 slim (glibc) confluent-kafka wheel 2.2s 0.01s 30M
3.10 slim (glibc) confluent-kafka - - 0.01s 30M
3.10 slim (glibc) avro wheel 5.3s 0.01s 69M
3.10 slim (glibc) avro - - 0.01s 67M
3.10 slim (glibc) schemaregistry wheel 4.3s 0.01s 55M
3.10 slim (glibc) schemaregistry - - 0.01s 53M
3.11 alpine (musl) confluent-kafka build_error - - - -
3.11 alpine (musl) confluent-kafka - - - -
3.11 alpine (musl) avro build_error - - - -
3.11 alpine (musl) avro - - - -
3.11 alpine (musl) schemaregistry build_error - - - -
3.11 alpine (musl) schemaregistry - - - -
3.11 slim (glibc) confluent-kafka wheel 2.1s 0.02s 32M
3.11 slim (glibc) confluent-kafka - - 0.02s 32M
3.11 slim (glibc) avro wheel 5.0s 0.02s 74M
3.11 slim (glibc) avro - - 0.02s 72M
3.11 slim (glibc) schemaregistry wheel 4.5s 0.03s 58M
3.11 slim (glibc) schemaregistry - - 0.02s 56M
3.12 alpine (musl) confluent-kafka build_error - - - -
3.12 alpine (musl) confluent-kafka - - - -
3.12 alpine (musl) avro build_error - - - -
3.12 alpine (musl) avro - - - -
3.12 alpine (musl) schemaregistry build_error - - - -
3.12 alpine (musl) schemaregistry - - - -
3.12 slim (glibc) confluent-kafka wheel 1.8s 0.02s 24M
3.12 slim (glibc) confluent-kafka - - 0.02s 24M
3.12 slim (glibc) avro wheel 4.2s 0.02s 66M
3.12 slim (glibc) avro - - 0.02s 64M
3.12 slim (glibc) schemaregistry wheel 3.5s 0.02s 50M
3.12 slim (glibc) schemaregistry - - 0.02s 48M
3.13 alpine (musl) confluent-kafka build_error - - - -
3.13 alpine (musl) confluent-kafka - - - -
3.13 alpine (musl) avro build_error - - - -
3.13 alpine (musl) avro - - - -
3.13 alpine (musl) schemaregistry build_error - - - -
3.13 alpine (musl) schemaregistry - - - -
3.13 slim (glibc) confluent-kafka wheel 1.8s 0.02s 24M
3.13 slim (glibc) confluent-kafka - - 0.02s 24M
3.13 slim (glibc) avro wheel 4.2s 0.02s 65M
3.13 slim (glibc) avro - - 0.02s 63M
3.13 slim (glibc) schemaregistry wheel 3.5s 0.02s 49M
3.13 slim (glibc) schemaregistry - - 0.02s 47M
3.9 alpine (musl) confluent-kafka build_error - - - -
3.9 alpine (musl) confluent-kafka - - - -
3.9 alpine (musl) avro build_error - - - -
3.9 alpine (musl) avro - - - -
3.9 alpine (musl) schemaregistry build_error - - - -
3.9 alpine (musl) schemaregistry - - - -
3.9 slim (glibc) confluent-kafka wheel 2.7s 0.01s 30M
3.9 slim (glibc) confluent-kafka - - 0.01s 30M
3.9 slim (glibc) avro wheel 6.0s 0.01s 68M
3.9 slim (glibc) avro - - 0.01s 67M
3.9 slim (glibc) schemaregistry wheel 5.1s 0.01s 54M
3.9 slim (glibc) schemaregistry - - 0.01s 53M

A basic example demonstrating how to set up a Kafka Producer to send messages and a Kafka Consumer to receive messages using the `confluent-kafka` library. It includes error handling, an optional delivery report for the producer, and manual offset committing for the consumer. Replace `localhost:9092` with your Kafka broker address and configure authentication if connecting to a secured cluster like Confluent Cloud using environment variables.

import os
from confluent_kafka import Producer
from confluent_kafka import Consumer, KafkaException

def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

def run_producer():
    conf = {
        'bootstrap.servers': os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092'),
        # Add security configs if connecting to Confluent Cloud or secured clusters
        # 'security.protocol': 'SASL_SSL',
        # 'sasl.mechanisms': 'PLAIN',
        # 'sasl.username': os.environ.get('KAFKA_API_KEY'),
        # 'sasl.password': os.environ.get('KAFKA_API_SECRET')
    }

    producer = Producer(conf)
    topic = 'my_test_topic'

    try:
        for i in range(5):
            message = f'Hello Kafka from Python {i}'
            producer.produce(topic, message.encode('utf-8'), callback=delivery_report)
            print(f'Produced message: {message}')
        producer.flush(10) # Wait up to 10 seconds for messages to be delivered

    except BufferError:
        print('Local producer queue is full. Try again later.')
    except Exception as e:
        print(f'Error producing message: {e}')
    finally:
        producer.flush()

def run_consumer():
    conf = {
        'bootstrap.servers': os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092'),
        'group.id': 'my_python_consumer_group',
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': False,
        # Add security configs if connecting to Confluent Cloud or secured clusters
        # 'security.protocol': 'SASL_SSL',
        # 'sasl.mechanisms': 'PLAIN',
        # 'sasl.username': os.environ.get('KAFKA_API_KEY'),
        # 'sasl.password': os.environ.get('KAFKA_API_SECRET')
    }

    consumer = Consumer(conf)
    topic = 'my_test_topic'

    try:
        consumer.subscribe([topic])

        while True:
            msg = consumer.poll(1.0) # Poll for messages with a 1-second timeout
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaException.PARTITION_EOF:
                    # End of partition event, not an error
                    print(f'{msg.topic()} [{msg.partition()}] reached end offset {msg.offset()}')
                else:
                    print(f'Consumer error: {msg.error()}')
            else:
                print(f"Received message: {msg.value().decode('utf-8')} from {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}")
                consumer.commit(msg)

    except KeyboardInterrupt:
        pass
    except Exception as e:
        print(f'Error consuming message: {e}')
    finally:
        consumer.close()

if __name__ == '__main__':
    print("Running producer...")
    run_producer()
    print("\nRunning consumer (Press Ctrl+C to stop)...")
    run_consumer()