Dagster DuckDB Integration
The `dagster-duckdb` library provides dedicated ops, resources, and IO managers for integrating DuckDB databases with Dagster data pipelines. It enables users to easily read from and write to DuckDB, manage database connections, and persist assets. The library's releases are tightly coupled with the Dagster core framework's major versions, ensuring compatibility and leveraging the latest features of both Dagster and DuckDB. Current version is 0.29.0.
Common errors
-
ModuleNotFoundError: No module named 'dagster_duckdb'
cause The `dagster-duckdb` package has not been installed in the active Python environment.fixRun `pip install dagster-duckdb` to install the library. -
dagster._core.errors.DagsterInvalidConfigError: Missing required config field 'database'
cause The `DuckDBResource` or `duckdb_io_manager` was configured without specifying the `database` path.fixProvide a `database` path string (e.g., `DuckDBResource(database='path/to/my_db.duckdb')`) or `:memory:` for an in-memory database within its configuration. -
dagster._core.errors.DagsterInvalidDefinitionError: Asset 'my_asset' requires resource 'duckdb', but it was not provided to the job.
cause An asset or op tried to use a `DuckDBResource` (e.g., `@asset(resource_defs={'duckdb': ...})` or typed dependency), but the resource was not included in the `Definitions` object or job definition.fixEnsure the `duckdb` resource is defined in your `Definitions` object and passed to the job containing the asset, for example: `Definitions(assets=[my_asset], resources={'duckdb': DuckDBResource(...)})`.
Warnings
- breaking Dagster library versions (like `dagster-duckdb`) are tightly coupled with the core `dagster` package version. For example, `dagster-duckdb==0.29.0` is designed to work with `dagster==1.13.0`. Using mismatched versions can lead to unexpected behavior or runtime errors.
- gotcha The `database` configuration for both `DuckDBResource` and `duckdb_io_manager` is crucial. Incorrectly specifying the path (e.g., a non-existent directory or insufficient permissions) will cause runtime errors when Dagster tries to connect or write to the database.
- gotcha The `duckdb_io_manager` expects assets to return data structures it knows how to serialize into a DuckDB table (e.g., Pandas DataFrames, Polars DataFrames, PyArrow Tables). Returning arbitrary Python objects will result in an error or unexpected serialization.
- gotcha The `requires_python` range for `dagster-duckdb==0.29.0` is `>=3.10, <3.15`. Using Python versions outside this range (e.g., Python 3.9 or 3.15+) may lead to installation failures or runtime incompatibilities.
Install
-
pip install dagster-duckdb
Imports
- DuckDBResource
from dagster.experimental import DuckDBResource
from dagster_duckdb import DuckDBResource
- duckdb_io_manager
from dagster_duckdb import duckdb_io_manager
Quickstart
import os
import tempfile
import pandas as pd
from dagster import Definitions, asset, ScheduleDefinition, file_relative_path
from dagster_duckdb import DuckDBResource, duckdb_io_manager
# Use a temporary file for the DuckDB database to make the example runnable.
# In a production environment, this would typically be a persistent path.
db_temp_dir = tempfile.mkdtemp()
db_file_path = os.path.join(db_temp_dir, "my_dagster_db.duckdb")
@asset
def my_duckdb_asset(duckdb: DuckDBResource):
"""
An asset that uses DuckDBResource to execute SQL directly,
creating and populating a table.
"""
with duckdb.get_connection() as conn:
conn.execute("CREATE TABLE IF NOT EXISTS my_data (id INTEGER, name TEXT)")
conn.execute("INSERT INTO my_data VALUES (1, 'Alice'), (2, 'Bob')")
print(f"Table 'my_data' created and populated in {db_file_path}")
@asset(key="io_manager_output_table")
def another_asset_for_io_manager() -> pd.DataFrame:
"""
An asset whose output (a Pandas DataFrame) is materialized by the
`duckdb_io_manager` into a DuckDB table named 'io_manager_output_table'.
"""
return pd.DataFrame({"col_a": [10, 20], "col_b": ["x", "y"]})
defs = Definitions(
assets=[
my_duckdb_asset,
another_asset_for_io_manager
],
resources={
"duckdb": DuckDBResource(database=db_file_path),
"io_manager": duckdb_io_manager.configured({"database": db_file_path})
},
schedules=[
ScheduleDefinition(
job=my_duckdb_asset.to_job(name="my_duckdb_job"),
cron_schedule="0 0 * * *", # daily at midnight
)
]
)
# To run this example:
# 1. Save this code to a file (e.g., `my_repo.py`).
# 2. Run `dagster dev -f my_repo.py` in your terminal.
# 3. Navigate to the Dagster UI (typically http://localhost:3000).
# 4. Launch a run for `my_duckdb_job` or `another_asset_for_io_manager` asset.
# 5. After running, you can inspect the DuckDB file at `db_file_path`.