RSocket Flowable for JavaScript

0.0.29-alpha.0 · deprecated · verified Sun Apr 19

rsocket-flowable provides a JavaScript implementation of the ReactiveStreams specification, forming a core component of the `rsocket-js` monorepo. It defines fundamental interfaces and types for reactive programming, such as `Flowable`, `Single`, `ISubscriber`, and `ISubscription`, crucial for building non-blocking, asynchronous data pipelines with backpressure. The package is currently in an early alpha state, with the latest version being `0.0.29-alpha.0`. The `rsocket-js` project, and consequently `rsocket-flowable`, has an active but pre-stable release cadence, with frequent alpha updates. Its primary differentiator is its role in enabling the RSocket protocol's Reactive Streams semantics over various transports, providing fine-grained control over data flow and resource management through explicit backpressure.

Common errors

Warnings

Install

Imports

Quickstart

This TypeScript quickstart demonstrates how to create a `Flowable` producer and subscribe to it, illustrating core ReactiveStreams concepts of `onSubscribe`, `onNext`, `onError`, `onComplete`, backpressure management via `request()`, and explicit cancellation.

import { Flowable, ISubscriber, ISubscription } from 'rsocket-flowable';

// Create a simple Flowable that emits numbers on demand
const numberFlowable = new Flowable<number>(subscriber => {
  let count = 0;
  let cancelled = false;

  subscriber.onSubscribe({
    request(n: number) {
      console.log(`Producer: Subscriber requested ${n} items.`);
      if (cancelled) return;
      for (let i = 0; i < n; i++) {
        if (count < 5) { // Emit up to 5 items for this example
          console.log(`Producer: Emitting ${count}`);
          subscriber.onNext(count++);
        } else {
          if (!cancelled) {
            subscriber.onComplete();
            cancelled = true; // Ensure onComplete is called only once
          }
          break;
        }
      }
    },
    cancel() {
      console.log('Producer: Subscription cancelled by consumer.');
      cancelled = true;
    }
  });
});

// Subscribe to the Flowable
console.log('Consumer: Subscribing to Flowable...');
numberFlowable.subscribe(new class implements ISubscriber<number> {
  private _subscription: ISubscription | undefined;
  onSubscribe(subscription: ISubscription): void {
    this._subscription = subscription;
    console.log('Consumer: Subscription established. Requesting initial 2 items.');
    this._subscription.request(2); // Request initial items
  }
  onNext(value: number): void {
    console.log(`Consumer: Received: ${value}`);
    if (value === 1) {
      console.log('Consumer: Received 1. Requesting 3 more items.');
      this._subscription?.request(3); // Request more items dynamically
    } else if (value === 4) {
      console.log('Consumer: Received 4. Cancelling subscription.');
      this._subscription?.cancel(); // Cancel the subscription
    }
  }
  onError(error: Error): void {
    console.error('Consumer: Error:', error);
  }
  onComplete(): void {
    console.log('Consumer: Flowable completed.');
  }
});

// Expected output demonstrates backpressure and cancellation:
// Consumer: Subscribing to Flowable...
// Producer: Subscriber requested 2 items.
// Producer: Emitting 0
// Consumer: Received: 0
// Producer: Emitting 1
// Consumer: Received: 1
// Consumer: Received 1. Requesting 3 more items.
// Producer: Subscriber requested 3 items.
// Producer: Emitting 2
// Consumer: Received: 2
// Producer: Emitting 3
// Consumer: Received: 3
// Producer: Emitting 4
// Consumer: Received: 4
// Consumer: Received 4. Cancelling subscription.
// Producer: Subscription cancelled by consumer.

view raw JSON →