RSocket WebSocket Server

0.0.29-alpha.0 · active · verified Sun Apr 19

rsocket-websocket-server provides a robust implementation of an RSocket server that operates over the WebSocket protocol, enabling reactive, multiplexed, and message-driven communication between applications. As part of the rsocket-js monorepo, it integrates seamlessly with other RSocket.js components like rsocket-core for protocol handling and various adapters. Currently, the package is in an alpha state, with 0.0.29-alpha.0 being its latest version, indicating ongoing development and pre-production readiness. The release cadence is tied to the broader rsocket-js monorepo development, which saw a significant TypeScript migration in 1.0.0-alpha.1 across its packages. Its key differentiator is offering a full RSocket server implementation specifically for WebSocket connections, adhering to the RSocket specification for high-performance, resilient, and responsive microservices and real-time applications.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to set up and run an RSocket WebSocket server, handling fire-and-forget, request-response, request-stream, and request-channel interactions using the `rsocket-flowable` reactive types.

import { RSocketServer, Payload, Responder } from 'rsocket-core';
import { RSocketWebSocketServer } from 'rsocket-websocket-server';
import { Flowable, Single } from 'rsocket-flowable';

const port = process.env.PORT ?? 8080;

class MyResponder implements Responder {
  fireAndForget(payload: Payload): void {
    console.log(`Received fire and forget: ${payload.data?.toString()}`);
  }

  requestResponse(payload: Payload): Single<Payload> {
    console.log(`Received request-response: ${payload.data?.toString()}`);
    return Single.of({
      data: Buffer.from(`Response to ${payload.data?.toString()}`),
      metadata: payload.metadata,
    });
  }

  requestStream(payload: Payload): Flowable<Payload> {
    console.log(`Received request-stream: ${payload.data?.toString()}`);
    return Flowable.just(
      { data: Buffer.from('Stream Item 1') },
      { data: Buffer.from('Stream Item 2') },
      { data: Buffer.from('Stream Item 3') }
    ).delayElements(500); // Simulate some delay
  }

  requestChannel(payloads: Flowable<Payload>): Flowable<Payload> {
    console.log(`Received request-channel setup payload:`);
    return payloads.map(p => {
      console.log(`Channel data: ${p.data?.toString()}`);
      return {
        data: Buffer.from(`Echo: ${p.data?.toString()}`),
        metadata: p.metadata,
      };
    });
  }
}

const server = new RSocketServer({
  transport: new RSocketWebSocketServer({ port }),
  responder: new MyResponder(),
});

server.start().then(() => {
  console.log(`RSocket WebSocket Server started on port ${port}`);
  console.log('Use Ctrl+C to stop');
}).catch(error => {
  console.error('Server failed to start:', error);
});

process.on('SIGINT', () => {
  server.shutdown().then(() => {
    console.log('Server gracefully shut down.');
    process.exit(0);
  }).catch(error => {
    console.error('Error during server shutdown:', error);
    process.exit(1);
  });
});

view raw JSON →