{"id":17732,"library":"kafka-console","title":"Kafka Console CLI Tool","description":"Kafka Console is a command-line interface (CLI) tool designed for interacting with Apache Kafka clusters. It simplifies common Kafka operations such as listing topics, consuming messages from specific topics and partitions, and producing messages to topics. The current stable version is 3.1.13. As a CLI tool, its release cadence is generally driven by feature additions, bug fixes, and compatibility with newer Kafka broker versions. Key differentiators include robust support for various SASL authentication mechanisms (plain, scram-sha-256, scram-sha-512, oauthbearer) and TLS/SSL connections, making it suitable for secure Kafka environments. It offers flexible message consumption options like consuming from specific offsets or timestamps, filtering, and outputting in JSONL or raw formats. For production, it supports static headers and reading messages from stdin or a file, making it versatile for scripting and automation. It directly addresses the need for a capable, easy-to-use, yet powerful terminal-based Kafka client.","status":"active","version":"3.1.13","language":"javascript","source_language":"en","source_url":"https://github.com/3axap4eHko/kafka-console","tags":["javascript","kafka","cli","commands","sasl","consumer","producer"],"install":[{"cmd":"npm install kafka-console","lang":"bash","label":"npm"},{"cmd":"yarn add kafka-console","lang":"bash","label":"yarn"},{"cmd":"pnpm add kafka-console","lang":"bash","label":"pnpm"}],"dependencies":[],"imports":[],"quickstart":{"code":"const { exec } = require('child_process');\n\nasync function runKafkaCommand(command) {\n  return new Promise((resolve, reject) => {\n    // KAFKA_BROKERS environment variable can be used, or default to localhost\n    const brokers = process.env.KAFKA_BROKERS || 'localhost:9092';\n    exec(`npx kafka-console ${command} -b ${brokers}`, (error, stdout, stderr) => {\n      if (error) {\n        console.error(`Command failed: npx kafka-console ${command} -b ${brokers}\\nStderr: ${stderr}`);\n        return reject(error);\n      }\n      if (stderr) {\n        console.warn(`Command stderr for ${command}:\\n${stderr}`);\n      }\n      console.log(`Stdout for ${command}:\\n${stdout}`);\n      resolve(stdout);\n    });\n  });\n}\n\n(async () => {\n  const topic = 'my-console-topic'; // Ensure this topic exists on your Kafka cluster\n\n  console.log('--- Listing topics ---');\n  await runKafkaCommand('list');\n\n  console.log('\\n--- Producing a single message ---');\n  const message = JSON.stringify({ key: \"user-1\", value: { name: \"Alice\", timestamp: Date.now() } });\n  // Using echo with the heredoc-like syntax (<<<) for stdin\n  await runKafkaCommand(`produce ${topic} <<< '${message}'`);\n\n  console.log('\\n--- Consuming 1 message from the beginning ---');\n  // Note: --from 0 might re-read messages if group not used, or if no messages since start\n  await runKafkaCommand(`consume ${topic} --from 0 --count 1 --group cli-quickstart`);\n\n  console.log('\\nQuickstart complete. Verify Kafka brokers are running and topic exists for full functionality.');\n})().catch(console.error);","lang":"javascript","description":"This quickstart demonstrates how to programmatically execute `kafka-console` commands using Node.js's `child_process` module to list topics, produce, and consume messages."},"warnings":[{"fix":"Double-check broker security configurations and corresponding `kafka-console` options. Consider environment variables like `KAFKA_MECHANISM`, `KAFKA_USERNAME`, etc., for sensitive credentials.","message":"Connecting to Kafka clusters secured with SASL or SSL can be complex. Ensure all `--ssl`, `--mechanism`, `--username`, `--password`, or `--oauth-bearer` options are correctly specified and match your broker's configuration. Incorrect settings often lead to connection timeouts or authentication failures.","severity":"gotcha","affected_versions":">=1.0.0"},{"fix":"For non-JSON message values, always explicitly set `--data-format raw`. For custom formats, provide a path to a custom formatter script.","message":"The `--data-format` option for `consume` and `produce` commands defaults to `json`. If your messages are not valid JSON, consumption will fail to parse or production will serialize them as a string, leading to unexpected behavior. Using `raw` format is crucial for non-JSON payloads.","severity":"gotcha","affected_versions":">=1.0.0"},{"fix":"Use `--from 0` to ensure all historical messages are considered. For new messages only, rely on the default or specify a timestamp. Always consider the impact of `--group` on offset management.","message":"When consuming, `--from 0` fetches messages from the beginning of the topic, while omitting `--from` defaults to `latest`. Be mindful of this difference, especially when debugging or setting up continuous consumers. Using `--from 0` with `--group` can still result in only new messages if the group has an existing offset.","severity":"gotcha","affected_versions":">=1.0.0"},{"fix":"For continuous or large-scale consumption, always specify a unique `--group` name to leverage Kafka's offset management and avoid redundant processing across restarts or multiple consumers.","message":"Consuming a large number of messages without a consumer group (i.e., not specifying `--group`) can be inefficient for long-running processes, as offsets are not committed. This can lead to re-processing messages upon restart.","severity":"gotcha","affected_versions":">=1.0.0"}],"env_vars":null,"last_verified":"2026-04-23T00:00:00.000Z","next_check":"2026-07-22T00:00:00.000Z","problems":[{"fix":"Verify the broker address (e.g., `localhost:9092`), ensure the Kafka broker process is active, and check any local or network firewall configurations. Confirm the port is correct.","cause":"The Kafka broker address is incorrect, the broker is not running, or network firewall rules are preventing a connection.","error":"Failed to connect to brokers: [broker:port] (Connect brokers timed out)"},{"fix":"Verify the topic name using `kafka-console list`. If the topic exists, check Kafka ACLs or security configurations to ensure the user has consume/produce permissions.","cause":"The specified Kafka topic does not exist on the cluster, or the authenticated user lacks permissions to access it.","error":"Failed to consume messages: Topic 'my-topic' not found or authorized."},{"fix":"Double-check the `--username`, `--password`, `--mechanism`, or `--oauth-bearer` options against your Kafka broker's SASL setup. Ensure the mechanism is supported by both client and broker.","cause":"The provided SASL credentials (username, password, mechanism, or OAuth token) are incorrect or do not match the Kafka broker's configuration.","error":"Failed to produce messages: SASL authentication failed"},{"fix":"If messages are not JSON, use `--data-format raw` to consume them as raw strings or buffers. If they are intended to be JSON, inspect the message content for malformed JSON.","cause":"The `--data-format json` option was used, but messages in the topic contain non-JSON data that cannot be parsed as valid JSON.","error":"Failed to consume messages: Invalid JSON payload in message at offset X"}],"ecosystem":"npm","meta_description":null,"install_score":null,"install_tag":null,"quickstart_score":null,"quickstart_tag":null}