{"id":1310,"library":"aiokafka","title":"aiokafka: Async Kafka Client for Python","description":"aiokafka is an asyncio-compatible client for Apache Kafka, enabling asynchronous production and consumption of messages. It leverages Python's `async/await` syntax for non-blocking I/O operations, making it suitable for high-performance applications. The current version is 0.13.0, and it generally releases new versions every few months, often incorporating new Kafka KIPs and Python version support updates.","status":"active","version":"0.13.0","language":"en","source_language":"en","source_url":"https://github.com/aio-libs/aiokafka","tags":["kafka","asyncio","message-queue","stream-processing","real-time"],"install":[{"cmd":"pip install aiokafka","lang":"bash","label":"Core library"},{"cmd":"pip install aiokafka[snappy,lz4,zstd]","lang":"bash","label":"With optional compression dependencies"}],"dependencies":[{"reason":"Required for LZ4 compression codec support in Kafka.","package":"lz4","optional":true},{"reason":"Required for ZStandard compression codec support in Kafka.","package":"zstandard","optional":true},{"reason":"Required for Snappy compression codec support in Kafka.","package":"python-snappy","optional":true}],"imports":[{"symbol":"AIOKafkaConsumer","correct":"from aiokafka import AIOKafkaConsumer"},{"symbol":"AIOKafkaProducer","correct":"from aiokafka import AIOKafkaProducer"},{"symbol":"AIOKafkaAdminClient","correct":"from aiokafka import AIOKafkaAdminClient"},{"note":"KafkaError and other specific error classes are located in the `aiokafka.errors` submodule.","wrong":"from aiokafka import KafkaError","symbol":"KafkaError","correct":"from aiokafka.errors import KafkaError"}],"quickstart":{"code":"import asyncio\nimport os\nfrom aiokafka import AIOKafkaConsumer, AIOKafkaProducer\n\nBOOTSTRAP_SERVERS = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092')\nTOPIC = os.environ.get('KAFKA_TOPIC', 'my_test_topic')\n\nasync def main():\n    producer = AIOKafkaProducer(bootstrap_servers=BOOTSTRAP_SERVERS)\n    consumer = AIOKafkaConsumer(\n        TOPIC,\n        bootstrap_servers=BOOTSTRAP_SERVERS,\n        group_id=\"my_consumer_group\",\n        auto_offset_reset=\"earliest\"\n    )\n\n    print(f\"Connecting to Kafka at {BOOTSTRAP_SERVERS}\")\n    await producer.start()\n    await consumer.start()\n\n    try:\n        # Produce message\n        print(f\"Producing message to topic {TOPIC}\")\n        await producer.send_and_wait(TOPIC, b\"Hello aiokafka!\")\n        print(\"Message produced.\")\n\n        # Consume message\n        print(f\"Consuming message from topic {TOPIC}\")\n        async for msg in consumer:\n            print(f\"Consumed: offset={msg.offset}, key={msg.key}, value={msg.value.decode()}\")\n            break # Only consume one message for the example\n\n    finally:\n        print(\"Stopping producer and consumer...\")\n        await producer.stop()\n        await consumer.stop()\n        print(\"Stopped.\")\n\nif __name__ == '__main__':\n    asyncio.run(main())","lang":"python","description":"This quickstart demonstrates setting up an `AIOKafkaProducer` to send a message and an `AIOKafkaConsumer` to receive it. Ensure you have a Kafka broker running (e.g., via Docker) and optionally set `KAFKA_BOOTSTRAP_SERVERS` and `KAFKA_TOPIC` environment variables. It highlights the `start()` and `stop()` methods, crucial for managing the client lifecycle in an asyncio application."},"warnings":[{"fix":"Remove the `api_version` parameter from your client constructor calls. The library will negotiate the correct API version with the Kafka brokers.","message":"The `api_version` parameter has been removed from `AIOKafkaConsumer`, `AIOKafkaProducer`, and `AIOKafkaAdminClient` constructors. API versions are now resolved automatically at connection time.","severity":"breaking","affected_versions":">=0.13.0"},{"fix":"Upgrade your Python environment to 3.10 or newer if you are using aiokafka 0.12.0+.","message":"aiokafka has progressively dropped support for older Python versions. Ensure your environment meets the minimum requirement.","severity":"breaking","affected_versions":"0.12.0 drops Python 3.8; 0.8.1 drops Python 3.7; 0.8.0 drops Python 3.6. Current minimum is Python 3.10."},{"fix":"Always use `await` with `start()`, `stop()`, `send_and_wait()`, and when iterating over the consumer (e.g., `async for msg in consumer:`). Ensure your code runs within an `asyncio` event loop.","message":"aiokafka is an asyncio-native library. All operations that interact with Kafka are `await`-able. Using it outside an `async` context or without `await` will lead to runtime errors or incorrect behavior.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Install aiokafka with the necessary compression extras: `pip install aiokafka[snappy,lz4,zstd]` for full support.","message":"For Kafka messages compressed with Snappy, LZ4, or ZStandard, the corresponding Python compression libraries must be installed as extra dependencies, otherwise, messages compressed with these codecs cannot be decompressed, leading to `ImportError` or `CompressionError`.","severity":"gotcha","affected_versions":"All versions"}],"env_vars":null,"last_verified":"2026-04-09T00:00:00.000Z","next_check":"2026-07-08T00:00:00.000Z"}