Apache Airflow Snowflake Provider
The `apache-airflow-providers-snowflake` library is an official Apache Airflow provider package that enables seamless interaction with Snowflake Data Cloud from Airflow DAGs. It includes hooks, operators, and transfers for executing SQL queries, managing data, and leveraging Snowflake-specific features. The current version is 6.12.0, and it follows the Airflow provider release cadence, with frequent updates introducing new features and bug fixes.
Warnings
- breaking The minimum supported Apache Airflow version for `apache-airflow-providers-snowflake` 6.12.0 is 2.11.0. Older provider versions had different Airflow minimums (e.g., 2.1.0+, 2.2.0+, 2.3.0+). Ensure your Airflow installation meets this requirement to avoid compatibility issues.
- breaking In provider versions 4.x and above, the `SnowflakeHook`'s `run` method now conforms to the `DBApiHook` semantics, returning a sequence of sequences (DbApi-compatible results) instead of a dictionary of { 'column': 'value' }. This change affects how results are processed.
- breaking As of provider version 6.3.0, the `private_key_content` field in Snowflake connections using key-pair authentication is expected to be a base64 encoded string. Existing connections with unencoded private key content will break.
- gotcha When upgrading, ensure `snowflake-connector-python` and `snowflake-sqlalchemy` versions are compatible with your `apache-airflow-providers-snowflake` and `apache-airflow` versions. Conflicts can lead to `ModuleNotFoundError` or unexpected behavior (e.g., `sqlalchemy.sql.roles` issues with `snowflake-sqlalchemy==1.2.5`).
- gotcha If you encounter 'No module named 'Snowflake'' errors, especially when a `snowflake.py` file is present in your DAGs folder or a conflicting location, it might be due to a Python import path conflict where your local file shadows the installed provider module.
- deprecated In versions 6.x and later, all deprecated classes, parameters, and features have been removed from the Snowflake provider package. This includes removal of `apply_default` decorator.
- gotcha The `SnowflakeOperator`'s `autocommit` parameter defaults to `True`. However, if you explicitly want autocommit behavior, or are migrating from very old versions, be aware of changes in `common.sql` provider that might affect `autocommit` behavior in some contexts. It is recommended to use `SQLExecuteQueryOperator` for better control over transactions for complex SQL.
Install
-
pip install apache-airflow-providers-snowflake
Imports
- SnowflakeOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
- SnowflakeHook
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
- S3ToSnowflakeOperator
from airflow.providers.snowflake.transfers.s3_to_snowflake import S3ToSnowflakeOperator
Quickstart
from __future__ import annotations
import os
import pendulum
from airflow.models.dag import DAG
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
SNOWFLAKE_CONN_ID = os.environ.get('SNOWFLAKE_CONN_ID', 'snowflake_default')
with DAG(
dag_id='snowflake_quickstart_example',
start_date=pendulum.datetime(2023, 1, 1, tz='UTC'),
catchup=False,
schedule=None,
tags=['snowflake', 'example'],
doc_md="""### Snowflake Quickstart DAG
This DAG demonstrates a basic interaction with Snowflake using the SnowflakeOperator.
It creates a table, inserts data, and then queries the table.
""",
) as dag:
create_table = SnowflakeOperator(
task_id='create_snowflake_table',
snowflake_conn_id=SNOWFLAKE_CONN_ID,
sql=(
"CREATE TABLE IF NOT EXISTS AIRFLOW_TEST_TABLE (id INTEGER, name VARCHAR);"
),
)
insert_data = SnowflakeOperator(
task_id='insert_data_into_table',
snowflake_conn_id=SNOWFLAKE_CONN_ID,
sql="INSERT INTO AIRFLOW_TEST_TABLE (id, name) VALUES (1, 'Airflow'), (2, 'Snowflake');",
)
query_data = SnowflakeOperator(
task_id='query_snowflake_table',
snowflake_conn_id=SNOWFLAKE_CONN_ID,
sql="SELECT * FROM AIRFLOW_TEST_TABLE;",
handler=lambda cursor: [print(row) for row in cursor.fetchall()],
)
# Define task dependencies
create_table >> insert_data >> query_data