{"id":5171,"library":"dagster-snowflake","title":"Dagster Snowflake","description":"The `dagster-snowflake` library provides an integration between Dagster and Snowflake, enabling data engineers to orchestrate, observe, and manage data pipelines that interact with the Snowflake AI Data Cloud. It allows connecting to Snowflake as a resource, building assets backed by Snowflake tables and views, and utilizing Snowflake as a data warehouse within Dagster pipelines. The library's current version is 0.29.0, released in conjunction with Dagster core, which typically has frequent updates.","status":"active","version":"0.29.0","language":"en","source_language":"en","source_url":"https://github.com/dagster-io/dagster/tree/master/python_modules/libraries/dagster-snowflake","tags":["data orchestration","etl","snowflake","data warehouse","dagster","sql"],"install":[{"cmd":"pip install dagster-snowflake","lang":"bash","label":"Base installation"},{"cmd":"pip install dagster-snowflake-pandas","lang":"bash","label":"With Pandas support (for DataFrame I/O)"},{"cmd":"pip install dagster-snowflake-pyspark","lang":"bash","label":"With PySpark support (for PySpark DataFrame I/O)"},{"cmd":"pip install dagster-snowflake-polars","lang":"bash","label":"With Polars support (beta)"}],"dependencies":[{"reason":"Core Dagster framework, required for all integrations.","package":"dagster"},{"reason":"Underlying Python driver for Snowflake connectivity.","package":"snowflake-connector-python","optional":false},{"reason":"Required for `dagster-snowflake-pandas` to handle Pandas DataFrames.","package":"pandas","optional":true},{"reason":"Required for `dagster-snowflake-pyspark` to handle PySpark DataFrames.","package":"pyspark","optional":true},{"reason":"Required for `dagster-snowflake-polars` to handle Polars DataFrames.","package":"polars","optional":true},{"reason":"Requires Python version <3.15 and >=3.10.","package":"python","optional":false}],"imports":[{"symbol":"SnowflakeResource","correct":"from dagster_snowflake import SnowflakeResource"},{"symbol":"SnowflakePandasIOManager","correct":"from dagster_snowflake_pandas import SnowflakePandasIOManager"},{"symbol":"SnowflakePySparkIOManager","correct":"from dagster_snowflake_pyspark import SnowflakePySparkIOManager"},{"note":"This is a legacy API for constructing IO managers; newer patterns often use `SnowflakePandasIOManager` or `SnowflakePySparkIOManager` directly.","wrong":"from dagster_snowflake_pandas import build_snowflake_io_manager","symbol":"build_snowflake_io_manager","correct":"from dagster_snowflake import build_snowflake_io_manager"}],"quickstart":{"code":"import os\nfrom dagster import asset, Definitions, EnvVar\nfrom dagster_snowflake import SnowflakeResource\n\n@asset\ndef my_snowflake_asset(snowflake: SnowflakeResource):\n    \"\"\"An example Dagster asset that interacts with Snowflake.\"\"\"\n    with snowflake.get_connection() as conn:\n        cursor = conn.cursor()\n        # Example: Create a table if it doesn't exist and insert data\n        cursor.execute(\n            \"\"\"\n            CREATE TABLE IF NOT EXISTS my_database.my_schema.my_table (\n                id INT,\n                name VARCHAR\n            );\"\n        )\n        cursor.execute(\"INSERT INTO my_database.my_schema.my_table (id, name) VALUES (1, 'Alice');\")\n        cursor.execute(\"INSERT INTO my_database.my_schema.my_table (id, name) VALUES (2, 'Bob');\")\n        result = cursor.execute(\"SELECT COUNT(*) FROM my_database.my_table\").fetchone()\n        print(f\"Rows in my_table: {result[0]}\")\n    return {'rows_processed': result[0] if result else 0}\n\ndefs = Definitions(\n    assets=[my_snowflake_asset],\n    resources={\n        \"snowflake\": SnowflakeResource(\n            account=EnvVar(\"SNOWFLAKE_ACCOUNT\"),\n            user=EnvVar(\"SNOWFLAKE_USER\"),\n            password=EnvVar(\"SNOWFLAKE_PASSWORD\"),\n            database=EnvVar(\"SNOWFLAKE_DATABASE\"),\n            schema=EnvVar(\"SNOWFLAKE_SCHEMA\"),\n            warehouse=os.environ.get(\"SNOWFLAKE_WAREHOUSE\", \"\"), # Optional\n            role=os.environ.get(\"SNOWFLAKE_ROLE\", \"\") # Optional\n        )\n    },\n)\n\n# To run this locally, set the following environment variables:\n# export SNOWFLAKE_ACCOUNT=\"your-account-identifier\"\n# export SNOWFLAKE_USER=\"your-username\"\n# export SNOWFLAKE_PASSWORD=\"your-password\"\n# export SNOWFLAKE_DATABASE=\"your-database\"\n# export SNOWFLAKE_SCHEMA=\"your-schema\"\n# (Optional) export SNOWFLAKE_WAREHOUSE=\"your-warehouse\"\n# (Optional) export SNOWFLAKE_ROLE=\"your-role\"\n# Then run `dagster dev -f your_file_name.py`","lang":"python","description":"This quickstart demonstrates how to define a Dagster asset that connects to Snowflake using `SnowflakeResource`. It uses environment variables for secure credential management and performs a simple SQL operation (table creation and data insertion) within the asset's compute function."},"warnings":[{"fix":"Upgrade to Python 3.10, 3.11, 3.12, or 3.13 and ensure all Dagster and `dagster-snowflake` packages are compatible with your chosen Python version. The library currently requires Python <3.15, >=3.10.","message":"Python 3.6 and 3.8 are no longer supported. `dagster-snowflake` dropped Python 3.6 support due to its underlying `snowflake-connector-python` dependency, and Dagster core no longer supports Python 3.8 (EOL 2024-10-07).","severity":"breaking","affected_versions":"<=0.22.x for Py3.6, <=1.12.x for Py3.8"},{"fix":"Ensure all timestamp data in Pandas DataFrames is timezone-aware (e.g., using `.dt.tz_localize('UTC')` or `.dt.tz_convert('UTC')`) before writing to Snowflake via `dagster-snowflake-pandas`.","message":"When loading Pandas DataFrames with timestamp columns to Snowflake, `snowflake-connector-python` (v3.5.0+) may cause data corruption if timestamps are not timezone-aware. The `SnowflakePandasIOManager` attempts to mitigate this by assigning UTC if no timezone is present.","severity":"gotcha","affected_versions":"snowflake-connector-python>=3.5.0"},{"fix":"Utilize Snowflake's query profiler to optimize SQL queries within your Dagster assets. Implement efficient warehouse usage, consider connection pooling, and separate schemas for different data stages (raw, staging, production). Dagster provides observability to help identify bottlenecks.","message":"Snowflake costs and query performance can be significant issues if not managed correctly. Common anti-patterns include full table scans, inefficient joins, or using Snowflake for high-concurrency, low-latency applications.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Carefully configure schema-level future grants (e.g., `GRANT SELECT ON FUTURE TABLES IN SCHEMA ... TO ROLE ...`) in Snowflake to ensure consistent access when tables are re-materialized or recreated.","message":"Schema-level permissions with Snowflake future grants can be complex, especially when dbt/Dagster recreates tables. Database-level future grants may not be sufficient, leading to lost SELECT access for reporting roles.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Use `EnvVar` (e.g., `EnvVar(\"SNOWFLAKE_PASSWORD\")`) within Dagster resource configurations to retrieve credentials from environment variables. For more advanced setups, integrate with a dedicated secret management system.","message":"Sensitive credentials (account, user, password, private key) for Snowflake should always be managed securely using environment variables or a secrets manager, not hardcoded directly in code.","severity":"gotcha","affected_versions":"All versions"},{"fix":"For new projects or refactoring, prefer using explicit I/O manager classes like `SnowflakePandasIOManager`, `SnowflakePySparkIOManager`, or `SnowflakePolarsIOManager` directly within your `Definitions` resources.","message":"The `build_snowflake_io_manager` function is considered a legacy API for constructing I/O managers.","severity":"deprecated","affected_versions":"All versions, but more pronounced with newer Dagster releases."}],"env_vars":null,"last_verified":"2026-04-13T00:00:00.000Z","next_check":"2026-07-12T00:00:00.000Z"}