Apache Airflow Apache Kafka Provider
raw JSON → 1.13.1 verified Sat Apr 25 auth: no python
This is a provider package for Apache Airflow that enables interaction with Apache Kafka clusters. It provides hooks, operators, and triggers to read from topics, write to topics, and await specific messages in Kafka topics, integrating these operations seamlessly into Airflow DAGs. The current version is 1.13.1, and provider packages typically follow a roughly 2-3 month minor release cadence, with patch releases issued as needed.
pip install apache-airflow-providers-apache-kafka Common errors
error ModuleNotFoundError: No module named 'airflow.providers.apache.kafka' ↓
cause The 'apache-airflow-providers-apache-kafka' package is not installed.
fix
Install the package using pip: 'pip install apache-airflow-providers-apache-kafka'.
error ImportError: cannot import name 'KafkaOperator' from 'airflow.providers.apache.kafka.operators' ↓
cause The 'KafkaOperator' class is not available in the specified module.
fix
Ensure you are importing the correct class from the correct module, and that your 'apache-airflow-providers-apache-kafka' package is up to date.
error AttributeError: module 'airflow.providers.apache.kafka.hooks' has no attribute 'KafkaHook' ↓
cause The 'KafkaHook' attribute does not exist in the 'airflow.providers.apache.kafka.hooks' module.
fix
Verify the correct import path and ensure that the 'apache-airflow-providers-apache-kafka' package is properly installed and up to date.
error ModuleNotFoundError: No module named 'confluent_kafka' ↓
cause The 'confluent-kafka' package, a dependency of 'apache-airflow-providers-apache-kafka', is not installed.
fix
Install the 'confluent-kafka' package using pip: 'pip install confluent-kafka'.
error TypeError: 'NoneType' object is not iterable ↓
cause A function is returning 'None' when an iterable is expected, possibly due to a misconfigured Kafka connection.
fix
Check the Kafka connection configuration in Airflow and ensure all required parameters are correctly set.
Warnings
breaking The minimum supported Apache Airflow version for the `apache-airflow-providers-apache-kafka` provider is `2.11.0` for provider version `1.13.1`. Earlier provider versions (e.g., 1.7.0, 1.9.0, 1.11.0) required older minimum Airflow versions. Upgrading the provider might necessitate an upgrade of your Airflow core installation. ↓
fix Ensure your Apache Airflow core version meets or exceeds the minimum requirement for the installed provider version. Refer to the provider's changelog or documentation for specific compatibility matrices.
gotcha The provider relies on the `confluent-kafka` Python package. Issues, especially related to the underlying `librdkafka` C library (e.g., when Kerberos authentication is involved), can cause connection failures. ↓
fix Verify `confluent-kafka` is installed correctly (`pip install confluent-kafka`). For complex scenarios like Kerberos, you might need to build `librdkafka` from source and install `confluent-kafka` with the `--no-binary` flag. Ensure all necessary system-level dependencies for `librdkafka` (e.g., `libsasl2-dev`, `krb5-user`) are present.
gotcha Incorrect or incomplete Kafka connection configurations in the Airflow UI (or `kafka_conn_id` in operators) are a common source of errors, leading to connection failures, missing messages, or unexpected behavior. Key parameters like `bootstrap.servers` and `group.id` (for consumers) are critical. ↓
fix Carefully configure your Kafka connection in the Airflow UI. For `Apache Kafka` connection type, specify `bootstrap.servers` in the 'Extras' JSON field. For consumers, ensure a `group.id` is provided and is unique across consumer groups. Consult `librdkafka` documentation for a full list of connection parameters.
deprecated The original `astronomer/airflow-provider-kafka` GitHub repository and its associated PyPI package have been discontinued since March 2023. Users should migrate to the official `apache-airflow-providers-apache-kafka` package. ↓
fix Update your `requirements.txt` to use `apache-airflow-providers-apache-kafka` and adjust import paths if necessary to `airflow.providers.apache.kafka.*`.
gotcha Apache Airflow is designed for workflow orchestration and batch processing, not for real-time streaming or low-latency operations. While the Kafka provider allows interaction with Kafka, using Airflow itself for managing continuous streaming processes is an anti-pattern and can lead to performance and operational issues. ↓
fix Use Airflow to orchestrate tasks that *interact* with Kafka (e.g., trigger a data load after a Kafka topic reaches a certain state, or publish results to Kafka). Avoid using Airflow as a streaming engine; dedicated streaming platforms are better suited for low-latency, high-throughput stream processing.
Install compatibility last tested: 2026-04-25
runtime status import time mem disk
3.10-alpine — — —
3.10-slim 3.66s 65.0MB 254M
3.11-alpine — — —
3.11-slim 5.80s 70.3MB 274M
3.12-alpine — — —
3.12-slim 6.31s 69.1MB 265M
3.13-alpine — — —
3.13-slim 6.41s 69.7MB 267M
3.9-alpine — — —
3.9-slim 7.08s 76.3MB 218M
Imports
- ProduceToTopicOperator wrong
from airflow.providers.kafka.operators.produce import KafkaProducerOperatorcorrectfrom airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator - ConsumeFromTopicOperator wrong
from airflow.providers.kafka.operators.consume import KafkaConsumerOperatorcorrectfrom airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator - AwaitKafkaMessageOperator
from airflow.providers.apache.kafka.operators.await_message import AwaitKafkaMessageOperator - KafkaProducerHook
from airflow.providers.apache.kafka.hooks.producer import KafkaProducerHook - KafkaConsumerHook
from airflow.providers.apache.kafka.hooks.consumer import KafkaConsumerHook - KafkaAdminClientHook
from airflow.providers.apache.kafka.hooks.admin_client import KafkaAdminClientHook - AwaitMessageTrigger
from airflow.providers.apache.kafka.triggers.await_message import AwaitMessageTrigger
Quickstart
from __future__ import annotations
import pendulum
from airflow.models.dag import DAG
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
with DAG(
dag_id="kafka_producer_consumer_example",
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False,
schedule=None,
tags=["kafka", "example"],
) as dag:
# Define a Kafka connection in Airflow UI named 'kafka_default'
# e.g., Host: broker:9092, Port: <empty>, Extras: {"bootstrap.servers": "broker:9092"}
produce_messages = ProduceToTopicOperator(
task_id="produce_test_messages",
topic="test_topic",
kafka_conn_id="kafka_default",
producer_config={'bootstrap.servers': 'broker:9092'},
messages=[{'key': f'key-{i}', 'value': f'value-{i}'} for i in range(5)],
)
consume_messages = ConsumeFromTopicOperator(
task_id="consume_test_messages",
topic="test_topic",
kafka_conn_id="kafka_default",
consumer_config={'bootstrap.servers': 'broker:9092', 'group.id': 'airflow-consumer-group', 'auto.offset.reset': 'earliest'},
apply_function='lambda msg: print(f"Consumed: {msg.key}, {msg.value}")', # Simple print function
commit_messages_every_n_messages=1, # Commit after each message for demonstration
max_messages=5, # Consume up to 5 messages
poll_timeout=10, # Max time to wait for messages
)
produce_messages >> consume_messages