Apache Airflow dbt Cloud Provider

4.8.0 · active · verified Sat Apr 11

This provider package allows Apache Airflow to interact with dbt Cloud, enabling orchestration of dbt Cloud jobs and fetching job run details. It includes operators, sensors, and hooks for various dbt Cloud functionalities. The current version is 4.8.0. Airflow provider packages typically follow a regular release cadence, often aligned with Airflow's own releases or as new features/bug fixes are introduced.

Warnings

Install

Imports

Quickstart

This example DAG demonstrates how to trigger a dbt Cloud job and then wait for its completion using the `DbtCloudRunJobOperator` and `DbtCloudJobRunSensor`. It uses deferrable operators for efficient async execution. Ensure you configure an Airflow connection of type 'dbt Cloud' named `dbt_cloud_default` (or your chosen `DBT_CLOUD_CONN_ID`) with your dbt Cloud API Token. Also, provide your `account_id` and `job_id`.

from __future__ import annotations

import os
import pendulum

from airflow.models.dag import DAG
from airflow.providers.dbt_cloud.operators.dbt_cloud import DbtCloudRunJobOperator
from airflow.providers.dbt_cloud.sensors.dbt_cloud import DbtCloudJobRunSensor


DBT_CLOUD_CONN_ID = os.environ.get('DBT_CLOUD_CONN_ID', 'dbt_cloud_default')
DBT_CLOUD_ACCOUNT_ID = os.environ.get('DBT_CLOUD_ACCOUNT_ID', '12345') # Your dbt Cloud Account ID
DBT_CLOUD_JOB_ID = os.environ.get('DBT_CLOUD_JOB_ID', '67890') # Your dbt Cloud Job ID

with DAG(
    dag_id="dbt_cloud_example_dag",
    schedule=None,
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=False,
    tags=["dbt_cloud", "example"],
) as dag:
    trigger_dbt_cloud_job = DbtCloudRunJobOperator(
        task_id="trigger_dbt_cloud_job",
        dbt_cloud_conn_id=DBT_CLOUD_CONN_ID,
        account_id=DBT_CLOUD_ACCOUNT_ID,
        job_id=DBT_CLOUD_JOB_ID,
        check_interval=10, # Check job status every 10 seconds
        timeout=60 * 20, # Fail after 20 minutes
        deferrable=True # Enable deferrable mode for async execution
    )

    wait_for_dbt_cloud_job = DbtCloudJobRunSensor(
        task_id="wait_for_dbt_cloud_job",
        dbt_cloud_conn_id=DBT_CLOUD_CONN_ID,
        account_id=DBT_CLOUD_ACCOUNT_ID,
        job_id=DBT_CLOUD_JOB_ID,
        deferrable=True
    )

    trigger_dbt_cloud_job >> wait_for_dbt_cloud_job

view raw JSON →