Faust Streaming

0.11.3 · active · verified Tue Apr 14

Faust-streaming is a Python stream processing library that ports concepts from Kafka Streams to Python. It enables building high-performance distributed systems and real-time data pipelines. This project is an actively maintained fork of the original Faust library, aiming for continuous releases, improved code quality, and support for the latest Kafka drivers. The current version is 0.11.3, with releases happening periodically based on community contributions and dependency updates.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates a basic Faust streaming application. It initializes a Faust application, defines a `Record` model for incoming data, sets up a Kafka topic, and creates an `@app.agent` to consume and process messages from that topic. The Kafka broker address is configurable via an environment variable.

import faust
import os

# Configure Kafka broker from environment variable or default to localhost
KAFKA_BROKER = os.environ.get('FAUST_BROKER', 'kafka://localhost:9092')

app = faust.App(
    'my-streaming-app',
    broker=KAFKA_BROKER,
    value_serializer='json',
)

# Define a data model for messages
class Order(faust.Record):
    account_id: str
    amount: float
    item_id: str

# Define an input topic using the Order model
orders_topic = app.topic('orders', value_type=Order)

# Define an agent to process messages from the 'orders' topic
@app.agent(orders_topic)
async def process_orders(orders):
    async for order in orders:
        print(f"Processing order {order.item_id} for account {order.account_id}: ${order.amount}")
        # Example: Perform some asynchronous operation or send to another topic
        # await another_topic.send(value={'processed': True, 'order_id': order.item_id})

# To run this worker: save as app.py, then execute 'faust -A app worker -l info'

view raw JSON →