SQS Consumer for Node.js
sqs-consumer is a robust JavaScript/TypeScript library designed to streamline the development of applications that process messages from Amazon SQS queues in Node.js environments. It significantly reduces common boilerplate by managing continuous polling, message visibility timeouts, automatic message deletion upon successful processing, and concurrent message handling. The current stable version, 14.2.6, reflects active development with frequent canary releases and stable updates primarily focused on maintenance, dependency management, and bug fixes. Its core differentiator is providing a high-level, event-driven abstraction over the AWS SQS API, which simplifies building scalable and resilient queue-based microservices compared to direct interactions with the lower-level `@aws-sdk/client-sqs`. The library is built for modern Node.js (>=20.0.0) and includes comprehensive TypeScript type definitions.
Common errors
-
Error: Missing credentials in config
cause The AWS SDK (used by sqs-consumer) cannot find AWS credentials in the environment, shared credential file, or passed SQSClient configuration.fixEnsure `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables are set, or provide an `SQSClient` instance with credentials explicitly defined in the `Consumer.create` options. -
TypeError: (0 , sqs_consumer_1.Consumer) is not a function
cause Attempting to import `Consumer` using a default import syntax (`import Consumer from 'sqs-consumer'`) or CommonJS `require()` when the package is an ES Module with named exports.fixChange your import statement to `import { Consumer } from 'sqs-consumer';` for ES Modules. -
AccessDeniedException: User: arn:aws:iam::ACCOUNT_ID:user/IAM_USER is not authorized to perform: sqs:ReceiveMessage on resource: QUEUE_URL
cause The AWS IAM principal (user or role) configured for the application lacks the necessary permissions to interact with the specified SQS queue.fixReview and update the IAM policy attached to your AWS user or role, ensuring it includes required permissions like `sqs:ReceiveMessage`, `sqs:DeleteMessage`, and `sqs:ChangeMessageVisibility` for the target queue. -
Consumer-level error (e.g., SQS polling issue): AWS.SimpleQueueService.QueueDoesNotExist: The specified queue does not exist for this wsdl version.
cause The `queueUrl` provided to `Consumer.create` is incorrect, or the queue does not exist in the specified AWS region.fixDouble-check the `queueUrl` and `region` provided in the `Consumer.create` options. Ensure the SQS queue exists and its URL matches the configured value.
Warnings
- breaking `sqs-consumer` transitioned to an ES Module (ESM) codebase and expects usage with `import` statements. Attempting to use CommonJS `require()` will result in import errors or `TypeError`s. This change typically aligns with versions that integrate AWS SDK v3.
- gotcha Errors thrown within the `handleMessage` function are critical. If an `async handleMessage` function throws an error (or returns a rejected promise), the message will NOT be automatically deleted from the SQS queue. It will become visible again after its visibility timeout expires, potentially leading to infinite retries if not handled by a dead-letter queue (DLQ) policy. Ensure your `handleMessage` logic is robust or uses `terminateVisibilityTimeout`.
- gotcha Incorrect AWS Identity and Access Management (IAM) permissions for the SQS queue will prevent the consumer from polling or deleting messages. Required permissions include `sqs:ReceiveMessage`, `sqs:DeleteMessage`, `sqs:DeleteMessageBatch`, `sqs:ChangeMessageVisibility`, `sqs:ChangeMessageVisibilityBatch`, `sqs:GetQueueAttributes`, and `sqs:GetQueueUrl`.
- gotcha Prior to version `14.2.7-canary.2`, a bug existed where unhandled errors within `handleMessage` could cause the entire consumer polling loop to crash, effectively stopping message processing entirely. While fixed in newer versions, it highlights the importance of robust error handling.
Install
-
npm install sqs-consumer -
yarn add sqs-consumer -
pnpm add sqs-consumer
Imports
- Consumer
import Consumer from 'sqs-consumer'; const Consumer = require('sqs-consumer');import { Consumer } from 'sqs-consumer'; - SQSClient
import { SQS } from 'aws-sdk';import { SQSClient } from '@aws-sdk/client-sqs'; - ConsumerOptions
import { ConsumerOptions } from 'sqs-consumer'; - Message
import { Message } from '@aws-sdk/client-sqs';
Quickstart
import { Consumer } from 'sqs-consumer';
import { SQSClient, Message } from '@aws-sdk/client-sqs';
// Ensure environment variables are set for AWS_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY,
// or pass credentials directly to SQSClient configuration.
const REGION = process.env.AWS_REGION ?? 'us-east-1';
const SQS_QUEUE_URL = process.env.SQS_QUEUE_URL ?? 'https://sqs.us-east-1.amazonaws.com/123456789012/my-test-queue';
if (!process.env.AWS_ACCESS_KEY_ID || !process.env.AWS_SECRET_ACCESS_KEY) {
console.warn('AWS credentials (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) are not set in environment variables. Consumer may fail without proper authentication.');
}
const sqs = new SQSClient({ region: REGION });
const app = Consumer.create({
queueUrl: SQS_QUEUE_URL,
handleMessage: async (message: Message) => {
// This function is called for each message received.
console.log(`Processing message: ${message.MessageId}`);
console.log(`Message Body: ${message.Body}`);
try {
const data = JSON.parse(message.Body ?? '{}');
console.log('Parsed data:', data);
// Simulate some asynchronous work.
await new Promise(resolve => setTimeout(resolve, Math.random() * 500));
console.log(`Successfully processed and acknowledged message ${message.MessageId}`);
// Returning nothing (or a fulfilled promise) implies successful processing and message deletion.
} catch (error) {
console.error(`Error processing message ${message.MessageId}:`, error);
// If handleMessage throws, the message will NOT be deleted and will return to the queue for retry.
throw error; // Re-throw to signal a processing failure.
}
},
sqs: sqs, // Pass the instantiated SQSClient
batchSize: 5, // Process up to 5 messages at once
visibilityTimeout: 30, // seconds
terminateVisibilityTimeout: true // Ensures messages are retried faster on processing_error
});
app.on('error', (err) => {
console.error('Consumer-level error (e.g., SQS polling issue):', err.message);
});
app.on('processing_error', (err, message) => {
console.error(`Handler error for message ${message.MessageId}:`, err.message);
// This event is fired when handleMessage throws an error.
// The message will not be deleted and will be retried (potentially immediately if terminateVisibilityTimeout is true).
});
app.on('timeout_exceeded', (message) => {
console.warn(`Message processing timed out for ${message.MessageId}.`);
});
app.on('empty', () => {
console.log('Queue is currently empty. Waiting for new messages...');
});
app.start();
console.log('SQS Consumer started. Waiting for messages...');
// Graceful shutdown on application exit signals
process.on('SIGTERM', async () => {
console.log('SIGTERM received. Stopping consumer...');
await app.stop();
console.log('Consumer stopped. Exiting.');
process.exit(0);
});
process.on('SIGINT', async () => {
console.log('SIGINT received. Stopping consumer...');
await app.stop();
console.log('Consumer stopped. Exiting.');
process.exit(0);
});