{"id":887,"library":"kafka-python","title":"Pure Python client for Apache Kafka","description":"kafka-python is a pure Python client for Apache Kafka, designed to function much like the official Java client with a set of Pythonic interfaces for producing, consuming, and administering Kafka topics. It is actively maintained with frequent releases and supports Kafka brokers from version 0.8.0 up to 4.0 and beyond due to backward compatibility of the Kafka protocol. The current latest stable release is 2.3.1, though PyPI may list 2.3.0 as the newest packaged version.","status":"active","version":"2.3.1","language":"python","source_language":"en","source_url":"https://github.com/dpkp/kafka-python","tags":["kafka","messaging","streaming","data-pipeline"],"install":[{"cmd":"pip install kafka-python","lang":"bash","label":"Latest stable version"},{"cmd":"pip install 'kafka-python[crc32c,lz4,snappy,zstd]'","lang":"bash","label":"With optional compression/CRC32c dependencies"}],"dependencies":[{"reason":"Highly recommended for optimized CRC32c calculation with Kafka 0.11+ brokers, as the pure Python implementation is slow.","package":"crc32c","optional":true},{"reason":"For LZ4 compression/decompression support.","package":"lz4"},{"reason":"For Snappy compression/decompression support. Requires the native snappy library.","package":"python-snappy"},{"reason":"For ZSTD compression/decompression support.","package":"zstandard"}],"imports":[{"symbol":"KafkaProducer","correct":"from kafka import KafkaProducer"},{"symbol":"KafkaConsumer","correct":"from kafka import KafkaConsumer"},{"note":"While `kafka.admin` exists, the class `KafkaAdminClient` is typically imported directly from the top-level `kafka` package for convenience.","wrong":"from kafka.admin import KafkaAdminClient","symbol":"KafkaAdminClient","correct":"from kafka import KafkaAdminClient"},{"symbol":"TopicPartition","correct":"from kafka import TopicPartition"}],"quickstart":{"code":"import os\nfrom kafka import KafkaProducer, KafkaConsumer\nimport json\nimport time\n\n# Configuration\nKAFKA_BOOTSTRAP_SERVERS = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092')\nKAFKA_TOPIC = os.environ.get('KAFKA_TOPIC', 'my_test_topic')\n\n# --- Producer Example ---\ndef produce_messages():\n    producer = KafkaProducer(\n        bootstrap_servers=[KAFKA_BOOTSTRAP_SERVERS],\n        value_serializer=lambda v: json.dumps(v).encode('utf-8')\n    )\n    print(f\"Producing messages to topic: {KAFKA_TOPIC}\")\n    for i in range(5):\n        message = {'number': i, 'timestamp': time.time()}\n        future = producer.send(KAFKA_TOPIC, message)\n        try:\n            record_metadata = future.get(timeout=10)\n            print(f\"Sent: {message} to partition {record_metadata.partition} offset {record_metadata.offset}\")\n        except Exception as e:\n            print(f\"Error sending message: {e}\")\n    producer.flush() # Ensure all messages are sent\n    producer.close()\n    print(\"Producer finished.\")\n\n# --- Consumer Example ---\ndef consume_messages():\n    consumer = KafkaConsumer(\n        KAFKA_TOPIC,\n        bootstrap_servers=[KAFKA_BOOTSTRAP_SERVERS],\n        auto_offset_reset='earliest', # Start reading from the beginning of the topic if no committed offset\n        enable_auto_commit=True,    # Auto-commit offsets periodically\n        group_id='my_python_group', # Required for auto-assignment and offset commits\n        value_deserializer=lambda x: json.loads(x.decode('utf-8'))\n    )\n    print(f\"Consuming messages from topic: {KAFKA_TOPIC}\")\n    try:\n        for message in consumer:\n            # message value and key are deserialized\n            print(f\"Received: topic={message.topic}, partition={message.partition}, offset={message.offset}, key={message.key}, value={message.value}\")\n    except KeyboardInterrupt:\n        print(\"Consumer interrupted.\")\n    finally:\n        consumer.close()\n        print(\"Consumer closed.\")\n\nif __name__ == '__main__':\n    # Make sure a Kafka broker is running at KAFKA_BOOTSTRAP_SERVERS (e.g., localhost:9092)\n    # And the topic 'my_test_topic' exists (or let Kafka create it if configured)\n    import threading\n\n    # Run producer in a separate thread\n    producer_thread = threading.Thread(target=produce_messages)\n    producer_thread.start()\n\n    # Give producer a moment to start and send some messages\n    time.sleep(2)\n\n    # Run consumer in the main thread (or another thread, but be mindful of thread-safety)\n    consume_messages()\n\n    producer_thread.join()\n","lang":"python","description":"This quickstart demonstrates how to set up a basic Kafka producer to send JSON messages to a topic and a consumer to read them. It highlights common configurations like `bootstrap_servers`, `value_serializer`/`value_deserializer`, `group_id`, and `auto_offset_reset`. Ensure a Kafka broker is running and accessible (e.g., at `localhost:9092`) and the target topic exists."},"warnings":[{"fix":"Upgrade to Python 3.8+ or pin `kafka-python` to a version compatible with Python 2.x (e.g., < 2.3.0).","message":"Python 2 support was officially dropped with `kafka-python` release 2.3.x. Users on Python 2 must use an older version of the library.","severity":"breaking","affected_versions":"< 2.3.0"},{"fix":"Use separate `KafkaConsumer` instances for each thread or process, or manage concurrency via multiprocessing.","message":"The `KafkaConsumer` class is *not* thread-safe. While `KafkaProducer` can be shared across threads, `KafkaConsumer` instances should not. For concurrent consumption, it's recommended to use multiprocessing or a separate `KafkaConsumer` instance per thread with proper synchronization.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Implement custom `key_serializer` and `value_deserializer` functions using libraries like Avro or Protobuf for better performance and schema enforcement.","message":"For high-throughput applications, inefficient serialization (e.g., plain JSON) can become a bottleneck. Using more efficient formats like Avro or Protobuf with appropriate `key_serializer` and `value_serializer` functions is recommended.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Set `enable_auto_commit=False` and manually commit offsets using `consumer.commit()` after successful message processing to ensure exactly-once or at-least-once processing semantics.","message":"Relying solely on `enable_auto_commit=True` can lead to message duplication or loss in case of consumer crashes, as offsets might be committed before messages are fully processed.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Install the `crc32c` package (e.g., `pip install 'kafka-python[crc32c]'`) for optimized native code, especially in high-throughput scenarios.","message":"For Kafka brokers version 0.11 and above, `kafka-python` uses a new message protocol that requires `crc32c` for checksum validation. The pure Python implementation is significantly slower.","severity":"gotcha","affected_versions":"All versions with Kafka brokers >= 0.11"},{"fix":"Refer to release notes for `KafkaAdminClient` changes, and consider thorough testing if using advanced admin features. It's generally more stable in recent 2.x releases.","message":"`KafkaAdminClient` was historically marked as an unstable interface and, while improved, changes to its internal protocol tuple returns have occurred. Users should be mindful of potential interface adjustments in minor releases.","severity":"gotcha","affected_versions":"< 2.3.0"}],"env_vars":null,"last_verified":"2026-05-12T20:52:05.038Z","next_check":"2026-07-14T00:00:00.000Z","problems":[{"fix":"Verify that Kafka brokers are running and accessible from the client, check network connectivity and firewall rules, ensure `bootstrap_servers` is configured with correct 'host:port' pairs, and confirm client/broker version compatibility (sometimes by explicitly setting `api_version`).","cause":"The Kafka client cannot establish a connection with the specified bootstrap servers, often due to the Kafka broker being down, incorrect host/port configuration, network issues (firewall, DNS), or client-broker version incompatibility.","error":"kafka.errors.NoBrokersAvailable: NoBrokersAvailable"},{"fix":"Configure the `auto_offset_reset` parameter in `KafkaConsumer` to either 'earliest' (to start consuming from the beginning of the available log) or 'latest' (to start from the newest messages) to handle out-of-range offsets gracefully.","cause":"A Kafka consumer attempts to fetch messages from an offset that no longer exists in the broker's log, typically because old messages have been deleted due to retention policies or the consumer's committed offset is stale.","error":"kafka.errors.OffsetOutOfRangeError"},{"fix":"Double-check the `security_protocol`, `sasl_mechanism`, `sasl_plain_username`, and `sasl_plain_password` in your Kafka client configuration, ensuring they match the broker's security settings and that credentials are correct. For Kerberos, confirm the client machine has a valid Kerberos ticket and configuration.","cause":"The Kafka client is unable to authenticate with the broker, often due to incorrect SASL configuration parameters (e.g., `sasl_plain_username`, `sasl_plain_password`, `sasl_mechanism`) or an improperly configured Kerberos setup if GSSAPI is used.","error":"kafka.errors.AuthenticationFailedError: Authentication failed for user ..."},{"fix":"Increase `request_timeout_ms` and `delivery_timeout_ms` in your client configuration, verify network connectivity between the client and Kafka brokers, monitor broker health and load, and ensure that all topic partitions have an active leader.","cause":"A Kafka client (producer or consumer) operation, such as sending a message, fetching metadata, or committing offsets, failed to complete within the configured timeout period, often due to network instability, an overloaded Kafka broker, or leader election issues.","error":"kafka.errors.TimeoutException"}],"ecosystem":"pypi","meta_description":null,"install_score":100,"install_tag":"verified","quickstart_score":null,"quickstart_tag":null,"pypi_latest":"2.3.1","cli_name":null,"install_checks":{"last_tested":"2026-05-12","tag":"verified","tag_description":"installs cleanly on critical runtimes, fast import, recently tested","results":[{"runtime":"python:3.10-alpine","python_version":"3.10","os_libc":"alpine (musl)","variant":"crc32c,lz4,snappy,zstd","exit_code":1,"wheel_type":null,"failure_reason":"build_error","install_time_s":null,"import_time_s":null,"mem_mb":null,"disk_size":null},{"runtime":"python:3.10-alpine","python_version":"3.10","os_libc":"alpine (musl)","variant":"default","exit_code":0,"wheel_type":"wheel","failure_reason":null,"install_time_s":null,"import_time_s":0.18,"mem_mb":7.3,"disk_size":"20.8M"},{"runtime":"python:3.10-alpine","python_version":"3.10","os_libc":"alpine (musl)","variant":"default","exit_code":0,"wheel_type":null,"failure_reason":null,"install_time_s":null,"import_time_s":0.19,"mem_mb":7.3,"disk_size":"20.8M"},{"runtime":"python:3.10-slim","python_version":"3.10","os_libc":"slim (glibc)","variant":"crc32c,lz4,snappy,zstd","exit_code":0,"wheel_type":"wheel","failure_reason":null,"install_time_s":2.9,"import_time_s":0.16,"mem_mb":8.3,"disk_size":"55M"},{"runtime":"python:3.10-slim","python_version":"3.10","os_libc":"slim (glibc)","variant":"default","exit_code":0,"wheel_type":"wheel","failure_reason":null,"install_time_s":1.8,"import_time_s":0.14,"mem_mb":7.3,"disk_size":"21M"},{"runtime":"python:3.10-slim","python_version":"3.10","os_libc":"slim (glibc)","variant":"default","exit_code":0,"wheel_type":null,"failure_reason":null,"install_time_s":null,"import_time_s":0.13,"mem_mb":7.3,"disk_size":"21M"},{"runtime":"python:3.11-alpine","python_version":"3.11","os_libc":"alpine (musl)","variant":"crc32c,lz4,snappy,zstd","exit_code":1,"wheel_type":null,"failure_reason":"build_error","install_time_s":null,"import_time_s":null,"mem_mb":null,"disk_size":null},{"runtime":"python:3.11-alpine","python_version":"3.11","os_libc":"alpine (musl)","variant":"default","exit_code":0,"wheel_type":"wheel","failure_reason":null,"install_time_s":null,"import_time_s":0.32,"mem_mb":8.1,"disk_size":"23.2M"},{"runtime":"python:3.11-alpine","python_version":"3.11","os_libc":"alpine (musl)","variant":"default","exit_code":0,"wheel_type":null,"failure_reason":null,"install_time_s":null,"import_time_s":0.38,"mem_mb":8.1,"disk_size":"23.2M"},{"runtime":"python:3.11-slim","python_version":"3.11","os_libc":"slim (glibc)","variant":"crc32c,lz4,snappy,zstd","exit_code":0,"wheel_type":"wheel","failure_reason":null,"install_time_s":2.6,"import_time_s":0.3,"mem_mb":9.1,"disk_size":"57M"},{"runtime":"python:3.11-slim","python_version":"3.11","os_libc":"slim (glibc)","variant":"default","exit_code":0,"wheel_type":"wheel","failure_reason":null,"install_time_s":1.8,"import_time_s":0.26,"mem_mb":8.1,"disk_size":"24M"},{"runtime":"python:3.11-slim","python_version":"3.11","os_libc":"slim (glibc)","variant":"default","exit_code":0,"wheel_type":null,"failure_reason":null,"install_time_s":null,"import_time_s":0.29,"mem_mb":8.1,"disk_size":"24M"},{"runtime":"python:3.12-alpine","python_version":"3.12","os_libc":"alpine (musl)","variant":"crc32c,lz4,snappy,zstd","exit_code":1,"wheel_type":null,"failure_reason":"build_error","install_time_s":null,"import_time_s":null,"mem_mb":null,"disk_size":null},{"runtime":"python:3.12-alpine","python_version":"3.12","os_libc":"alpine (musl)","variant":"default","exit_code":0,"wheel_type":"wheel","failure_reason":null,"install_time_s":null,"import_time_s":0.23,"mem_mb":7.7,"disk_size":"15.0M"},{"runtime":"python:3.12-alpine","python_version":"3.12","os_libc":"alpine (musl)","variant":"default","exit_code":0,"wheel_type":null,"failure_reason":null,"install_time_s":null,"import_time_s":0.25,"mem_mb":7.7,"disk_size":"15.0M"},{"runtime":"python:3.12-slim","python_version":"3.12","os_libc":"slim (glibc)","variant":"crc32c,lz4,snappy,zstd","exit_code":0,"wheel_type":"wheel","failure_reason":null,"install_time_s":2.4,"import_time_s":0.27,"mem_mb":8.9,"disk_size":"49M"},{"runtime":"python:3.12-slim","python_version":"3.12","os_libc":"slim (glibc)","variant":"default","exit_code":0,"wheel_type":"wheel","failure_reason":null,"install_time_s":1.6,"import_time_s":0.24,"mem_mb":7.7,"disk_size":"15M"},{"runtime":"python:3.12-slim","python_version":"3.12","os_libc":"slim (glibc)","variant":"default","exit_code":0,"wheel_type":null,"failure_reason":null,"install_time_s":null,"import_time_s":0.29,"mem_mb":7.7,"disk_size":"15M"},{"runtime":"python:3.13-alpine","python_version":"3.13","os_libc":"alpine (musl)","variant":"crc32c,lz4,snappy,zstd","exit_code":1,"wheel_type":null,"failure_reason":"build_error","install_time_s":null,"import_time_s":null,"mem_mb":null,"disk_size":null},{"runtime":"python:3.13-alpine","python_version":"3.13","os_libc":"alpine (musl)","variant":"default","exit_code":0,"wheel_type":"wheel","failure_reason":null,"install_time_s":null,"import_time_s":0.24,"mem_mb":8.6,"disk_size":"14.8M"},{"runtime":"python:3.13-alpine","python_version":"3.13","os_libc":"alpine (musl)","variant":"default","exit_code":0,"wheel_type":null,"failure_reason":null,"install_time_s":null,"import_time_s":0.24,"mem_mb":8.2,"disk_size":"14.6M"},{"runtime":"python:3.13-slim","python_version":"3.13","os_libc":"slim (glibc)","variant":"crc32c,lz4,snappy,zstd","exit_code":0,"wheel_type":"wheel","failure_reason":null,"install_time_s":2.3,"import_time_s":0.27,"mem_mb":9.2,"disk_size":"49M"},{"runtime":"python:3.13-slim","python_version":"3.13","os_libc":"slim (glibc)","variant":"default","exit_code":0,"wheel_type":"wheel","failure_reason":null,"install_time_s":1.6,"import_time_s":0.23,"mem_mb":8.6,"disk_size":"15M"},{"runtime":"python:3.13-slim","python_version":"3.13","os_libc":"slim (glibc)","variant":"default","exit_code":0,"wheel_type":null,"failure_reason":null,"install_time_s":null,"import_time_s":0.36,"mem_mb":8.2,"disk_size":"15M"},{"runtime":"python:3.9-alpine","python_version":"3.9","os_libc":"alpine (musl)","variant":"crc32c,lz4,snappy,zstd","exit_code":1,"wheel_type":null,"failure_reason":"build_error","install_time_s":null,"import_time_s":null,"mem_mb":null,"disk_size":null},{"runtime":"python:3.9-alpine","python_version":"3.9","os_libc":"alpine (musl)","variant":"default","exit_code":0,"wheel_type":"wheel","failure_reason":null,"install_time_s":null,"import_time_s":0.18,"mem_mb":7.6,"disk_size":"20.3M"},{"runtime":"python:3.9-alpine","python_version":"3.9","os_libc":"alpine (musl)","variant":"default","exit_code":0,"wheel_type":null,"failure_reason":null,"install_time_s":null,"import_time_s":0.2,"mem_mb":7.6,"disk_size":"20.3M"},{"runtime":"python:3.9-slim","python_version":"3.9","os_libc":"slim (glibc)","variant":"crc32c,lz4,snappy,zstd","exit_code":0,"wheel_type":"wheel","failure_reason":null,"install_time_s":3.4,"import_time_s":0.18,"mem_mb":8.2,"disk_size":"54M"},{"runtime":"python:3.9-slim","python_version":"3.9","os_libc":"slim (glibc)","variant":"default","exit_code":0,"wheel_type":"wheel","failure_reason":null,"install_time_s":2.1,"import_time_s":0.17,"mem_mb":7.6,"disk_size":"21M"},{"runtime":"python:3.9-slim","python_version":"3.9","os_libc":"slim (glibc)","variant":"default","exit_code":0,"wheel_type":null,"failure_reason":null,"install_time_s":null,"import_time_s":0.16,"mem_mb":7.6,"disk_size":"21M"}]},"quickstart_checks":{"last_tested":"2026-04-24","tag":null,"tag_description":null,"results":[{"runtime":"python:3.10-alpine","exit_code":1},{"runtime":"python:3.10-slim","exit_code":1},{"runtime":"python:3.11-alpine","exit_code":1},{"runtime":"python:3.11-slim","exit_code":1},{"runtime":"python:3.12-alpine","exit_code":1},{"runtime":"python:3.12-slim","exit_code":1},{"runtime":"python:3.13-alpine","exit_code":1},{"runtime":"python:3.13-slim","exit_code":1},{"runtime":"python:3.9-alpine","exit_code":1},{"runtime":"python:3.9-slim","exit_code":1}]}}