{"id":6775,"library":"prefect-snowflake","title":"Prefect Snowflake Integration","description":"The `prefect-snowflake` library provides Prefect 2.x integrations for interacting with Snowflake, enabling users to define flows that connect to Snowflake databases, execute queries, and manage data. It currently ships as version 0.28.7 and follows the Prefect ecosystem's release cadence, often aligning with Prefect core releases.","status":"active","version":"0.28.7","language":"en","source_language":"en","source_url":"https://github.com/PrefectHQ/prefect/tree/main/src/integrations/prefect-snowflake","tags":["prefect","snowflake","data-warehousing","elt","database"],"install":[{"cmd":"pip install prefect-snowflake","lang":"bash","label":"Install prefect-snowflake"}],"dependencies":[{"reason":"This is a Prefect integration and requires Prefect core (2.x) to function.","package":"prefect","optional":false},{"reason":"This is the underlying Python driver used to connect to Snowflake.","package":"snowflake-connector-python","optional":false}],"imports":[{"note":"As of recent versions, `SnowflakeConnector` has moved to `prefect_snowflake.connections` module from `prefect_snowflake.blocks`.","wrong":"from prefect_snowflake.blocks import SnowflakeConnector","symbol":"SnowflakeConnector","correct":"from prefect_snowflake.connections import SnowflakeConnector"}],"quickstart":{"code":"import os\nimport asyncio\nfrom prefect import flow, task\nfrom prefect_snowflake.connections import SnowflakeConnector\n\n@task\nasync def run_snowflake_query(connector_block: SnowflakeConnector, query: str):\n    \"\"\"\n    Executes a Snowflake query using the provided connector block.\n    \"\"\"\n    # snowflake-connector-python is synchronous, so it must be run in a thread pool\n    # when used within an async Prefect task/flow.\n    async with connector_block.get_connection() as conn:\n        result = await conn.run_sync(lambda c: c.cursor().execute(query).fetchall())\n        return result\n\n@flow\nasync def snowflake_example_flow(query: str = \"SELECT CURRENT_VERSION()\"):\n    \"\"\"\n    An example Prefect flow that connects to Snowflake and runs a query.\n    \"\"\"\n    block_name = \"my-snowflake-connector-agent\"\n\n    # Create and save a SnowflakeConnector block programmatically.\n    # In a real-world scenario, you might load a pre-configured block\n    # from the Prefect UI or directly pass connection details.\n    connector = SnowflakeConnector(\n        account=os.environ.get(\"SNOWFLAKE_ACCOUNT\", \"your_account_id\"),\n        user=os.environ.get(\"SNOWFLAKE_USER\", \"your_user\"),\n        password=os.environ.get(\"SNOWFLAKE_PASSWORD\", \"your_password\"),\n        warehouse=os.environ.get(\"SNOWFLAKE_WAREHOUSE\", \"your_warehouse\"),\n        database=os.environ.get(\"SNOWFLAKE_DATABASE\", \"your_database\"),\n        schema=os.environ.get(\"SNOWFLAKE_SCHEMA\", \"PUBLIC\"),\n        role=os.environ.get(\"SNOWFLAKE_ROLE\", \"PUBLIC\"),\n    )\n    # Save the block; overwrite if it already exists from a previous run\n    await connector.save(block_name, overwrite=True)\n\n    # Load the block (even if just saved, this is how you'd normally load it)\n    snowflake_connector_block = await SnowflakeConnector.load(block_name)\n\n    # Run the query task\n    result = await run_snowflake_query(snowflake_connector_block, query)\n    print(f\"Snowflake Query Result for '{query}': {result}\")\n\nif __name__ == \"__main__\":\n    # To run this flow, ensure you have set the SNOWFLAKE_* environment variables\n    # or replace the os.environ.get calls with actual credentials.\n    # Example:\n    # export SNOWFLAKE_ACCOUNT=\"YOUR_ACCOUNT.region.azure\" (or just YOUR_ACCOUNT_NAME)\n    # export SNOWFLAKE_USER=\"YOUR_USER\"\n    # export SNOWFLAKE_PASSWORD=\"YOUR_PASSWORD\"\n    # export SNOWFLAKE_WAREHOUSE=\"YOUR_WAREHOUSE\"\n    # export SNOWFLAKE_DATABASE=\"YOUR_DATABASE\"\n    # export SNOWFLAKE_SCHEMA=\"YOUR_SCHEMA\"\n    # export SNOWFLAKE_ROLE=\"YOUR_ROLE\"\n\n    # Run the flow (requires an event loop, so asyncio.run is appropriate for direct execution)\n    asyncio.run(snowflake_example_flow())\n","lang":"python","description":"This quickstart demonstrates how to create a `SnowflakeConnector` block programmatically, save it, load it, and use it within a Prefect flow to execute a Snowflake query. It uses environment variables for sensitive credentials and handles the synchronous nature of `snowflake-connector-python` within an async Prefect task."},"warnings":[{"fix":"Use `await conn.run_sync(...)` when performing synchronous Snowflake operations within an `async` Prefect task, or decorate synchronous tasks with `@task(log_prints=True)` which often handles this implicitly for simple cases.","message":"The underlying `snowflake-connector-python` library is synchronous. When used within Prefect's async tasks and flows, it requires explicit handling (e.g., using `connection.run_sync` or wrapping synchronous code in `sync_compatible` or `task_run_sync`) to avoid blocking the event loop.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Ensure `await SnowflakeConnector(...).save('block-name', overwrite=True)` is called at least once, or that the block is configured in your Prefect environment, before attempting `await SnowflakeConnector.load('block-name')`.","message":"Prefect integrations often rely on Prefect Blocks to manage credentials and configuration. You must create and save a `SnowflakeConnector` block (either programmatically as in the quickstart, or via the Prefect UI/CLI) before you can load and use it in your flows.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Ensure your project is using Prefect 2.x (`prefect>=2.0`) and that your code adheres to Prefect 2.x patterns (e.g., `@flow`, `@task` decorators, async by default).","message":"`prefect-snowflake` is an integration for Prefect 2.x. It is not compatible with Prefect 1.x flows and APIs.","severity":"breaking","affected_versions":"All versions"},{"fix":"Refer to the official Snowflake documentation for your specific account identifier format, or experiment with adding region/cloud provider suffixes (e.g., `.aws`, `.azure`, `.gcp`) if a simple account name doesn't work.","message":"The `account` parameter for `SnowflakeConnector` can sometimes be tricky, requiring specific formats depending on your Snowflake region and cloud provider (e.g., `account_identifier.region.cloud_provider` or just `account_identifier`).","severity":"gotcha","affected_versions":"All versions"}],"env_vars":null,"last_verified":"2026-04-15T00:00:00.000Z","next_check":"2026-07-14T00:00:00.000Z","problems":[]}