aiostream
aiostream provides a collection of stream operators that can be combined to create asynchronous pipelines of operations. It can be seen as an asynchronous version of itertools, although some aspects are are slightly different, offering features like operator pipe-lining, repeatability, and safe iteration contexts. The library is currently at version 0.7.1 and maintains an active development and release cadence.
Warnings
- breaking In `aiostream` v0.6.0, stream operators (e.g., `map`, `filter`) transitioned from being classes to singleton objects. This means you should directly call `stream.map(...)` instead of `stream.map().pipe(...)` or `stream.Map().pipe(...)`.
- breaking In `aiostream` v0.5.0, the `merge`, `chain`, and `ziplatest` operators no longer accept zero sources. Passing no arguments will now result in an error, as they expect at least one source stream to combine.
- gotcha Starting with `aiostream` v0.7.0, a `ValueError` is raised if `task_limit` or `ordered` arguments are provided to an operator (like `pipe.map`) when the supplied function is synchronous. These arguments are only applicable when the function being mapped is an asynchronous coroutine.
- breaking `aiostream` dropped support for Python 3.8 in version 0.6.4. The library now officially requires Python 3.9 or newer.
Install
-
pip install aiostream
Imports
- stream
from aiostream import stream
- pipe
from aiostream import pipe
Quickstart
import asyncio
from aiostream import stream, pipe
async def main():
# Create a counting stream with a 0.1-second interval
xs = stream.count(interval=0.1)
# Operators can be piped using '|' to transform the stream
ys = xs \
| pipe.map(lambda x: x**2) \
| pipe.take(5) # Take the first 5 elements
print("Squared stream elements:")
# Use a stream context for proper resource management and iterate
async with ys.stream() as streamer:
async for y in streamer:
print(f"-> {y}")
# Streams can also be awaited to get the last value after processing
last_value = await (stream.range(1, 4) | pipe.list())
print(f"List from range stream: {last_value}")
if __name__ == "__main__":
asyncio.run(main())