KafkaJS Snappy Codec (TypeScript)
kafkajs-snappy-typescript provides a Snappy compression and decompression codec specifically designed for use with KafkaJS, offering strict TypeScript compatibility. This package serves as a type-safe alternative to `tulios/kafkajs-snappy`, aiming to integrate seamlessly into TypeScript-first KafkaJS applications. It leverages Brooooooklyn's `snappy` Node.js compression library for its core compression logic. The current stable version is 1.0.3, primarily consisting of bug fixes and rebundling efforts to ensure proper NPM publication. As a specialized codec, its release cadence is tied to necessary compatibility updates with KafkaJS or its underlying `snappy` dependency, rather than frequent feature additions. Its key differentiator is its explicit focus on strong typing for KafkaJS environments.
Common errors
-
Error: Unknown compression type
cause The Snappy codec was not registered with KafkaJS's `CompressionCodecs` before a producer attempted to send a Snappy-compressed message or a consumer tried to decompress one.fixAdd `CompressionCodecs[CompressionTypes.Snappy] = new SnappyCodec().codec;` to your application startup code, ensuring it runs before KafkaJS operations. -
node-pre-gyp ERR! build error
cause The underlying `snappy` package (which uses native code) failed to compile during installation, often due to missing build tools like `python` or `make`, or incorrect compiler setup.fixEnsure you have the necessary build tools installed on your system. For `node-gyp`, this typically includes Python 3, `make` (or Xcode on macOS, Visual C++ Build Tools on Windows). Consult the `node-gyp` and `snappy` documentation for detailed prerequisites. -
TypeError: Cannot read properties of undefined (reading 'codec')
cause You might be attempting to access `.codec` on `SnappyCodec` without instantiating it first, or `SnappyCodec` itself might be undefined due to an incorrect import.fixEnsure you are instantiating the class correctly: `new SnappyCodec().codec`. Also, verify the import path: `import { SnappyCodec } from 'kafkajs-snappy-typescript';`
Warnings
- gotcha This package is a codec and must be explicitly registered with KafkaJS via `CompressionCodecs[CompressionTypes.Snappy] = new SnappyCodec().codec;` before producing or consuming Snappy-compressed messages. Failing to register it will result in KafkaJS not knowing how to handle Snappy compression.
- gotcha The underlying `snappy` package, used for compression, relies on native C++ bindings. This can sometimes lead to compilation issues during `npm install` on certain environments or architectures if build tools (like `node-gyp` dependencies) are not correctly set up. Check the `snappy` package documentation for specific build requirements.
- gotcha This library is a TypeScript-focused alternative to `tulios/kafkajs-snappy`. While functionally similar, using both or mixing their import styles could lead to type conflicts or confusion.
Install
-
npm install kafkajs-snappy-typescript -
yarn add kafkajs-snappy-typescript -
pnpm add kafkajs-snappy-typescript
Imports
- SnappyCodec
const { SnappyCodec } = require('kafkajs-snappy-typescript');import { SnappyCodec } from 'kafkajs-snappy-typescript'; - CompressionTypes
import { CompressionTypes, CompressionCodecs } from 'kafkajs';
Quickstart
import { Kafka, CompressionTypes, CompressionCodecs } from 'kafkajs';
import { SnappyCodec } from 'kafkajs-snappy-typescript';
// Register the Snappy codec with KafkaJS
CompressionCodecs[CompressionTypes.Snappy] = new SnappyCodec().codec;
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'],
});
const producer = kafka.producer();
const consumer = kafka.consumer({ groupId: 'test-group' });
async function run() {
await producer.connect();
await consumer.connect();
await producer.send({
topic: 'test-topic',
compression: CompressionTypes.Snappy, // Use Snappy compression
messages: [
{ value: 'Hello KafkaJS with Snappy!' },
],
});
console.log('Message sent with Snappy compression.');
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
value: message.value?.toString(),
headers: message.headers,
});
},
});
// Disconnect after a short period for demonstration
setTimeout(async () => {
await producer.disconnect();
await consumer.disconnect();
console.log('Producer and consumer disconnected.');
}, 5000);
}
run().catch(console.error);