RSocket Flowable for JavaScript
rsocket-flowable provides a JavaScript implementation of the ReactiveStreams specification, forming a core component of the `rsocket-js` monorepo. It defines fundamental interfaces and types for reactive programming, such as `Flowable`, `Single`, `ISubscriber`, and `ISubscription`, crucial for building non-blocking, asynchronous data pipelines with backpressure. The package is currently in an early alpha state, with the latest version being `0.0.29-alpha.0`. The `rsocket-js` project, and consequently `rsocket-flowable`, has an active but pre-stable release cadence, with frequent alpha updates. Its primary differentiator is its role in enabling the RSocket protocol's Reactive Streams semantics over various transports, providing fine-grained control over data flow and resource management through explicit backpressure.
Common errors
-
TypeError: myFlowable.subscribe is not a function
cause Attempting to use an object that is not an instance of `Flowable` (or a similar reactive type) as if it were, or the `Flowable` class itself was not correctly imported.fixVerify that the variable `myFlowable` is indeed an instance of `Flowable` from `rsocket-flowable`. Check your import statement: `import { Flowable } from 'rsocket-flowable';`. -
Error: Uncaught [MissingBackpressureException]
cause The Reactive Streams consumer (subscriber) failed to signal demand for items, or didn't signal enough demand, causing the producer to emit items faster than the consumer was prepared to handle.fixIn your `ISubscriber` implementation, ensure the `onSubscribe` method calls `subscription.request(n)` to initiate demand. Continue to call `subscription.request(n)` as more items are consumed to maintain appropriate backpressure. -
ReferenceError: Flowable is not defined
cause The `Flowable` class or other symbols were not correctly imported or are not within the current scope of the file. This often happens with incorrect CommonJS `require` syntax in an ESM-centric project or simply forgetting the import.fixAdd the correct ES Module import at the top of your file: `import { Flowable, ISubscriber, ISubscription } from 'rsocket-flowable';`.
Warnings
- breaking The `rsocket-flowable` package has been removed from the main `rsocket-js` monorepo as of `rsocket-js@1.0.0-alpha.x` versions and is no longer used internally by other RSocket packages. It will be officially marked as deprecated on NPM.
- gotcha Improper implementation of ReactiveStreams backpressure (e.g., not calling `request()` or requesting too many items at once) can lead to `MissingBackpressureException` errors, `OutOfMemoryError` in downstream consumers, or an overwhelmed producer that floods the network.
- gotcha This package, and the `rsocket-js` monorepo it originated from, is developed with modern JavaScript (ESM) and TypeScript in mind. Using CommonJS `require()` statements for imports can lead to `TypeError: require is not a function`, incorrect module resolution, or incompatibility issues in some build environments.
Install
-
npm install rsocket-flowable -
yarn add rsocket-flowable -
pnpm add rsocket-flowable
Imports
- Flowable
const { Flowable } = require('rsocket-flowable');import { Flowable } from 'rsocket-flowable'; - Single
import * as RSocketFlowable from 'rsocket-flowable'; const Single = RSocketFlowable.Single;
import { Single } from 'rsocket-flowable'; - ISubscriber
import { ISubscriber } => from 'rsocket-flowable';import type { ISubscriber } from 'rsocket-flowable'; - ISubscription
import { ISubscription } from 'rsocket-flowable';import type { ISubscription } from 'rsocket-flowable';
Quickstart
import { Flowable, ISubscriber, ISubscription } from 'rsocket-flowable';
// Create a simple Flowable that emits numbers on demand
const numberFlowable = new Flowable<number>(subscriber => {
let count = 0;
let cancelled = false;
subscriber.onSubscribe({
request(n: number) {
console.log(`Producer: Subscriber requested ${n} items.`);
if (cancelled) return;
for (let i = 0; i < n; i++) {
if (count < 5) { // Emit up to 5 items for this example
console.log(`Producer: Emitting ${count}`);
subscriber.onNext(count++);
} else {
if (!cancelled) {
subscriber.onComplete();
cancelled = true; // Ensure onComplete is called only once
}
break;
}
}
},
cancel() {
console.log('Producer: Subscription cancelled by consumer.');
cancelled = true;
}
});
});
// Subscribe to the Flowable
console.log('Consumer: Subscribing to Flowable...');
numberFlowable.subscribe(new class implements ISubscriber<number> {
private _subscription: ISubscription | undefined;
onSubscribe(subscription: ISubscription): void {
this._subscription = subscription;
console.log('Consumer: Subscription established. Requesting initial 2 items.');
this._subscription.request(2); // Request initial items
}
onNext(value: number): void {
console.log(`Consumer: Received: ${value}`);
if (value === 1) {
console.log('Consumer: Received 1. Requesting 3 more items.');
this._subscription?.request(3); // Request more items dynamically
} else if (value === 4) {
console.log('Consumer: Received 4. Cancelling subscription.');
this._subscription?.cancel(); // Cancel the subscription
}
}
onError(error: Error): void {
console.error('Consumer: Error:', error);
}
onComplete(): void {
console.log('Consumer: Flowable completed.');
}
});
// Expected output demonstrates backpressure and cancellation:
// Consumer: Subscribing to Flowable...
// Producer: Subscriber requested 2 items.
// Producer: Emitting 0
// Consumer: Received: 0
// Producer: Emitting 1
// Consumer: Received: 1
// Consumer: Received 1. Requesting 3 more items.
// Producer: Subscriber requested 3 items.
// Producer: Emitting 2
// Consumer: Received: 2
// Producer: Emitting 3
// Consumer: Received: 3
// Producer: Emitting 4
// Consumer: Received: 4
// Consumer: Received 4. Cancelling subscription.
// Producer: Subscription cancelled by consumer.