RabbitMQ Client for Node.js
rabbitmq-client is a robust and typed Node.js client library for RabbitMQ (AMQP 0-9-1), designed as an alternative to `amqplib`. It is currently in version 5.0.8, actively maintained with regular updates and bug fixes as indicated by recent commits and version bumps. Key differentiators include automatic re-connection, re-subscription, and message retry mechanisms, offering higher resilience out of the box. It provides a higher-level API through `Consumer` and `Publisher` abstractions, simplifying common use cases, alongside a lower-level `Connection` for direct AMQP operations and an `RPCClient` for request-response patterns. The library is written in TypeScript, ships with comprehensive type definitions, and explicitly avoids external dependencies, contributing to a smaller footprint and potentially fewer supply chain risks. Performance is comparable to `amqplib`, as demonstrated by included benchmarks.
Common errors
-
Error: connect ECONNREFUSED 127.0.0.1:5672
cause The RabbitMQ server is not running or is not accessible at the specified host and port.fixEnsure RabbitMQ is running and accessible from the machine where your application is hosted. Verify the connection string (e.g., `amqp://guest:guest@localhost:5672`) for correct host, port, and credentials. Check firewall rules if RabbitMQ is on a remote server. -
TypeError: rabbit.createConsumer is not a function
cause The `rabbit` variable is not a valid `Connection` instance or has not been properly initialized, or the import path is incorrect.fixEnsure `import { Connection } from 'rabbitmq-client'` is correct and `const rabbit = new Connection('amqp://...')` has been executed successfully before calling `rabbit.createConsumer()` or `rabbit.createPublisher()`. -
Channel closed due to error: PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'my-queue' in vhost '/'
cause Attempting to declare a queue with parameters (e.g., `durable`, `autoDelete`) that conflict with an existing queue of the same name.fixEither delete the conflicting queue in RabbitMQ management UI/cli, or ensure your `queueOptions` match the existing queue's configuration. This often happens when changing queue properties like `durable: true` to `durable: false` or vice-versa.
Warnings
- breaking To connect to RabbitMQ version 4.1.x or higher, you must use version 5.0.3 or higher of this library. Older versions of `rabbitmq-client` will not be compatible.
- gotcha The library explicitly states it has 'No dependencies'. While this is generally positive for a small footprint, it means users are responsible for handling any environment-specific requirements or polyfills if running in non-standard Node.js environments.
- gotcha Message acknowledgment behavior is automatic. If a consumer callback resolves successfully, the message is acknowledged (`BasicAck`). If it throws an error, the message is rejected (`BasicNack`) and potentially requeued or sent to a dead-letter exchange, depending on configuration. Explicit control requires returning a specific status code from the callback.
Install
-
npm install rabbitmq-client -
yarn add rabbitmq-client -
pnpm add rabbitmq-client
Imports
- Connection
const { Connection } = require('rabbitmq-client')import { Connection } from 'rabbitmq-client' - Consumer
import { createConsumer } from 'rabbitmq-client'import { Connection, type Consumer } from 'rabbitmq-client' - Publisher
import { createPublisher } from 'rabbitmq-client'import { Connection, type Publisher } from 'rabbitmq-client'
Quickstart
import { Connection } from 'rabbitmq-client'
// Initialize: Connect to RabbitMQ
const rabbit = new Connection('amqp://guest:guest@localhost:5672')
rabbit.on('error', (err) => {
console.error('RabbitMQ connection error:', err)
})
rabbit.on('connection', () => {
console.log('Connection successfully (re)established to RabbitMQ')
})
async function setupAndRun() {
// Consume messages from a queue
const consumer = rabbit.createConsumer({
queue: 'user-events-queue',
queueOptions: { durable: true },
qos: { prefetchCount: 2 }, // Handle 2 messages concurrently
exchanges: [{ exchange: 'my-events-exchange', type: 'topic' }],
queueBindings: [{ exchange: 'my-events-exchange', routingKey: 'users.*' }]
}, async (msg) => {
try {
console.log('Received message:', msg.body)
// Process message here. Auto-acknowledges on success.
// Throws error to nack and potentially requeue or dead-letter.
await new Promise(resolve => setTimeout(resolve, 50)); // Simulate async work
} catch (e) {
console.error('Error processing message:', e)
// Returning a specific status can control nack/requeue behavior
return 3; // Nack, don't requeue
}
})
consumer.on('error', (err) => {
console.error('Consumer error (user-events-queue):', err)
})
// Declare a publisher
const publisher = rabbit.createPublisher({
confirm: true, // Enable publish confirmations
maxAttempts: 2, // Enable retries on publish failure
exchanges: [{ exchange: 'my-events-exchange', type: 'topic' }]
})
// Publish a message to an exchange
try {
await publisher.send(
{ exchange: 'my-events-exchange', routingKey: 'users.visit' },
{ id: Date.now(), name: 'Alice', action: 'visit' }
)
console.log('Published user.visit message.')
} catch (e) {
console.error('Failed to publish message:', e)
}
// Publish directly to a queue (less common with exchanges)
try {
await publisher.send(
{ queue: 'direct-queue' },
{ message: 'This goes directly to a queue' }
)
console.log('Published direct message to queue.')
} catch (e) {
console.error('Failed to publish direct message:', e)
}
// Keep the process alive for a bit to receive messages
// setTimeout(() => {
// consumer.close()
// rabbit.close()
// console.log('Closed consumer and connection.')
// }, 10000)
}
setupAndRun();