Node.js Stream to ECMAScript Observable Converter

raw JSON →
0.2.0 verified Sat Apr 25 auth: no javascript abandoned

stream-to-observable is a utility package designed to convert Node.js Readable Streams into ECMAScript-compatible Observables. Released as version 0.2.0, this package has not seen updates since its publication seven years ago, indicating it is no longer actively maintained. It relies on `any-observable` to dynamically select an Observable implementation, requiring users to install a compatible library such as `zen-observable` or `rxjs` explicitly. While providing a bridge between traditional Node.js stream processing and the more declarative, reactive programming paradigm of Observables, its age means potential compatibility challenges with modern Node.js stream APIs or newer Observable library versions. A community-maintained fork, `@samverschueren/stream-to-observable`, exists due to this package's inactivity, which might be a more suitable alternative for current projects.

error Error: Cannot find any-observable implementation nor global.Observable. You must install polyfill or call require("any-observable/register") with your preferred implementation...
cause The package `stream-to-observable` relies on `any-observable` to find an Observable implementation, but none has been installed or registered.
fix
Install an Observable library (e.g., npm install --save zen-observable) and register it early in your application's lifecycle: require('any-observable/register')('zen');
error TypeError: streamToObservable(...) is not a function or streamToObservable(...).filter is not a function
cause This usually indicates that the `streamToObservable` function itself was not correctly imported or called, or the resulting object is not a valid Observable instance, often due to a missing or improperly registered Observable implementation.
fix
Ensure streamToObservable is correctly require()d and that an Observable implementation like zen-observable or rxjs is installed and registered via any-observable/register before stream-to-observable is used.
breaking This package (v0.2.0) is effectively abandoned, with no updates in seven years. It may have compatibility issues with modern Node.js versions (e.g., Node.js 16+) and their stream APIs, or with recent major versions of Observable libraries like RxJS.
fix Consider using the actively maintained fork `@samverschueren/stream-to-observable` instead, or directly use modern RxJS/ZenObservable features for stream conversion.
gotcha You must explicitly install an Observable implementation (e.g., `zen-observable` or `rxjs`) and register it with `any-observable` for `stream-to-observable` to function. Failure to do so will result in a runtime error.
fix Run `npm install --save zen-observable` (or `rxjs`) and add `require('any-observable/register')('zen');` (or `('rxjs')`) at the application's entry point before `stream-to-observable` is required.
gotcha Converting a Node.js stream to an Observable using this package inherently breaks Node.js's back-pressure mechanism. Observables are a 'push' technology, meaning all stream chunks will be read and pushed to the observer as quickly as possible, potentially overwhelming downstream consumers or memory.
fix For applications sensitive to back-pressure, carefully consider whether this conversion is appropriate. Alternative patterns or libraries that preserve back-pressure (e.g., `observable-stream` or explicit buffering) might be necessary.
npm install stream-to-observable
yarn add stream-to-observable
pnpm add stream-to-observable

This quickstart demonstrates converting a Node.js `fs.createReadStream` into an Observable, filtering for lines containing 'hello', mapping them to uppercase, and logging the results. It also highlights the necessary registration of an Observable implementation.

const fs = require('fs');
const split = require('split');

// IMPORTANT: You must install an Observable implementation like zen-observable or rxjs.
// e.g., npm install --save zen-observable
require('any-observable/register')('zen'); // Register 'zen-observable' implementation

const streamToObservable = require('stream-to-observable');

// Create a dummy file for demonstration
fs.writeFileSync('./hello-world.txt', 'Hello, World!\nThis is a test.\nAnother line with hello.\nGoodbye.');

const readStream = fs
  .createReadStream('./hello-world.txt', {encoding: 'utf8'})
  .pipe(split());

console.log('Starting stream processing...');

streamToObservable(readStream)
  .filter(chunk => /hello/i.test(chunk)) // Filter lines containing 'hello'
  .map(chunk => chunk.toUpperCase())    // Convert to uppercase
  .forEach(chunk => {
    console.log(`Observable emitted: ${chunk}`); // Log filtered and mapped chunks
  })
  .catch(err => {
    console.error('Observable error:', err);
  })
  .finally(() => {
    console.log('Observable stream completed.');
    fs.unlinkSync('./hello-world.txt'); // Clean up dummy file
  });