aiostream

0.7.1 · active · verified Fri Apr 10

aiostream provides a collection of stream operators that can be combined to create asynchronous pipelines of operations. It can be seen as an asynchronous version of itertools, although some aspects are are slightly different, offering features like operator pipe-lining, repeatability, and safe iteration contexts. The library is currently at version 0.7.1 and maintains an active development and release cadence.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates creating an asynchronous stream, applying transformations using pipe operators, iterating through the stream, and awaiting a stream to collect its results.

import asyncio
from aiostream import stream, pipe

async def main():
    # Create a counting stream with a 0.1-second interval
    xs = stream.count(interval=0.1)

    # Operators can be piped using '|' to transform the stream
    ys = xs \
        | pipe.map(lambda x: x**2) \
        | pipe.take(5) # Take the first 5 elements

    print("Squared stream elements:")
    # Use a stream context for proper resource management and iterate
    async with ys.stream() as streamer:
        async for y in streamer:
            print(f"-> {y}")

    # Streams can also be awaited to get the last value after processing
    last_value = await (stream.range(1, 4) | pipe.list())
    print(f"List from range stream: {last_value}")

if __name__ == "__main__":
    asyncio.run(main())

view raw JSON →