{"id":5928,"library":"faust-streaming","title":"Faust Streaming","description":"Faust-streaming is a Python stream processing library that ports concepts from Kafka Streams to Python. It enables building high-performance distributed systems and real-time data pipelines. This project is an actively maintained fork of the original Faust library, aiming for continuous releases, improved code quality, and support for the latest Kafka drivers. The current version is 0.11.3, with releases happening periodically based on community contributions and dependency updates.","status":"active","version":"0.11.3","language":"en","source_language":"en","source_url":"https://github.com/faust-streaming/faust","tags":["kafka","stream-processing","asyncio","real-time","event-driven"],"install":[{"cmd":"pip install faust-streaming","lang":"bash","label":"Install stable version"}],"dependencies":[{"reason":"Underlying asynchronous Kafka client used for consuming and producing messages.","package":"aiokafka","optional":false},{"reason":"Core dependency for async primitives and service management.","package":"mode-streaming","optional":false},{"reason":"Used for local state storage in tables (C++ embedded database). Not a direct Python installable, but an underlying component.","package":"RocksDB","optional":true}],"imports":[{"symbol":"App","correct":"from faust import App"},{"symbol":"Record","correct":"from faust import Record"}],"quickstart":{"code":"import faust\nimport os\n\n# Configure Kafka broker from environment variable or default to localhost\nKAFKA_BROKER = os.environ.get('FAUST_BROKER', 'kafka://localhost:9092')\n\napp = faust.App(\n    'my-streaming-app',\n    broker=KAFKA_BROKER,\n    value_serializer='json',\n)\n\n# Define a data model for messages\nclass Order(faust.Record):\n    account_id: str\n    amount: float\n    item_id: str\n\n# Define an input topic using the Order model\norders_topic = app.topic('orders', value_type=Order)\n\n# Define an agent to process messages from the 'orders' topic\n@app.agent(orders_topic)\nasync def process_orders(orders):\n    async for order in orders:\n        print(f\"Processing order {order.item_id} for account {order.account_id}: ${order.amount}\")\n        # Example: Perform some asynchronous operation or send to another topic\n        # await another_topic.send(value={'processed': True, 'order_id': order.item_id})\n\n# To run this worker: save as app.py, then execute 'faust -A app worker -l info'","lang":"python","description":"This quickstart demonstrates a basic Faust streaming application. It initializes a Faust application, defines a `Record` model for incoming data, sets up a Kafka topic, and creates an `@app.agent` to consume and process messages from that topic. The Kafka broker address is configurable via an environment variable."},"warnings":[{"fix":"Ensure `aiokafka` is updated to a compatible version (>=0.10.0, preferably latest stable) when upgrading `faust-streaming`. Review any custom code interacting directly with `aiokafka` internals.","message":"Upgrading to v0.11.2 changed the internal `aiokafka` topic error handling (`topic_errors` was renamed) and bumped the minimum `aiokafka` version to `0.10.0`. Older `aiokafka` versions may cause compatibility issues.","severity":"breaking","affected_versions":">=0.11.2"},{"fix":"Monitor `stream_processing_timeout` and agent logs for errors. Consider increasing `stream_processing_timeout` if processing individual events takes longer than the default. Review agent logic for potential deadlocks or long-running synchronous operations.","message":"Users reported that consumers could 'slowly die over time' or stop receiving messages after upgrading to `v0.11.2` from `v0.11.1`. This might be related to stream processing timeouts or agent hanging.","severity":"gotcha","affected_versions":"0.11.2"},{"fix":"Ensure `mode-streaming` is installed at version `0.4.0` or higher to avoid import errors and ensure correct functionality when using `faust-streaming >=0.11.0`.","message":"Version `0.11.0` included fixes for imports from `mode-streaming~=0.4.0`. This indicates a strong compatibility requirement with `mode-streaming` version `0.4.0` or newer.","severity":"breaking","affected_versions":">=0.11.0"},{"fix":"Plan for schema evolution. For topic type changes, implement an upgrade path (e.g., new topic for new schema) and ensure all consumers/producers are updated. For model renames, ensure old model names are still resolvable or provide a migration.","message":"Changing the `key_type` or `value_type` of an existing topic is a backward-incompatible change. All Faust instances using the old types must be restarted. Renaming model classes can also cause deserialization errors if not handled with an upgrade strategy.","severity":"breaking","affected_versions":"all"},{"fix":"Design agents that modify tables to run with a concurrency of 1, or ensure that only non-concurrent agents perform table modifications. Concurrent agents should only read from tables.","message":"Concurrent agents are explicitly not allowed to modify tables. Attempting to do so will raise an exception.","severity":"gotcha","affected_versions":"all"}],"env_vars":null,"last_verified":"2026-04-14T00:00:00.000Z","next_check":"2026-07-13T00:00:00.000Z","problems":[]}