KafkaJS Client

2.2.4 · active · verified Tue Apr 21

KafkaJS is a modern Apache Kafka client library for Node.js, currently at stable version 2.2.4. It offers comprehensive features for producing and consuming messages, as well as administrative tasks, providing native support for Kafka 0.10+ and specifically 0.11 features like transactions and custom authentication mechanisms. The library ships with full TypeScript definitions, ensuring a robust development experience. It aims for a low-level, high-performance approach, with a relatively frequent release cadence for bug fixes and minor enhancements, as seen by multiple patch releases in quick succession after major version bumps. A key differentiator is its focus on Node.js specifics and robust error handling to prevent common issues like CPU spikes.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to initialize a Kafka client, send messages using a producer, and consume messages using a consumer, including basic error handling and graceful shutdown.

import { Kafka, logLevel } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: [process.env.KAFKA_BROKER_1 ?? 'localhost:9092'],
  logLevel: logLevel.INFO,
});

const producer = kafka.producer();
const consumer = kafka.consumer({ groupId: 'my-group' });

const run = async () => {
  // Producer
  await producer.connect();
  await producer.send({
    topic: 'test-topic',
    messages: [
      { value: 'Hello KafkaJS user!' },
      { key: 'my-key', value: 'Another message with a key!' },
    ],
  });
  console.log('Message sent by producer.');

  // Consumer
  await consumer.connect();
  await consumer.subscribe({ topic: 'test-topic', fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        value: message.value?.toString(),
        key: message.key?.toString(),
        headers: message.headers,
        topic,
        partition,
      });
    },
  });
  console.log('Consumer started, waiting for messages...');
};

run().catch(async (e) => {
  console.error(`[example/kafkajs] ${e.message}`, e);
  await producer.disconnect();
  await consumer.disconnect();
  process.exit(1);
});

process.on('SIGTERM', async () => {
  console.log('SIGTERM received, disconnecting Kafka clients.');
  await producer.disconnect();
  await consumer.disconnect();
  process.exit(0);
});

view raw JSON →