ReactiveX for Python (RxPY)
ReactiveX for Python (RxPY) is a library for composing asynchronous and event-based programs using observable sequences and pipable query operators. It enables developers to represent asynchronous data streams with Observables, query them using operators, and parameterize concurrency using Schedulers. The library is currently at stable version 4.1.0, with development on version 5.0.0 actively ongoing, and follows a release cadence that modernizes it with current Python standards.
Warnings
- breaking Major API changes occurred during the migration from RxPY v3 to v4. The main module name changed from `rx` to `reactivex`. Operator chaining shifted from method-based calls (e.g., `Observable.map().filter()`) to a functional `pipe` method that takes operators from `reactivex.operators` (e.g., `Observable.pipe(ops.map(), ops.filter())`). The `pipe` function itself was renamed to `compose`, with a new `pipe` method for chaining.
- breaking Python 3.10+ deprecations introduced breaking changes in RxPY v4 related to concurrency primitives. Specifically, `setDaemon` (deprecated in Python 3.10) was replaced, and the `loop` parameter for methods like `create` and other concurrency-related functions was removed. Coroutine decorators were replaced with the `async` keyword. Users interacting directly with event loops or threading might need adjustments.
- gotcha In Python 3.10 and above, `asyncio.get_event_loop()` is deprecated, and `asyncio.get_running_loop()` or `asyncio.new_event_loop()` combined with `asyncio.set_event_loop()` should be used instead. While RxPY v5 alpha has internally addressed this, user code that directly interacts with `asyncio` event loops for custom schedulers or integration might encounter `DeprecationWarning`s or `RuntimeError`s on newer Python versions.
- gotcha While RxPY v5 introduces method chaining (fluent style) as an additive feature alongside the existing pipe-based functional style, some users might find it confusing to switch between the two. Both styles are fully type-safe and can be mixed.
Install
-
pip install reactivex -
pip install 'reactivex>=5.0.0a0'
Imports
- reactivex
import reactivex as rx
- operators
from reactivex import operators as ops
Quickstart
import reactivex as rx
from reactivex import operators as ops
def my_on_next(value):
print(f"Received: {value}")
def my_on_error(error):
print(f"Error: {error}")
def my_on_completed():
print("Done!")
# Create an observable from a sequence and apply operators
source = rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
disposable = source.pipe(
ops.map(lambda s: len(s)),
ops.filter(lambda i: i >= 5)
).subscribe(
on_next=my_on_next,
on_error=my_on_error,
on_completed=my_on_completed
)
# For long-running observables or specific cleanup, dispose is important
disposable.dispose()