Prefect Snowflake Integration

0.28.7 · active · verified Wed Apr 15

The `prefect-snowflake` library provides Prefect 2.x integrations for interacting with Snowflake, enabling users to define flows that connect to Snowflake databases, execute queries, and manage data. It currently ships as version 0.28.7 and follows the Prefect ecosystem's release cadence, often aligning with Prefect core releases.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to create a `SnowflakeConnector` block programmatically, save it, load it, and use it within a Prefect flow to execute a Snowflake query. It uses environment variables for sensitive credentials and handles the synchronous nature of `snowflake-connector-python` within an async Prefect task.

import os
import asyncio
from prefect import flow, task
from prefect_snowflake.connections import SnowflakeConnector

@task
async def run_snowflake_query(connector_block: SnowflakeConnector, query: str):
    """
    Executes a Snowflake query using the provided connector block.
    """
    # snowflake-connector-python is synchronous, so it must be run in a thread pool
    # when used within an async Prefect task/flow.
    async with connector_block.get_connection() as conn:
        result = await conn.run_sync(lambda c: c.cursor().execute(query).fetchall())
        return result

@flow
async def snowflake_example_flow(query: str = "SELECT CURRENT_VERSION()"):
    """
    An example Prefect flow that connects to Snowflake and runs a query.
    """
    block_name = "my-snowflake-connector-agent"

    # Create and save a SnowflakeConnector block programmatically.
    # In a real-world scenario, you might load a pre-configured block
    # from the Prefect UI or directly pass connection details.
    connector = SnowflakeConnector(
        account=os.environ.get("SNOWFLAKE_ACCOUNT", "your_account_id"),
        user=os.environ.get("SNOWFLAKE_USER", "your_user"),
        password=os.environ.get("SNOWFLAKE_PASSWORD", "your_password"),
        warehouse=os.environ.get("SNOWFLAKE_WAREHOUSE", "your_warehouse"),
        database=os.environ.get("SNOWFLAKE_DATABASE", "your_database"),
        schema=os.environ.get("SNOWFLAKE_SCHEMA", "PUBLIC"),
        role=os.environ.get("SNOWFLAKE_ROLE", "PUBLIC"),
    )
    # Save the block; overwrite if it already exists from a previous run
    await connector.save(block_name, overwrite=True)

    # Load the block (even if just saved, this is how you'd normally load it)
    snowflake_connector_block = await SnowflakeConnector.load(block_name)

    # Run the query task
    result = await run_snowflake_query(snowflake_connector_block, query)
    print(f"Snowflake Query Result for '{query}': {result}")

if __name__ == "__main__":
    # To run this flow, ensure you have set the SNOWFLAKE_* environment variables
    # or replace the os.environ.get calls with actual credentials.
    # Example:
    # export SNOWFLAKE_ACCOUNT="YOUR_ACCOUNT.region.azure" (or just YOUR_ACCOUNT_NAME)
    # export SNOWFLAKE_USER="YOUR_USER"
    # export SNOWFLAKE_PASSWORD="YOUR_PASSWORD"
    # export SNOWFLAKE_WAREHOUSE="YOUR_WAREHOUSE"
    # export SNOWFLAKE_DATABASE="YOUR_DATABASE"
    # export SNOWFLAKE_SCHEMA="YOUR_SCHEMA"
    # export SNOWFLAKE_ROLE="YOUR_ROLE"

    # Run the flow (requires an event loop, so asyncio.run is appropriate for direct execution)
    asyncio.run(snowflake_example_flow())

view raw JSON →