FastStream
FastStream is a modern, asynchronous framework designed to simplify interactions with various messaging queues (Kafka, RabbitMQ, Redis, NATS, MQTT). It provides an intuitive, FastAPI-like API for building message-driven applications, handling message serialization, dependency injection, and health checks. The current stable version is 0.6.7, with frequent minor releases and a significant 0.7.0 release candidate bringing breaking changes.
Warnings
- breaking Version 0.7.0 (release candidate available) removes several deprecated features. Specifically: publisher/subscriber-level middlewares, `ack_policy` now replaces several deprecated options, `RedisJSONMessageParser` is removed (Redis services must now use binary message format), and `broker.close` is replaced by `broker.stop`.
- breaking As of 0.7.0rc0 (and likely stable 0.7.0), import paths for specific brokers have changed. They are now directly under `faststream.<broker_name>` (e.g., `from faststream.kafka import KafkaBroker`) instead of `faststream.broker.<broker_name>`.
- gotcha FastStream's core installation (`pip install faststream`) does not include broker-specific client libraries. You must install FastStream with the appropriate extras, e.g., `pip install "faststream[kafka]"`, `"faststream[redis]"`, `"faststream[mqtt]"`, etc. Failing to do so will result in `ModuleNotFoundError` when trying to import or use a broker.
- breaking Version 0.6.0 introduced significant changes to the Middleware API and Router API. Applications upgrading from versions prior to 0.6.0 may experience breaking changes related to how middlewares are defined and applied, and how routers are configured.
Install
-
pip install "faststream[kafka]" -
pip install faststream
Imports
- FastStream
from faststream import FastStream
- Logger
from faststream import Logger
- KafkaBroker
from faststream.kafka import KafkaBroker
- MQTTBroker
from faststream.mqtt import MQTTBroker
- ContextRef
from faststream.types import ContextRef
Quickstart
import os
from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker
# Configure Kafka broker connection
# Use 'localhost:9092' for local Kafka, or environment variable for production
KAFKA_BROKER_URL = os.environ.get('KAFKA_BROKER_URL', 'localhost:9092')
broker = KafkaBroker(KAFKA_BROKER_URL)
app = FastStream(broker)
@broker.subscriber('test-topic')
async def handle_message(msg: str, logger: Logger):
logger.info(f"Received message: {msg}")
@app.on_startup
async def app_startup():
logger = Logger().bind(service="startup")
logger.info("FastStream application started")
# Example: publish a message on startup
await broker.publish("Hello, FastStream!", topic='test-topic')
@app.on_shutdown
async def app_shutdown():
logger = Logger().bind(service="shutdown")
logger.info("FastStream application stopped")
# To run this application:
# 1. Ensure Kafka is running.
# 2. Save as `app.py`.
# 3. `pip install "faststream[kafka]"`
# 4. `faststream run app:app`