Async Iterable Pipe Utility
it-pipe is a JavaScript/TypeScript utility for composing asynchronous iterables into pipelines, simplifying stream processing in Node.js and browser environments. The library, currently at version 3.0.1, maintains an active release cadence with recent updates in early 2023. Its core `pipe` function efficiently connects sources, transforms, and sinks, automatically wrapping initial iterable sources into functions as needed. A key differentiator for `it-pipe` is its explicit support for duplex streams, enabling more complex bidirectional data flows. It leverages the `it-` ecosystem pattern for async iterables, offering a robust and type-safe solution for orchestrating sequential asynchronous operations. This package requires Node.js version 16 or newer.
Common errors
-
ReferenceError: require is not defined
cause Attempting to use CommonJS `require()` to import `it-pipe` in an environment configured for ES Modules (Node.js with `"type": "module"` or a browser context).fixChange your import statement from `const pipe = require('it-pipe')` to `import { pipe } from 'it-pipe'`. If in a browser, ensure you are using a bundler that handles ESM, or load via a script tag (e.g., `<script src="https://unpkg.com/it-pipe/dist/index.min.js"></script>`). -
TypeError: pipe is not a function
cause Incorrectly importing `pipe` as a default export (`import pipe from 'it-pipe'`) when it is a named export, or attempting to use `require()` with named exports.fixEnsure you are using named import syntax: `import { pipe } from 'it-pipe'`. If using CommonJS, which is not supported since v2.0.0, migrate to ESM. -
Type 'AsyncGenerator<any, void, unknown>' is not assignable to type 'Iterable<unknown> | Source<unknown>' (TypeScript)
cause A type mismatch occurs when a pipeline stage (source, transform, or sink) is not correctly typed to match the `AsyncIterable` or `Promise<AsyncIterable>` expectation of `it-pipe`.fixExplicitly type your asynchronous generators and functions to return `AsyncIterable<T>` or `Promise<AsyncIterable<T>>` for sources and transforms, and `Promise<R>` for sinks, where `T` is the type of the yielded elements and `R` is the final return type of the sink.
Warnings
- breaking Starting with v3.0.0, if the entire pipeline consists only of synchronous operations, the `pipe` function will return a synchronous value directly, rather than always a Promise. This change optimizes performance for fully synchronous pipelines.
- breaking Version 2.0.0 introduced significant breaking changes by switching to ES Modules (ESM) only and exclusively using named exports. CommonJS 'require' statements will no longer work, and default imports are no longer available.
- breaking Version 2.0.0 also converted the library to TypeScript. While beneficial for type safety, this might introduce minor type-related breaking changes if you were relying on specific (potentially inferred) JavaScript type behaviors or internal structures that are now more strictly defined.
- gotcha Error propagation in async iterable pipelines can be complex. While `it-pipe` facilitates connections, individual pipeline stages (sources, transforms, sinks) must be designed to properly catch, handle, or propagate errors within their asynchronous generators or functions to ensure robust error handling throughout the pipeline.
Install
-
npm install it-pipe -
yarn add it-pipe -
pnpm add it-pipe
Imports
- pipe
const pipe = require('it-pipe')import { pipe } from 'it-pipe' - pipe
import pipe from 'it-pipe'
import { pipe } from 'it-pipe' - Source, Sink, Transform
import type { Source, Sink, Transform } from 'it-pipe'
Quickstart
import { pipe } from 'it-pipe';
async function runPipelineExample() {
const result = await pipe(
// A source is just an iterable, this is shorthand for () => [1, 2, 3]
[1, 2, 3],
// A transform takes a source, and returns a source.
// This transform doubles each value asynchronously.
async function transform (source: AsyncIterable<number>): Promise<AsyncIterable<number>> {
return (async function * (): AsyncIterable<number> {
for await (const val of source) yield val * 2;
})();
},
// A sink, it takes a source and consumes it, optionally returning a value.
// This sink buffers up all the values from the source and returns them.
async function collect (source: AsyncIterable<number>): Promise<number[]> {
const vals: number[] = [];
for await (const val of source) {
vals.push(val);
}
return vals;
}
);
console.log('Pipelined Result:', result); // Expected: [2, 4, 6]
// Demonstrating piping an async generator directly
async function* generateStrings(): AsyncIterable<string> {
yield 'hello';
yield 'world';
}
const collectedStrings = await pipe(
generateStrings(), // An async iterable source
async function toUpperCase(source: AsyncIterable<string>): Promise<AsyncIterable<string>> {
return (async function*(): AsyncIterable<string> {
for await (const s of source) {
yield s.toUpperCase();
}
})();
},
async function collectAndJoin(source: AsyncIterable<string>): Promise<string> {
let finalString = '';
for await (const s of source) {
finalString += s + ' ';
}
return finalString.trim();
}
);
console.log('String Pipeline Result:', collectedStrings); // Expected: "HELLO WORLD"
}
runPipelineExample().catch(console.error);