Prefect SQLAlchemy Integration

0.6.1 · active · verified Sun Apr 12

The `prefect-sqlalchemy` library provides integrations for working with databases within Prefect flows. It allows users to connect to various SQL databases using SQLAlchemy, enabling orchestration and observability of database operations. The library offers `SqlAlchemyConnector` and `AsyncSqlAlchemyConnector` for managing both synchronous and asynchronous database connections. This integration is actively maintained, with releases often aligning with the frequent release cadence of the core Prefect library.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to set up an `SqlAlchemyConnector` block, save it to Prefect, and then use it within Prefect tasks and a flow to interact with a SQLite database. It includes creating a table, inserting data, and fetching results. Remember to run the block saving part first, or configure your block via the Prefect UI, before executing the flow.

import os
from prefect import flow, task
from prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents, SyncDriver

@task
def setup_table(block_name: str) -> None:
    """Sets up a table and inserts data using the SQLAlchemy connector."""
    with SqlAlchemyConnector.load(block_name) as connector:
        connector.execute(
            "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
        )
        connector.execute(
            "INSERT INTO customers (name, address) VALUES (:name, :address);",
            parameters={"name": "Marvin", "address": "Highway 42"},
        )
        connector.execute_many(
            "INSERT INTO customers (name, address) VALUES (:name, :address);",
            seq_of_parameters=[
                {"name": "Ford", "address": "Highway 42"},
                {"name": "Unknown", "address": "Highway 42"},
            ],
        )

@task
def fetch_data(block_name: str) -> list:
    """Fetches all data from the customers table."""
    all_rows = []
    with SqlAlchemyConnector.load(block_name) as connector:
        while True:
            new_rows = connector.fetch_many("SELECT * FROM customers", size=2)
            if len(new_rows) == 0:
                break
            all_rows.extend(new_rows)
    return all_rows

@flow(name="SQLAlchemy Flow Example")
def sqlalchemy_flow(block_name: str) -> list:
    """Orchestrates database setup and data fetching."""
    setup_table(block_name)
    all_rows = fetch_data(block_name)
    return all_rows

if __name__ == "__main__":
    # --- Configuration and Block Saving (Run this once to create the block) ---
    # In a real scenario, use environment variables for sensitive info.
    # Replace 'my-sqlite-block' with your desired block name.
    # For SQLite, a file-based database is sufficient.
    sqlite_block_name = os.environ.get('PREFECT_SQL_BLOCK_NAME', 'my-sqlite-block')
    sqlite_db_path = os.environ.get('SQLITE_DB_PATH', 'prefect.db')

    # Create and save the connector block programmatically
    # (Alternatively, create it via the Prefect UI)
    connector = SqlAlchemyConnector(
        connection_info=ConnectionComponents(
            driver=SyncDriver.SQLITE_PYSQLITE,
            database=sqlite_db_path
        )
    )
    connector.save(sqlite_block_name)
    print(f"Saved SqlAlchemyConnector block as '{sqlite_block_name}' pointing to '{sqlite_db_path}'")
    print("You can now run the flow using this block.")

    # --- Running the Flow ---
    # Ensure the block 'my-sqlite-block' exists in your Prefect server/Cloud.
    results = sqlalchemy_flow(sqlite_block_name)
    print("Fetched Data:", results)

view raw JSON →