Dagster Snowflake Pandas Integration
The `dagster-snowflake-pandas` library provides a robust integration for using Pandas DataFrames with Snowflake within the Dagster data orchestration framework. It enables reading and writing Pandas DataFrames directly to and from Snowflake tables via Dagster's I/O manager system. This package is currently at version 0.29.0 and typically aligns its release cadence with the main Dagster core library.
Common errors
-
DagsterInvariantViolationError: Snowflake I/O manager configured to convert time data in DataFrame column 'my_timestamp_column' to strings, but the corresponding MY_TIMESTAMP_COLUMN column in table 'MY_TABLE' is not of type VARCHAR, it is of type TIMESTAMP.
cause The `SnowflakePandasIOManager` was configured to convert Pandas timestamp columns to strings (e.g., by default or `store_timestamps_as_strings=True`), but the target column in Snowflake already exists and is defined as a `TIMESTAMP` type.fixTo resolve this, either drop and recreate the Snowflake table with the timestamp column as `VARCHAR`, or set `store_timestamps_as_strings=False` in your `SnowflakePandasIOManager` configuration to allow Dagster to write Pandas timestamps directly to Snowflake `TIMESTAMP` columns. Ensure Pandas DataFrames have explicit timezones if storing as `TIMESTAMP` to avoid potential data corruption. -
snowflake.connector.errors.ProgrammingError: SQL compilation error: invalid identifier '5_STARS'
cause Your Pandas DataFrame contains column names that are not valid Snowflake identifiers (e.g., they start with a number or contain spaces/special characters that require quoting). Recent versions of `SnowflakePandasIOManager` explicitly set `quote_identifiers=False` when writing data, which means invalid names will cause SQL errors.fixRename your Pandas DataFrame columns to be valid Snowflake identifiers before returning the DataFrame from your asset function. Valid identifiers typically start with a letter or underscore and contain only alphanumeric characters and underscores (e.g., change `5_stars` to `_5_stars` or `five_stars`). -
ModuleNotFoundError: No module named 'dagster_snowflake_pandas'
cause The `dagster-snowflake-pandas` library has not been installed in your Python environment or the active environment is not the one where it was installed.fixInstall the library using `pip install dagster-snowflake-pandas`. If in a virtual environment, ensure it is activated. If using a dependency manager like `pip-tools` or `Poetry`, ensure it's added to your project's dependencies and installed correctly.
Warnings
- gotcha Handling of Pandas timestamp data in Snowflake can be problematic. The underlying `snowflake-connector-python` may corrupt timestamp data without timezones or convert non-UTC timestamps to UTC. `dagster-snowflake-pandas` attempts to mitigate this by assigning UTC by default or converting to strings if `store_timestamps_as_strings=True` is configured, but this can lead to unexpected type changes or data loss if not carefully managed.
- breaking Beginning with Dagster core versions around 1.6.x (and corresponding `dagster-snowflake-pandas` versions), the `SnowflakePandasIOManager` changed its behavior regarding column identifiers. It now explicitly sets `quote_identifiers=False` when writing to Snowflake. This can cause `SQL compilation error: invalid identifier` if your Pandas DataFrame column names are not valid Snowflake identifiers (e.g., contain spaces, start with numbers).
- gotcha `dagster-snowflake-pandas` releases are tightly coupled with `dagster` core releases. For example, `dagster-snowflake-pandas==0.29.0` is released alongside `dagster==1.13.0`. Mismatched versions between the core framework and libraries can lead to unexpected behavior or runtime errors.
Install
-
pip install dagster-snowflake-pandas
Imports
- SnowflakePandasIOManager
from dagster_snowflake_pandas import SnowflakePandasIOManager
Quickstart
import pandas as pd
import os
from dagster import asset, Definitions, EnvVar, Config
from dagster_snowflake_pandas import SnowflakePandasIOManager
@asset
def my_pandas_table() -> pd.DataFrame:
# Example: Create a simple Pandas DataFrame
data = {'col1': [1, 2], 'col2': ['A', 'B']}
df = pd.DataFrame(data)
return df
@asset
def downstream_asset(my_pandas_table: pd.DataFrame):
# Example: Use the DataFrame loaded from Snowflake
print(f"Loaded DataFrame from Snowflake:\n{my_pandas_table}")
return len(my_pandas_table)
class SnowflakeConfig(Config):
account: str
user: str
password: str
database: str
schema: str = "public"
warehouse: str = "compute_wh"
defs = Definitions(
assets=[my_pandas_table, downstream_asset],
resources={
"io_manager": SnowflakePandasIOManager(
account=EnvVar("SNOWFLAKE_ACCOUNT"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
database=EnvVar("SNOWFLAKE_DATABASE"),
schema=EnvVar("SNOWFLAKE_SCHEMA", "public"),
warehouse=EnvVar("SNOWFLAKE_WAREHOUSE", "compute_wh"),
)
},
)
# To run this locally, set the following environment variables:
# os.environ["SNOWFLAKE_ACCOUNT"] = "your_account_identifier"
# os.environ["SNOWFLAKE_USER"] = "your_username"
# os.environ["SNOWFLAKE_PASSWORD"] = "your_password"
# os.environ["SNOWFLAKE_DATABASE"] = "your_database"
# os.environ["SNOWFLAKE_SCHEMA"] = "your_schema" # Optional, defaults to 'public'
# os.environ["SNOWFLAKE_WAREHOUSE"] = "your_warehouse" # Optional, defaults to 'compute_wh'
# Example of how you might test this (not runnable as a single script without dagster dev/launch_assets):
# from dagster import materialize
# if __name__ == "__main__":
# result = materialize([my_pandas_table, downstream_asset], resources=defs.resources)
# assert result.success