{"id":8921,"library":"dagster-dlt","title":"Dagster dlt Integration","description":"dagster-dlt is a Python library that provides a native integration for using dlt (data load tool) within Dagster. It enables users to define dlt sources and pipelines as software-defined assets in Dagster, leveraging dlt's capabilities for data extraction, schema inference, and loading into various destinations. The current version is 0.29.0, and it follows Dagster's release cadence, typically releasing alongside core Dagster updates.","status":"active","version":"0.29.0","language":"en","source_language":"en","source_url":"https://github.com/dagster-io/dagster/tree/master/python_modules/libraries/dagster-dlt","tags":["ETL","ELT","orchestration","data pipelines","dlt","dagster","data integration"],"install":[{"cmd":"pip install dagster-dlt dagster dagster-webserver","lang":"bash","label":"Install library and core Dagster components"}],"dependencies":[{"reason":"Core library for ETL/ELT functionality, implicitly installed as a dependency.","package":"dlt","optional":false},{"reason":"Main orchestration framework; dagster-dlt is an integration for it.","package":"dagster","optional":false},{"reason":"Provides the Dagster UI for monitoring and managing assets and runs.","package":"dagster-webserver","optional":false}],"imports":[{"symbol":"dlt_assets","correct":"from dagster_dlt import dlt_assets"},{"symbol":"DagsterDltResource","correct":"from dagster_dlt import DagsterDltResource"},{"symbol":"pipeline","correct":"from dlt import pipeline"},{"symbol":"source","correct":"import dlt"},{"note":"The `DltLoadCollectionComponent` was moved from `dagster-embedded-elt` to `dagster-dlt`.","wrong":"from dagster_embedded_elt.dlt import DltLoadCollectionComponent","symbol":"DltLoadCollectionComponent","correct":"from dagster_dlt import DltLoadCollectionComponent"}],"quickstart":{"code":"import os\nfrom dagster import Definitions, AssetExecutionContext\nfrom dagster_dlt import DagsterDltResource, dlt_assets\nimport dlt\n\n# Assuming you have a dlt source defined, e.g., in `my_dlt_source.py`\n# For this example, we'll create a minimal in-memory source.\n# In a real scenario, this would import from your dlt source module.\n@dlt.source\ndef my_in_memory_source(item_count: int = 3):\n    @dlt.resource\n    def my_items():\n        for i in range(item_count):\n            yield {'id': i, 'value': f'item_{i}'}\n    return my_items\n\n# Configure a dlt pipeline to load to a local DuckDB file\nmy_pipeline = dlt.pipeline(\n    pipeline_name=\"my_dagster_dlt_pipeline\",\n    destination=\"duckdb\",\n    dataset_name=\"my_data\",\n    progress=\"log\",\n    credentials={'database': './my_dagster_dlt_data.duckdb'}\n)\n\n# Define Dagster assets using the @dlt_assets decorator\n@dlt_assets(\n    dlt_source=my_in_memory_source(item_count=5),\n    dlt_pipeline=my_pipeline,\n    name=\"my_dlt_assets\",\n    group_name=\"dlt_ingestion\"\n)\ndef my_dagster_dlt_assets(context: AssetExecutionContext, dlt_resource: DagsterDltResource):\n    # The dlt_assets decorator automatically generates assets from the dlt source's resources.\n    # The function body is where you trigger the dlt pipeline run.\n    # The yielded results will be converted to Dagster materializations.\n    yield from dlt_resource.run(my_pipeline, my_in_memory_source(item_count=5))\n\n# Combine assets and resources into Dagster Definitions\ndefs = Definitions(\n    assets=[my_dagster_dlt_assets],\n    resources={\n        \"dlt\": DagsterDltResource(\n            # It's good practice to pass any DLT credentials via environment variables\n            # or Dagster secrets/resources if they are sensitive.\n            # For DuckDB, a file path is often direct.\n        )\n    }\n)\n\n# To run this:\n# 1. Save as `__init__.py` in a Dagster project (e.g., `my_project/my_dagster_dlt_example/__init__.py`)\n# 2. Run `dagster dev` in the parent directory of `my_project`\n# 3. Open the Dagster UI (Dagit), locate 'my_dlt_assets' and materialize it.\n# 4. Check `my_dagster_dlt_data.duckdb` for the loaded data.","lang":"python","description":"This quickstart demonstrates how to define Dagster assets from a dlt source and pipeline using the `@dlt_assets` decorator. It sets up a simple in-memory dlt source and a dlt pipeline that loads data to a local DuckDB file, then orchestrates this with Dagster. The `DagsterDltResource` is used to execute the dlt pipeline within the Dagster asset context. Remember to replace `my_in_memory_source` with your actual dlt source definition."},"warnings":[{"fix":"Update imports from `dagster_embedded_elt.dlt` to `dagster_dlt`. Specifically, `DagsterDltResource` and `dlt_assets` should now be imported from `dagster_dlt`.","message":"The `dagster-dlt` library was introduced as a standalone package, replacing the `dlt` module previously found within `dagster-embedded-elt`.","severity":"breaking","affected_versions":"Dagster versions 1.12.0 and later."},{"fix":"Use the `dagster_dlt_translator` parameter instead of `dlt_dagster_translator` when defining custom translation logic for dlt resources to Dagster assets.","message":"The `dlt_dagster_translator` parameter in the `@dlt_assets` decorator was deprecated and renamed.","severity":"deprecated","affected_versions":"Dagster versions 1.8.0 and later."},{"fix":"Ensure all required dlt environment variables (e.g., API keys, database credentials) are set in the environment where your Dagster process runs, or configure them via Dagster's resource configuration and pass them to your dlt pipeline.","message":"dlt relies on environment variables for managing connections and secrets to sources and destinations. Failure to configure these will lead to pipeline failures.","severity":"gotcha","affected_versions":"All versions"},{"fix":"To avoid concurrent access issues with DuckDB, either use a database that supports multi-process transactions (e.g., PostgreSQL, MySQL), write data to Parquet files and have DuckDB read from them, or ensure that only one dlt asset targeting the same DuckDB file is materialized at a time (e.g., by limiting Dagster's concurrency).","message":"When using DuckDB as a destination for multiple dlt assets, concurrent writes can lead to file locking errors (`IO Error: Cannot open file...`).","severity":"gotcha","affected_versions":"All versions where DuckDB is used as a destination."},{"fix":"This issue often stems from concurrent access to dlt's `state.json` files. Ensure each dlt asset uses a unique pipeline name and dataset, and consider limiting Dagster's concurrency (`max_concurrent_runs: 1`) if the problem persists.","message":"When materializing multiple dlt assets, intermittent `PermissionError: [WinError 5] Access is denied: for state.json` errors can occur, especially on Windows.","severity":"gotcha","affected_versions":"All versions, particularly on Windows."}],"env_vars":null,"last_verified":"2026-04-16T00:00:00.000Z","next_check":"2026-07-15T00:00:00.000Z","problems":[{"fix":"Ensure each `dlt.pipeline` instance used within Dagster assets has a unique `pipeline_name` and `dataset_name`. If using a shared destination like DuckDB, manage concurrency at the Dagster level (e.g., `max_concurrent_runs: 1`).","cause":"Concurrent write access to dlt's state files by multiple dlt pipelines or assets, especially problematic on Windows due to file locking.","error":"PermissionError: [WinError 5] Access is denied: 'C:\\Users\\...\\.dlt\\pipelines\\...\\state.json'"},{"fix":"Verify that your dlt pipeline definition correctly specifies the `credentials` for DuckDB (e.g., the database file path). If multiple assets write to the same DuckDB, consider the concurrency issues mentioned in warnings.","cause":"Incorrect or missing credentials for the DuckDB destination, or a file locking issue if multiple processes try to access the same DuckDB file simultaneously.","error":"dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at stage load when processing package ... with exception: <class 'dlt.destinations.exceptions.DestinationConnectionError'> Connection with DuckDbSqlClient to dataset name ... failed. Please check if you configured the credentials at all and provided the right credentials values."},{"fix":"Check your dlt source logic to ensure it's extracting data as expected. If the destination has existing tables, ensure `dlt.pipeline(write_disposition='append')` or `'merge'` is used, and consider if `dataset_name` conflicts are occurring across pipelines.","cause":"This message indicates that the dlt source did not yield any data, or that the destination database already contains tables with the same schema name, preventing dlt from creating new tables or loading data if `write_disposition` is not correctly set (e.g., `append` or `merge`).","error":"No data found to normalize"}]}