nr-stream
The `nr-stream` package provides utilities for writing functional-style code in Python, offering `Stream`, `Optional`, and `Refreshable` classes. The `Stream` class wraps iterables to enable chained modifiers, simplifying common operations. `Optional` represents a value that might be `None`, allowing for safe chaining, while `Refreshable` acts as a container for values that can be updated, propagating changes to listeners. The current version is 1.1.5, released on February 14, 2023, with an active but irregular release cadence.
Common errors
-
Stream is empty or produces no output after first use.
cause The `Stream` object's internal iterator has been exhausted by a previous operation. `Stream` objects are single-use by design.fixCreate a new `Stream` instance from the original data source (e.g., `Stream(my_list)`) for each sequence of operations that needs to start from the beginning of the data. -
Unexpected performance overhead or computations triggered by `Refreshable` updates.
cause Chained `Refreshable` operations are evaluated eagerly. Any `map` or `filter` operations on a `Refreshable` will cause their respective transformation functions to execute immediately whenever the upstream `Refreshable` is updated.fixUnderstand that `Refreshable` is designed for 'reactive' updates where all dependants are immediately notified and re-evaluated. If you require lazy evaluation or deferment of computation until explicitly requested, `Refreshable` might not be the appropriate tool, or you need to manage when updates are triggered more carefully.
Warnings
- gotcha Stream objects immediately convert the underlying iterable to an iterator upon creation. This means a `Stream` object can only be consumed once. Attempting to iterate or perform operations on it a second time will result in an empty stream or unexpected behavior, as the underlying iterator will be exhausted.
- gotcha `Refreshable` objects use eager evaluation. Chained modifications on a `Refreshable` are replayed immediately when the parent `Refreshable` is updated, not lazily when `.get()` is called. This can lead to computations being performed more often than expected if not understood.
Install
-
pip install nr-stream
Imports
- Stream
from nr.stream import Stream
- Optional
from nr.stream import Optional
- Refreshable
from nr.stream import Refreshable
Quickstart
from nr.stream import Stream
values = [3, 6, 4, 7, 1, 2, 5]
# Create a Stream, chunk it, map each chunk to its sum, and collect the results
processed_stream = Stream(values).chunks(3, fill=0).map(sum)
print(list(processed_stream))
# Expected output: [13, 10, 5]
from nr.stream import Optional
import os
# Example with Optional
opt_var = Optional(os.getenv("NON_EXISTENT_VAR"))
value_or_default = opt_var.or_else_get(lambda: "default_value")
print(f"Value or default: {value_or_default}")
from nr.stream import Refreshable
# Example with Refreshable
root_refreshable = Refreshable[int | None](None)
child_refreshable = root_refreshable.map(lambda v: 42 if v is None else v)
print(f"Initial root: {root_refreshable.get()}, child: {child_refreshable.get()}")
root_refreshable.update(10)
print(f"Updated root: {root_refreshable.get()}, child: {child_refreshable.get()}")