Apache Airflow Snowflake Provider

6.12.0 · active · verified Sun Mar 29

The `apache-airflow-providers-snowflake` library is an official Apache Airflow provider package that enables seamless interaction with Snowflake Data Cloud from Airflow DAGs. It includes hooks, operators, and transfers for executing SQL queries, managing data, and leveraging Snowflake-specific features. The current version is 6.12.0, and it follows the Airflow provider release cadence, with frequent updates introducing new features and bug fixes.

Warnings

Install

Imports

Quickstart

This quickstart DAG demonstrates how to create a Snowflake connection in Airflow and use the `SnowflakeOperator` to execute SQL commands. It first defines a DAG, then creates a table, inserts sample data, and finally queries the data, printing the results to the task logs. Ensure your Airflow environment has a Snowflake connection named `snowflake_default` (or the value of `SNOWFLAKE_CONN_ID` environment variable) configured with appropriate credentials (e.g., account, login, password, warehouse, database, schema).

from __future__ import annotations
import os
import pendulum

from airflow.models.dag import DAG
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator

SNOWFLAKE_CONN_ID = os.environ.get('SNOWFLAKE_CONN_ID', 'snowflake_default')

with DAG(
    dag_id='snowflake_quickstart_example',
    start_date=pendulum.datetime(2023, 1, 1, tz='UTC'),
    catchup=False,
    schedule=None,
    tags=['snowflake', 'example'],
    doc_md="""### Snowflake Quickstart DAG
    This DAG demonstrates a basic interaction with Snowflake using the SnowflakeOperator.
    It creates a table, inserts data, and then queries the table.
    """,
) as dag:
    create_table = SnowflakeOperator(
        task_id='create_snowflake_table',
        snowflake_conn_id=SNOWFLAKE_CONN_ID,
        sql=(
            "CREATE TABLE IF NOT EXISTS AIRFLOW_TEST_TABLE (id INTEGER, name VARCHAR);"
        ),
    )

    insert_data = SnowflakeOperator(
        task_id='insert_data_into_table',
        snowflake_conn_id=SNOWFLAKE_CONN_ID,
        sql="INSERT INTO AIRFLOW_TEST_TABLE (id, name) VALUES (1, 'Airflow'), (2, 'Snowflake');",
    )

    query_data = SnowflakeOperator(
        task_id='query_snowflake_table',
        snowflake_conn_id=SNOWFLAKE_CONN_ID,
        sql="SELECT * FROM AIRFLOW_TEST_TABLE;",
        handler=lambda cursor: [print(row) for row in cursor.fetchall()],
    )

    # Define task dependencies
    create_table >> insert_data >> query_data

view raw JSON →