Reactive Extensions for Python (RxPY)

3.2.0 · active · verified Thu Apr 09

Reactive Extensions for Python (RxPY) is a library for composing asynchronous and event-based programs using observable sequences and pipable query operators. It enables developers to represent asynchronous data streams with Observables, query them using a rich set of operators, and manage concurrency with Schedulers. The library (via the 'rx' PyPI package) is currently at version 3.2.0, providing a robust framework for reactive programming in Python.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates creating an observable from a simple sequence, applying transformation and filtering operators using the `pipe` method, and subscribing with `on_next`, `on_error`, and `on_completed` handlers. This uses the pipe-based operator chaining introduced in v3.

import rx
from rx import operators as ops

# Create an observable from a sequence of items
source = rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

# Define a pipeline of operators
composed = source.pipe(
    ops.map(lambda s: len(s)),
    ops.filter(lambda i: i >= 5)
)

# Subscribe to the observable to start the data flow and print results
composed.subscribe(
    on_next=lambda value: print(f"Received {value}"),
    on_error=lambda err: print(f"Error: {err}"),
    on_completed=lambda: print("Done!")
)

view raw JSON →