Monte Carlo Data Airflow Provider
Monte Carlo's Apache Airflow Provider integrates Monte Carlo with Airflow to enable data observability features like incident alerts, lineage visibility, and pipeline control via Circuit Breakers. It is compatible with Apache Airflow 1.10.14 or greater and requires Python 3.7 or greater. The library is actively maintained with regular updates.
Common errors
-
DAG runs remain in a “running” state indefinitely or no alerts are triggered when a DAG fails in Monte Carlo.
cause This typically indicates that DAG-level callbacks are not properly configured, or are missing, even if task-level callbacks are present.fixEnsure that `mcd_callbacks.dag_callbacks` (or explicit DAG-level success/failure callbacks) are included in your DAG definition, in addition to any task-level callbacks. -
Airflow connection tests fail or the 'Monte Carlo Data Gateway' connection type is not available in Apache Airflow Managed Workflows for Apache Airflow (MWAA).
cause In some MWAA cases, PyPI packages are not installed in the web server, making custom connection types unavailable.fixAs a workaround, use the generic HTTP connection type. Set `https://integrations.getmontecarlo.com` as the 'Host' and configure your Monte Carlo API ID as 'Login' and API token as 'Password'. Note that HTTP connections cannot be tested within the Airflow UI. -
Connection to Monte Carlo fails after upgrading Apache Airflow to 2.9.0 or higher.
cause Older versions of `airflow-mcd` (prior to 0.3.3) have known compatibility issues with Airflow 2.9.0+.fixUpgrade the `airflow-mcd` package to version 0.3.3 or later: `pip install -U airflow-mcd`.
Warnings
- breaking When upgrading to Airflow 2.9.0 or later, ensure `airflow-mcd` is updated to version 0.3.3 or higher. Older versions of `airflow-mcd` are not compatible with Airflow 2.9.0+ and may cause connection failures.
- gotcha If task-level callbacks are configured, DAG-level callbacks (e.g., `mcd_callbacks.dag_callbacks`) must also be defined. Without DAG callbacks, Monte Carlo cannot update the DAG run status, leading to DAGs appearing perpetually 'Running' and preventing failure alerts.
- gotcha The `Monte Carlo Data Gateway` connection type in Airflow does not support Circuit Breakers operations. For Circuit Breaker functionality, you must use the `Monte Carlo Data` connection type configured with an API Key ID (login) and API Secret (password).
- breaking For Airflow 3.0 compatibility, `airflow-mcd` package version 0.3.10 and later is required. Earlier versions of `airflow-mcd` may not function correctly with Airflow 3.0 onwards.
Install
-
pip install airflow-mcd -
pip install "apache-airflow>=1.10.14" airflow-mcd
Imports
- mcd_callbacks
from airflow_mcd.callbacks import mcd_callbacks
- DbtRunOperator
from airflow_mcd.operators.dbt import DbtRunOperator
- SimpleCircuitBreakerOperator
from airflow_mcd.operators.mcd import SimpleCircuitBreakerOperator
Quickstart
from __future__ import annotations
import pendulum
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow_mcd.callbacks import mcd_callbacks
with DAG(
dag_id="monte_carlo_example_dag",
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False,
schedule=None,
tags=["monte_carlo", "example"],
**mcd_callbacks.dag_callbacks, # Apply broad DAG-level callbacks
) as dag:
start_task = BashOperator(
task_id="start_task",
bash_command="echo 'Starting DAG'",
**mcd_callbacks.task_callbacks, # Apply broad Task-level callbacks
)
process_data = BashOperator(
task_id="process_data",
bash_command="echo 'Processing data...'; sleep 5",
# You can override specific callbacks if needed:
# on_failure_callback=mcd_callbacks.mcd_task_failure_callback,
**mcd_callbacks.task_callbacks,
)
end_task = BashOperator(
task_id="end_task",
bash_command="echo 'DAG finished'",
**mcd_callbacks.task_callbacks,
)
start_task >> process_data >> end_task