{"library":"rsocket-flowable","title":"RSocket Flowable for JavaScript","description":"rsocket-flowable provides a JavaScript implementation of the ReactiveStreams specification, forming a core component of the `rsocket-js` monorepo. It defines fundamental interfaces and types for reactive programming, such as `Flowable`, `Single`, `ISubscriber`, and `ISubscription`, crucial for building non-blocking, asynchronous data pipelines with backpressure. The package is currently in an early alpha state, with the latest version being `0.0.29-alpha.0`. The `rsocket-js` project, and consequently `rsocket-flowable`, has an active but pre-stable release cadence, with frequent alpha updates. Its primary differentiator is its role in enabling the RSocket protocol's Reactive Streams semantics over various transports, providing fine-grained control over data flow and resource management through explicit backpressure.","language":"javascript","status":"deprecated","last_verified":"Sun Apr 19","install":{"commands":["npm install rsocket-flowable"],"cli":null},"imports":["import { Flowable } from 'rsocket-flowable';","import { Single } from 'rsocket-flowable';","import type { ISubscriber } from 'rsocket-flowable';","import type { ISubscription } from 'rsocket-flowable';"],"auth":{"required":false,"env_vars":[]},"quickstart":{"code":"import { Flowable, ISubscriber, ISubscription } from 'rsocket-flowable';\n\n// Create a simple Flowable that emits numbers on demand\nconst numberFlowable = new Flowable<number>(subscriber => {\n  let count = 0;\n  let cancelled = false;\n\n  subscriber.onSubscribe({\n    request(n: number) {\n      console.log(`Producer: Subscriber requested ${n} items.`);\n      if (cancelled) return;\n      for (let i = 0; i < n; i++) {\n        if (count < 5) { // Emit up to 5 items for this example\n          console.log(`Producer: Emitting ${count}`);\n          subscriber.onNext(count++);\n        } else {\n          if (!cancelled) {\n            subscriber.onComplete();\n            cancelled = true; // Ensure onComplete is called only once\n          }\n          break;\n        }\n      }\n    },\n    cancel() {\n      console.log('Producer: Subscription cancelled by consumer.');\n      cancelled = true;\n    }\n  });\n});\n\n// Subscribe to the Flowable\nconsole.log('Consumer: Subscribing to Flowable...');\nnumberFlowable.subscribe(new class implements ISubscriber<number> {\n  private _subscription: ISubscription | undefined;\n  onSubscribe(subscription: ISubscription): void {\n    this._subscription = subscription;\n    console.log('Consumer: Subscription established. Requesting initial 2 items.');\n    this._subscription.request(2); // Request initial items\n  }\n  onNext(value: number): void {\n    console.log(`Consumer: Received: ${value}`);\n    if (value === 1) {\n      console.log('Consumer: Received 1. Requesting 3 more items.');\n      this._subscription?.request(3); // Request more items dynamically\n    } else if (value === 4) {\n      console.log('Consumer: Received 4. Cancelling subscription.');\n      this._subscription?.cancel(); // Cancel the subscription\n    }\n  }\n  onError(error: Error): void {\n    console.error('Consumer: Error:', error);\n  }\n  onComplete(): void {\n    console.log('Consumer: Flowable completed.');\n  }\n});\n\n// Expected output demonstrates backpressure and cancellation:\n// Consumer: Subscribing to Flowable...\n// Producer: Subscriber requested 2 items.\n// Producer: Emitting 0\n// Consumer: Received: 0\n// Producer: Emitting 1\n// Consumer: Received: 1\n// Consumer: Received 1. Requesting 3 more items.\n// Producer: Subscriber requested 3 items.\n// Producer: Emitting 2\n// Consumer: Received: 2\n// Producer: Emitting 3\n// Consumer: Received: 3\n// Producer: Emitting 4\n// Consumer: Received: 4\n// Consumer: Received 4. Cancelling subscription.\n// Producer: Subscription cancelled by consumer.","lang":"typescript","description":"This TypeScript quickstart demonstrates how to create a `Flowable` producer and subscribe to it, illustrating core ReactiveStreams concepts of `onSubscribe`, `onNext`, `onError`, `onComplete`, backpressure management via `request()`, and explicit cancellation.","tag":null,"tag_description":null,"last_tested":null,"results":[]},"compatibility":null}