Apache Airflow
Apache Airflow is an open-source platform used to programmatically author, schedule, and monitor workflows, particularly for data pipelines. It defines workflows as Directed Acyclic Graphs (DAGs) in Python, enabling dynamic, scalable, and extensible orchestration. The current stable version is 3.1.8, with releases occurring regularly to introduce new features, improvements, and bug fixes.
Warnings
- breaking Direct metadata database access from task code is restricted in Airflow 3. Tasks can no longer directly import and use Airflow database sessions or models. All runtime interactions (state transitions, heartbeats, XComs, resource fetching) must now use the dedicated Task Execution API or the official Python API Client.
- breaking SubDAGs have been removed in Airflow 3. They are replaced by TaskGroups, Assets, and Data Aware Scheduling.
- breaking The Sequential Executor has been removed in Airflow 3. It is replaced by the LocalExecutor, which can still be used with SQLite for local development.
- deprecated SLAs (Service Level Agreements) are deprecated and have been removed in Airflow 3. They will be replaced by forthcoming Deadline Alerts.
- gotcha Avoid using relative imports in DAG files (e.g., `from . import my_module`). The same DAG file might be parsed in different contexts (scheduler, workers, tests), leading to inconsistent behavior.
- gotcha Do not use Airflow Variables or Connections at the top level of DAG files (i.e., outside of task `execute()` methods or Jinja templates). This can cause slow DAG parsing and unexpected behavior, as the values are fetched every time the DAG file is parsed.
Install
-
pip install "apache-airflow[celery,cncf.kubernetes,http,postgres,amazon]"==3.1.8 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.1.8/constraints-3.10.txt" -
pip install apache-airflow==3.1.8 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.1.8/constraints-3.10.txt"
Imports
- DAG
from airflow.models.dag import DAG
- BashOperator
from airflow.operators.bash import BashOperator
- PythonOperator
from airflow.operators.python import PythonOperator
- TaskGroup
from airflow.utils.task_group import TaskGroup
- Provider specific operators (e.g., S3Operator)
from airflow.providers.amazon.operators.s3 import S3Operator
- XComArg
from airflow.models.xcom_arg import XComArg
Quickstart
import os
from datetime import datetime
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
# Set AIRFLOW_HOME if not already set (e.g., in a local dev setup)
# os.environ['AIRFLOW_HOME'] = os.environ.get('AIRFLOW_HOME', '~/airflow')
def _greet(name):
print(f"Hello, {name} from a Python task!")
with DAG(
dag_id='simple_airflow_quickstart',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily',
catchup=False,
tags=['quickstart'],
) as dag:
start_task = BashOperator(
task_id='start_workflow',
bash_command='echo "Starting the workflow!"',
)
greet_task = PythonOperator(
task_id='greet_with_python',
python_callable=_greet,
op_kwargs={'name': 'Airflow User'},
)
end_task = BashOperator(
task_id='end_workflow',
bash_command='echo "Workflow finished!"',
)
start_task >> greet_task >> end_task