KafkaJS Client
KafkaJS is a modern Apache Kafka client library for Node.js, currently at stable version 2.2.4. It offers comprehensive features for producing and consuming messages, as well as administrative tasks, providing native support for Kafka 0.10+ and specifically 0.11 features like transactions and custom authentication mechanisms. The library ships with full TypeScript definitions, ensuring a robust development experience. It aims for a low-level, high-performance approach, with a relatively frequent release cadence for bug fixes and minor enhancements, as seen by multiple patch releases in quick succession after major version bumps. A key differentiator is its focus on Node.js specifics and robust error handling to prevent common issues like CPU spikes.
Common errors
-
The group is rebalancing, so a rejoin is needed (ILLEGAL_GENERATION)
cause A consumer group coordination issue where a rebalance incorrectly led to an `ILLEGAL_GENERATION` error preventing consumers from rejoining the group.fixUpgrade KafkaJS to v2.2.4 or newer to fix the consumer group rejoining logic after `ILLEGAL_GENERATION` errors. -
Application consuming 100% CPU without processing messages.
cause KafkaJS client entering a tight loop when all configured Kafka brokers are unreachable or unavailable.fixUpgrade KafkaJS to v2.1.0 or newer to resolve the 100% CPU utilization issue during broker unavailability. -
Failed to connect: SASL authentication failed (when using PLAIN mechanism)
cause A regression in `v2.2.0` through `v2.2.2` broke SASL/PLAIN authentication support.fixUpgrade KafkaJS to v2.2.3 or newer to fix the regression in SASL/PLAIN authentication. -
Error: Topic authorization failed (producer getting stuck)
cause A producer might get stuck or persistently fail to send messages after encountering a topic authorization error.fixUpgrade KafkaJS to v2.1.0 or newer to fix persistent errors when producing after a topic authorization error.
Warnings
- breaking Upgrading to v2.0.0 from v1.x introduces several breaking changes, including a new default partitioner, removal of Node.js 10/12 support, and changes to admin client methods and TypeScript enums. Not addressing these can lead to messages being routed incorrectly or runtime errors.
- gotcha Prior to v2.1.0, KafkaJS consumers could exhibit 100% CPU utilization when all configured Kafka brokers were unavailable, leading to application unresponsiveness and crashes.
- gotcha A regression in versions 2.2.0 through 2.2.2 caused issues with SASL/PLAIN authentication, preventing clients from connecting correctly when using this mechanism.
- gotcha Consumers might get stuck or become unresponsive after very brief throttling periods, potentially halting message processing, a bug resolved in v2.2.4.
- gotcha Older versions could enter an infinite crash loop when no brokers were available during startup or reconnection, leading to application instability.
- gotcha Re-using `groupId` across different applications or independent deployments of the same application can lead to unexpected consumer behavior, such as receiving messages for unsubscribed topics or frequent rebalances.
Install
-
npm install kafkajs -
yarn add kafkajs -
pnpm add kafkajs
Imports
- Kafka
const { Kafka } = require('kafkajs')import { Kafka } from 'kafkajs' - Producer, Consumer, Admin
import { Producer } from 'kafkajs/lib/producer'import { Producer, Consumer, Admin } from 'kafkajs' - EachMessagePayload, KafkaConfig
import { EachMessagePayload } from 'kafkajs'import type { EachMessagePayload, KafkaConfig } from 'kafkajs' - Partitioners
const { DefaultPartitioner } = require('kafkajs/partitioners')import { Partitioners } from 'kafkajs'
Quickstart
import { Kafka, logLevel } from 'kafkajs';
const kafka = new Kafka({
clientId: 'my-app',
brokers: [process.env.KAFKA_BROKER_1 ?? 'localhost:9092'],
logLevel: logLevel.INFO,
});
const producer = kafka.producer();
const consumer = kafka.consumer({ groupId: 'my-group' });
const run = async () => {
// Producer
await producer.connect();
await producer.send({
topic: 'test-topic',
messages: [
{ value: 'Hello KafkaJS user!' },
{ key: 'my-key', value: 'Another message with a key!' },
],
});
console.log('Message sent by producer.');
// Consumer
await consumer.connect();
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
value: message.value?.toString(),
key: message.key?.toString(),
headers: message.headers,
topic,
partition,
});
},
});
console.log('Consumer started, waiting for messages...');
};
run().catch(async (e) => {
console.error(`[example/kafkajs] ${e.message}`, e);
await producer.disconnect();
await consumer.disconnect();
process.exit(1);
});
process.on('SIGTERM', async () => {
console.log('SIGTERM received, disconnecting Kafka clients.');
await producer.disconnect();
await consumer.disconnect();
process.exit(0);
});