Redux Observable Middleware
raw JSON →Redux Observable Middleware is a lightweight Redux middleware designed for integrating RxJS-like observables directly into Redux action creators. It is currently in an early stage of development, with the latest stable version being `0.2.0`, published 10 years ago. While there isn't a defined release cadence, its purpose is to simplify handling asynchronous data streams by automatically subscribing to any object with a `subscribe` method found within an action's `observable` property. This package dispatches specific lifecycle actions (`_ON_NEXT`, `_ON_ERROR`, `_ON_COMPLETED` or custom types) as the observable emits values, completes, or errors. This approach differs from more comprehensive solutions like `redux-observable`, which utilizes "epics" for side effects, offering a simpler, direct method for incorporating observables into the Redux data flow.
Common errors
error TypeError: observable.subscribe is not a function ↓
observable property in your Redux action is an instance of an RxJS Observable or any object that implements a subscribe method. Double-check that rxjs is correctly imported and Observable instances are created correctly. error Actions with type `MY_ACTION_TYPE` are not handled, but `MY_ACTION_TYPE_ON_NEXT` is. ↓
type, ensure your reducer handles both the original ACTION_TYPE and its suffixed variants (e.g., ACTION_TYPE_ON_NEXT). If using an object type, be explicit about which lifecycle actions you want to dispatch by defining onSubscribe, onNext, onError, onCompleted properties in the type object of your action. Warnings
gotcha The package `redux-observable-middleware` has not been updated in a very long time (last publish 10 years ago for v0.2.0) and uses an older style of RxJS (implied RxJS 4/5 by the `Rx.Observable.interval` syntax). Modern RxJS (v6+) uses pipeable operators (`.pipe(operator())`) and has different import paths and usage patterns. Compatibility with newer RxJS versions is not guaranteed and requires careful adaptation. ↓
breaking The behavior of `onSubscribe` actions was refined in v0.2.0 to ensure they are always dispatched first, regardless of the observable type. While intended as a fix, this is a behavioral change that might affect logic relying on the previous, less consistent dispatch order for `onSubscribe` actions. ↓
gotcha Error handling within the observable is crucial. If an observable errors out and is not caught, it can lead to uncaught exceptions or the observable stream terminating prematurely, preventing further actions from being processed for that specific observable. The middleware itself provides an `_ON_ERROR` action, but the observable stream itself needs internal `catchError` operators for resilience. ↓
Install
npm install redux-observable-middleware yarn add redux-observable-middleware pnpm add redux-observable-middleware Imports
- observableMiddleware wrong
const observableMiddleware = require('redux-observable-middleware');correctimport observableMiddleware from 'redux-observable-middleware'; - Redux.createStore wrong
import { createStore } from 'redux';correctimport * as Redux from 'redux'; - Rx.Observable wrong
import { Observable } from 'rxjs';correctimport * as Rx from 'rxjs';
Quickstart
import * as Redux from 'redux';
import * as Rx from 'rxjs';
import observableMiddleware from 'redux-observable-middleware';
const ACTION_TYPE = 'INTERVAL';
function reducer(state = null, action) {
console.log('Action received:', action.type, action.data || action.err);
switch (action.type) {
case `${ACTION_TYPE}_ON_NEXT`:
return action.data;
case `${ACTION_TYPE}_ON_ERROR`:
console.error('Observable error:', action.err);
return state; // Or handle error appropriately
case `${ACTION_TYPE}_ON_COMPLETED`:
console.log('Observable completed.');
return state;
default:
return state;
}
}
const store = Redux.createStore(reducer, Redux.applyMiddleware(observableMiddleware));
store.subscribe(() => {
console.log('Current state:', store.getState());
});
console.log('Dispatching observable action...');
store.dispatch({
type: ACTION_TYPE,
observable: Rx.interval(1000).pipe(Rx.take(5))
});