Dagster PostgreSQL Integration
dagster-postgres is an integration library for Dagster that provides resources and storage implementations for interacting with PostgreSQL databases. It allows Dagster assets to connect to Postgres, and can also use Postgres for Dagster's run storage and event log storage. It is currently at version 0.28.22 and follows the rapid release cycle of the main Dagster project.
Warnings
- gotcha The `psycopg2-binary` package was removed as a transitive dependency in `dagster-postgres` version 0.28.18 (Dagster core 1.12.18). If your project relied on this transitive dependency, you must now explicitly install `psycopg2-binary` (or `psycopg2`) in your environment.
- gotcha Dagster library versions (e.g., `dagster-postgres`) are tightly coupled with the core `dagster` library version. Mismatched versions can lead to unexpected behavior or runtime errors. Always ensure all `dagster-*` packages in your environment match the major and minor version of your core `dagster` installation.
- gotcha When configuring `postgres_resource`, sensitive credentials (user, password) should always be managed securely, ideally using environment variables or a secret management system, rather than hardcoding them directly in code or `dagster.yaml`.
Install
-
pip install dagster-postgres
Imports
- postgres_resource
from dagster_postgres import postgres_resource
- PostgresRunStorage
from dagster_postgres.run_storage import PostgresRunStorage
- PostgresEventLogStorage
from dagster_postgres.event_log import PostgresEventLogStorage
Quickstart
import os
from dagster import Definitions, asset, EnvVar
from dagster_postgres import postgres_resource
@asset(compute_kind="sql")
def my_postgres_asset(postgres: postgres_resource):
"""A simple asset that connects to PostgreSQL and runs a dummy query."""
with postgres.get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT 1;")
result = cursor.fetchone()
print(f"Postgres query result: {result}")
# In a typical asset, you'd load/transform/store data here.
defs = Definitions(
assets=[my_postgres_asset],
resources={
"postgres": postgres_resource.configured({
"database": EnvVar("POSTGRES_DB"),
"host": EnvVar("POSTGRES_HOST"),
"port": EnvVar("POSTGRES_PORT"),
"user": EnvVar("POSTGRES_USER"),
"password": EnvVar("POSTGRES_PASSWORD"),
})
}
)
# To run this example, set environment variables (e.g., in your shell):
# export POSTGRES_DB=your_db_name
# export POSTGRES_HOST=localhost
# export POSTGRES_PORT=5432
# export POSTGRES_USER=your_user
# export POSTGRES_PASSWORD=your_password
# Then execute using `dagster dev -f your_file.py` and materialize `my_postgres_asset` in the UI.