Exframe Messaging Queue
exframe-mq is a messaging framework module providing a simplified abstraction layer over RabbitMQ for Node.js applications, currently stable at version 5.2.0. It is part of the broader exframe ecosystem and is typically used in conjunction with other exframe-* packages like exframe-context, exframe-health, and exframe-logger. The library primarily focuses on a Publish/Subscribe topology, makes a key assumption of JSON payloads for all messages, and manages persistent and shared connections to the RabbitMQ server. While a specific release cadence isn't detailed, its close ties to other exframe modules suggest coordinated releases. Key differentiators include its opinionated, simplified API for common RabbitMQ patterns and deep integration with the exframe application framework.
Common errors
-
Error: connect ECONNREFUSED 127.0.0.1:5672
cause The RabbitMQ server is not running or is inaccessible at the configured address and port.fixEnsure the RabbitMQ server is running and accessible from the application's host. Verify the `url` configuration in `mq.create()` is correct. -
TypeError: mq.create is not a function
cause The `exframe-mq` module was imported incorrectly, likely using named import syntax (e.g., `import { create } from 'exframe-mq';`) instead of importing the default object.fixAdjust your import statement to `import mq from 'exframe-mq';` (ESM) or `const mq = require('exframe-mq');` (CommonJS), then access the `create` method as `mq.create()`. -
Error: Missing peer dependency "exframe-logger@3.x"
cause A required peer dependency of `exframe-mq` is not installed or its version does not match the expected range specified in `package.json`.fixInstall the missing peer dependency using `npm install <package-name>@<version-range>` (e.g., `npm install exframe-logger@3`) to satisfy the requirement. Repeat for any other missing `exframe-*` peer dependencies.
Warnings
- gotcha The library assumes all message payloads are JSON. Non-JSON messages will likely cause parsing errors in subscribers when the module attempts to process them.
- gotcha `exframe-mq` has several peer dependencies (`exframe-context`, `exframe-health`, `exframe-logger`, `exframe-service`) that must be installed and configured alongside it for full functionality. Missing or incompatible versions can lead to runtime errors.
- gotcha The default RabbitMQ URL (`amqp://localhost`) and connection retry settings (e.g., `maxAttempts`, `baseTimeout`) are suitable for local development but are generally insufficient for robust production environments, which require proper authentication and more resilient retry logic.
- gotcha Message processing middleware functions passed to `client.subscribe` are expected to return Promises that resolve. Failing to return a Promise or returning a synchronous value without wrapping it can lead to unexpected behavior in message acknowledgment or error handling.
- gotcha The `logger` option passed to `mq.create` expects an object with an interface compatible with `winston` logger methods (e.g., `info`, `error`, `warn`, `debug`). While a default logger outputs to stdout, custom logging requires a compliant object.
Install
-
npm install exframe-mq -
yarn add exframe-mq -
pnpm add exframe-mq
Imports
- mq
import { create } from 'exframe-mq';import mq from 'exframe-mq';
- create
import { create } from 'exframe-mq';import mq from 'exframe-mq'; const client = mq.create(...);
- client
import { client } from 'exframe-mq';import mq from 'exframe-mq'; const clientInstance = mq.client(options);
Quickstart
import mq from 'exframe-mq'; // Use 'const mq = require("exframe-mq");' for CommonJS
// Mock a logger for demonstration purposes
const logger = {
info: (message, ...args) => console.log('INFO:', message, ...args),
error: (message, ...args) => console.error('ERROR:', message, ...args),
warn: (message, ...args) => console.warn('WARN:', message, ...args),
debug: (message, ...args) => console.debug('DEBUG:', message, ...args),
};
async function runMessagingExample() {
const rabbitmqUrl = process.env.RABBITMQ_URL ?? 'amqp://localhost';
console.log(`Attempting to connect to RabbitMQ at ${rabbitmqUrl}...`);
try {
// Initialize the main exframe-mq client
const mqClient = mq.create({
logger,
url: rabbitmqUrl,
heartbeat: 30,
baseTimeout: 500,
maxAttempts: 5,
responseTimeout: 60 * 1000,
});
// Get a messaging client instance for publish/subscribe operations
const client = mqClient.client();
const EXCHANGE_NAME = 'my-app-exchange';
const ROUTING_KEY_PATTERN = 'my.topic.#'; // Subscribe to all messages under 'my.topic'
const PUBLISH_ROUTING_KEY = 'my.topic.hello';
// Subscribe to messages on a topic exchange
await client.subscribe(EXCHANGE_NAME, ROUTING_KEY_PATTERN, async (context, message, extraParams = {}) => {
logger.info(`Received message: ${JSON.stringify(message)} with context ${JSON.stringify(context)} and extraParams ${JSON.stringify(extraParams)}`);
// Simulate some asynchronous processing
await new Promise(resolve => setTimeout(resolve, 100));
}, { exchangeType: 'topic' });
logger.info(`Successfully subscribed to '${EXCHANGE_NAME}' with routing key pattern '${ROUTING_KEY_PATTERN}'.`);
// Publish a message after a short delay to allow subscription to establish
setTimeout(() => {
const messagePayload = { sender: 'quickstart-app', content: 'Hello from exframe-mq!' };
logger.info(`Publishing message '${JSON.stringify(messagePayload)}' to '${EXCHANGE_NAME}' with routing key '${PUBLISH_ROUTING_KEY}'.`);
client.publish(EXCHANGE_NAME, PUBLISH_ROUTING_KEY, messagePayload);
}, 2000);
console.log('Application running. Awaiting messages. Press Ctrl+C to exit.');
} catch (error) {
logger.error('Failed to initialize or connect to RabbitMQ:', error);
process.exit(1);
}
}
runMessagingExample();