Quix Streams

raw JSON →
3.23.6 verified Fri May 01 auth: no python

Quix Streams is a Python library for building stream processing applications with Apache Kafka. Version 3.23.6 supports stateful operations, windowing, connectors, and Quix Cloud integration. Requires Python >=3.9, <4. Release cadence is active with frequent minor releases.

pip install quixstreams
error ModuleNotFoundError: No module named 'confluent_kafka'
cause Missing Kafka C library or Python bindings.
fix
Install confluent-kafka: pip install quixstreams[kafka] or separately: pip install confluent-kafka
error AttributeError: 'Application' object has no attribute 'run'
cause Using a very old version (<0.5) where the method was named differently.
fix
Upgrade to latest: pip install --upgrade quixstreams
error ValueError: Topic 'my-topic' does not exist and auto_create_topics is False
cause Kafka topic must be created beforehand or auto-creation must be enabled.
fix
Set auto_create_topics=True in Application() or create the topic manually via Kafka admin.
error TypeError: 'NoneType' object is not callable in StreamingDataFrame.apply()
cause The apply function returned None instead of a value.
fix
Ensure your apply function returns a value (can be the input row modified).
breaking In v3.22.0, the default for `quix_portal_api` URL was removed. You must explicitly set it when connecting to Quix Cloud.
fix Set the `quix_portal_api` parameter in Application or use the corresponding environment variable.
deprecated The `lookup.quix_configuration_service` function is deprecated; use `app.quix_config_service` instead.
fix Replace function-based configuration lookup with the method on the Application instance.
gotcha When using `StreamingDataFrame.to_topic()`, the callable must return a `Topic` object, not a topic name string.
fix Ensure your routing callable returns `app.topic('destination-topic')`.
gotcha State stores are local and not automatically replicated; replaying a stream from a different consumer group will start with empty state.
fix Design state to be idempotent or use external storage for persistence.
pip install quixstreams[kafka]

Minimal stream processing app that increments a counter for each message.

from quixstreams import Application
import os

app = Application(broker_address=os.environ.get('KAFKA_BROKER', 'localhost:9092'))
topic = app.topic('my-topic', value_deserializer='json')

@app.dataframe(topic)
def process(df):
    df = df.apply(lambda row: {'count': row.get('count', 0) + 1})
    return df

if __name__ == '__main__':
    app.run()