Prefect dbt Integration

0.7.20 · active · verified Sat Apr 11

Prefect-dbt is an official Prefect integration for orchestrating dbt (data build tool) projects within Prefect workflows. It provides tasks and blocks for interacting with the dbt CLI, allowing users to run dbt commands (like `dbt run`, `dbt test`, `dbt build`) as part of their data pipelines. The current version is 0.7.20, and it follows Prefect's release cadence for integrations, with updates typically coinciding with or following major Prefect releases.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates a simple Prefect flow that uses `prefect-dbt` to execute the `dbt debug` command. This is a good starting point as it verifies the dbt installation and profile configuration without requiring a full dbt project or database connection. For `dbt run`, `dbt test`, etc., you would typically set up a `DbtCliProfile` block in the Prefect UI and load it in your flow.

import os
from prefect import flow, get_run_logger
from prefect_dbt.cli import DbtCli

@flow(log_prints=True)
def dbt_cli_debug_flow():
    """
    A simple Prefect flow that runs `dbt debug` using the prefect-dbt integration.
    This demonstrates basic interaction with the dbt CLI via Prefect.
    """
    logger = get_run_logger()
    
    # Initialize DbtCli. By default, it looks for dbt in the PATH
    # and profiles.yml in the standard dbt locations or specified via env vars.
    dbt_cli = DbtCli()
    
    logger.info("Attempting to run `dbt debug`...")
    
    # Run the `debug` command. This checks dbt installation and profiles.
    # It usually doesn't require a specific dbt project directory or database connection to run.
    try:
        debug_result = dbt_cli.debug()
        
        logger.info(f"dbt debug completed with return code: {debug_result.return_code}")
        logger.info(f"dbt debug stdout:\n{debug_result.stdout}")
        
        if debug_result.return_code != 0:
            logger.error(f"dbt debug stderr:\n{debug_result.stderr}")
            raise RuntimeError(f"`dbt debug` failed with return code {debug_result.return_code}")
            
    except Exception as e:
        logger.error(f"Failed to execute dbt debug: {e}")
        logger.warning(
            "Ensure 'dbt-core' is installed in your execution environment "
            f"(`pip install dbt-core` or specific adapter like `pip install dbt-postgres`)."
        )
        raise

if __name__ == "__main__":
    dbt_cli_debug_flow() # For local execution
    # To deploy: dbt_cli_debug_flow.to_deployment(name="dbt-debug-deployment").apply()

view raw JSON →