Apache Airflow Apache Kafka Provider

1.13.1 · active · verified Sat Apr 11

This is a provider package for Apache Airflow that enables interaction with Apache Kafka clusters. It provides hooks, operators, and triggers to read from topics, write to topics, and await specific messages in Kafka topics, integrating these operations seamlessly into Airflow DAGs. The current version is 1.13.1, and provider packages typically follow a roughly 2-3 month minor release cadence, with patch releases issued as needed.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates a simple Airflow DAG that uses the Kafka provider to produce and then consume messages from a Kafka topic. Before running, ensure you have a Kafka connection configured in the Airflow UI (Admin -> Connections) named `kafka_default`. The 'Extras' field should contain `{"bootstrap.servers": "broker:9092"}` or the appropriate Kafka broker address. The example assumes a local Kafka instance accessible via `broker:9092`.

from __future__ import annotations

import pendulum
from airflow.models.dag import DAG
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator

with DAG(
    dag_id="kafka_producer_consumer_example",
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=False,
    schedule=None,
    tags=["kafka", "example"],
) as dag:
    # Define a Kafka connection in Airflow UI named 'kafka_default'
    # e.g., Host: broker:9092, Port: <empty>, Extras: {"bootstrap.servers": "broker:9092"}

    produce_messages = ProduceToTopicOperator(
        task_id="produce_test_messages",
        topic="test_topic",
        kafka_conn_id="kafka_default",
        producer_config={'bootstrap.servers': 'broker:9092'},
        messages=[{'key': f'key-{i}', 'value': f'value-{i}'} for i in range(5)],
    )

    consume_messages = ConsumeFromTopicOperator(
        task_id="consume_test_messages",
        topic="test_topic",
        kafka_conn_id="kafka_default",
        consumer_config={'bootstrap.servers': 'broker:9092', 'group.id': 'airflow-consumer-group', 'auto.offset.reset': 'earliest'},
        apply_function='lambda msg: print(f"Consumed: {msg.key}, {msg.value}")', # Simple print function
        commit_messages_every_n_messages=1, # Commit after each message for demonstration
        max_messages=5, # Consume up to 5 messages
        poll_timeout=10, # Max time to wait for messages
    )

    produce_messages >> consume_messages

view raw JSON →