{"id":2257,"library":"reactivex","title":"ReactiveX for Python (RxPY)","description":"ReactiveX for Python (RxPY) is a library for composing asynchronous and event-based programs using observable sequences and pipable query operators. It enables developers to represent asynchronous data streams with Observables, query them using operators, and parameterize concurrency using Schedulers. The library is currently at stable version 4.1.0, with development on version 5.0.0 actively ongoing, and follows a release cadence that modernizes it with current Python standards.","status":"active","version":"4.1.0","language":"en","source_language":"en","source_url":"https://github.com/ReactiveX/RxPY","tags":["reactive programming","async","event streams","rxjs","concurrency"],"install":[{"cmd":"pip install reactivex","lang":"bash","label":"Install stable version"},{"cmd":"pip install 'reactivex>=5.0.0a0'","lang":"bash","label":"Install pre-release v5.x (alpha)"}],"dependencies":[{"reason":"Required for execution. Version 4.x supports Python >=3.8, <4.0. Version 5.x alpha aims for Python 3.9+.","package":"Python","optional":false}],"imports":[{"note":"The main module was renamed from `rx` to `reactivex` in v4 to avoid naming conflicts and align with modern Python practices.","wrong":"import rx","symbol":"reactivex","correct":"import reactivex as rx"},{"note":"Operators are typically imported from the `reactivex.operators` submodule and aliased as `ops` for conciseness in chaining.","symbol":"operators","correct":"from reactivex import operators as ops"}],"quickstart":{"code":"import reactivex as rx\nfrom reactivex import operators as ops\n\ndef my_on_next(value):\n    print(f\"Received: {value}\")\n\ndef my_on_error(error):\n    print(f\"Error: {error}\")\n\ndef my_on_completed():\n    print(\"Done!\")\n\n# Create an observable from a sequence and apply operators\nsource = rx.of(\"Alpha\", \"Beta\", \"Gamma\", \"Delta\", \"Epsilon\")\n\ndisposable = source.pipe(\n    ops.map(lambda s: len(s)),\n    ops.filter(lambda i: i >= 5)\n).subscribe(\n    on_next=my_on_next,\n    on_error=my_on_error,\n    on_completed=my_on_completed\n)\n\n# For long-running observables or specific cleanup, dispose is important\ndisposable.dispose()","lang":"python","description":"This quickstart demonstrates creating an observable from a sequence of strings, transforming them using `map` (to get length) and `filter` (to keep lengths >= 5), and then subscribing to process the results. It highlights the functional `pipe` operator for chaining transformations and the use of `on_next`, `on_error`, and `on_completed` handlers."},"warnings":[{"fix":"Update imports from `rx` to `reactivex`. Rewrite operator chains to use the `pipe` method and import operators from `reactivex.operators`. Refer to the migration guide for specific operator changes.","message":"Major API changes occurred during the migration from RxPY v3 to v4. The main module name changed from `rx` to `reactivex`. Operator chaining shifted from method-based calls (e.g., `Observable.map().filter()`) to a functional `pipe` method that takes operators from `reactivex.operators` (e.g., `Observable.pipe(ops.map(), ops.filter())`). The `pipe` function itself was renamed to `compose`, with a new `pipe` method for chaining.","severity":"breaking","affected_versions":"RxPY v3.x to v4.x"},{"fix":"Review code for direct usage of Python's deprecated `threading.setDaemon` or explicit `loop` parameters in RxPY's `create` or scheduler functions. Adopt modern `async/await` syntax where applicable. For custom schedulers or advanced concurrency, consult the RxPY v4 documentation on schedulers.","message":"Python 3.10+ deprecations introduced breaking changes in RxPY v4 related to concurrency primitives. Specifically, `setDaemon` (deprecated in Python 3.10) was replaced, and the `loop` parameter for methods like `create` and other concurrency-related functions was removed. Coroutine decorators were replaced with the `async` keyword. Users interacting directly with event loops or threading might need adjustments.","severity":"breaking","affected_versions":"RxPY v3.x to v4.0.0. Python versions 3.10+."},{"fix":"For explicit `asyncio` event loop management, use `asyncio.get_running_loop()` within an `async` context, or `asyncio.new_event_loop()` and `asyncio.set_event_loop()` for setting a new loop. Consider using RxPY's provided schedulers which are designed to handle these Python version differences internally.","message":"In Python 3.10 and above, `asyncio.get_event_loop()` is deprecated, and `asyncio.get_running_loop()` or `asyncio.new_event_loop()` combined with `asyncio.set_event_loop()` should be used instead. While RxPY v5 alpha has internally addressed this, user code that directly interacts with `asyncio` event loops for custom schedulers or integration might encounter `DeprecationWarning`s or `RuntimeError`s on newer Python versions.","severity":"gotcha","affected_versions":"Python 3.10+, especially when using custom `AsyncIOScheduler` or direct `asyncio` integration."},{"fix":"Choose a consistent style (fluent or functional) for clarity within your project, or leverage both where it enhances readability. Be aware that most v4 examples will be pipe-based, and v5 will offer both.","message":"While RxPY v5 introduces method chaining (fluent style) as an additive feature alongside the existing pipe-based functional style, some users might find it confusing to switch between the two. Both styles are fully type-safe and can be mixed.","severity":"gotcha","affected_versions":"RxPY v5.0.0a1+"}],"env_vars":null,"last_verified":"2026-04-09T00:00:00.000Z","next_check":"2026-07-08T00:00:00.000Z"}