Dagster dlt Integration
dagster-dlt is a Python library that provides a native integration for using dlt (data load tool) within Dagster. It enables users to define dlt sources and pipelines as software-defined assets in Dagster, leveraging dlt's capabilities for data extraction, schema inference, and loading into various destinations. The current version is 0.29.0, and it follows Dagster's release cadence, typically releasing alongside core Dagster updates.
Common errors
-
PermissionError: [WinError 5] Access is denied: 'C:\Users\...\.dlt\pipelines\...\state.json'
cause Concurrent write access to dlt's state files by multiple dlt pipelines or assets, especially problematic on Windows due to file locking.fixEnsure each `dlt.pipeline` instance used within Dagster assets has a unique `pipeline_name` and `dataset_name`. If using a shared destination like DuckDB, manage concurrency at the Dagster level (e.g., `max_concurrent_runs: 1`). -
dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at stage load when processing package ... with exception: <class 'dlt.destinations.exceptions.DestinationConnectionError'> Connection with DuckDbSqlClient to dataset name ... failed. Please check if you configured the credentials at all and provided the right credentials values.
cause Incorrect or missing credentials for the DuckDB destination, or a file locking issue if multiple processes try to access the same DuckDB file simultaneously.fixVerify that your dlt pipeline definition correctly specifies the `credentials` for DuckDB (e.g., the database file path). If multiple assets write to the same DuckDB, consider the concurrency issues mentioned in warnings. -
No data found to normalize
cause This message indicates that the dlt source did not yield any data, or that the destination database already contains tables with the same schema name, preventing dlt from creating new tables or loading data if `write_disposition` is not correctly set (e.g., `append` or `merge`).fixCheck your dlt source logic to ensure it's extracting data as expected. If the destination has existing tables, ensure `dlt.pipeline(write_disposition='append')` or `'merge'` is used, and consider if `dataset_name` conflicts are occurring across pipelines.
Warnings
- breaking The `dagster-dlt` library was introduced as a standalone package, replacing the `dlt` module previously found within `dagster-embedded-elt`.
- deprecated The `dlt_dagster_translator` parameter in the `@dlt_assets` decorator was deprecated and renamed.
- gotcha dlt relies on environment variables for managing connections and secrets to sources and destinations. Failure to configure these will lead to pipeline failures.
- gotcha When using DuckDB as a destination for multiple dlt assets, concurrent writes can lead to file locking errors (`IO Error: Cannot open file...`).
- gotcha When materializing multiple dlt assets, intermittent `PermissionError: [WinError 5] Access is denied: for state.json` errors can occur, especially on Windows.
Install
-
pip install dagster-dlt dagster dagster-webserver
Imports
- dlt_assets
from dagster_dlt import dlt_assets
- DagsterDltResource
from dagster_dlt import DagsterDltResource
- pipeline
from dlt import pipeline
- source
import dlt
- DltLoadCollectionComponent
from dagster_embedded_elt.dlt import DltLoadCollectionComponent
from dagster_dlt import DltLoadCollectionComponent
Quickstart
import os
from dagster import Definitions, AssetExecutionContext
from dagster_dlt import DagsterDltResource, dlt_assets
import dlt
# Assuming you have a dlt source defined, e.g., in `my_dlt_source.py`
# For this example, we'll create a minimal in-memory source.
# In a real scenario, this would import from your dlt source module.
@dlt.source
def my_in_memory_source(item_count: int = 3):
@dlt.resource
def my_items():
for i in range(item_count):
yield {'id': i, 'value': f'item_{i}'}
return my_items
# Configure a dlt pipeline to load to a local DuckDB file
my_pipeline = dlt.pipeline(
pipeline_name="my_dagster_dlt_pipeline",
destination="duckdb",
dataset_name="my_data",
progress="log",
credentials={'database': './my_dagster_dlt_data.duckdb'}
)
# Define Dagster assets using the @dlt_assets decorator
@dlt_assets(
dlt_source=my_in_memory_source(item_count=5),
dlt_pipeline=my_pipeline,
name="my_dlt_assets",
group_name="dlt_ingestion"
)
def my_dagster_dlt_assets(context: AssetExecutionContext, dlt_resource: DagsterDltResource):
# The dlt_assets decorator automatically generates assets from the dlt source's resources.
# The function body is where you trigger the dlt pipeline run.
# The yielded results will be converted to Dagster materializations.
yield from dlt_resource.run(my_pipeline, my_in_memory_source(item_count=5))
# Combine assets and resources into Dagster Definitions
defs = Definitions(
assets=[my_dagster_dlt_assets],
resources={
"dlt": DagsterDltResource(
# It's good practice to pass any DLT credentials via environment variables
# or Dagster secrets/resources if they are sensitive.
# For DuckDB, a file path is often direct.
)
}
)
# To run this:
# 1. Save as `__init__.py` in a Dagster project (e.g., `my_project/my_dagster_dlt_example/__init__.py`)
# 2. Run `dagster dev` in the parent directory of `my_project`
# 3. Open the Dagster UI (Dagit), locate 'my_dlt_assets' and materialize it.
# 4. Check `my_dagster_dlt_data.duckdb` for the loaded data.