{"id":3396,"library":"apache-airflow-providers-apache-kafka","title":"Apache Airflow Apache Kafka Provider","description":"This is a provider package for Apache Airflow that enables interaction with Apache Kafka clusters. It provides hooks, operators, and triggers to read from topics, write to topics, and await specific messages in Kafka topics, integrating these operations seamlessly into Airflow DAGs. The current version is 1.13.1, and provider packages typically follow a roughly 2-3 month minor release cadence, with patch releases issued as needed.","status":"active","version":"1.13.1","language":"en","source_language":"en","source_url":"https://github.com/apache/airflow/tree/main/airflow/providers/apache/kafka","tags":["airflow","kafka","provider","apache-airflow","data-streaming","etl","orchestration","messaging"],"install":[{"cmd":"pip install apache-airflow-providers-apache-kafka","lang":"bash","label":"Install the provider"}],"dependencies":[{"reason":"Core Airflow installation is required. Version 1.13.1 of the provider requires Apache Airflow >=2.11.0.","package":"apache-airflow","optional":false},{"reason":"The provider uses the confluent-kafka-python library for interacting with Kafka. Specific versions are required based on Python version (e.g., >=2.6.0 for Python < 3.14, >=2.13.2 for Python >= 3.14).","package":"confluent-kafka","optional":false},{"reason":"A common compatibility provider package required by Apache Airflow providers, version >=1.12.0.","package":"apache-airflow-providers-common-compat","optional":false},{"reason":"ASGI server and client utilities, version requirements depend on Python version.","package":"asgiref","optional":false}],"imports":[{"note":"The official package uses `apache.kafka` in the path and `ProduceToTopicOperator` for the class name, not `kafka` or `KafkaProducerOperator` directly.","wrong":"from airflow.providers.kafka.operators.produce import KafkaProducerOperator","symbol":"ProduceToTopicOperator","correct":"from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator"},{"note":"The official package uses `apache.kafka` in the path and `ConsumeFromTopicOperator` for the class name.","wrong":"from airflow.providers.kafka.operators.consume import KafkaConsumerOperator","symbol":"ConsumeFromTopicOperator","correct":"from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator"},{"symbol":"AwaitKafkaMessageOperator","correct":"from airflow.providers.apache.kafka.operators.await_message import AwaitKafkaMessageOperator"},{"symbol":"KafkaProducerHook","correct":"from airflow.providers.apache.kafka.hooks.producer import KafkaProducerHook"},{"symbol":"KafkaConsumerHook","correct":"from airflow.providers.apache.kafka.hooks.consumer import KafkaConsumerHook"},{"symbol":"KafkaAdminClientHook","correct":"from airflow.providers.apache.kafka.hooks.admin_client import KafkaAdminClientHook"},{"symbol":"AwaitMessageTrigger","correct":"from airflow.providers.apache.kafka.triggers.await_message import AwaitMessageTrigger"}],"quickstart":{"code":"from __future__ import annotations\n\nimport pendulum\nfrom airflow.models.dag import DAG\nfrom airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator\nfrom airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator\n\nwith DAG(\n    dag_id=\"kafka_producer_consumer_example\",\n    start_date=pendulum.datetime(2023, 1, 1, tz=\"UTC\"),\n    catchup=False,\n    schedule=None,\n    tags=[\"kafka\", \"example\"],\n) as dag:\n    # Define a Kafka connection in Airflow UI named 'kafka_default'\n    # e.g., Host: broker:9092, Port: <empty>, Extras: {\"bootstrap.servers\": \"broker:9092\"}\n\n    produce_messages = ProduceToTopicOperator(\n        task_id=\"produce_test_messages\",\n        topic=\"test_topic\",\n        kafka_conn_id=\"kafka_default\",\n        producer_config={'bootstrap.servers': 'broker:9092'},\n        messages=[{'key': f'key-{i}', 'value': f'value-{i}'} for i in range(5)],\n    )\n\n    consume_messages = ConsumeFromTopicOperator(\n        task_id=\"consume_test_messages\",\n        topic=\"test_topic\",\n        kafka_conn_id=\"kafka_default\",\n        consumer_config={'bootstrap.servers': 'broker:9092', 'group.id': 'airflow-consumer-group', 'auto.offset.reset': 'earliest'},\n        apply_function='lambda msg: print(f\"Consumed: {msg.key}, {msg.value}\")', # Simple print function\n        commit_messages_every_n_messages=1, # Commit after each message for demonstration\n        max_messages=5, # Consume up to 5 messages\n        poll_timeout=10, # Max time to wait for messages\n    )\n\n    produce_messages >> consume_messages\n","lang":"python","description":"This quickstart demonstrates a simple Airflow DAG that uses the Kafka provider to produce and then consume messages from a Kafka topic. Before running, ensure you have a Kafka connection configured in the Airflow UI (Admin -> Connections) named `kafka_default`. The 'Extras' field should contain `{\"bootstrap.servers\": \"broker:9092\"}` or the appropriate Kafka broker address. The example assumes a local Kafka instance accessible via `broker:9092`."},"warnings":[{"fix":"Ensure your Apache Airflow core version meets or exceeds the minimum requirement for the installed provider version. Refer to the provider's changelog or documentation for specific compatibility matrices.","message":"The minimum supported Apache Airflow version for the `apache-airflow-providers-apache-kafka` provider is `2.11.0` for provider version `1.13.1`. Earlier provider versions (e.g., 1.7.0, 1.9.0, 1.11.0) required older minimum Airflow versions. Upgrading the provider might necessitate an upgrade of your Airflow core installation.","severity":"breaking","affected_versions":"All provider versions, specifically 1.13.1 and later."},{"fix":"Verify `confluent-kafka` is installed correctly (`pip install confluent-kafka`). For complex scenarios like Kerberos, you might need to build `librdkafka` from source and install `confluent-kafka` with the `--no-binary` flag. Ensure all necessary system-level dependencies for `librdkafka` (e.g., `libsasl2-dev`, `krb5-user`) are present.","message":"The provider relies on the `confluent-kafka` Python package. Issues, especially related to the underlying `librdkafka` C library (e.g., when Kerberos authentication is involved), can cause connection failures.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Carefully configure your Kafka connection in the Airflow UI. For `Apache Kafka` connection type, specify `bootstrap.servers` in the 'Extras' JSON field. For consumers, ensure a `group.id` is provided and is unique across consumer groups. Consult `librdkafka` documentation for a full list of connection parameters.","message":"Incorrect or incomplete Kafka connection configurations in the Airflow UI (or `kafka_conn_id` in operators) are a common source of errors, leading to connection failures, missing messages, or unexpected behavior. Key parameters like `bootstrap.servers` and `group.id` (for consumers) are critical.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Update your `requirements.txt` to use `apache-airflow-providers-apache-kafka` and adjust import paths if necessary to `airflow.providers.apache.kafka.*`.","message":"The original `astronomer/airflow-provider-kafka` GitHub repository and its associated PyPI package have been discontinued since March 2023. Users should migrate to the official `apache-airflow-providers-apache-kafka` package.","severity":"deprecated","affected_versions":"Users of `astronomer-airflow-providers-kafka`"},{"fix":"Use Airflow to orchestrate tasks that *interact* with Kafka (e.g., trigger a data load after a Kafka topic reaches a certain state, or publish results to Kafka). Avoid using Airflow as a streaming engine; dedicated streaming platforms are better suited for low-latency, high-throughput stream processing.","message":"Apache Airflow is designed for workflow orchestration and batch processing, not for real-time streaming or low-latency operations. While the Kafka provider allows interaction with Kafka, using Airflow itself for managing continuous streaming processes is an anti-pattern and can lead to performance and operational issues.","severity":"gotcha","affected_versions":"All versions"}],"env_vars":null,"last_verified":"2026-04-11T00:00:00.000Z","next_check":"2026-07-10T00:00:00.000Z"}