Node.js Stream to ECMAScript Observable Converter
raw JSON →stream-to-observable is a utility package designed to convert Node.js Readable Streams into ECMAScript-compatible Observables. Released as version 0.2.0, this package has not seen updates since its publication seven years ago, indicating it is no longer actively maintained. It relies on `any-observable` to dynamically select an Observable implementation, requiring users to install a compatible library such as `zen-observable` or `rxjs` explicitly. While providing a bridge between traditional Node.js stream processing and the more declarative, reactive programming paradigm of Observables, its age means potential compatibility challenges with modern Node.js stream APIs or newer Observable library versions. A community-maintained fork, `@samverschueren/stream-to-observable`, exists due to this package's inactivity, which might be a more suitable alternative for current projects.
Common errors
error Error: Cannot find any-observable implementation nor global.Observable. You must install polyfill or call require("any-observable/register") with your preferred implementation... ↓
npm install --save zen-observable) and register it early in your application's lifecycle: require('any-observable/register')('zen'); error TypeError: streamToObservable(...) is not a function or streamToObservable(...).filter is not a function ↓
streamToObservable is correctly require()d and that an Observable implementation like zen-observable or rxjs is installed and registered via any-observable/register before stream-to-observable is used. Warnings
breaking This package (v0.2.0) is effectively abandoned, with no updates in seven years. It may have compatibility issues with modern Node.js versions (e.g., Node.js 16+) and their stream APIs, or with recent major versions of Observable libraries like RxJS. ↓
gotcha You must explicitly install an Observable implementation (e.g., `zen-observable` or `rxjs`) and register it with `any-observable` for `stream-to-observable` to function. Failure to do so will result in a runtime error. ↓
gotcha Converting a Node.js stream to an Observable using this package inherently breaks Node.js's back-pressure mechanism. Observables are a 'push' technology, meaning all stream chunks will be read and pushed to the observer as quickly as possible, potentially overwhelming downstream consumers or memory. ↓
Install
npm install stream-to-observable yarn add stream-to-observable pnpm add stream-to-observable Imports
- streamToObservable wrong
import streamToObservable from 'stream-to-observable';correctconst streamToObservable = require('stream-to-observable');
Quickstart
const fs = require('fs');
const split = require('split');
// IMPORTANT: You must install an Observable implementation like zen-observable or rxjs.
// e.g., npm install --save zen-observable
require('any-observable/register')('zen'); // Register 'zen-observable' implementation
const streamToObservable = require('stream-to-observable');
// Create a dummy file for demonstration
fs.writeFileSync('./hello-world.txt', 'Hello, World!\nThis is a test.\nAnother line with hello.\nGoodbye.');
const readStream = fs
.createReadStream('./hello-world.txt', {encoding: 'utf8'})
.pipe(split());
console.log('Starting stream processing...');
streamToObservable(readStream)
.filter(chunk => /hello/i.test(chunk)) // Filter lines containing 'hello'
.map(chunk => chunk.toUpperCase()) // Convert to uppercase
.forEach(chunk => {
console.log(`Observable emitted: ${chunk}`); // Log filtered and mapped chunks
})
.catch(err => {
console.error('Observable error:', err);
})
.finally(() => {
console.log('Observable stream completed.');
fs.unlinkSync('./hello-world.txt'); // Clean up dummy file
});