Apache Airflow
Apache Airflow is an open-source platform used to programmatically author, schedule, and monitor workflows, particularly for data pipelines. It defines workflows as Directed Acyclic Graphs (DAGs) in Python, enabling dynamic, scalable, and extensible orchestration. The current stable version is 3.1.8, with releases occurring regularly to introduce new features, improvements, and bug fixes.
Common errors
-
ModuleNotFoundError: No module named 'airflow.providers.microsoft.mssql.operators'
cause This error occurs when the 'apache-airflow-providers-microsoft-mssql' package is not installed, leading to missing modules required for Microsoft SQL Server operations.fixInstall the missing provider package using the command: 'pip install apache-airflow-providers-microsoft-mssql'. -
pymssql._mssql.MSSQLDatabaseException: (20009, b'DB-Lib error message 20009, severity 9:\nUnable to connect: Adaptive Server is unavailable or does not exist (servername)\n')
cause This error indicates that the specified SQL Server is unreachable, possibly due to incorrect server details or network issues.fixVerify the server name, ensure the server is running, and check network connectivity. If using Airflow's Connections UI, include the full server name with domain under the host attribute. -
sqlalchemy.exc.ProgrammingError: (pyodbc.ProgrammingError) ('42000', "[42000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Incorrect syntax near '1'. (102) (SQLExecDirectW)")cause This error arises from SQL syntax issues, often due to compatibility problems between Airflow's ORM and the SQL Server version.fixEnsure that the SQL statements are compatible with your SQL Server version and consider updating the ODBC driver to the latest version. -
pymssql.exceptions.OperationalError: (20002, b'DB-Lib error message 20002, severity 9:\nAdaptive Server connection failed')
cause This error suggests a failure in establishing a connection to the SQL Server, possibly due to incorrect connection parameters or server unavailability.fixDouble-check the connection parameters, including server address, port, username, and password. Ensure the SQL Server is accessible and running. -
ModuleNotFoundError: No module named 'airflow.operators.mssql_operator'
cause This error occurs when attempting to import 'MsSqlOperator' from an incorrect module path due to changes in Airflow's module structure.fixUpdate the import statement to: 'from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator'.
Warnings
- breaking Direct metadata database access from task code is restricted in Airflow 3. Tasks can no longer directly import and use Airflow database sessions or models. All runtime interactions (state transitions, heartbeats, XComs, resource fetching) must now use the dedicated Task Execution API or the official Python API Client.
- breaking SubDAGs have been removed in Airflow 3. They are replaced by TaskGroups, Assets, and Data Aware Scheduling.
- breaking The Sequential Executor has been removed in Airflow 3. It is replaced by the LocalExecutor, which can still be used with SQLite for local development.
- deprecated SLAs (Service Level Agreements) are deprecated and have been removed in Airflow 3. They will be replaced by forthcoming Deadline Alerts.
- gotcha Avoid using relative imports in DAG files (e.g., `from . import my_module`). The same DAG file might be parsed in different contexts (scheduler, workers, tests), leading to inconsistent behavior.
- gotcha Do not use Airflow Variables or Connections at the top level of DAG files (i.e., outside of task `execute()` methods or Jinja templates). This can cause slow DAG parsing and unexpected behavior, as the values are fetched every time the DAG file is parsed.
- breaking Apache Airflow 3.x requires Python 3.10 or newer. Attempting to install Airflow 3.x on Python 3.9 or older will result in a Python version incompatibility error during package resolution.
- breaking Installing Apache Airflow 3.x on Alpine-based Python images (e.g., `python:3.13-alpine`) fails due to missing C/C++ build tools required by dependencies like `grpcio`. Minimal Alpine images do not include these development packages by default.
Install
-
pip install "apache-airflow[celery,cncf.kubernetes,http,postgres,amazon]"==3.1.8 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.1.8/constraints-3.10.txt" -
pip install apache-airflow==3.1.8 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.1.8/constraints-3.10.txt"
Imports
- DAG
from airflow.models.dag import DAG
- BashOperator
from airflow.operators.bash import BashOperator
- PythonOperator
from airflow.operators.python import PythonOperator
- TaskGroup
from airflow.utils.task_group import TaskGroup
- Provider specific operators (e.g., S3Operator)
from airflow.operators.s3_operator import S3Operator
from airflow.providers.amazon.operators.s3 import S3Operator
- XComArg
from airflow.utils.task_group import XComArg
from airflow.models.xcom_arg import XComArg
Quickstart
import os
from datetime import datetime
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
# Set AIRFLOW_HOME if not already set (e.g., in a local dev setup)
# os.environ['AIRFLOW_HOME'] = os.environ.get('AIRFLOW_HOME', '~/airflow')
def _greet(name):
print(f"Hello, {name} from a Python task!")
with DAG(
dag_id='simple_airflow_quickstart',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily',
catchup=False,
tags=['quickstart'],
) as dag:
start_task = BashOperator(
task_id='start_workflow',
bash_command='echo "Starting the workflow!"',
)
greet_task = PythonOperator(
task_id='greet_with_python',
python_callable=_greet,
op_kwargs={'name': 'Airflow User'},
)
end_task = BashOperator(
task_id='end_workflow',
bash_command='echo "Workflow finished!"',
)
start_task >> greet_task >> end_task