postgres-run-migration
PostgreSQL connection string e.g. "postgresql://user:pass@host:5432/dbname"
import sys
import os
import subprocess
import time
import tempfile
import urllib.request
import json
# ─────────────────────────────────────────
# PRE_EXECUTION
# ─────────────────────────────────────────
for attempt in range(2):
try:
for slug in ["alembic", "sqlalchemy"]:
req = urllib.request.Request(
f"https://checklist.day/api/registry/{slug}",
headers={"User-Agent": "checklist-agent/1.0"}
)
with urllib.request.urlopen(req, timeout=10) as resp:
registry = json.loads(resp.read())
warnings = registry.get("warnings", [])
if warnings:
print(f"[{slug}] WARNINGS:")
for w in warnings if isinstance(warnings, list) else [warnings]:
print(f" ⚠ {w}")
break
except Exception as e:
if attempt == 1:
print(f"ABORT: registry unreachable — {e}")
sys.exit(1)
time.sleep(2)
# ─────────────────────────────────────────
# EXECUTION
# ─────────────────────────────────────────
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q",
"alembic>=1.13.0", "sqlalchemy>=2.0.0", "psycopg2-binary>=2.9.9"])
from alembic.config import Config
from alembic import command
from alembic.runtime.migration import MigrationContext
from sqlalchemy import create_engine, text
DATABASE_URL = os.environ.get("DATABASE_URL")
if not DATABASE_URL:
print("ABORT: DATABASE_URL env var not set")
sys.exit(1)
if DATABASE_URL.startswith("postgres://"):
DATABASE_URL = DATABASE_URL.replace("postgres://", "postgresql://", 1)
# Build a minimal Alembic project in a temp directory
tmpdir = tempfile.mkdtemp()
versions_dir = os.path.join(tmpdir, "versions")
os.makedirs(versions_dir)
# Write alembic.ini
ini_content = f"""
[alembic]
script_location = {tmpdir}
sqlalchemy.url = {DATABASE_URL}
[loggers]
keys = root
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
"""
ini_path = os.path.join(tmpdir, "alembic.ini")
with open(ini_path, "w") as f:
f.write(ini_content.strip())
# Write env.py
env_py = """
from alembic import context
from sqlalchemy import create_engine
config = context.config
url = config.get_main_option("sqlalchemy.url")
def run_migrations_online():
engine = create_engine(url)
with engine.connect() as connection:
context.configure(connection=connection, target_metadata=None)
with context.begin_transaction():
context.run_migrations()
run_migrations_online()
"""
with open(os.path.join(tmpdir, "env.py"), "w") as f:
f.write(env_py.strip())
# Write a test migration
rev_id = "001_checklist_test"
migration = f"""
revision = '{rev_id}'
down_revision = None
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table(
'alembic_checklist_test',
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('label', sa.Text, nullable=False),
)
def downgrade():
op.drop_table('alembic_checklist_test')
"""
with open(os.path.join(versions_dir, f"{rev_id}.py"), "w") as f:
f.write(migration.strip())
cfg = Config(ini_path)
engine = create_engine(DATABASE_URL)
# Check current revision
with engine.connect() as conn:
ctx = MigrationContext.configure(conn)
current_revision = ctx.get_current_revision()
print(f" current revision: {current_revision}")
# FOOTGUN: always check if already up-to-date before running upgrade
# Running upgrade head on an already-current DB is safe but wastes time
command.upgrade(cfg, "head")
with engine.connect() as conn:
ctx = MigrationContext.configure(conn)
target_revision = ctx.get_current_revision()
print(f" after upgrade: {target_revision}")
migrations_run = 0 if current_revision == target_revision else 1
# Test rollback
command.downgrade(cfg, "-1")
with engine.connect() as conn:
ctx = MigrationContext.configure(conn)
after_downgrade = ctx.get_current_revision()
rollback_tested = after_downgrade == current_revision
print(f" downgrade verified: {rollback_tested}")
engine.dispose()
# ─────────────────────────────────────────
# POST_EXECUTION
# ─────────────────────────────────────────
assert target_revision == rev_id, f"FAIL: expected revision {rev_id}, got {target_revision}"
assert rollback_tested, "FAIL: downgrade did not return to original revision"
result = {
"current_revision": current_revision,
"target_revision": target_revision,
"migrations_run": migrations_run,
"rollback_tested": rollback_tested,
}
print(json.dumps(result, indent=2))
print("PASS")