Apache Airflow dbt Cloud Provider
This provider package allows Apache Airflow to interact with dbt Cloud, enabling orchestration of dbt Cloud jobs and fetching job run details. It includes operators, sensors, and hooks for various dbt Cloud functionalities. The current version is 4.8.0. Airflow provider packages typically follow a regular release cadence, often aligned with Airflow's own releases or as new features/bug fixes are introduced.
Warnings
- breaking Version 4.0.0 removed all deprecated parameters in operators and hooks. If you were using any parameters marked as deprecated in previous versions (e.g., `schema`, `project_id`, `environment_id`), your DAGs will break.
- breaking In version 3.0.0, the `poll_interval` parameter was removed from `DbtCloudRunJobOperator` as it was only supported in `DbtCloudJobRunSensor`. Additionally, the `deferrable` parameter was added to `DbtCloudRunJobOperator` for async execution.
- gotcha Incorrect or missing dbt Cloud Connection configuration. The provider requires an Airflow connection of type 'dbt Cloud' with a valid API token. Common issues include using the wrong connection ID, an expired token, or a token with insufficient permissions.
- gotcha Confusion between `account_id`, `job_id`, and other identifiers. Users often provide incorrect IDs for dbt Cloud resources, leading to 'resource not found' or 'permission denied' errors.
Install
-
pip install apache-airflow-providers-dbt-cloud
Imports
- DbtCloudHook
from airflow.providers.dbt_cloud.hooks.dbt_cloud import DbtCloudHook
- DbtCloudRunJobOperator
from airflow.providers.dbt_cloud.operators.dbt_cloud import DbtCloudRunJobOperator
- DbtCloudJobRunSensor
from airflow.providers.dbt_cloud.sensors.dbt_cloud import DbtCloudJobRunSensor
Quickstart
from __future__ import annotations
import os
import pendulum
from airflow.models.dag import DAG
from airflow.providers.dbt_cloud.operators.dbt_cloud import DbtCloudRunJobOperator
from airflow.providers.dbt_cloud.sensors.dbt_cloud import DbtCloudJobRunSensor
DBT_CLOUD_CONN_ID = os.environ.get('DBT_CLOUD_CONN_ID', 'dbt_cloud_default')
DBT_CLOUD_ACCOUNT_ID = os.environ.get('DBT_CLOUD_ACCOUNT_ID', '12345') # Your dbt Cloud Account ID
DBT_CLOUD_JOB_ID = os.environ.get('DBT_CLOUD_JOB_ID', '67890') # Your dbt Cloud Job ID
with DAG(
dag_id="dbt_cloud_example_dag",
schedule=None,
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False,
tags=["dbt_cloud", "example"],
) as dag:
trigger_dbt_cloud_job = DbtCloudRunJobOperator(
task_id="trigger_dbt_cloud_job",
dbt_cloud_conn_id=DBT_CLOUD_CONN_ID,
account_id=DBT_CLOUD_ACCOUNT_ID,
job_id=DBT_CLOUD_JOB_ID,
check_interval=10, # Check job status every 10 seconds
timeout=60 * 20, # Fail after 20 minutes
deferrable=True # Enable deferrable mode for async execution
)
wait_for_dbt_cloud_job = DbtCloudJobRunSensor(
task_id="wait_for_dbt_cloud_job",
dbt_cloud_conn_id=DBT_CLOUD_CONN_ID,
account_id=DBT_CLOUD_ACCOUNT_ID,
job_id=DBT_CLOUD_JOB_ID,
deferrable=True
)
trigger_dbt_cloud_job >> wait_for_dbt_cloud_job