Faust Streaming
Faust-streaming is a Python stream processing library that ports concepts from Kafka Streams to Python. It enables building high-performance distributed systems and real-time data pipelines. This project is an actively maintained fork of the original Faust library, aiming for continuous releases, improved code quality, and support for the latest Kafka drivers. The current version is 0.11.3, with releases happening periodically based on community contributions and dependency updates.
Warnings
- breaking Upgrading to v0.11.2 changed the internal `aiokafka` topic error handling (`topic_errors` was renamed) and bumped the minimum `aiokafka` version to `0.10.0`. Older `aiokafka` versions may cause compatibility issues.
- gotcha Users reported that consumers could 'slowly die over time' or stop receiving messages after upgrading to `v0.11.2` from `v0.11.1`. This might be related to stream processing timeouts or agent hanging.
- breaking Version `0.11.0` included fixes for imports from `mode-streaming~=0.4.0`. This indicates a strong compatibility requirement with `mode-streaming` version `0.4.0` or newer.
- breaking Changing the `key_type` or `value_type` of an existing topic is a backward-incompatible change. All Faust instances using the old types must be restarted. Renaming model classes can also cause deserialization errors if not handled with an upgrade strategy.
- gotcha Concurrent agents are explicitly not allowed to modify tables. Attempting to do so will raise an exception.
Install
-
pip install faust-streaming
Imports
- App
from faust import App
- Record
from faust import Record
Quickstart
import faust
import os
# Configure Kafka broker from environment variable or default to localhost
KAFKA_BROKER = os.environ.get('FAUST_BROKER', 'kafka://localhost:9092')
app = faust.App(
'my-streaming-app',
broker=KAFKA_BROKER,
value_serializer='json',
)
# Define a data model for messages
class Order(faust.Record):
account_id: str
amount: float
item_id: str
# Define an input topic using the Order model
orders_topic = app.topic('orders', value_type=Order)
# Define an agent to process messages from the 'orders' topic
@app.agent(orders_topic)
async def process_orders(orders):
async for order in orders:
print(f"Processing order {order.item_id} for account {order.account_id}: ${order.amount}")
# Example: Perform some asynchronous operation or send to another topic
# await another_topic.send(value={'processed': True, 'order_id': order.item_id})
# To run this worker: save as app.py, then execute 'faust -A app worker -l info'