Dagster dbt Integration
dagster-dbt provides a robust integration for dbt within the Dagster ecosystem. It allows users to define dbt models, seeds, snapshots, and tests as first-class Dagster assets, enabling rich metadata, lineage tracking, and seamless orchestration alongside other data tools. The library is actively developed and typically releases new versions in sync with Dagster core, with the current version being 0.29.0, corresponding to Dagster 1.13.0.
Warnings
- breaking Version compatibility with `dbt-core` is crucial. `dagster-dbt` supports specific ranges of `dbt-core` versions (e.g., 1.7 through 1.11 for `dagster-dbt 0.29.0`). Incompatibilities can lead to module loading errors or unexpected behavior.
- gotcha When using `@dbt_assets` with a time window partition definition and no explicit backfill policy, the default policy changed from `BackfillPolicy.multi_run()` to `BackfillPolicy.single_run()` in a previous Dagster 1.12.0 release. This might change how backfills execute for partitioned dbt assets.
- deprecated The `dbt_cloud_resource` and `load_assets_from_dbt_cloud_job` APIs are considered superseded by `dagster_dbt.cloud_v2` resources (`DbtCloudClientResource`, `DbtCloudCredentials`, `DbtCloudWorkspace`) and `load_dbt_cloud_asset_specs` for improved observability and orchestration capabilities.
- gotcha State-backed components, including `DbtProject` (formerly `DbtProjectComponent`), automatically refresh their state during development (`dagster dev` or `dg CLI commands`). While convenient, if an external API (like a dbt manifest) is unavailable or malformed, it can cause the entire code location to fail loading.
Install
-
pip install dagster-dbt dbt-core dbt-snowflake # or dbt-bigquery, dbt-redshift, etc.
Imports
- DbtCliResource
from dagster_dbt import DbtCliResource
- DbtProject
from dagster_dbt import DbtProject
- dbt_assets
from dagster_dbt import dbt_assets
- DagsterDbtTranslator
from dagster_dbt import DagsterDbtTranslator
- DbtCloudClientResource
from dagster_dbt.cloud_v2.resources import DbtCloudClientResource
Quickstart
from pathlib import Path
from dagster import AssetExecutionContext, Definitions
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets
# Assuming your dbt project is in a subdirectory named 'my_dbt_project'
dbt_project_dir = Path(__file__).parent / "my_dbt_project"
# Initialize DbtProject, which handles manifest compilation
# For dev, prepare_if_dev() compiles the manifest if it's missing or outdated
dbt_project = DbtProject(project_dir=dbt_project_dir)
dbt_project.prepare_if_dev()
# Define dbt assets using the @dbt_assets decorator
# The manifest path is required to infer assets and their dependencies
@dbt_assets(manifest=dbt_project.manifest_path)
def my_dbt_models(context: AssetExecutionContext, dbt: DbtCliResource):
# Execute dbt build command and stream events to Dagster
yield from dbt.cli(["build"], context=context).stream()
# Combine assets and resources into a Dagster Definitions object
defs = Definitions(
assets=[my_dbt_models],
resources={
"dbt": DbtCliResource(project_dir=dbt_project),
},
)