Async Iterable Pipe Utility

3.0.1 · active · verified Sun Apr 19

it-pipe is a JavaScript/TypeScript utility for composing asynchronous iterables into pipelines, simplifying stream processing in Node.js and browser environments. The library, currently at version 3.0.1, maintains an active release cadence with recent updates in early 2023. Its core `pipe` function efficiently connects sources, transforms, and sinks, automatically wrapping initial iterable sources into functions as needed. A key differentiator for `it-pipe` is its explicit support for duplex streams, enabling more complex bidirectional data flows. It leverages the `it-` ecosystem pattern for async iterables, offering a robust and type-safe solution for orchestrating sequential asynchronous operations. This package requires Node.js version 16 or newer.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to use `it-pipe` to chain an iterable source, an asynchronous transform function, and an asynchronous sink function, showcasing both number and string processing pipelines.

import { pipe } from 'it-pipe';

async function runPipelineExample() {
  const result = await pipe(
    // A source is just an iterable, this is shorthand for () => [1, 2, 3]
    [1, 2, 3],
    // A transform takes a source, and returns a source.
    // This transform doubles each value asynchronously.
    async function transform (source: AsyncIterable<number>): Promise<AsyncIterable<number>> {
      return (async function * (): AsyncIterable<number> {
        for await (const val of source) yield val * 2;
      })();
    },
    // A sink, it takes a source and consumes it, optionally returning a value.
    // This sink buffers up all the values from the source and returns them.
    async function collect (source: AsyncIterable<number>): Promise<number[]> {
      const vals: number[] = [];
      for await (const val of source) {
        vals.push(val);
      }
      return vals;
    }
  );

  console.log('Pipelined Result:', result); // Expected: [2, 4, 6]

  // Demonstrating piping an async generator directly
  async function* generateStrings(): AsyncIterable<string> {
    yield 'hello';
    yield 'world';
  }

  const collectedStrings = await pipe(
    generateStrings(), // An async iterable source
    async function toUpperCase(source: AsyncIterable<string>): Promise<AsyncIterable<string>> {
      return (async function*(): AsyncIterable<string> {
        for await (const s of source) {
          yield s.toUpperCase();
        }
      })();
    },
    async function collectAndJoin(source: AsyncIterable<string>): Promise<string> {
      let finalString = '';
      for await (const s of source) {
        finalString += s + ' ';
      }
      return finalString.trim();
    }
  );
  console.log('String Pipeline Result:', collectedStrings); // Expected: "HELLO WORLD"
}

runPipelineExample().catch(console.error);

view raw JSON →