Apache Airflow Providers Common Messaging

raw JSON →
2.0.3 verified Mon Apr 27 auth: no python

This package provides common messaging utilities for Apache Airflow, including components for building event-driven workflows and integrating with messaging systems like Kafka. Version 2.0.3 supports Python >=3.10. Release cadence is periodic, aligned with Airflow provider releases.

pip install apache-airflow-providers-common-messaging
error ModuleNotFoundError: No module named 'airflow.providers.common.messaging'
cause The package is not installed or the import path is incorrect because the package was renamed.
fix
Install the correct package: pip install apache-airflow-providers-common-messaging. For older Airflow versions (<2.10.0), use apache-airflow-providers-messaging.
error AttributeError: module 'airflow.providers.common.messaging' has no attribute 'scheduler'
cause The import path is wrong; the scheduler module is in `airflow.providers.common.messaging.scheduler`.
fix
Use from airflow.providers.common.messaging.scheduler import EventDrivenScheduler.
breaking Package renamed from `apache-airflow-providers-messaging` (previously in Airflow 2.9.x) to `apache-airflow-providers-common-messaging` in Airflow 2.10.0.
fix Update your install command to `pip install apache-airflow-providers-common-messaging` and adjust imports to use `airflow.providers.common.messaging` instead of `airflow.providers.messaging`.
deprecated The `EventDrivenScheduler` class may be deprecated in future versions in favor of the new `DAGSchedule` with event-driven triggers.
fix Monitor Airflow changelog. For now, use `EventDrivenScheduler` as documented.
gotcha The trigger type must match a registered trigger class; common mistakes: using 'kafka' instead of 'apache_kafka' for the trigger.
fix Check the list of available triggers in the provider documentation. For Kafka, use trigger='kafka' or provide the full module path.

Creates an event-driven DAG that listens to a Kafka topic and processes events.

import os
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.common.messaging.scheduler import EventDrivenScheduler
from datetime import datetime

# Define a simple event-driven DAG
with DAG(
    dag_id='event_driven_dag',
    start_date=datetime(2024, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    scheduler = EventDrivenScheduler(
        task_id='listen_for_event',
        trigger='kafka',
        broker=os.environ.get('KAFKA_BROKER', 'localhost:9092'),
        topic='example_topic',
    )
    def process_event(event):
        print(f"Received event: {event}")
    process = PythonOperator(
        task_id='process_event',
        python_callable=process_event,
        op_args=[scheduler.output],
    )
    scheduler >> process