RabbitMQ Client for Node.js

5.0.8 · active · verified Tue Apr 21

rabbitmq-client is a robust and typed Node.js client library for RabbitMQ (AMQP 0-9-1), designed as an alternative to `amqplib`. It is currently in version 5.0.8, actively maintained with regular updates and bug fixes as indicated by recent commits and version bumps. Key differentiators include automatic re-connection, re-subscription, and message retry mechanisms, offering higher resilience out of the box. It provides a higher-level API through `Consumer` and `Publisher` abstractions, simplifying common use cases, alongside a lower-level `Connection` for direct AMQP operations and an `RPCClient` for request-response patterns. The library is written in TypeScript, ships with comprehensive type definitions, and explicitly avoids external dependencies, contributing to a smaller footprint and potentially fewer supply chain risks. Performance is comparable to `amqplib`, as demonstrated by included benchmarks.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to establish a connection to RabbitMQ, set up a consumer to process messages from a queue, and create a publisher to send messages to an exchange or directly to a queue, including error handling and retry mechanisms.

import { Connection } from 'rabbitmq-client'

// Initialize: Connect to RabbitMQ
const rabbit = new Connection('amqp://guest:guest@localhost:5672')
rabbit.on('error', (err) => {
  console.error('RabbitMQ connection error:', err)
})
rabbit.on('connection', () => {
  console.log('Connection successfully (re)established to RabbitMQ')
})

async function setupAndRun() {
  // Consume messages from a queue
  const consumer = rabbit.createConsumer({
    queue: 'user-events-queue',
    queueOptions: { durable: true },
    qos: { prefetchCount: 2 }, // Handle 2 messages concurrently
    exchanges: [{ exchange: 'my-events-exchange', type: 'topic' }],
    queueBindings: [{ exchange: 'my-events-exchange', routingKey: 'users.*' }]
  }, async (msg) => {
    try {
      console.log('Received message:', msg.body)
      // Process message here. Auto-acknowledges on success.
      // Throws error to nack and potentially requeue or dead-letter.
      await new Promise(resolve => setTimeout(resolve, 50)); // Simulate async work
    } catch (e) {
      console.error('Error processing message:', e)
      // Returning a specific status can control nack/requeue behavior
      return 3; // Nack, don't requeue
    }
  })

  consumer.on('error', (err) => {
    console.error('Consumer error (user-events-queue):', err)
  })

  // Declare a publisher
  const publisher = rabbit.createPublisher({
    confirm: true, // Enable publish confirmations
    maxAttempts: 2, // Enable retries on publish failure
    exchanges: [{ exchange: 'my-events-exchange', type: 'topic' }]
  })

  // Publish a message to an exchange
  try {
    await publisher.send(
      { exchange: 'my-events-exchange', routingKey: 'users.visit' },
      { id: Date.now(), name: 'Alice', action: 'visit' }
    )
    console.log('Published user.visit message.')
  } catch (e) {
    console.error('Failed to publish message:', e)
  }

  // Publish directly to a queue (less common with exchanges)
  try {
    await publisher.send(
      { queue: 'direct-queue' },
      { message: 'This goes directly to a queue' }
    )
    console.log('Published direct message to queue.')
  } catch (e) {
    console.error('Failed to publish direct message:', e)
  }

  // Keep the process alive for a bit to receive messages
  // setTimeout(() => {
  //   consumer.close()
  //   rabbit.close()
  //   console.log('Closed consumer and connection.')
  // }, 10000)
}

setupAndRun();

view raw JSON →