Prefect SQLAlchemy Integration
The `prefect-sqlalchemy` library provides integrations for working with databases within Prefect flows. It allows users to connect to various SQL databases using SQLAlchemy, enabling orchestration and observability of database operations. The library offers `SqlAlchemyConnector` and `AsyncSqlAlchemyConnector` for managing both synchronous and asynchronous database connections. This integration is actively maintained, with releases often aligning with the frequent release cadence of the core Prefect library.
Warnings
- breaking SQLAlchemy 2.0 introduced significant API changes compared to previous 1.x versions. If you are upgrading your `prefect-sqlalchemy` integration or `SQLAlchemy` itself, ensure your database interaction code is compatible with SQLAlchemy 2.x patterns. `prefect-sqlalchemy` version 0.4.0 and later added explicit support for SQLAlchemy 2.x.
- gotcha Database drivers are not included: `prefect-sqlalchemy` provides the integration layer but does not bundle specific database drivers (e.g., `psycopg2-binary` for PostgreSQL, `aiosqlite` for async SQLite, `pymysql` for MySQL). You must install the appropriate driver(s) for your target database(s) separately.
- gotcha Block state management: It is generally recommended to load and consume an `SqlAlchemyConnector` (or `AsyncSqlAlchemyConnector`) within the scope of a single task or flow. Passing the connector instance across separate tasks or flows might lead to loss of connection/cursor state and unexpected behavior.
- gotcha Using `SqlAlchemyConnector.load()` requires a pre-existing block document. If you attempt to load a block that hasn't been saved yet (either programmatically or via the Prefect UI), it will result in an error.
Install
-
pip install prefect-sqlalchemy -
pip install "prefect[sqlalchemy]"
Imports
- SqlAlchemyConnector
from prefect_sqlalchemy import SqlAlchemyConnector
- AsyncSqlAlchemyConnector
from prefect_sqlalchemy import AsyncSqlAlchemyConnector
- ConnectionComponents
from prefect_sqlalchemy import ConnectionComponents
- SyncDriver
from prefect_sqlalchemy import SyncDriver
- AsyncDriver
from prefect_sqlalchemy import AsyncDriver
- DatabaseCredentials
from prefect_sqlalchemy import SqlAlchemyConnector
Quickstart
import os
from prefect import flow, task
from prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents, SyncDriver
@task
def setup_table(block_name: str) -> None:
"""Sets up a table and inserts data using the SQLAlchemy connector."""
with SqlAlchemyConnector.load(block_name) as connector:
connector.execute(
"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
)
connector.execute(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
parameters={"name": "Marvin", "address": "Highway 42"},
)
connector.execute_many(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
seq_of_parameters=[
{"name": "Ford", "address": "Highway 42"},
{"name": "Unknown", "address": "Highway 42"},
],
)
@task
def fetch_data(block_name: str) -> list:
"""Fetches all data from the customers table."""
all_rows = []
with SqlAlchemyConnector.load(block_name) as connector:
while True:
new_rows = connector.fetch_many("SELECT * FROM customers", size=2)
if len(new_rows) == 0:
break
all_rows.extend(new_rows)
return all_rows
@flow(name="SQLAlchemy Flow Example")
def sqlalchemy_flow(block_name: str) -> list:
"""Orchestrates database setup and data fetching."""
setup_table(block_name)
all_rows = fetch_data(block_name)
return all_rows
if __name__ == "__main__":
# --- Configuration and Block Saving (Run this once to create the block) ---
# In a real scenario, use environment variables for sensitive info.
# Replace 'my-sqlite-block' with your desired block name.
# For SQLite, a file-based database is sufficient.
sqlite_block_name = os.environ.get('PREFECT_SQL_BLOCK_NAME', 'my-sqlite-block')
sqlite_db_path = os.environ.get('SQLITE_DB_PATH', 'prefect.db')
# Create and save the connector block programmatically
# (Alternatively, create it via the Prefect UI)
connector = SqlAlchemyConnector(
connection_info=ConnectionComponents(
driver=SyncDriver.SQLITE_PYSQLITE,
database=sqlite_db_path
)
)
connector.save(sqlite_block_name)
print(f"Saved SqlAlchemyConnector block as '{sqlite_block_name}' pointing to '{sqlite_db_path}'")
print("You can now run the flow using this block.")
# --- Running the Flow ---
# Ensure the block 'my-sqlite-block' exists in your Prefect server/Cloud.
results = sqlalchemy_flow(sqlite_block_name)
print("Fetched Data:", results)