Dagster dlt Integration

0.29.0 · active · verified Thu Apr 16

dagster-dlt is a Python library that provides a native integration for using dlt (data load tool) within Dagster. It enables users to define dlt sources and pipelines as software-defined assets in Dagster, leveraging dlt's capabilities for data extraction, schema inference, and loading into various destinations. The current version is 0.29.0, and it follows Dagster's release cadence, typically releasing alongside core Dagster updates.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to define Dagster assets from a dlt source and pipeline using the `@dlt_assets` decorator. It sets up a simple in-memory dlt source and a dlt pipeline that loads data to a local DuckDB file, then orchestrates this with Dagster. The `DagsterDltResource` is used to execute the dlt pipeline within the Dagster asset context. Remember to replace `my_in_memory_source` with your actual dlt source definition.

import os
from dagster import Definitions, AssetExecutionContext
from dagster_dlt import DagsterDltResource, dlt_assets
import dlt

# Assuming you have a dlt source defined, e.g., in `my_dlt_source.py`
# For this example, we'll create a minimal in-memory source.
# In a real scenario, this would import from your dlt source module.
@dlt.source
def my_in_memory_source(item_count: int = 3):
    @dlt.resource
    def my_items():
        for i in range(item_count):
            yield {'id': i, 'value': f'item_{i}'}
    return my_items

# Configure a dlt pipeline to load to a local DuckDB file
my_pipeline = dlt.pipeline(
    pipeline_name="my_dagster_dlt_pipeline",
    destination="duckdb",
    dataset_name="my_data",
    progress="log",
    credentials={'database': './my_dagster_dlt_data.duckdb'}
)

# Define Dagster assets using the @dlt_assets decorator
@dlt_assets(
    dlt_source=my_in_memory_source(item_count=5),
    dlt_pipeline=my_pipeline,
    name="my_dlt_assets",
    group_name="dlt_ingestion"
)
def my_dagster_dlt_assets(context: AssetExecutionContext, dlt_resource: DagsterDltResource):
    # The dlt_assets decorator automatically generates assets from the dlt source's resources.
    # The function body is where you trigger the dlt pipeline run.
    # The yielded results will be converted to Dagster materializations.
    yield from dlt_resource.run(my_pipeline, my_in_memory_source(item_count=5))

# Combine assets and resources into Dagster Definitions
defs = Definitions(
    assets=[my_dagster_dlt_assets],
    resources={
        "dlt": DagsterDltResource(
            # It's good practice to pass any DLT credentials via environment variables
            # or Dagster secrets/resources if they are sensitive.
            # For DuckDB, a file path is often direct.
        )
    }
)

# To run this:
# 1. Save as `__init__.py` in a Dagster project (e.g., `my_project/my_dagster_dlt_example/__init__.py`)
# 2. Run `dagster dev` in the parent directory of `my_project`
# 3. Open the Dagster UI (Dagit), locate 'my_dlt_assets' and materialize it.
# 4. Check `my_dagster_dlt_data.duckdb` for the loaded data.

view raw JSON →