RabbitMQ Stream Client for JavaScript/TypeScript
This library provides a client for the RabbitMQ stream protocol, enabling JavaScript and TypeScript applications to interact with RabbitMQ Streams. Designed for high-throughput, low-latency messaging, it facilitates publishing and consuming messages with advanced features like super streams, deduplication, and single active consumers. The current stable version is 1.0.0, released in April 2024. The project exhibits an active release cadence with frequent updates and bug fixes, indicated by multiple minor and patch releases leading up to 1.0.0, addressing features like external authentication, SSL options, and enhanced connection management. Key differentiators include its native support for RabbitMQ's stream protocol, enabling advanced features for horizontal scaling, message deduplication for exactly-once semantics, and single active consumer patterns for workload distribution. It also provides robust connection handling, including explicit connection pooling and automatic reconnect mechanisms, making it suitable for high-reliability, distributed systems.
Common errors
-
Error: Code 83 (NOT_SUPPORTED_FORMAT) when message is published using amqplib
cause Messages published via the AMQP 0-9-1 protocol (e.g., using `amqplib`) might not be compatible with the RabbitMQ Stream protocol due to differences in message formatting and properties.fixEnsure all messages intended for RabbitMQ Streams are published using `rabbitmq-stream-js-client`. Do not mix publishing messages to streams using `amqplib` or other AMQP 0-9-1 clients, as the stream protocol has its own specific message structure. -
Error: connect ECONNREFUSED 127.0.0.1:5552
cause The client could not establish a connection to the RabbitMQ Stream server. This typically means the server is not running, is not listening on the specified host/port, or a firewall is blocking the connection.fixVerify that your RabbitMQ server is running and the Stream plugin is enabled. Check the `hostname` and `port` (default 5552 for streams) in your connection configuration. Ensure no firewall rules are preventing the connection. -
TypeError: rabbit.connect is not a function
cause You are likely attempting to use a CommonJS `require` syntax in a TypeScript or ESM project without correctly destructuring the module's named exports.fixUse ESM named imports: `import { connect } from 'rabbitmq-stream-js-client';`. If you must use `require`, destructure it: `const { connect } = require('rabbitmq-stream-js-client');`.
Warnings
- breaking The `Connection closed` listener is no longer called by the client. Applications relying on this specific event for connection state management will need to adjust.
- gotcha When using message deduplication with `publisherRef` or `publishingId`, it is solely the user's responsibility to guarantee the uniqueness and incremental order of these IDs. RabbitMQ Stream does not enforce or manage these across producers, which can lead to message loss or unexpected deduplication if not handled correctly.
- gotcha The `ca` parameter for SSL/TLS connections is optional. If not provided, the client will rely on the system's default CAs or accept self-signed certificates depending on Node.js configuration, which might be a security risk in production environments.
- breaking Internal refactoring around connection pooling (e.g., connection pool as an instance, managing connections inside the pool) in v0.6.2 might affect applications that previously attempted to interfere with or manage underlying connections directly.
Install
-
npm install rabbitmq-stream-js-client -
yarn add rabbitmq-stream-js-client -
pnpm add rabbitmq-stream-js-client
Imports
- connect
const rabbit = require('rabbitmq-stream-js-client'); const client = await rabbit.connect(...);import { connect } from 'rabbitmq-stream-js-client'; - StreamClient
import { StreamClient } from 'rabbitmq-stream-js-client'; - Publisher
import { Publisher } from 'rabbitmq-stream-js-client';
Quickstart
import { connect } from 'rabbitmq-stream-js-client';
import { Buffer } from 'buffer'; // Node.js Buffer is needed for message content
async function runRabbitStreamExample() {
const client = await connect({
hostname: process.env.RABBITMQ_HOST ?? 'localhost',
port: parseInt(process.env.RABBITMQ_STREAM_PORT ?? '5552', 10),
username: process.env.RABBITMQ_USERNAME ?? 'rabbit',
password: process.env.RABBITMQ_PASSWORD ?? 'rabbit',
vhost: process.env.RABBITMQ_VHOST ?? '/',
});
const streamName = 'my-first-stream';
// Declare a stream (if it doesn't exist, it will be created)
await client.declareStream({ stream: streamName });
const publisher = await client.declarePublisher({
stream: streamName,
publisherRef: 'my-publisher-ref',
});
const messageContent = Buffer.from('Hello, RabbitMQ Stream!');
await publisher.send(messageContent);
console.log('Message published:', messageContent.toString());
const consumer = await client.createConsumer(
{
stream: streamName,
consumerRef: 'my-consumer-ref',
offset: 'first',
},
(message) => {
console.log('Message consumed:', message.getData().toString());
consumer.close(); // Close consumer after first message for this example
}
);
// Give some time for the consumer to receive the message
await new Promise(resolve => setTimeout(resolve, 1000));
await publisher.close();
await client.close();
console.log('Client and publisher closed.');
}
runRabbitStreamExample().catch(console.error);