aiokafka: Async Kafka Client for Python
aiokafka is an asyncio-compatible client for Apache Kafka, enabling asynchronous production and consumption of messages. It leverages Python's `async/await` syntax for non-blocking I/O operations, making it suitable for high-performance applications. The current version is 0.13.0, and it generally releases new versions every few months, often incorporating new Kafka KIPs and Python version support updates.
Warnings
- breaking The `api_version` parameter has been removed from `AIOKafkaConsumer`, `AIOKafkaProducer`, and `AIOKafkaAdminClient` constructors. API versions are now resolved automatically at connection time.
- breaking aiokafka has progressively dropped support for older Python versions. Ensure your environment meets the minimum requirement.
- gotcha aiokafka is an asyncio-native library. All operations that interact with Kafka are `await`-able. Using it outside an `async` context or without `await` will lead to runtime errors or incorrect behavior.
- gotcha For Kafka messages compressed with Snappy, LZ4, or ZStandard, the corresponding Python compression libraries must be installed as extra dependencies, otherwise, messages compressed with these codecs cannot be decompressed, leading to `ImportError` or `CompressionError`.
Install
-
pip install aiokafka -
pip install aiokafka[snappy,lz4,zstd]
Imports
- AIOKafkaConsumer
from aiokafka import AIOKafkaConsumer
- AIOKafkaProducer
from aiokafka import AIOKafkaProducer
- AIOKafkaAdminClient
from aiokafka import AIOKafkaAdminClient
- KafkaError
from aiokafka.errors import KafkaError
Quickstart
import asyncio
import os
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
BOOTSTRAP_SERVERS = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092')
TOPIC = os.environ.get('KAFKA_TOPIC', 'my_test_topic')
async def main():
producer = AIOKafkaProducer(bootstrap_servers=BOOTSTRAP_SERVERS)
consumer = AIOKafkaConsumer(
TOPIC,
bootstrap_servers=BOOTSTRAP_SERVERS,
group_id="my_consumer_group",
auto_offset_reset="earliest"
)
print(f"Connecting to Kafka at {BOOTSTRAP_SERVERS}")
await producer.start()
await consumer.start()
try:
# Produce message
print(f"Producing message to topic {TOPIC}")
await producer.send_and_wait(TOPIC, b"Hello aiokafka!")
print("Message produced.")
# Consume message
print(f"Consuming message from topic {TOPIC}")
async for msg in consumer:
print(f"Consumed: offset={msg.offset}, key={msg.key}, value={msg.value.decode()}")
break # Only consume one message for the example
finally:
print("Stopping producer and consumer...")
await producer.stop()
await consumer.stop()
print("Stopped.")
if __name__ == '__main__':
asyncio.run(main())