Monte Carlo Data Airflow Provider

raw JSON →
0.3.11 verified Thu Apr 16 auth: no python

Monte Carlo's Apache Airflow Provider integrates Monte Carlo with Airflow to enable data observability features like incident alerts, lineage visibility, and pipeline control via Circuit Breakers. It is compatible with Apache Airflow 1.10.14 or greater and requires Python 3.7 or greater. The library is actively maintained with regular updates.

pip install airflow-mcd
error DAG runs remain in a “running” state indefinitely or no alerts are triggered when a DAG fails in Monte Carlo.
cause This typically indicates that DAG-level callbacks are not properly configured, or are missing, even if task-level callbacks are present.
fix
Ensure that mcd_callbacks.dag_callbacks (or explicit DAG-level success/failure callbacks) are included in your DAG definition, in addition to any task-level callbacks.
error Airflow connection tests fail or the 'Monte Carlo Data Gateway' connection type is not available in Apache Airflow Managed Workflows for Apache Airflow (MWAA).
cause In some MWAA cases, PyPI packages are not installed in the web server, making custom connection types unavailable.
fix
As a workaround, use the generic HTTP connection type. Set https://integrations.getmontecarlo.com as the 'Host' and configure your Monte Carlo API ID as 'Login' and API token as 'Password'. Note that HTTP connections cannot be tested within the Airflow UI.
error Connection to Monte Carlo fails after upgrading Apache Airflow to 2.9.0 or higher.
cause Older versions of `airflow-mcd` (prior to 0.3.3) have known compatibility issues with Airflow 2.9.0+.
fix
Upgrade the airflow-mcd package to version 0.3.3 or later: pip install -U airflow-mcd.
breaking When upgrading to Airflow 2.9.0 or later, ensure `airflow-mcd` is updated to version 0.3.3 or higher. Older versions of `airflow-mcd` are not compatible with Airflow 2.9.0+ and may cause connection failures.
fix Upgrade `airflow-mcd` to 0.3.3 or later: `pip install -U airflow-mcd`.
gotcha If task-level callbacks are configured, DAG-level callbacks (e.g., `mcd_callbacks.dag_callbacks`) must also be defined. Without DAG callbacks, Monte Carlo cannot update the DAG run status, leading to DAGs appearing perpetually 'Running' and preventing failure alerts.
fix Always include `**mcd_callbacks.dag_callbacks` (or explicit DAG callbacks) in your DAG definition if you are using any task-level callbacks.
gotcha The `Monte Carlo Data Gateway` connection type in Airflow does not support Circuit Breakers operations. For Circuit Breaker functionality, you must use the `Monte Carlo Data` connection type configured with an API Key ID (login) and API Secret (password).
fix Create an Airflow connection of type `Monte Carlo Data` for Circuit Breakers, ensuring the API Key ID is in the 'Login' field and the API Secret in the 'Password' field.
breaking For Airflow 3.0 compatibility, `airflow-mcd` package version 0.3.10 and later is required. Earlier versions of `airflow-mcd` may not function correctly with Airflow 3.0 onwards.
fix Upgrade `airflow-mcd` to 0.3.10 or later: `pip install -U airflow-mcd`.
pip install "apache-airflow>=1.10.14" airflow-mcd

This quickstart demonstrates how to integrate Monte Carlo callbacks into an Apache Airflow DAG. The `mcd_callbacks.dag_callbacks` and `mcd_callbacks.task_callbacks` are used to automatically send webhooks to Monte Carlo upon various DAG and task events (success, failure, etc.), providing observability. For full functionality, Monte Carlo API credentials must be configured as an Airflow connection.

from __future__ import annotations

import pendulum

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator

from airflow_mcd.callbacks import mcd_callbacks

with DAG(
    dag_id="monte_carlo_example_dag",
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=False,
    schedule=None,
    tags=["monte_carlo", "example"],
    **mcd_callbacks.dag_callbacks, # Apply broad DAG-level callbacks
) as dag:
    start_task = BashOperator(
        task_id="start_task",
        bash_command="echo 'Starting DAG'",
        **mcd_callbacks.task_callbacks, # Apply broad Task-level callbacks
    )

    process_data = BashOperator(
        task_id="process_data",
        bash_command="echo 'Processing data...'; sleep 5",
        # You can override specific callbacks if needed:
        # on_failure_callback=mcd_callbacks.mcd_task_failure_callback,
        **mcd_callbacks.task_callbacks,
    )

    end_task = BashOperator(
        task_id="end_task",
        bash_command="echo 'DAG finished'",
        **mcd_callbacks.task_callbacks,
    )

    start_task >> process_data >> end_task