confluent-kafka-stubs for Type Hinting

0.0.3 · active · verified Sun Apr 12

confluent-kafka-stubs provides static type stub files for the `confluent-kafka` Python client, enabling improved type checking with tools like MyPy. It is currently at version 0.0.3 and is released as part of the main `confluent-kafka-python` project, with updates typically mirroring the development of the core library.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates a simple producer and consumer using `confluent-kafka`. To leverage the `confluent-kafka-stubs` for type checking, ensure both `confluent-kafka` and `confluent-kafka-stubs` are installed in your environment. You can then run a static analysis tool like MyPy on your code to benefit from the provided type hints.

import os
from confluent_kafka import Producer, Consumer, KafkaException

def delivery_report(err, msg):
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}")

def run_example():
    # Configuration for Kafka (example with local Kafka or Confluent Cloud)
    # Replace with your actual Kafka broker(s)
    broker = os.environ.get('KAFKA_BROKER', 'localhost:9092')
    conf = {
        'bootstrap.servers': broker,
        # 'sasl.username': os.environ.get('KAFKA_USERNAME'),
        # 'sasl.password': os.environ.get('KAFKA_PASSWORD'),
        # 'security.protocol': 'SASL_SSL',
        # 'sasl.mechanisms': 'PLAIN',
        'acks': 'all'
    }

    topic = 'my_example_topic'

    # Producer
    print("--- Producer ---")
    producer = Producer(conf)
    try:
        producer.produce(topic, key='key1', value='hello world from stubs!', callback=delivery_report)
        producer.flush(10) # Wait for up to 10 seconds for any outstanding messages to be delivered
    except KafkaException as e:
        print(f"Producer error: {e}")

    # Consumer
    print("\n--- Consumer ---")
    consumer_conf = {
        'bootstrap.servers': broker,
        'group.id': 'my_consumer_group',
        'auto.offset.reset': 'earliest'
    }
    consumer = Consumer(consumer_conf)
    try:
        consumer.subscribe([topic])
        # Poll for messages for a short duration
        msg = consumer.poll(timeout=5.0)
        if msg is None:
            print("No message received within timeout.")
        elif msg.error():
            print(f"Consumer error: {msg.error()}")
        else:
            print(f"Received message: key={msg.key().decode('utf-8')}, value={msg.value().decode('utf-8')}")
    except KafkaException as e:
        print(f"Consumer error: {e}")
    finally:
        consumer.close()

if __name__ == '__main__':
    # To run this with type checking, save as e.g., 'example.py'
    # Then run 'mypy example.py'
    # And 'python example.py' (ensure Kafka broker is running)
    run_example()

view raw JSON →