Airflow dbt Python

3.5.0 · active · verified Tue Apr 14

airflow-dbt-python is a Python library providing Airflow operators, hooks, and utilities to execute dbt commands. Unlike solutions wrapping the dbt CLI, it directly interfaces with dbt-core, enabling features like using Airflow connections as dbt targets and pushing dbt artifacts to XCom. The library is currently at version 3.5.0 and actively maintained, with a focus on supporting recent versions of Airflow and dbt.

Warnings

Install

Imports

Quickstart

This example DAG demonstrates a basic dbt workflow using airflow-dbt-python operators. It includes seeding data, running dbt models with specific tags, and executing tests. Replace `/path/to/my/dbt/project/` and `~/.dbt/` with your actual dbt project and profiles directories, or configure remote storage as needed for multi-machine/cloud environments. Ensure your Airflow connections for dbt targets are configured if not using `profiles.yml`.

import datetime as dt

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow_dbt_python.operators.dbt import (
    DbtRunOperator,
    DbtSeedOperator,
    DbtTestOperator,
)

default_args = {
    "owner": "airflow",
    "start_date": days_ago(1),
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
}

with DAG(
    dag_id="example_dbt_workflow",
    schedule_interval="0 0 * * *",
    catchup=False,
    dagrun_timeout=dt.timedelta(minutes=60),
    default_args=default_args,
    tags=["dbt", "example"],
) as dag:
    dbt_seed = DbtSeedOperator(
        task_id="dbt_seed_task",
        project_dir="/path/to/my/dbt/project/",
        profiles_dir="~/.dbt/",
        target="production",
        profile="my-project",
    )

    dbt_run = DbtRunOperator(
        task_id="dbt_run_task",
        project_dir="/path/to/my/dbt/project/",
        profiles_dir="~/.dbt/",
        target="production",
        profile="my-project",
        select=["+tag:daily"],
        exclude=["tag:deprecated"],
        full_refresh=False,
    )

    dbt_test = DbtTestOperator(
        task_id="dbt_test_task",
        project_dir="/path/to/my/dbt/project/",
        profiles_dir="~/.dbt/",
        target="production",
        profile="my-project",
        singular=True, # For dbt-core v1.0.0+ tests
    )

    dbt_seed >> dbt_run >> dbt_test

view raw JSON →