Apache Airflow dbt Cloud Provider

raw JSON →
4.8.0 verified Sat Apr 25 auth: no python

This provider package allows Apache Airflow to interact with dbt Cloud, enabling orchestration of dbt Cloud jobs and fetching job run details. It includes operators, sensors, and hooks for various dbt Cloud functionalities. The current version is 4.8.0. Airflow provider packages typically follow a regular release cadence, often aligned with Airflow's own releases or as new features/bug fixes are introduced.

pip install apache-airflow-providers-dbt-cloud
error DbtCloudRunJobOperator in Airflow Fails to Detect Successful Job Completion in DBT Cloud
cause The DbtCloudRunJobOperator fails to recognize the successful completion of a dbt Cloud job, causing the task to be marked as failed in Airflow.
fix
Ensure that the DbtCloudRunJobOperator is correctly configured with appropriate parameters such as 'wait_for_termination', 'check_interval', and 'execution_timeout'. Additionally, verify that the dbt Cloud job is set up to return the correct status upon completion.
error ImportError: cannot import name 'DbtCloudRunJobOperator' from 'airflow.providers.dbt.cloud.operators.dbt'
cause The DbtCloudRunJobOperator has been moved or renamed in recent versions of the apache-airflow-providers-dbt-cloud package.
fix
Update the import statement to 'from airflow.providers.dbt.cloud.operators.dbt_cloud import DbtCloudRunJobOperator' to reflect the current module structure.
error AttributeError: module 'airflow.providers.dbt.cloud.operators' has no attribute 'DbtCloudRunJobOperator'
cause The DbtCloudRunJobOperator is not directly accessible from the 'operators' module due to changes in the package's structure.
fix
Import the operator using 'from airflow.providers.dbt.cloud.operators.dbt_cloud import DbtCloudRunJobOperator' to access it correctly.
error ModuleNotFoundError: No module named 'airflow.providers.dbt.cloud'
cause The apache-airflow-providers-dbt-cloud package is not installed or not properly configured in the Airflow environment.
fix
Install the package using 'pip install apache-airflow-providers-dbt-cloud' and ensure that Airflow is configured to recognize the provider.
error TypeError: DbtCloudRunJobOperator() missing 1 required positional argument: 'job_id'
cause The 'job_id' parameter, which specifies the dbt Cloud job to run, is missing when initializing the DbtCloudRunJobOperator.
fix
Provide the 'job_id' parameter when initializing the operator, e.g., 'DbtCloudRunJobOperator(task_id='run_dbt_job', job_id=12345)'.
breaking Version 4.0.0 removed all deprecated parameters in operators and hooks. If you were using any parameters marked as deprecated in previous versions (e.g., `schema`, `project_id`, `environment_id`), your DAGs will break.
fix Review the latest documentation for the DbtCloud provider and update your DAGs to use the current parameter names and structures. For instance, ensure `account_id` and `job_id` are explicitly passed.
breaking In version 3.0.0, the `poll_interval` parameter was removed from `DbtCloudRunJobOperator` as it was only supported in `DbtCloudJobRunSensor`. Additionally, the `deferrable` parameter was added to `DbtCloudRunJobOperator` for async execution.
fix Remove `poll_interval` from `DbtCloudRunJobOperator` instances. To leverage deferrable mode, set `deferrable=True` on both operators and sensors. If you need a specific polling interval for the run, set it on `DbtCloudJobRunSensor` or use `check_interval` on `DbtCloudRunJobOperator`.
gotcha Incorrect or missing dbt Cloud Connection configuration. The provider requires an Airflow connection of type 'dbt Cloud' with a valid API token. Common issues include using the wrong connection ID, an expired token, or a token with insufficient permissions.
fix Verify that your Airflow connection (e.g., `dbt_cloud_default`) is correctly set up with the 'dbt Cloud' type and a valid, unexpired API token. Ensure the API token has the necessary read/write permissions for the specific dbt Cloud account and jobs you are interacting with.
gotcha Confusion between `account_id`, `job_id`, and other identifiers. Users often provide incorrect IDs for dbt Cloud resources, leading to 'resource not found' or 'permission denied' errors.
fix Double-check that you are passing the correct `account_id` and `job_id` for your dbt Cloud environment. These can be found in the dbt Cloud UI (e.g., in the URL when viewing a job or account settings).
runtime status import time mem disk
3.10-alpine
3.10-slim
3.11-alpine
3.11-slim
3.12-alpine
3.12-slim
3.13-alpine
3.13-slim
3.9-alpine
3.9-slim

This example DAG demonstrates how to trigger a dbt Cloud job and then wait for its completion using the `DbtCloudRunJobOperator` and `DbtCloudJobRunSensor`. It uses deferrable operators for efficient async execution. Ensure you configure an Airflow connection of type 'dbt Cloud' named `dbt_cloud_default` (or your chosen `DBT_CLOUD_CONN_ID`) with your dbt Cloud API Token. Also, provide your `account_id` and `job_id`.

from __future__ import annotations

import os
import pendulum

from airflow.models.dag import DAG
from airflow.providers.dbt_cloud.operators.dbt_cloud import DbtCloudRunJobOperator
from airflow.providers.dbt_cloud.sensors.dbt_cloud import DbtCloudJobRunSensor


DBT_CLOUD_CONN_ID = os.environ.get('DBT_CLOUD_CONN_ID', 'dbt_cloud_default')
DBT_CLOUD_ACCOUNT_ID = os.environ.get('DBT_CLOUD_ACCOUNT_ID', '12345') # Your dbt Cloud Account ID
DBT_CLOUD_JOB_ID = os.environ.get('DBT_CLOUD_JOB_ID', '67890') # Your dbt Cloud Job ID

with DAG(
    dag_id="dbt_cloud_example_dag",
    schedule=None,
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=False,
    tags=["dbt_cloud", "example"],
) as dag:
    trigger_dbt_cloud_job = DbtCloudRunJobOperator(
        task_id="trigger_dbt_cloud_job",
        dbt_cloud_conn_id=DBT_CLOUD_CONN_ID,
        account_id=DBT_CLOUD_ACCOUNT_ID,
        job_id=DBT_CLOUD_JOB_ID,
        check_interval=10, # Check job status every 10 seconds
        timeout=60 * 20, # Fail after 20 minutes
        deferrable=True # Enable deferrable mode for async execution
    )

    wait_for_dbt_cloud_job = DbtCloudJobRunSensor(
        task_id="wait_for_dbt_cloud_job",
        dbt_cloud_conn_id=DBT_CLOUD_CONN_ID,
        account_id=DBT_CLOUD_ACCOUNT_ID,
        job_id=DBT_CLOUD_JOB_ID,
        deferrable=True
    )

    trigger_dbt_cloud_job >> wait_for_dbt_cloud_job