postgres-run-migration

code_execution · unverified · null · json · download .py

PostgreSQL connection string e.g. "postgresql://user:pass@host:5432/dbname"

import sys
import os
import subprocess
import time
import tempfile
import urllib.request
import json

# ─────────────────────────────────────────
# PRE_EXECUTION
# ─────────────────────────────────────────

for attempt in range(2):
    try:
        for slug in ["alembic", "sqlalchemy"]:
            req = urllib.request.Request(
                f"https://checklist.day/api/registry/{slug}",
                headers={"User-Agent": "checklist-agent/1.0"}
            )
            with urllib.request.urlopen(req, timeout=10) as resp:
                registry = json.loads(resp.read())
                warnings = registry.get("warnings", [])
                if warnings:
                    print(f"[{slug}] WARNINGS:")
                    for w in warnings if isinstance(warnings, list) else [warnings]:
                        print(f"  ⚠ {w}")
        break
    except Exception as e:
        if attempt == 1:
            print(f"ABORT: registry unreachable — {e}")
            sys.exit(1)
        time.sleep(2)

# ─────────────────────────────────────────
# EXECUTION
# ─────────────────────────────────────────

subprocess.check_call([sys.executable, "-m", "pip", "install", "-q",
                       "alembic>=1.13.0", "sqlalchemy>=2.0.0", "psycopg2-binary>=2.9.9"])

from alembic.config import Config
from alembic import command
from alembic.runtime.migration import MigrationContext
from sqlalchemy import create_engine, text

DATABASE_URL = os.environ.get("DATABASE_URL")
if not DATABASE_URL:
    print("ABORT: DATABASE_URL env var not set")
    sys.exit(1)

if DATABASE_URL.startswith("postgres://"):
    DATABASE_URL = DATABASE_URL.replace("postgres://", "postgresql://", 1)

# Build a minimal Alembic project in a temp directory
tmpdir = tempfile.mkdtemp()
versions_dir = os.path.join(tmpdir, "versions")
os.makedirs(versions_dir)

# Write alembic.ini
ini_content = f"""
[alembic]
script_location = {tmpdir}
sqlalchemy.url = {DATABASE_URL}
[loggers]
keys = root
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
"""
ini_path = os.path.join(tmpdir, "alembic.ini")
with open(ini_path, "w") as f:
    f.write(ini_content.strip())

# Write env.py
env_py = """
from alembic import context
from sqlalchemy import create_engine

config = context.config
url = config.get_main_option("sqlalchemy.url")

def run_migrations_online():
    engine = create_engine(url)
    with engine.connect() as connection:
        context.configure(connection=connection, target_metadata=None)
        with context.begin_transaction():
            context.run_migrations()

run_migrations_online()
"""
with open(os.path.join(tmpdir, "env.py"), "w") as f:
    f.write(env_py.strip())

# Write a test migration
rev_id = "001_checklist_test"
migration = f"""
revision = '{rev_id}'
down_revision = None
branch_labels = None
depends_on = None

from alembic import op
import sqlalchemy as sa

def upgrade():
    op.create_table(
        'alembic_checklist_test',
        sa.Column('id', sa.Integer, primary_key=True),
        sa.Column('label', sa.Text, nullable=False),
    )

def downgrade():
    op.drop_table('alembic_checklist_test')
"""
with open(os.path.join(versions_dir, f"{rev_id}.py"), "w") as f:
    f.write(migration.strip())

cfg = Config(ini_path)

engine = create_engine(DATABASE_URL)

# Check current revision
with engine.connect() as conn:
    ctx = MigrationContext.configure(conn)
    current_revision = ctx.get_current_revision()
print(f"  current revision: {current_revision}")

# FOOTGUN: always check if already up-to-date before running upgrade
# Running upgrade head on an already-current DB is safe but wastes time
command.upgrade(cfg, "head")

with engine.connect() as conn:
    ctx = MigrationContext.configure(conn)
    target_revision = ctx.get_current_revision()
print(f"  after upgrade: {target_revision}")

migrations_run = 0 if current_revision == target_revision else 1

# Test rollback
command.downgrade(cfg, "-1")
with engine.connect() as conn:
    ctx = MigrationContext.configure(conn)
    after_downgrade = ctx.get_current_revision()
rollback_tested = after_downgrade == current_revision
print(f"  downgrade verified: {rollback_tested}")

engine.dispose()

# ─────────────────────────────────────────
# POST_EXECUTION
# ─────────────────────────────────────────

assert target_revision == rev_id, f"FAIL: expected revision {rev_id}, got {target_revision}"
assert rollback_tested, "FAIL: downgrade did not return to original revision"

result = {
    "current_revision": current_revision,
    "target_revision":  target_revision,
    "migrations_run":   migrations_run,
    "rollback_tested":  rollback_tested,
}
print(json.dumps(result, indent=2))
print("PASS")