Dagster Snowflake
The `dagster-snowflake` library provides an integration between Dagster and Snowflake, enabling data engineers to orchestrate, observe, and manage data pipelines that interact with the Snowflake AI Data Cloud. It allows connecting to Snowflake as a resource, building assets backed by Snowflake tables and views, and utilizing Snowflake as a data warehouse within Dagster pipelines. The library's current version is 0.29.0, released in conjunction with Dagster core, which typically has frequent updates.
Warnings
- breaking Python 3.6 and 3.8 are no longer supported. `dagster-snowflake` dropped Python 3.6 support due to its underlying `snowflake-connector-python` dependency, and Dagster core no longer supports Python 3.8 (EOL 2024-10-07).
- gotcha When loading Pandas DataFrames with timestamp columns to Snowflake, `snowflake-connector-python` (v3.5.0+) may cause data corruption if timestamps are not timezone-aware. The `SnowflakePandasIOManager` attempts to mitigate this by assigning UTC if no timezone is present.
- gotcha Snowflake costs and query performance can be significant issues if not managed correctly. Common anti-patterns include full table scans, inefficient joins, or using Snowflake for high-concurrency, low-latency applications.
- gotcha Schema-level permissions with Snowflake future grants can be complex, especially when dbt/Dagster recreates tables. Database-level future grants may not be sufficient, leading to lost SELECT access for reporting roles.
- gotcha Sensitive credentials (account, user, password, private key) for Snowflake should always be managed securely using environment variables or a secrets manager, not hardcoded directly in code.
- deprecated The `build_snowflake_io_manager` function is considered a legacy API for constructing I/O managers.
Install
-
pip install dagster-snowflake -
pip install dagster-snowflake-pandas -
pip install dagster-snowflake-pyspark -
pip install dagster-snowflake-polars
Imports
- SnowflakeResource
from dagster_snowflake import SnowflakeResource
- SnowflakePandasIOManager
from dagster_snowflake_pandas import SnowflakePandasIOManager
- SnowflakePySparkIOManager
from dagster_snowflake_pyspark import SnowflakePySparkIOManager
- build_snowflake_io_manager
from dagster_snowflake import build_snowflake_io_manager
Quickstart
import os
from dagster import asset, Definitions, EnvVar
from dagster_snowflake import SnowflakeResource
@asset
def my_snowflake_asset(snowflake: SnowflakeResource):
"""An example Dagster asset that interacts with Snowflake."""
with snowflake.get_connection() as conn:
cursor = conn.cursor()
# Example: Create a table if it doesn't exist and insert data
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS my_database.my_schema.my_table (
id INT,
name VARCHAR
);"
)
cursor.execute("INSERT INTO my_database.my_schema.my_table (id, name) VALUES (1, 'Alice');")
cursor.execute("INSERT INTO my_database.my_schema.my_table (id, name) VALUES (2, 'Bob');")
result = cursor.execute("SELECT COUNT(*) FROM my_database.my_table").fetchone()
print(f"Rows in my_table: {result[0]}")
return {'rows_processed': result[0] if result else 0}
defs = Definitions(
assets=[my_snowflake_asset],
resources={
"snowflake": SnowflakeResource(
account=EnvVar("SNOWFLAKE_ACCOUNT"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
database=EnvVar("SNOWFLAKE_DATABASE"),
schema=EnvVar("SNOWFLAKE_SCHEMA"),
warehouse=os.environ.get("SNOWFLAKE_WAREHOUSE", ""), # Optional
role=os.environ.get("SNOWFLAKE_ROLE", "") # Optional
)
},
)
# To run this locally, set the following environment variables:
# export SNOWFLAKE_ACCOUNT="your-account-identifier"
# export SNOWFLAKE_USER="your-username"
# export SNOWFLAKE_PASSWORD="your-password"
# export SNOWFLAKE_DATABASE="your-database"
# export SNOWFLAKE_SCHEMA="your-schema"
# (Optional) export SNOWFLAKE_WAREHOUSE="your-warehouse"
# (Optional) export SNOWFLAKE_ROLE="your-role"
# Then run `dagster dev -f your_file_name.py`