FastStream

0.6.7 · active · verified Sun Apr 12

FastStream is a modern, asynchronous framework designed to simplify interactions with various messaging queues (Kafka, RabbitMQ, Redis, NATS, MQTT). It provides an intuitive, FastAPI-like API for building message-driven applications, handling message serialization, dependency injection, and health checks. The current stable version is 0.6.7, with frequent minor releases and a significant 0.7.0 release candidate bringing breaking changes.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates a basic FastStream application with Kafka. It defines a Kafka broker, an application instance, and a subscriber that listens to 'test-topic'. A message is published on application startup for immediate testing. Ensure Kafka is running and FastStream is installed with Kafka extras to run.

import os
from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker

# Configure Kafka broker connection
# Use 'localhost:9092' for local Kafka, or environment variable for production
KAFKA_BROKER_URL = os.environ.get('KAFKA_BROKER_URL', 'localhost:9092')

broker = KafkaBroker(KAFKA_BROKER_URL)
app = FastStream(broker)

@broker.subscriber('test-topic')
async def handle_message(msg: str, logger: Logger):
    logger.info(f"Received message: {msg}")

@app.on_startup
async def app_startup():
    logger = Logger().bind(service="startup")
    logger.info("FastStream application started")
    # Example: publish a message on startup
    await broker.publish("Hello, FastStream!", topic='test-topic')

@app.on_shutdown
async def app_shutdown():
    logger = Logger().bind(service="shutdown")
    logger.info("FastStream application stopped")

# To run this application:
# 1. Ensure Kafka is running.
# 2. Save as `app.py`.
# 3. `pip install "faststream[kafka]"`
# 4. `faststream run app:app`

view raw JSON →