Kafka Console CLI Tool

raw JSON →
3.1.13 verified Thu Apr 23 auth: no javascript

Kafka Console is a command-line interface (CLI) tool designed for interacting with Apache Kafka clusters. It simplifies common Kafka operations such as listing topics, consuming messages from specific topics and partitions, and producing messages to topics. The current stable version is 3.1.13. As a CLI tool, its release cadence is generally driven by feature additions, bug fixes, and compatibility with newer Kafka broker versions. Key differentiators include robust support for various SASL authentication mechanisms (plain, scram-sha-256, scram-sha-512, oauthbearer) and TLS/SSL connections, making it suitable for secure Kafka environments. It offers flexible message consumption options like consuming from specific offsets or timestamps, filtering, and outputting in JSONL or raw formats. For production, it supports static headers and reading messages from stdin or a file, making it versatile for scripting and automation. It directly addresses the need for a capable, easy-to-use, yet powerful terminal-based Kafka client.

error Failed to connect to brokers: [broker:port] (Connect brokers timed out)
cause The Kafka broker address is incorrect, the broker is not running, or network firewall rules are preventing a connection.
fix
Verify the broker address (e.g., localhost:9092), ensure the Kafka broker process is active, and check any local or network firewall configurations. Confirm the port is correct.
error Failed to consume messages: Topic 'my-topic' not found or authorized.
cause The specified Kafka topic does not exist on the cluster, or the authenticated user lacks permissions to access it.
fix
Verify the topic name using kafka-console list. If the topic exists, check Kafka ACLs or security configurations to ensure the user has consume/produce permissions.
error Failed to produce messages: SASL authentication failed
cause The provided SASL credentials (username, password, mechanism, or OAuth token) are incorrect or do not match the Kafka broker's configuration.
fix
Double-check the --username, --password, --mechanism, or --oauth-bearer options against your Kafka broker's SASL setup. Ensure the mechanism is supported by both client and broker.
error Failed to consume messages: Invalid JSON payload in message at offset X
cause The `--data-format json` option was used, but messages in the topic contain non-JSON data that cannot be parsed as valid JSON.
fix
If messages are not JSON, use --data-format raw to consume them as raw strings or buffers. If they are intended to be JSON, inspect the message content for malformed JSON.
gotcha Connecting to Kafka clusters secured with SASL or SSL can be complex. Ensure all `--ssl`, `--mechanism`, `--username`, `--password`, or `--oauth-bearer` options are correctly specified and match your broker's configuration. Incorrect settings often lead to connection timeouts or authentication failures.
fix Double-check broker security configurations and corresponding `kafka-console` options. Consider environment variables like `KAFKA_MECHANISM`, `KAFKA_USERNAME`, etc., for sensitive credentials.
gotcha The `--data-format` option for `consume` and `produce` commands defaults to `json`. If your messages are not valid JSON, consumption will fail to parse or production will serialize them as a string, leading to unexpected behavior. Using `raw` format is crucial for non-JSON payloads.
fix For non-JSON message values, always explicitly set `--data-format raw`. For custom formats, provide a path to a custom formatter script.
gotcha When consuming, `--from 0` fetches messages from the beginning of the topic, while omitting `--from` defaults to `latest`. Be mindful of this difference, especially when debugging or setting up continuous consumers. Using `--from 0` with `--group` can still result in only new messages if the group has an existing offset.
fix Use `--from 0` to ensure all historical messages are considered. For new messages only, rely on the default or specify a timestamp. Always consider the impact of `--group` on offset management.
gotcha Consuming a large number of messages without a consumer group (i.e., not specifying `--group`) can be inefficient for long-running processes, as offsets are not committed. This can lead to re-processing messages upon restart.
fix For continuous or large-scale consumption, always specify a unique `--group` name to leverage Kafka's offset management and avoid redundant processing across restarts or multiple consumers.
npm install kafka-console
yarn add kafka-console
pnpm add kafka-console

This quickstart demonstrates how to programmatically execute `kafka-console` commands using Node.js's `child_process` module to list topics, produce, and consume messages.

const { exec } = require('child_process');

async function runKafkaCommand(command) {
  return new Promise((resolve, reject) => {
    // KAFKA_BROKERS environment variable can be used, or default to localhost
    const brokers = process.env.KAFKA_BROKERS || 'localhost:9092';
    exec(`npx kafka-console ${command} -b ${brokers}`, (error, stdout, stderr) => {
      if (error) {
        console.error(`Command failed: npx kafka-console ${command} -b ${brokers}\nStderr: ${stderr}`);
        return reject(error);
      }
      if (stderr) {
        console.warn(`Command stderr for ${command}:\n${stderr}`);
      }
      console.log(`Stdout for ${command}:\n${stdout}`);
      resolve(stdout);
    });
  });
}

(async () => {
  const topic = 'my-console-topic'; // Ensure this topic exists on your Kafka cluster

  console.log('--- Listing topics ---');
  await runKafkaCommand('list');

  console.log('\n--- Producing a single message ---');
  const message = JSON.stringify({ key: "user-1", value: { name: "Alice", timestamp: Date.now() } });
  // Using echo with the heredoc-like syntax (<<<) for stdin
  await runKafkaCommand(`produce ${topic} <<< '${message}'`);

  console.log('\n--- Consuming 1 message from the beginning ---');
  // Note: --from 0 might re-read messages if group not used, or if no messages since start
  await runKafkaCommand(`consume ${topic} --from 0 --count 1 --group cli-quickstart`);

  console.log('\nQuickstart complete. Verify Kafka brokers are running and topic exists for full functionality.');
})().catch(console.error);