{"id":4691,"library":"prefect-sqlalchemy","title":"Prefect SQLAlchemy Integration","description":"The `prefect-sqlalchemy` library provides integrations for working with databases within Prefect flows. It allows users to connect to various SQL databases using SQLAlchemy, enabling orchestration and observability of database operations. The library offers `SqlAlchemyConnector` and `AsyncSqlAlchemyConnector` for managing both synchronous and asynchronous database connections. This integration is actively maintained, with releases often aligning with the frequent release cadence of the core Prefect library.","status":"active","version":"0.6.1","language":"en","source_language":"en","source_url":"https://github.com/PrefectHQ/prefect/tree/main/src/integrations/prefect-sqlalchemy","tags":["prefect","sqlalchemy","database","etl","workflow","integration","orm"],"install":[{"cmd":"pip install prefect-sqlalchemy","lang":"bash","label":"Basic Installation"},{"cmd":"pip install \"prefect[sqlalchemy]\"","lang":"bash","label":"Install with Prefect (includes core Prefect)"}],"dependencies":[{"reason":"This is a Prefect integration and requires Prefect for flow orchestration.","package":"prefect","optional":false},{"reason":"Core library for database interaction.","package":"SQLAlchemy","optional":false},{"reason":"Specific database drivers (e.g., `psycopg2-binary` for PostgreSQL, `aiosqlite` for async SQLite) must be installed separately based on the database being used.","package":"database-driver","optional":true}],"imports":[{"symbol":"SqlAlchemyConnector","correct":"from prefect_sqlalchemy import SqlAlchemyConnector"},{"symbol":"AsyncSqlAlchemyConnector","correct":"from prefect_sqlalchemy import AsyncSqlAlchemyConnector"},{"symbol":"ConnectionComponents","correct":"from prefect_sqlalchemy import ConnectionComponents"},{"symbol":"SyncDriver","correct":"from prefect_sqlalchemy import SyncDriver"},{"symbol":"AsyncDriver","correct":"from prefect_sqlalchemy import AsyncDriver"},{"note":"The `DatabaseCredentials` block has been replaced by `SqlAlchemyConnector` in recent versions.","wrong":"from prefect_sqlalchemy import DatabaseCredentials","symbol":"DatabaseCredentials","correct":"from prefect_sqlalchemy import SqlAlchemyConnector"}],"quickstart":{"code":"import os\nfrom prefect import flow, task\nfrom prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents, SyncDriver\n\n@task\ndef setup_table(block_name: str) -> None:\n    \"\"\"Sets up a table and inserts data using the SQLAlchemy connector.\"\"\"\n    with SqlAlchemyConnector.load(block_name) as connector:\n        connector.execute(\n            \"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);\"\n        )\n        connector.execute(\n            \"INSERT INTO customers (name, address) VALUES (:name, :address);\",\n            parameters={\"name\": \"Marvin\", \"address\": \"Highway 42\"},\n        )\n        connector.execute_many(\n            \"INSERT INTO customers (name, address) VALUES (:name, :address);\",\n            seq_of_parameters=[\n                {\"name\": \"Ford\", \"address\": \"Highway 42\"},\n                {\"name\": \"Unknown\", \"address\": \"Highway 42\"},\n            ],\n        )\n\n@task\ndef fetch_data(block_name: str) -> list:\n    \"\"\"Fetches all data from the customers table.\"\"\"\n    all_rows = []\n    with SqlAlchemyConnector.load(block_name) as connector:\n        while True:\n            new_rows = connector.fetch_many(\"SELECT * FROM customers\", size=2)\n            if len(new_rows) == 0:\n                break\n            all_rows.extend(new_rows)\n    return all_rows\n\n@flow(name=\"SQLAlchemy Flow Example\")\ndef sqlalchemy_flow(block_name: str) -> list:\n    \"\"\"Orchestrates database setup and data fetching.\"\"\"\n    setup_table(block_name)\n    all_rows = fetch_data(block_name)\n    return all_rows\n\nif __name__ == \"__main__\":\n    # --- Configuration and Block Saving (Run this once to create the block) ---\n    # In a real scenario, use environment variables for sensitive info.\n    # Replace 'my-sqlite-block' with your desired block name.\n    # For SQLite, a file-based database is sufficient.\n    sqlite_block_name = os.environ.get('PREFECT_SQL_BLOCK_NAME', 'my-sqlite-block')\n    sqlite_db_path = os.environ.get('SQLITE_DB_PATH', 'prefect.db')\n\n    # Create and save the connector block programmatically\n    # (Alternatively, create it via the Prefect UI)\n    connector = SqlAlchemyConnector(\n        connection_info=ConnectionComponents(\n            driver=SyncDriver.SQLITE_PYSQLITE,\n            database=sqlite_db_path\n        )\n    )\n    connector.save(sqlite_block_name)\n    print(f\"Saved SqlAlchemyConnector block as '{sqlite_block_name}' pointing to '{sqlite_db_path}'\")\n    print(\"You can now run the flow using this block.\")\n\n    # --- Running the Flow ---\n    # Ensure the block 'my-sqlite-block' exists in your Prefect server/Cloud.\n    results = sqlalchemy_flow(sqlite_block_name)\n    print(\"Fetched Data:\", results)\n","lang":"python","description":"This quickstart demonstrates how to set up an `SqlAlchemyConnector` block, save it to Prefect, and then use it within Prefect tasks and a flow to interact with a SQLite database. It includes creating a table, inserting data, and fetching results. Remember to run the block saving part first, or configure your block via the Prefect UI, before executing the flow."},"warnings":[{"fix":"Upgrade `prefect-sqlalchemy` to 0.4.0 or higher. Review SQLAlchemy 2.0 migration guides and update database interaction code accordingly, especially for connection and query execution.","message":"SQLAlchemy 2.0 introduced significant API changes compared to previous 1.x versions. If you are upgrading your `prefect-sqlalchemy` integration or `SQLAlchemy` itself, ensure your database interaction code is compatible with SQLAlchemy 2.x patterns. `prefect-sqlalchemy` version 0.4.0 and later added explicit support for SQLAlchemy 2.x.","severity":"breaking","affected_versions":"prefect-sqlalchemy < 0.4.0 with SQLAlchemy >= 2.0"},{"fix":"Install the necessary database driver using `pip install <driver_package>` (e.g., `pip install psycopg2-binary`) alongside `prefect-sqlalchemy`.","message":"Database drivers are not included: `prefect-sqlalchemy` provides the integration layer but does not bundle specific database drivers (e.g., `psycopg2-binary` for PostgreSQL, `aiosqlite` for async SQLite, `pymysql` for MySQL). You must install the appropriate driver(s) for your target database(s) separately.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Load the `SqlAlchemyConnector` block inside the task or flow where it is being used, or ensure its lifecycle is properly managed if shared. Using it as a context manager (`with connector: ...`) is the recommended pattern to ensure proper resource closure.","message":"Block state management: It is generally recommended to load and consume an `SqlAlchemyConnector` (or `AsyncSqlAlchemyConnector`) within the scope of a single task or flow. Passing the connector instance across separate tasks or flows might lead to loss of connection/cursor state and unexpected behavior.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Before calling `SqlAlchemyConnector.load('your-block-name')`, ensure you have saved a block with that name using `connector_instance.save('your-block-name')` or by configuring it through the Prefect UI.","message":"Using `SqlAlchemyConnector.load()` requires a pre-existing block document. If you attempt to load a block that hasn't been saved yet (either programmatically or via the Prefect UI), it will result in an error.","severity":"gotcha","affected_versions":"All versions"}],"env_vars":null,"last_verified":"2026-04-12T00:00:00.000Z","next_check":"2026-07-11T00:00:00.000Z"}