Reactive Extensions for Python (RxPY)
Reactive Extensions for Python (RxPY) is a library for composing asynchronous and event-based programs using observable sequences and pipable query operators. It enables developers to represent asynchronous data streams with Observables, query them using a rich set of operators, and manage concurrency with Schedulers. The library (via the 'rx' PyPI package) is currently at version 3.2.0, providing a robust framework for reactive programming in Python.
Warnings
- breaking The official RxPY project renamed its main module from `rx` to `reactivex` starting with version 4.x. While `pip install rx` currently provides version 3.x, upgrading to the latest RxPY (installed as `reactivex`) will require changes to import statements.
- breaking In RxPY v4, the `mapper` function argument was removed from operators that combine values from several observables (e.g., `combine_latest`, `group_join`, `join`, `with_latest_from`, `zip`, `zip_with_iterable`).
- breaking For developers creating custom operators using functional composition, the global `rx.pipe` function (used as a helper to compose other operators) was renamed to `reactivex.compose` in version 4.x.
- gotcha When using operators with multiple optional arguments, it is highly recommended to use named keyword arguments rather than positional arguments to prevent unexpected behavior due to argument order or future API changes.
Install
-
pip install rx
Imports
- rx
import rx
- operators
from rx import operators as ops
Quickstart
import rx
from rx import operators as ops
# Create an observable from a sequence of items
source = rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
# Define a pipeline of operators
composed = source.pipe(
ops.map(lambda s: len(s)),
ops.filter(lambda i: i >= 5)
)
# Subscribe to the observable to start the data flow and print results
composed.subscribe(
on_next=lambda value: print(f"Received {value}"),
on_error=lambda err: print(f"Error: {err}"),
on_completed=lambda: print("Done!")
)