Dagster DuckDB Integration

0.29.0 · active · verified Thu Apr 16

The `dagster-duckdb` library provides dedicated ops, resources, and IO managers for integrating DuckDB databases with Dagster data pipelines. It enables users to easily read from and write to DuckDB, manage database connections, and persist assets. The library's releases are tightly coupled with the Dagster core framework's major versions, ensuring compatibility and leveraging the latest features of both Dagster and DuckDB. Current version is 0.29.0.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to define a Dagster repository with DuckDB integration. It includes an asset using `DuckDBResource` for direct SQL execution and another asset whose output (a Pandas DataFrame) is automatically materialized into a DuckDB table by the `duckdb_io_manager`. A temporary file is used for the database to make it easily runnable without manual cleanup. To execute, save this code, then run `dagster dev -f your_file.py` and trigger a run from the Dagster UI.

import os
import tempfile
import pandas as pd
from dagster import Definitions, asset, ScheduleDefinition, file_relative_path
from dagster_duckdb import DuckDBResource, duckdb_io_manager

# Use a temporary file for the DuckDB database to make the example runnable.
# In a production environment, this would typically be a persistent path.
db_temp_dir = tempfile.mkdtemp()
db_file_path = os.path.join(db_temp_dir, "my_dagster_db.duckdb")

@asset
def my_duckdb_asset(duckdb: DuckDBResource):
    """
    An asset that uses DuckDBResource to execute SQL directly,
    creating and populating a table.
    """
    with duckdb.get_connection() as conn:
        conn.execute("CREATE TABLE IF NOT EXISTS my_data (id INTEGER, name TEXT)")
        conn.execute("INSERT INTO my_data VALUES (1, 'Alice'), (2, 'Bob')")
    print(f"Table 'my_data' created and populated in {db_file_path}")

@asset(key="io_manager_output_table")
def another_asset_for_io_manager() -> pd.DataFrame:
    """
    An asset whose output (a Pandas DataFrame) is materialized by the
    `duckdb_io_manager` into a DuckDB table named 'io_manager_output_table'.
    """
    return pd.DataFrame({"col_a": [10, 20], "col_b": ["x", "y"]})

defs = Definitions(
    assets=[
        my_duckdb_asset,
        another_asset_for_io_manager
    ],
    resources={
        "duckdb": DuckDBResource(database=db_file_path),
        "io_manager": duckdb_io_manager.configured({"database": db_file_path})
    },
    schedules=[
        ScheduleDefinition(
            job=my_duckdb_asset.to_job(name="my_duckdb_job"),
            cron_schedule="0 0 * * *", # daily at midnight
        )
    ]
)

# To run this example:
# 1. Save this code to a file (e.g., `my_repo.py`).
# 2. Run `dagster dev -f my_repo.py` in your terminal.
# 3. Navigate to the Dagster UI (typically http://localhost:3000).
# 4. Launch a run for `my_duckdb_job` or `another_asset_for_io_manager` asset.
# 5. After running, you can inspect the DuckDB file at `db_file_path`.

view raw JSON →