Dagster Snowflake

0.29.0 · active · verified Mon Apr 13

The `dagster-snowflake` library provides an integration between Dagster and Snowflake, enabling data engineers to orchestrate, observe, and manage data pipelines that interact with the Snowflake AI Data Cloud. It allows connecting to Snowflake as a resource, building assets backed by Snowflake tables and views, and utilizing Snowflake as a data warehouse within Dagster pipelines. The library's current version is 0.29.0, released in conjunction with Dagster core, which typically has frequent updates.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to define a Dagster asset that connects to Snowflake using `SnowflakeResource`. It uses environment variables for secure credential management and performs a simple SQL operation (table creation and data insertion) within the asset's compute function.

import os
from dagster import asset, Definitions, EnvVar
from dagster_snowflake import SnowflakeResource

@asset
def my_snowflake_asset(snowflake: SnowflakeResource):
    """An example Dagster asset that interacts with Snowflake."""
    with snowflake.get_connection() as conn:
        cursor = conn.cursor()
        # Example: Create a table if it doesn't exist and insert data
        cursor.execute(
            """
            CREATE TABLE IF NOT EXISTS my_database.my_schema.my_table (
                id INT,
                name VARCHAR
            );"
        )
        cursor.execute("INSERT INTO my_database.my_schema.my_table (id, name) VALUES (1, 'Alice');")
        cursor.execute("INSERT INTO my_database.my_schema.my_table (id, name) VALUES (2, 'Bob');")
        result = cursor.execute("SELECT COUNT(*) FROM my_database.my_table").fetchone()
        print(f"Rows in my_table: {result[0]}")
    return {'rows_processed': result[0] if result else 0}

defs = Definitions(
    assets=[my_snowflake_asset],
    resources={
        "snowflake": SnowflakeResource(
            account=EnvVar("SNOWFLAKE_ACCOUNT"),
            user=EnvVar("SNOWFLAKE_USER"),
            password=EnvVar("SNOWFLAKE_PASSWORD"),
            database=EnvVar("SNOWFLAKE_DATABASE"),
            schema=EnvVar("SNOWFLAKE_SCHEMA"),
            warehouse=os.environ.get("SNOWFLAKE_WAREHOUSE", ""), # Optional
            role=os.environ.get("SNOWFLAKE_ROLE", "") # Optional
        )
    },
)

# To run this locally, set the following environment variables:
# export SNOWFLAKE_ACCOUNT="your-account-identifier"
# export SNOWFLAKE_USER="your-username"
# export SNOWFLAKE_PASSWORD="your-password"
# export SNOWFLAKE_DATABASE="your-database"
# export SNOWFLAKE_SCHEMA="your-schema"
# (Optional) export SNOWFLAKE_WAREHOUSE="your-warehouse"
# (Optional) export SNOWFLAKE_ROLE="your-role"
# Then run `dagster dev -f your_file_name.py`

view raw JSON →