aiostream

raw JSON →
0.7.1 verified Fri Apr 24 auth: no python

aiostream provides a collection of stream operators that can be combined to create asynchronous pipelines of operations. It can be seen as an asynchronous version of itertools, although some aspects are are slightly different, offering features like operator pipe-lining, repeatability, and safe iteration contexts. The library is currently at version 0.7.1 and maintains an active development and release cadence.

pip install aiostream
error ModuleNotFoundError: No module named 'aiostream'
cause The 'aiostream' package is not installed in the Python environment.
fix
Install the package using 'pip install aiostream'.
error ImportError: cannot import name 'stream' from 'aiostream'
cause The 'stream' module is not available in the 'aiostream' package, possibly due to an incorrect import statement.
fix
Ensure the correct import statement: 'from aiostream import stream'.
error TypeError: 'Stream' object is not iterable
cause Attempting to iterate over a 'Stream' object without using an asynchronous context.
fix
Use 'async for' within an 'async with' context: 'async with stream.stream() as s: async for item in s: ...'.
error AttributeError: module 'aiostream' has no attribute 'pipe'
cause The 'pipe' module is not directly accessible from 'aiostream'.
fix
Import 'pipe' separately: 'from aiostream import pipe'.
error RuntimeError: This event loop is already running
cause Calling 'asyncio.run()' inside an already running event loop, often in interactive environments like Jupyter notebooks.
fix
Use 'await' directly in an async function or use 'nest_asyncio' to allow nested event loops.
breaking In `aiostream` v0.6.0, stream operators (e.g., `map`, `filter`) transitioned from being classes to singleton objects. This means you should directly call `stream.map(...)` instead of `stream.map().pipe(...)` or `stream.Map().pipe(...)`.
fix Remove explicit instantiation of operators. For example, change `stream.map(func)` to `stream.map(source, func)` or `source | pipe.map(func)`.
breaking In `aiostream` v0.5.0, the `merge`, `chain`, and `ziplatest` operators no longer accept zero sources. Passing no arguments will now result in an error, as they expect at least one source stream to combine.
fix Ensure that `stream.merge()`, `stream.chain()`, or `stream.ziplatest()` are always provided with one or more asynchronous iterable sources.
gotcha Starting with `aiostream` v0.7.0, a `ValueError` is raised if `task_limit` or `ordered` arguments are provided to an operator (like `pipe.map`) when the supplied function is synchronous. These arguments are only applicable when the function being mapped is an asynchronous coroutine.
fix When using `pipe.map` or similar operators with a synchronous function, omit the `task_limit` and `ordered` arguments. Ensure these arguments are only used with asynchronous functions.
breaking `aiostream` dropped support for Python 3.8 in version 0.6.4. The library now officially requires Python 3.9 or newer.
fix Upgrade your Python environment to version 3.9 or later to use `aiostream` versions 0.6.4 and above.
runtime status import time mem disk
3.10-alpine 0.16s 5.0MB 18.4M
3.10-slim 0.09s 5.0MB 19M
3.11-alpine 0.38s 6.0MB 20.3M
3.11-slim 0.19s 6.0MB 21M
3.12-alpine 0.71s 8.2MB 12.2M
3.12-slim 0.47s 8.2MB 13M
3.13-alpine 0.69s 8.7MB 11.8M
3.13-slim 0.45s 8.7MB 12M
3.9-alpine 0.17s 4.9MB 17.9M
3.9-slim 0.11s 4.9MB 18M

This quickstart demonstrates creating an asynchronous stream, applying transformations using pipe operators, iterating through the stream, and awaiting a stream to collect its results.

import asyncio
from aiostream import stream, pipe

async def main():
    # Create a counting stream with a 0.1-second interval
    xs = stream.count(interval=0.1)

    # Operators can be piped using '|' to transform the stream
    ys = xs \
        | pipe.map(lambda x: x**2) \
        | pipe.take(5) # Take the first 5 elements

    print("Squared stream elements:")
    # Use a stream context for proper resource management and iterate
    async with ys.stream() as streamer:
        async for y in streamer:
            print(f"-> {y}")

    # Streams can also be awaited to get the last value after processing
    last_value = await (stream.range(1, 4) | pipe.list())
    print(f"List from range stream: {last_value}")

if __name__ == "__main__":
    asyncio.run(main())