DAG Factory
dag-factory is an open-source Python library that dynamically generates Apache Airflow DAGs from YAML configuration files. It enables users to define complex data pipelines using a declarative syntax, reducing the need for extensive Python knowledge and promoting consistency across many DAGs. The library is actively maintained by Astronomer, with the current stable version being 1.0.1, and receives regular updates and feature enhancements.
Warnings
- breaking Airflow providers (`apache-airflow-providers-http`, `apache-airflow-providers-cncf-kubernetes`) are no longer automatically installed. They are now optional dependencies.
- breaking The `DagFactory` class is now considered private (`_DagFactory`), and its direct import path (`from dagfactory import DagFactory`) has been removed.
- breaking The `schedule_interval` parameter in YAML DAG configurations is no longer supported.
- breaking The `clean_dags()` method has been removed. DAG cleanup is now handled directly by Airflow's configuration (`AIRFLOW__DAG_PROCESSOR__REFRESH_INTERVAL`).
- breaking Support for older Airflow and Python versions has been dropped.
- breaking Several inconsistent YAML parameters (e.g., `dagrun_timeout_sec`, `retry_delay_sec`, `sla_secs`, `execution_delta_secs`, `execution_timeout_secs`) have been removed.
- gotcha The `sla_miss_callback` parameter is removed from `dag_kwargs` for Airflow versions >= 3.1.0.
Install
-
pip install dag-factory -
pip install dag-factory[all] -
pip install dag-factory[kubernetes]
Imports
- load_yaml_dags
from dagfactory import load_yaml_dags
Quickstart
import os
from dagfactory import load_yaml_dags
# Define a simple YAML configuration for a DAG
# In a real Airflow setup, this would be in a .yml file in your DAGs folder
# e.g., dags/my_dag.yml
yaml_config_content = '''
my_example_dag:
default_args:
owner: 'airflow'
start_date: '2023-01-01'
retries: 1
schedule: '@daily'
description: 'A simple example DAG from YAML'
tasks:
start_task:
operator: airflow.operators.bash.BashOperator
bash_command: 'echo "Starting DAG!"'
end_task:
operator: airflow.operators.bash.BashOperator
bash_command: 'echo "DAG finished."'
dependencies: [start_task]
'''
# For demonstration, we'll write the YAML to a temporary file.
# In a real Airflow environment, this file would be picked up by the scheduler.
dags_folder = os.environ.get('AIRFLOW_HOME', './dags')
os.makedirs(dags_folder, exist_ok=True)
config_filepath = os.path.join(dags_folder, 'my_dag.yml')
with open(config_filepath, 'w') as f:
f.write(yaml_config_content)
# Load DAGs from the YAML file(s) into Airflow's DAG Bag.
# This Python file (e.g., dags/dag_generator.py) will be parsed by Airflow.
# All YAML files in the dags_folder (or specified path) will be processed.
load_yaml_dags(globals_dict=globals(), config_filepath=config_filepath)