Dagster Snowflake Pandas Integration

0.29.0 · active · verified Thu Apr 16

The `dagster-snowflake-pandas` library provides a robust integration for using Pandas DataFrames with Snowflake within the Dagster data orchestration framework. It enables reading and writing Pandas DataFrames directly to and from Snowflake tables via Dagster's I/O manager system. This package is currently at version 0.29.0 and typically aligns its release cadence with the main Dagster core library.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to configure `SnowflakePandasIOManager` as an I/O manager in Dagster to store and load Pandas DataFrames in Snowflake. It defines two assets: one that creates a DataFrame and another that consumes it, showcasing seamless data transfer via Snowflake. Snowflake connection details are securely managed using environment variables.

import pandas as pd
import os
from dagster import asset, Definitions, EnvVar, Config
from dagster_snowflake_pandas import SnowflakePandasIOManager

@asset
def my_pandas_table() -> pd.DataFrame:
    # Example: Create a simple Pandas DataFrame
    data = {'col1': [1, 2], 'col2': ['A', 'B']}
    df = pd.DataFrame(data)
    return df

@asset
def downstream_asset(my_pandas_table: pd.DataFrame):
    # Example: Use the DataFrame loaded from Snowflake
    print(f"Loaded DataFrame from Snowflake:\n{my_pandas_table}")
    return len(my_pandas_table)

class SnowflakeConfig(Config):
    account: str
    user: str
    password: str
    database: str
    schema: str = "public"
    warehouse: str = "compute_wh"

defs = Definitions(
    assets=[my_pandas_table, downstream_asset],
    resources={
        "io_manager": SnowflakePandasIOManager(
            account=EnvVar("SNOWFLAKE_ACCOUNT"),
            user=EnvVar("SNOWFLAKE_USER"),
            password=EnvVar("SNOWFLAKE_PASSWORD"),
            database=EnvVar("SNOWFLAKE_DATABASE"),
            schema=EnvVar("SNOWFLAKE_SCHEMA", "public"),
            warehouse=EnvVar("SNOWFLAKE_WAREHOUSE", "compute_wh"),
        )
    },
)

# To run this locally, set the following environment variables:
# os.environ["SNOWFLAKE_ACCOUNT"] = "your_account_identifier"
# os.environ["SNOWFLAKE_USER"] = "your_username"
# os.environ["SNOWFLAKE_PASSWORD"] = "your_password"
# os.environ["SNOWFLAKE_DATABASE"] = "your_database"
# os.environ["SNOWFLAKE_SCHEMA"] = "your_schema" # Optional, defaults to 'public'
# os.environ["SNOWFLAKE_WAREHOUSE"] = "your_warehouse" # Optional, defaults to 'compute_wh'

# Example of how you might test this (not runnable as a single script without dagster dev/launch_assets):
# from dagster import materialize
# if __name__ == "__main__":
#     result = materialize([my_pandas_table, downstream_asset], resources=defs.resources)
#     assert result.success

view raw JSON →