Airflow dbt Python
airflow-dbt-python is a Python library providing Airflow operators, hooks, and utilities to execute dbt commands. Unlike solutions wrapping the dbt CLI, it directly interfaces with dbt-core, enabling features like using Airflow connections as dbt targets and pushing dbt artifacts to XCom. The library is currently at version 3.5.0 and actively maintained, with a focus on supporting recent versions of Airflow and dbt.
Warnings
- breaking With dbt-core v1.0.0 and later, the way dbt is installed changed significantly. Instead of `pip install dbt`, you now install `dbt-core` and then specific database adapters (e.g., `dbt-redshift`, `dbt-snowflake`). This also impacted how `DbtTestOperator` handled test types.
- gotcha In multi-machine or cloud Airflow installations (e.g., AWS MWAA, GCP Cloud Composer), workers may not have a shared local filesystem. Storing dbt project files directly on the worker is unreliable. `airflow-dbt-python` requires dbt project files to be accessible.
- breaking New versions of Apache Airflow and dbt-core may introduce breaking changes. The `airflow-dbt-python` library aims to keep up with the latest releases, but compatibility issues can arise.
- gotcha When omitting `profiles_dir` in operators, `airflow-dbt-python` will first check if the `project_dir` URL includes a `profiles.yml`. If not found, it will attempt to find an Airflow Connection using the `target` argument.
Install
-
pip install airflow-dbt-python -
pip install airflow-dbt-python[redshift] -
pip install airflow-dbt-python[snowflake]
Imports
- DbtRunOperator
from airflow_dbt_python.operators.dbt import DbtRunOperator
- DbtSeedOperator
from airflow_dbt_python.operators.dbt import DbtSeedOperator
- DbtTestOperator
from airflow_dbt_python.operators.dbt import DbtTestOperator
- DbtDocsGenerateOperator
from airflow_dbt_python.operators.dbt import DbtDocsGenerateOperator
Quickstart
import datetime as dt
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow_dbt_python.operators.dbt import (
DbtRunOperator,
DbtSeedOperator,
DbtTestOperator,
)
default_args = {
"owner": "airflow",
"start_date": days_ago(1),
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
}
with DAG(
dag_id="example_dbt_workflow",
schedule_interval="0 0 * * *",
catchup=False,
dagrun_timeout=dt.timedelta(minutes=60),
default_args=default_args,
tags=["dbt", "example"],
) as dag:
dbt_seed = DbtSeedOperator(
task_id="dbt_seed_task",
project_dir="/path/to/my/dbt/project/",
profiles_dir="~/.dbt/",
target="production",
profile="my-project",
)
dbt_run = DbtRunOperator(
task_id="dbt_run_task",
project_dir="/path/to/my/dbt/project/",
profiles_dir="~/.dbt/",
target="production",
profile="my-project",
select=["+tag:daily"],
exclude=["tag:deprecated"],
full_refresh=False,
)
dbt_test = DbtTestOperator(
task_id="dbt_test_task",
project_dir="/path/to/my/dbt/project/",
profiles_dir="~/.dbt/",
target="production",
profile="my-project",
singular=True, # For dbt-core v1.0.0+ tests
)
dbt_seed >> dbt_run >> dbt_test