SQS Consumer for Node.js

14.2.6 · active · verified Sun Apr 19

sqs-consumer is a robust JavaScript/TypeScript library designed to streamline the development of applications that process messages from Amazon SQS queues in Node.js environments. It significantly reduces common boilerplate by managing continuous polling, message visibility timeouts, automatic message deletion upon successful processing, and concurrent message handling. The current stable version, 14.2.6, reflects active development with frequent canary releases and stable updates primarily focused on maintenance, dependency management, and bug fixes. Its core differentiator is providing a high-level, event-driven abstraction over the AWS SQS API, which simplifies building scalable and resilient queue-based microservices compared to direct interactions with the lower-level `@aws-sdk/client-sqs`. The library is built for modern Node.js (>=20.0.0) and includes comprehensive TypeScript type definitions.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to create and start an SQS consumer using `sqs-consumer`, process messages asynchronously, handle errors during message processing, and implement graceful shutdown. It uses `SQSClient` from AWS SDK v3 and includes relevant event listeners.

import { Consumer } from 'sqs-consumer';
import { SQSClient, Message } from '@aws-sdk/client-sqs';

// Ensure environment variables are set for AWS_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY,
// or pass credentials directly to SQSClient configuration.
const REGION = process.env.AWS_REGION ?? 'us-east-1';
const SQS_QUEUE_URL = process.env.SQS_QUEUE_URL ?? 'https://sqs.us-east-1.amazonaws.com/123456789012/my-test-queue';

if (!process.env.AWS_ACCESS_KEY_ID || !process.env.AWS_SECRET_ACCESS_KEY) {
  console.warn('AWS credentials (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) are not set in environment variables. Consumer may fail without proper authentication.');
}

const sqs = new SQSClient({ region: REGION });

const app = Consumer.create({
  queueUrl: SQS_QUEUE_URL,
  handleMessage: async (message: Message) => {
    // This function is called for each message received.
    console.log(`Processing message: ${message.MessageId}`);
    console.log(`Message Body: ${message.Body}`);

    try {
      const data = JSON.parse(message.Body ?? '{}');
      console.log('Parsed data:', data);
      // Simulate some asynchronous work.
      await new Promise(resolve => setTimeout(resolve, Math.random() * 500));
      console.log(`Successfully processed and acknowledged message ${message.MessageId}`);
      // Returning nothing (or a fulfilled promise) implies successful processing and message deletion.
    } catch (error) {
      console.error(`Error processing message ${message.MessageId}:`, error);
      // If handleMessage throws, the message will NOT be deleted and will return to the queue for retry.
      throw error; // Re-throw to signal a processing failure.
    }
  },
  sqs: sqs, // Pass the instantiated SQSClient
  batchSize: 5, // Process up to 5 messages at once
  visibilityTimeout: 30, // seconds
  terminateVisibilityTimeout: true // Ensures messages are retried faster on processing_error
});

app.on('error', (err) => {
  console.error('Consumer-level error (e.g., SQS polling issue):', err.message);
});

app.on('processing_error', (err, message) => {
  console.error(`Handler error for message ${message.MessageId}:`, err.message);
  // This event is fired when handleMessage throws an error.
  // The message will not be deleted and will be retried (potentially immediately if terminateVisibilityTimeout is true).
});

app.on('timeout_exceeded', (message) => {
  console.warn(`Message processing timed out for ${message.MessageId}.`);
});

app.on('empty', () => {
  console.log('Queue is currently empty. Waiting for new messages...');
});

app.start();
console.log('SQS Consumer started. Waiting for messages...');

// Graceful shutdown on application exit signals
process.on('SIGTERM', async () => {
  console.log('SIGTERM received. Stopping consumer...');
  await app.stop();
  console.log('Consumer stopped. Exiting.');
  process.exit(0);
});

process.on('SIGINT', async () => {
  console.log('SIGINT received. Stopping consumer...');
  await app.stop();
  console.log('Consumer stopped. Exiting.');
  process.exit(0);
});

view raw JSON →