RabbitMQ Stream Client for JavaScript/TypeScript

1.0.0 · active · verified Tue Apr 21

This library provides a client for the RabbitMQ stream protocol, enabling JavaScript and TypeScript applications to interact with RabbitMQ Streams. Designed for high-throughput, low-latency messaging, it facilitates publishing and consuming messages with advanced features like super streams, deduplication, and single active consumers. The current stable version is 1.0.0, released in April 2024. The project exhibits an active release cadence with frequent updates and bug fixes, indicated by multiple minor and patch releases leading up to 1.0.0, addressing features like external authentication, SSL options, and enhanced connection management. Key differentiators include its native support for RabbitMQ's stream protocol, enabling advanced features for horizontal scaling, message deduplication for exactly-once semantics, and single active consumer patterns for workload distribution. It also provides robust connection handling, including explicit connection pooling and automatic reconnect mechanisms, making it suitable for high-reliability, distributed systems.

Common errors

Warnings

Install

Imports

Quickstart

Demonstrates connecting to RabbitMQ Stream, declaring a stream, publishing a message, and consuming it.

import { connect } from 'rabbitmq-stream-js-client';
import { Buffer } from 'buffer'; // Node.js Buffer is needed for message content

async function runRabbitStreamExample() {
  const client = await connect({
    hostname: process.env.RABBITMQ_HOST ?? 'localhost',
    port: parseInt(process.env.RABBITMQ_STREAM_PORT ?? '5552', 10),
    username: process.env.RABBITMQ_USERNAME ?? 'rabbit',
    password: process.env.RABBITMQ_PASSWORD ?? 'rabbit',
    vhost: process.env.RABBITMQ_VHOST ?? '/',
  });

  const streamName = 'my-first-stream';

  // Declare a stream (if it doesn't exist, it will be created)
  await client.declareStream({ stream: streamName });

  const publisher = await client.declarePublisher({
    stream: streamName,
    publisherRef: 'my-publisher-ref',
  });

  const messageContent = Buffer.from('Hello, RabbitMQ Stream!');
  await publisher.send(messageContent);
  console.log('Message published:', messageContent.toString());

  const consumer = await client.createConsumer(
    {
      stream: streamName,
      consumerRef: 'my-consumer-ref',
      offset: 'first',
    },
    (message) => {
      console.log('Message consumed:', message.getData().toString());
      consumer.close(); // Close consumer after first message for this example
    }
  );

  // Give some time for the consumer to receive the message
  await new Promise(resolve => setTimeout(resolve, 1000));

  await publisher.close();
  await client.close();
  console.log('Client and publisher closed.');
}

runRabbitStreamExample().catch(console.error);

view raw JSON →