confluent-kafka-stubs for Type Hinting
confluent-kafka-stubs provides static type stub files for the `confluent-kafka` Python client, enabling improved type checking with tools like MyPy. It is currently at version 0.0.3 and is released as part of the main `confluent-kafka-python` project, with updates typically mirroring the development of the core library.
Warnings
- gotcha confluent-kafka-stubs provides only type information for static analysis (e.g., MyPy, IDE auto-completion). It does not contain any executable code or change the runtime behavior of `confluent-kafka`. You must still install and use the `confluent-kafka` library for your application to function.
- gotcha Mismatching versions between `confluent-kafka` and `confluent-kafka-stubs` can lead to incorrect type hints or MyPy errors. If the underlying library's API changes significantly, the stubs might become outdated.
- gotcha Some advanced features or dynamic behaviors of `confluent-kafka` (e.g., highly dynamic configurations or callback signatures) might not be perfectly covered by static stubs, leading to occasional `Any` types or less precise hints.
Install
-
pip install confluent-kafka-stubs -
pip install confluent-kafka
Imports
- KafkaProducer, KafkaConsumer, Message
from confluent_kafka import Producer, Consumer, Message
Quickstart
import os
from confluent_kafka import Producer, Consumer, KafkaException
def delivery_report(err, msg):
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Message delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}")
def run_example():
# Configuration for Kafka (example with local Kafka or Confluent Cloud)
# Replace with your actual Kafka broker(s)
broker = os.environ.get('KAFKA_BROKER', 'localhost:9092')
conf = {
'bootstrap.servers': broker,
# 'sasl.username': os.environ.get('KAFKA_USERNAME'),
# 'sasl.password': os.environ.get('KAFKA_PASSWORD'),
# 'security.protocol': 'SASL_SSL',
# 'sasl.mechanisms': 'PLAIN',
'acks': 'all'
}
topic = 'my_example_topic'
# Producer
print("--- Producer ---")
producer = Producer(conf)
try:
producer.produce(topic, key='key1', value='hello world from stubs!', callback=delivery_report)
producer.flush(10) # Wait for up to 10 seconds for any outstanding messages to be delivered
except KafkaException as e:
print(f"Producer error: {e}")
# Consumer
print("\n--- Consumer ---")
consumer_conf = {
'bootstrap.servers': broker,
'group.id': 'my_consumer_group',
'auto.offset.reset': 'earliest'
}
consumer = Consumer(consumer_conf)
try:
consumer.subscribe([topic])
# Poll for messages for a short duration
msg = consumer.poll(timeout=5.0)
if msg is None:
print("No message received within timeout.")
elif msg.error():
print(f"Consumer error: {msg.error()}")
else:
print(f"Received message: key={msg.key().decode('utf-8')}, value={msg.value().decode('utf-8')}")
except KafkaException as e:
print(f"Consumer error: {e}")
finally:
consumer.close()
if __name__ == '__main__':
# To run this with type checking, save as e.g., 'example.py'
# Then run 'mypy example.py'
# And 'python example.py' (ensure Kafka broker is running)
run_example()