aiokafka: Async Kafka Client for Python

0.13.0 · active · verified Thu Apr 09

aiokafka is an asyncio-compatible client for Apache Kafka, enabling asynchronous production and consumption of messages. It leverages Python's `async/await` syntax for non-blocking I/O operations, making it suitable for high-performance applications. The current version is 0.13.0, and it generally releases new versions every few months, often incorporating new Kafka KIPs and Python version support updates.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates setting up an `AIOKafkaProducer` to send a message and an `AIOKafkaConsumer` to receive it. Ensure you have a Kafka broker running (e.g., via Docker) and optionally set `KAFKA_BOOTSTRAP_SERVERS` and `KAFKA_TOPIC` environment variables. It highlights the `start()` and `stop()` methods, crucial for managing the client lifecycle in an asyncio application.

import asyncio
import os
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer

BOOTSTRAP_SERVERS = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092')
TOPIC = os.environ.get('KAFKA_TOPIC', 'my_test_topic')

async def main():
    producer = AIOKafkaProducer(bootstrap_servers=BOOTSTRAP_SERVERS)
    consumer = AIOKafkaConsumer(
        TOPIC,
        bootstrap_servers=BOOTSTRAP_SERVERS,
        group_id="my_consumer_group",
        auto_offset_reset="earliest"
    )

    print(f"Connecting to Kafka at {BOOTSTRAP_SERVERS}")
    await producer.start()
    await consumer.start()

    try:
        # Produce message
        print(f"Producing message to topic {TOPIC}")
        await producer.send_and_wait(TOPIC, b"Hello aiokafka!")
        print("Message produced.")

        # Consume message
        print(f"Consuming message from topic {TOPIC}")
        async for msg in consumer:
            print(f"Consumed: offset={msg.offset}, key={msg.key}, value={msg.value.decode()}")
            break # Only consume one message for the example

    finally:
        print("Stopping producer and consumer...")
        await producer.stop()
        await consumer.stop()
        print("Stopped.")

if __name__ == '__main__':
    asyncio.run(main())

view raw JSON →