{"id":11963,"library":"rsocket-flowable","title":"RSocket Flowable for JavaScript","description":"rsocket-flowable provides a JavaScript implementation of the ReactiveStreams specification, forming a core component of the `rsocket-js` monorepo. It defines fundamental interfaces and types for reactive programming, such as `Flowable`, `Single`, `ISubscriber`, and `ISubscription`, crucial for building non-blocking, asynchronous data pipelines with backpressure. The package is currently in an early alpha state, with the latest version being `0.0.29-alpha.0`. The `rsocket-js` project, and consequently `rsocket-flowable`, has an active but pre-stable release cadence, with frequent alpha updates. Its primary differentiator is its role in enabling the RSocket protocol's Reactive Streams semantics over various transports, providing fine-grained control over data flow and resource management through explicit backpressure.","status":"deprecated","version":"0.0.29-alpha.0","language":"javascript","source_language":"en","source_url":"https://github.com/rsocket/rsocket-js","tags":["javascript"],"install":[{"cmd":"npm install rsocket-flowable","lang":"bash","label":"npm"},{"cmd":"yarn add rsocket-flowable","lang":"bash","label":"yarn"},{"cmd":"pnpm add rsocket-flowable","lang":"bash","label":"pnpm"}],"dependencies":[],"imports":[{"note":"While CommonJS `require` might work in some older setups, `rsocket-flowable` is developed with modern JavaScript (ESM) and TypeScript in mind. ESM `import` is the recommended and best-supported approach.","wrong":"const { Flowable } = require('rsocket-flowable');","symbol":"Flowable","correct":"import { Flowable } from 'rsocket-flowable';"},{"note":"`Single` is used for operations that emit a single value or an error, analogous to a Promise within the Reactive Streams paradigm. Direct named import is preferred for clarity and tree-shaking.","wrong":"import * as RSocketFlowable from 'rsocket-flowable'; const Single = RSocketFlowable.Single;","symbol":"Single","correct":"import { Single } from 'rsocket-flowable';"},{"note":"When importing interfaces or types in TypeScript, using `import type` is a best practice. It ensures the import is removed during compilation to JavaScript, avoiding potential runtime issues or unnecessary bundling overhead.","wrong":"import { ISubscriber } => from 'rsocket-flowable';","symbol":"ISubscriber","correct":"import type { ISubscriber } from 'rsocket-flowable';"},{"note":"`ISubscription` is the interface received by a subscriber, allowing it to request more data or cancel the subscription, crucial for implementing backpressure. Use `import type` for clarity.","wrong":"import { ISubscription } from 'rsocket-flowable';","symbol":"ISubscription","correct":"import type { ISubscription } from 'rsocket-flowable';"}],"quickstart":{"code":"import { Flowable, ISubscriber, ISubscription } from 'rsocket-flowable';\n\n// Create a simple Flowable that emits numbers on demand\nconst numberFlowable = new Flowable<number>(subscriber => {\n  let count = 0;\n  let cancelled = false;\n\n  subscriber.onSubscribe({\n    request(n: number) {\n      console.log(`Producer: Subscriber requested ${n} items.`);\n      if (cancelled) return;\n      for (let i = 0; i < n; i++) {\n        if (count < 5) { // Emit up to 5 items for this example\n          console.log(`Producer: Emitting ${count}`);\n          subscriber.onNext(count++);\n        } else {\n          if (!cancelled) {\n            subscriber.onComplete();\n            cancelled = true; // Ensure onComplete is called only once\n          }\n          break;\n        }\n      }\n    },\n    cancel() {\n      console.log('Producer: Subscription cancelled by consumer.');\n      cancelled = true;\n    }\n  });\n});\n\n// Subscribe to the Flowable\nconsole.log('Consumer: Subscribing to Flowable...');\nnumberFlowable.subscribe(new class implements ISubscriber<number> {\n  private _subscription: ISubscription | undefined;\n  onSubscribe(subscription: ISubscription): void {\n    this._subscription = subscription;\n    console.log('Consumer: Subscription established. Requesting initial 2 items.');\n    this._subscription.request(2); // Request initial items\n  }\n  onNext(value: number): void {\n    console.log(`Consumer: Received: ${value}`);\n    if (value === 1) {\n      console.log('Consumer: Received 1. Requesting 3 more items.');\n      this._subscription?.request(3); // Request more items dynamically\n    } else if (value === 4) {\n      console.log('Consumer: Received 4. Cancelling subscription.');\n      this._subscription?.cancel(); // Cancel the subscription\n    }\n  }\n  onError(error: Error): void {\n    console.error('Consumer: Error:', error);\n  }\n  onComplete(): void {\n    console.log('Consumer: Flowable completed.');\n  }\n});\n\n// Expected output demonstrates backpressure and cancellation:\n// Consumer: Subscribing to Flowable...\n// Producer: Subscriber requested 2 items.\n// Producer: Emitting 0\n// Consumer: Received: 0\n// Producer: Emitting 1\n// Consumer: Received: 1\n// Consumer: Received 1. Requesting 3 more items.\n// Producer: Subscriber requested 3 items.\n// Producer: Emitting 2\n// Consumer: Received: 2\n// Producer: Emitting 3\n// Consumer: Received: 3\n// Producer: Emitting 4\n// Consumer: Received: 4\n// Consumer: Received 4. Cancelling subscription.\n// Producer: Subscription cancelled by consumer.","lang":"typescript","description":"This TypeScript quickstart demonstrates how to create a `Flowable` producer and subscribe to it, illustrating core ReactiveStreams concepts of `onSubscribe`, `onNext`, `onError`, `onComplete`, backpressure management via `request()`, and explicit cancellation."},"warnings":[{"fix":"Users are strongly advised to migrate away from `rsocket-flowable`. The RSocket-JS ecosystem has evolved, and direct usage of `Flowable` from this standalone package is no longer the recommended pattern for RSocket communication. Instead, utilize the reactive types integrated within `@rsocket/core` or through adapters like `rsocket-adapter-rxjs` if you require RxJS interoperability. Review the latest `rsocket-js` documentation and examples for current best practices.","message":"The `rsocket-flowable` package has been removed from the main `rsocket-js` monorepo as of `rsocket-js@1.0.0-alpha.x` versions and is no longer used internally by other RSocket packages. It will be officially marked as deprecated on NPM.","severity":"breaking","affected_versions":">=1.0.0-alpha.x"},{"fix":"Ensure that your `ISubscriber.onSubscribe` implementation calls `subscription.request(n)` to signal initial demand. Subsequently, call `subscription.request(n)` in `onNext` (or another appropriate lifecycle method) when more items are processed and truly needed, respecting your consumer's capacity.","message":"Improper implementation of ReactiveStreams backpressure (e.g., not calling `request()` or requesting too many items at once) can lead to `MissingBackpressureException` errors, `OutOfMemoryError` in downstream consumers, or an overwhelmed producer that floods the network.","severity":"gotcha","affected_versions":">=0.0.1-alpha.0"},{"fix":"Always use ES Module `import` syntax (e.g., `import { Flowable } from 'rsocket-flowable';`). Ensure your project's `tsconfig.json` (for TypeScript) and build tools (like Webpack or Rollup) are configured to support ES Modules. For Node.js, ensure your `package.json` specifies `\"type\": \"module\"` if you are using `.js` files or use `.mjs` extension.","message":"This package, and the `rsocket-js` monorepo it originated from, is developed with modern JavaScript (ESM) and TypeScript in mind. Using CommonJS `require()` statements for imports can lead to `TypeError: require is not a function`, incorrect module resolution, or incompatibility issues in some build environments.","severity":"gotcha","affected_versions":">=0.0.1-alpha.0"}],"env_vars":null,"last_verified":"2026-04-19T00:00:00.000Z","next_check":"2026-07-18T00:00:00.000Z","problems":[{"fix":"Verify that the variable `myFlowable` is indeed an instance of `Flowable` from `rsocket-flowable`. Check your import statement: `import { Flowable } from 'rsocket-flowable';`.","cause":"Attempting to use an object that is not an instance of `Flowable` (or a similar reactive type) as if it were, or the `Flowable` class itself was not correctly imported.","error":"TypeError: myFlowable.subscribe is not a function"},{"fix":"In your `ISubscriber` implementation, ensure the `onSubscribe` method calls `subscription.request(n)` to initiate demand. Continue to call `subscription.request(n)` as more items are consumed to maintain appropriate backpressure.","cause":"The Reactive Streams consumer (subscriber) failed to signal demand for items, or didn't signal enough demand, causing the producer to emit items faster than the consumer was prepared to handle.","error":"Error: Uncaught [MissingBackpressureException]"},{"fix":"Add the correct ES Module import at the top of your file: `import { Flowable, ISubscriber, ISubscription } from 'rsocket-flowable';`.","cause":"The `Flowable` class or other symbols were not correctly imported or are not within the current scope of the file. This often happens with incorrect CommonJS `require` syntax in an ESM-centric project or simply forgetting the import.","error":"ReferenceError: Flowable is not defined"}],"ecosystem":"npm"}