Apache Airflow Apache Kafka Provider
This is a provider package for Apache Airflow that enables interaction with Apache Kafka clusters. It provides hooks, operators, and triggers to read from topics, write to topics, and await specific messages in Kafka topics, integrating these operations seamlessly into Airflow DAGs. The current version is 1.13.1, and provider packages typically follow a roughly 2-3 month minor release cadence, with patch releases issued as needed.
Warnings
- breaking The minimum supported Apache Airflow version for the `apache-airflow-providers-apache-kafka` provider is `2.11.0` for provider version `1.13.1`. Earlier provider versions (e.g., 1.7.0, 1.9.0, 1.11.0) required older minimum Airflow versions. Upgrading the provider might necessitate an upgrade of your Airflow core installation.
- gotcha The provider relies on the `confluent-kafka` Python package. Issues, especially related to the underlying `librdkafka` C library (e.g., when Kerberos authentication is involved), can cause connection failures.
- gotcha Incorrect or incomplete Kafka connection configurations in the Airflow UI (or `kafka_conn_id` in operators) are a common source of errors, leading to connection failures, missing messages, or unexpected behavior. Key parameters like `bootstrap.servers` and `group.id` (for consumers) are critical.
- deprecated The original `astronomer/airflow-provider-kafka` GitHub repository and its associated PyPI package have been discontinued since March 2023. Users should migrate to the official `apache-airflow-providers-apache-kafka` package.
- gotcha Apache Airflow is designed for workflow orchestration and batch processing, not for real-time streaming or low-latency operations. While the Kafka provider allows interaction with Kafka, using Airflow itself for managing continuous streaming processes is an anti-pattern and can lead to performance and operational issues.
Install
-
pip install apache-airflow-providers-apache-kafka
Imports
- ProduceToTopicOperator
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
- ConsumeFromTopicOperator
from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
- AwaitKafkaMessageOperator
from airflow.providers.apache.kafka.operators.await_message import AwaitKafkaMessageOperator
- KafkaProducerHook
from airflow.providers.apache.kafka.hooks.producer import KafkaProducerHook
- KafkaConsumerHook
from airflow.providers.apache.kafka.hooks.consumer import KafkaConsumerHook
- KafkaAdminClientHook
from airflow.providers.apache.kafka.hooks.admin_client import KafkaAdminClientHook
- AwaitMessageTrigger
from airflow.providers.apache.kafka.triggers.await_message import AwaitMessageTrigger
Quickstart
from __future__ import annotations
import pendulum
from airflow.models.dag import DAG
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
with DAG(
dag_id="kafka_producer_consumer_example",
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False,
schedule=None,
tags=["kafka", "example"],
) as dag:
# Define a Kafka connection in Airflow UI named 'kafka_default'
# e.g., Host: broker:9092, Port: <empty>, Extras: {"bootstrap.servers": "broker:9092"}
produce_messages = ProduceToTopicOperator(
task_id="produce_test_messages",
topic="test_topic",
kafka_conn_id="kafka_default",
producer_config={'bootstrap.servers': 'broker:9092'},
messages=[{'key': f'key-{i}', 'value': f'value-{i}'} for i in range(5)],
)
consume_messages = ConsumeFromTopicOperator(
task_id="consume_test_messages",
topic="test_topic",
kafka_conn_id="kafka_default",
consumer_config={'bootstrap.servers': 'broker:9092', 'group.id': 'airflow-consumer-group', 'auto.offset.reset': 'earliest'},
apply_function='lambda msg: print(f"Consumed: {msg.key}, {msg.value}")', # Simple print function
commit_messages_every_n_messages=1, # Commit after each message for demonstration
max_messages=5, # Consume up to 5 messages
poll_timeout=10, # Max time to wait for messages
)
produce_messages >> consume_messages