ReactiveX for Python (RxPY)

4.1.0 · active · verified Thu Apr 09

ReactiveX for Python (RxPY) is a library for composing asynchronous and event-based programs using observable sequences and pipable query operators. It enables developers to represent asynchronous data streams with Observables, query them using operators, and parameterize concurrency using Schedulers. The library is currently at stable version 4.1.0, with development on version 5.0.0 actively ongoing, and follows a release cadence that modernizes it with current Python standards.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates creating an observable from a sequence of strings, transforming them using `map` (to get length) and `filter` (to keep lengths >= 5), and then subscribing to process the results. It highlights the functional `pipe` operator for chaining transformations and the use of `on_next`, `on_error`, and `on_completed` handlers.

import reactivex as rx
from reactivex import operators as ops

def my_on_next(value):
    print(f"Received: {value}")

def my_on_error(error):
    print(f"Error: {error}")

def my_on_completed():
    print("Done!")

# Create an observable from a sequence and apply operators
source = rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

disposable = source.pipe(
    ops.map(lambda s: len(s)),
    ops.filter(lambda i: i >= 5)
).subscribe(
    on_next=my_on_next,
    on_error=my_on_error,
    on_completed=my_on_completed
)

# For long-running observables or specific cleanup, dispose is important
disposable.dispose()

view raw JSON →