{"id":8489,"library":"pykafka","title":"PyKafka","description":"PyKafka is a full-featured, pure-Python client for Apache Kafka, which optionally uses a C extension (librdkafka) for improved performance. It supports Kafka versions 0.8.2 and newer, providing Pythonic implementations of Kafka producers and consumers. The library aims to offer a similar abstraction level to the JVM Kafka client. PyKafka is actively maintained, with the current stable version being 2.8.0, and typically releases updates as needed.","status":"active","version":"2.8.0","language":"en","source_language":"en","source_url":"https://github.com/Parsely/pykafka","tags":["kafka","message queue","producer","consumer","client","streaming","real-time"],"install":[{"cmd":"pip install pykafka","lang":"bash","label":"Install PyKafka"},{"cmd":"RDKAFKA_INSTALL=system pip install pykafka","lang":"bash","label":"Install with librdkafka C extension (recommended)"}],"dependencies":[{"reason":"Optional C extension for high-performance producer and consumer operations. Requires system-level installation of librdkafka headers and shared libraries.","package":"librdkafka","optional":true},{"reason":"Required for LZ4 compression support.","package":"lz4","optional":true},{"reason":"Required for XXHash algorithm support, used in some Kafka protocol features.","package":"xxhash","optional":true}],"imports":[{"symbol":"KafkaClient","correct":"from pykafka import KafkaClient"},{"symbol":"OffsetType","correct":"from pykafka.common import OffsetType"},{"note":"Producer is typically accessed via topic.get_producer() after initializing KafkaClient and Topic.","wrong":"from pykafka import Producer","symbol":"Producer","correct":"from pykafka.producer import Producer"},{"note":"SimpleConsumer is typically accessed via topic.get_simple_consumer() after initializing KafkaClient and Topic.","wrong":"from pykafka import SimpleConsumer","symbol":"SimpleConsumer","correct":"from pykafka.simpleconsumer import SimpleConsumer"},{"note":"BalancedConsumer is typically accessed via topic.get_balanced_consumer() after initializing KafkaClient and Topic, often requiring Zookeeper connection.","wrong":"from pykafka import BalancedConsumer","symbol":"BalancedConsumer","correct":"from pykafka.balancedconsumer import BalancedConsumer"}],"quickstart":{"code":"import os\nimport time\nfrom pykafka import KafkaClient\nfrom pykafka.common import OffsetType\n\nKAFKA_HOSTS = os.environ.get('KAFKA_HOSTS', 'localhost:9092')\nTOPIC_NAME = os.environ.get('KAFKA_TOPIC', 'test-topic').encode('utf-8')\nCONSUMER_GROUP = os.environ.get('KAFKA_CONSUMER_GROUP', 'my-consumer-group').encode('utf-8')\n\n# 1. Connect to Kafka\nclient = KafkaClient(hosts=KAFKA_HOSTS)\ntopic = client.topics[TOPIC_NAME]\n\n# 2. Produce a message\nwith topic.get_producer() as producer:\n    print(f\"Producing message to {TOPIC_NAME.decode()}...\")\n    for i in range(5):\n        message_value = f\"test message {i}\".encode('utf-8')\n        producer.produce(message_value)\n        print(f\"Sent: {message_value.decode()}\")\n    print(\"Finished producing messages.\")\n\n# Give Kafka a moment to process\ntime.sleep(2)\n\n# 3. Consume messages\nprint(f\"\\nConsuming messages from {TOPIC_NAME.decode()} (group: {CONSUMER_GROUP.decode()})...\")\nconsumer = topic.get_simple_consumer(\n    consumer_group=CONSUMER_GROUP,\n    auto_commit_enable=True,\n    auto_offset_reset=OffsetType.EARLIEST, # Start from the beginning if no offset is committed\n    reset_offset_on_start=True # Force reset on start, useful for quickstart\n)\n\nfor i, message in enumerate(consumer):\n    if message is not None:\n        print(f\"Received: Offset={message.offset}, Value={message.value.decode()}\")\n    if i >= 4: # Consume the 5 messages we sent\n        break\n\nconsumer.stop()\nprint(\"Finished consuming messages.\")\n","lang":"python","description":"This quickstart demonstrates how to initialize a PyKafka client, produce a few messages to a topic, and then consume them using a `SimpleConsumer`. It's configured to connect to a local Kafka instance and resets the consumer offset to the earliest available for demonstration purposes. Ensure `KAFKA_HOSTS`, `KAFKA_TOPIC`, and `KAFKA_CONSUMER_GROUP` environment variables are set or default to `localhost:9092` and `test-topic`, `my-consumer-group` respectively."},"warnings":[{"fix":"Ensure `consumer_group` and topic names are passed as `str` instead of `bytes`. Convert existing byte strings using `.decode('utf-8')` if necessary, or ensure they are defined as `str`.","message":"In PyKafka 2.8.0, the `consumer_group` keyword argument for consumer components and the parameter for `TopicDict.__getitem__` (when accessing topics) changed their expected type from `bytes` to `str`. If you were passing byte strings, this will now raise a `TypeError`.","severity":"breaking","affected_versions":">=2.8.0"},{"fix":"Implement a loop to regularly call `producer.get_delivery_report()` and process the returned reports to prevent memory leaks. This is often done in a separate thread or an asynchronous callback.","message":"When using `Producer` with `delivery_reports=True`, it's critical to regularly drain the delivery report queue. Failing to do so can lead to unbounded memory growth, as reports are stored in memory until consumed.","severity":"gotcha","affected_versions":"All"},{"fix":"Carefully review the `Consumer Patterns` documentation regarding `auto_offset_reset` (`OffsetType.EARLIEST` or `OffsetType.LATEST`) and `reset_offset_on_start` to ensure the consumer starts at the intended offset, especially in production environments to avoid message loss or reprocessing.","message":"The behavior of `auto_offset_reset` and `reset_offset_on_start` in consumers can be counter-intuitive. `reset_offset_on_start=True` will *always* reset the offset based on `auto_offset_reset` for the first fetch, even if committed offsets exist. If `False`, it will use committed offsets if available.","severity":"gotcha","affected_versions":"All"},{"fix":"Install `librdkafka` development packages on your system (e.g., `librdkafka-dev` on Debian/Ubuntu, `librdkafka-devel` on RHEL/CentOS) and then install pykafka using `RDKAFKA_INSTALL=system pip install pykafka`.","message":"While PyKafka is a pure-Python client, its high-performance C extension (librdkafka) is highly recommended for production use. Without it, performance can be significantly lower, and certain features might be less robust. Installation of `librdkafka` requires system-level dependencies.","severity":"gotcha","affected_versions":"All"}],"env_vars":null,"last_verified":"2026-04-16T00:00:00.000Z","next_check":"2026-07-15T00:00:00.000Z","problems":[{"fix":"Decode the byte string to a UTF-8 string: `consumer_group=b'my_group'.decode('utf-8')` or `client.topics[b'my_topic'].decode('utf-8')`.","cause":"Attempting to pass a byte string to a `consumer_group` parameter or `client.topics[...]` accessor that now expects a `str`.","error":"TypeError: argument of type 'bytes' is not iterable"},{"fix":"Verify that Kafka brokers are running and accessible from the client machine (check network, firewall). Ensure the `hosts` argument to `KafkaClient` is correct (e.g., `'localhost:9092'`). Check Kafka logs for broker issues or topic existence.","cause":"The KafkaClient could not connect to any of the specified Kafka brokers, or the topic does not exist/is not discoverable.","error":"pykafka.exceptions.NoBrokersAvailableError: No brokers available for topic 'my_topic'"},{"fix":"This can be transient. Often, catching the exception and re-instantiating the PyKafka component (e.g., `KafkaClient`, `Producer`, `Consumer`) or calling `stop()` then `start()` on the component might re-establish connection. Check network stability and Kafka broker health.","cause":"The client's connection to a Kafka broker was unexpectedly lost during an operation.","error":"pykafka.exceptions.SocketDisconnectedError: Tried to use a connection that was disconnected"},{"fix":"Check your Kafka ACLs (Access Control Lists) to ensure that the client has read/write permissions for the topic and consumer group, and `READ` permission on the cluster for consumer group management.","cause":"The Kafka user associated with the client does not have the necessary permissions to access the specified consumer group or topic.","error":"pykafka.exceptions.GroupAuthorizationFailed: [Error 29] Group Authorization Failed: This consumer group is not authorized to access this topic."}]}