{"id":2376,"library":"aiostream","title":"aiostream","description":"aiostream provides a collection of stream operators that can be combined to create asynchronous pipelines of operations. It can be seen as an asynchronous version of itertools, although some aspects are are slightly different, offering features like operator pipe-lining, repeatability, and safe iteration contexts. The library is currently at version 0.7.1 and maintains an active development and release cadence.","status":"active","version":"0.7.1","language":"en","source_language":"en","source_url":"https://github.com/vxgmichel/aiostream","tags":["async","stream","generator","asyncio","data processing","reactive programming"],"install":[{"cmd":"pip install aiostream","lang":"bash","label":"Install latest version"}],"dependencies":[],"imports":[{"note":"The primary stream creation methods are exposed directly under 'aiostream.stream'.","wrong":"from aiostream.core import Stream","symbol":"stream","correct":"from aiostream import stream"},{"note":"Pipe operators are available directly from 'aiostream.pipe' for pipe-lining operations.","wrong":"from aiostream.operators import pipe","symbol":"pipe","correct":"from aiostream import pipe"}],"quickstart":{"code":"import asyncio\nfrom aiostream import stream, pipe\n\nasync def main():\n    # Create a counting stream with a 0.1-second interval\n    xs = stream.count(interval=0.1)\n\n    # Operators can be piped using '|' to transform the stream\n    ys = xs \\\n        | pipe.map(lambda x: x**2) \\\n        | pipe.take(5) # Take the first 5 elements\n\n    print(\"Squared stream elements:\")\n    # Use a stream context for proper resource management and iterate\n    async with ys.stream() as streamer:\n        async for y in streamer:\n            print(f\"-> {y}\")\n\n    # Streams can also be awaited to get the last value after processing\n    last_value = await (stream.range(1, 4) | pipe.list())\n    print(f\"List from range stream: {last_value}\")\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n","lang":"python","description":"This quickstart demonstrates creating an asynchronous stream, applying transformations using pipe operators, iterating through the stream, and awaiting a stream to collect its results."},"warnings":[{"fix":"Remove explicit instantiation of operators. For example, change `stream.map(func)` to `stream.map(source, func)` or `source | pipe.map(func)`.","message":"In `aiostream` v0.6.0, stream operators (e.g., `map`, `filter`) transitioned from being classes to singleton objects. This means you should directly call `stream.map(...)` instead of `stream.map().pipe(...)` or `stream.Map().pipe(...)`.","severity":"breaking","affected_versions":">=0.6.0"},{"fix":"Ensure that `stream.merge()`, `stream.chain()`, or `stream.ziplatest()` are always provided with one or more asynchronous iterable sources.","message":"In `aiostream` v0.5.0, the `merge`, `chain`, and `ziplatest` operators no longer accept zero sources. Passing no arguments will now result in an error, as they expect at least one source stream to combine.","severity":"breaking","affected_versions":">=0.5.0"},{"fix":"When using `pipe.map` or similar operators with a synchronous function, omit the `task_limit` and `ordered` arguments. Ensure these arguments are only used with asynchronous functions.","message":"Starting with `aiostream` v0.7.0, a `ValueError` is raised if `task_limit` or `ordered` arguments are provided to an operator (like `pipe.map`) when the supplied function is synchronous. These arguments are only applicable when the function being mapped is an asynchronous coroutine.","severity":"gotcha","affected_versions":">=0.7.0"},{"fix":"Upgrade your Python environment to version 3.9 or later to use `aiostream` versions 0.6.4 and above.","message":"`aiostream` dropped support for Python 3.8 in version 0.6.4. The library now officially requires Python 3.9 or newer.","severity":"breaking","affected_versions":">=0.6.4"}],"env_vars":null,"last_verified":"2026-04-10T00:00:00.000Z","next_check":"2026-07-09T00:00:00.000Z"}