Airflow dbt Integration (GoCardless)
airflow-dbt is a Python package that provides Apache Airflow operators for integrating with dbt (data build tool). It allows users to orchestrate dbt commands like `seed`, `snapshot`, `run`, and `test` within Airflow DAGs by wrapping the dbt CLI. This package, currently at version 0.4.0, offers a foundational way to embed dbt transformations into Airflow workflows, with its last update in September 2021.
Warnings
- gotcha This package relies on wrapping the dbt CLI. This means the dbt executable must be installed and available on the Airflow worker's PATH or explicitly set via the `dbt_bin` argument. This can be a common point of failure, especially in managed Airflow environments like AWS MWAA or GCP Cloud Composer, where managing CLI tools requires specific configurations.
- gotcha The operators may not expose the full range of arguments available in the dbt CLI commands. For example, `DbtRunOperator` might not have an explicit `fail_fast` attribute, limiting granular control over dbt execution parameters directly from Airflow.
- gotcha This package does not offer direct access to dbt artifacts (e.g., `manifest.json`, `run_results.json`) generated during execution. This limitation prevents more advanced use cases such as dynamic DAG generation based on dbt's lineage or pushing artifacts to Airflow XComs for downstream processing.
- deprecated The package has not received updates since September 2021 (v0.4.0), making it potentially incompatible with newer versions of Apache Airflow (e.g., Airflow 2.10+) or dbt-core (e.g., dbt-core 1.8+), which may introduce breaking changes or new features not supported by this older integration. For instance, `dbt-common`'s `isodate` constraint can conflict with Airflow 2.10.3+.
Install
-
pip install airflow-dbt
Imports
- DbtSeedOperator
from airflow_dbt.operators.dbt_operator import DbtSeedOperator
- DbtSnapshotOperator
from airflow_dbt.operators.dbt_operator import DbtSnapshotOperator
- DbtRunOperator
from airflow_dbt.operators.dbt_operator import DbtRunOperator
- DbtTestOperator
from airflow_dbt.operators.dbt_operator import DbtTestOperator
- DbtDocsGenerateOperator
from airflow_dbt.operators.dbt_operator import DbtDocsGenerateOperator
Quickstart
from airflow import DAG
from airflow_dbt.operators.dbt_operator import (
DbtSeedOperator,
DbtSnapshotOperator,
DbtRunOperator,
DbtTestOperator
)
from airflow.utils.dates import days_ago
import os
default_args = {
'dir': os.environ.get('DBT_PROJECT_DIR', '/path/to/your/dbt/project'),
'start_date': days_ago(0)
}
with DAG(
dag_id='dbt_example_dag',
default_args=default_args,
schedule_interval='@daily',
tags=['dbt', 'example']
) as dag:
dbt_seed = DbtSeedOperator(
task_id='dbt_seed',
profiles_dir=os.environ.get('DBT_PROFILES_DIR', '/path/to/your/.dbt') # Optional
)
dbt_snapshot = DbtSnapshotOperator(
task_id='dbt_snapshot'
)
dbt_run = DbtRunOperator(
task_id='dbt_run'
)
dbt_test = DbtTestOperator(
task_id='dbt_test',
retries=0 # Failing tests should fail the task, not retry
)
dbt_seed >> dbt_snapshot >> dbt_run >> dbt_test