Kafka Console CLI Tool
raw JSON →Kafka Console is a command-line interface (CLI) tool designed for interacting with Apache Kafka clusters. It simplifies common Kafka operations such as listing topics, consuming messages from specific topics and partitions, and producing messages to topics. The current stable version is 3.1.13. As a CLI tool, its release cadence is generally driven by feature additions, bug fixes, and compatibility with newer Kafka broker versions. Key differentiators include robust support for various SASL authentication mechanisms (plain, scram-sha-256, scram-sha-512, oauthbearer) and TLS/SSL connections, making it suitable for secure Kafka environments. It offers flexible message consumption options like consuming from specific offsets or timestamps, filtering, and outputting in JSONL or raw formats. For production, it supports static headers and reading messages from stdin or a file, making it versatile for scripting and automation. It directly addresses the need for a capable, easy-to-use, yet powerful terminal-based Kafka client.
Common errors
error Failed to connect to brokers: [broker:port] (Connect brokers timed out) ↓
localhost:9092), ensure the Kafka broker process is active, and check any local or network firewall configurations. Confirm the port is correct. error Failed to consume messages: Topic 'my-topic' not found or authorized. ↓
kafka-console list. If the topic exists, check Kafka ACLs or security configurations to ensure the user has consume/produce permissions. error Failed to produce messages: SASL authentication failed ↓
--username, --password, --mechanism, or --oauth-bearer options against your Kafka broker's SASL setup. Ensure the mechanism is supported by both client and broker. error Failed to consume messages: Invalid JSON payload in message at offset X ↓
--data-format raw to consume them as raw strings or buffers. If they are intended to be JSON, inspect the message content for malformed JSON. Warnings
gotcha Connecting to Kafka clusters secured with SASL or SSL can be complex. Ensure all `--ssl`, `--mechanism`, `--username`, `--password`, or `--oauth-bearer` options are correctly specified and match your broker's configuration. Incorrect settings often lead to connection timeouts or authentication failures. ↓
gotcha The `--data-format` option for `consume` and `produce` commands defaults to `json`. If your messages are not valid JSON, consumption will fail to parse or production will serialize them as a string, leading to unexpected behavior. Using `raw` format is crucial for non-JSON payloads. ↓
gotcha When consuming, `--from 0` fetches messages from the beginning of the topic, while omitting `--from` defaults to `latest`. Be mindful of this difference, especially when debugging or setting up continuous consumers. Using `--from 0` with `--group` can still result in only new messages if the group has an existing offset. ↓
gotcha Consuming a large number of messages without a consumer group (i.e., not specifying `--group`) can be inefficient for long-running processes, as offsets are not committed. This can lead to re-processing messages upon restart. ↓
Install
npm install kafka-console yarn add kafka-console pnpm add kafka-console Quickstart
const { exec } = require('child_process');
async function runKafkaCommand(command) {
return new Promise((resolve, reject) => {
// KAFKA_BROKERS environment variable can be used, or default to localhost
const brokers = process.env.KAFKA_BROKERS || 'localhost:9092';
exec(`npx kafka-console ${command} -b ${brokers}`, (error, stdout, stderr) => {
if (error) {
console.error(`Command failed: npx kafka-console ${command} -b ${brokers}\nStderr: ${stderr}`);
return reject(error);
}
if (stderr) {
console.warn(`Command stderr for ${command}:\n${stderr}`);
}
console.log(`Stdout for ${command}:\n${stdout}`);
resolve(stdout);
});
});
}
(async () => {
const topic = 'my-console-topic'; // Ensure this topic exists on your Kafka cluster
console.log('--- Listing topics ---');
await runKafkaCommand('list');
console.log('\n--- Producing a single message ---');
const message = JSON.stringify({ key: "user-1", value: { name: "Alice", timestamp: Date.now() } });
// Using echo with the heredoc-like syntax (<<<) for stdin
await runKafkaCommand(`produce ${topic} <<< '${message}'`);
console.log('\n--- Consuming 1 message from the beginning ---');
// Note: --from 0 might re-read messages if group not used, or if no messages since start
await runKafkaCommand(`consume ${topic} --from 0 --count 1 --group cli-quickstart`);
console.log('\nQuickstart complete. Verify Kafka brokers are running and topic exists for full functionality.');
})().catch(console.error);