Prefect Snowflake Integration
The `prefect-snowflake` library provides Prefect 2.x integrations for interacting with Snowflake, enabling users to define flows that connect to Snowflake databases, execute queries, and manage data. It currently ships as version 0.28.7 and follows the Prefect ecosystem's release cadence, often aligning with Prefect core releases.
Warnings
- gotcha The underlying `snowflake-connector-python` library is synchronous. When used within Prefect's async tasks and flows, it requires explicit handling (e.g., using `connection.run_sync` or wrapping synchronous code in `sync_compatible` or `task_run_sync`) to avoid blocking the event loop.
- gotcha Prefect integrations often rely on Prefect Blocks to manage credentials and configuration. You must create and save a `SnowflakeConnector` block (either programmatically as in the quickstart, or via the Prefect UI/CLI) before you can load and use it in your flows.
- breaking `prefect-snowflake` is an integration for Prefect 2.x. It is not compatible with Prefect 1.x flows and APIs.
- gotcha The `account` parameter for `SnowflakeConnector` can sometimes be tricky, requiring specific formats depending on your Snowflake region and cloud provider (e.g., `account_identifier.region.cloud_provider` or just `account_identifier`).
Install
-
pip install prefect-snowflake
Imports
- SnowflakeConnector
from prefect_snowflake.blocks import SnowflakeConnector
from prefect_snowflake.connections import SnowflakeConnector
Quickstart
import os
import asyncio
from prefect import flow, task
from prefect_snowflake.connections import SnowflakeConnector
@task
async def run_snowflake_query(connector_block: SnowflakeConnector, query: str):
"""
Executes a Snowflake query using the provided connector block.
"""
# snowflake-connector-python is synchronous, so it must be run in a thread pool
# when used within an async Prefect task/flow.
async with connector_block.get_connection() as conn:
result = await conn.run_sync(lambda c: c.cursor().execute(query).fetchall())
return result
@flow
async def snowflake_example_flow(query: str = "SELECT CURRENT_VERSION()"):
"""
An example Prefect flow that connects to Snowflake and runs a query.
"""
block_name = "my-snowflake-connector-agent"
# Create and save a SnowflakeConnector block programmatically.
# In a real-world scenario, you might load a pre-configured block
# from the Prefect UI or directly pass connection details.
connector = SnowflakeConnector(
account=os.environ.get("SNOWFLAKE_ACCOUNT", "your_account_id"),
user=os.environ.get("SNOWFLAKE_USER", "your_user"),
password=os.environ.get("SNOWFLAKE_PASSWORD", "your_password"),
warehouse=os.environ.get("SNOWFLAKE_WAREHOUSE", "your_warehouse"),
database=os.environ.get("SNOWFLAKE_DATABASE", "your_database"),
schema=os.environ.get("SNOWFLAKE_SCHEMA", "PUBLIC"),
role=os.environ.get("SNOWFLAKE_ROLE", "PUBLIC"),
)
# Save the block; overwrite if it already exists from a previous run
await connector.save(block_name, overwrite=True)
# Load the block (even if just saved, this is how you'd normally load it)
snowflake_connector_block = await SnowflakeConnector.load(block_name)
# Run the query task
result = await run_snowflake_query(snowflake_connector_block, query)
print(f"Snowflake Query Result for '{query}': {result}")
if __name__ == "__main__":
# To run this flow, ensure you have set the SNOWFLAKE_* environment variables
# or replace the os.environ.get calls with actual credentials.
# Example:
# export SNOWFLAKE_ACCOUNT="YOUR_ACCOUNT.region.azure" (or just YOUR_ACCOUNT_NAME)
# export SNOWFLAKE_USER="YOUR_USER"
# export SNOWFLAKE_PASSWORD="YOUR_PASSWORD"
# export SNOWFLAKE_WAREHOUSE="YOUR_WAREHOUSE"
# export SNOWFLAKE_DATABASE="YOUR_DATABASE"
# export SNOWFLAKE_SCHEMA="YOUR_SCHEMA"
# export SNOWFLAKE_ROLE="YOUR_ROLE"
# Run the flow (requires an event loop, so asyncio.run is appropriate for direct execution)
asyncio.run(snowflake_example_flow())