postgres-create-table-idempotent

code_execution · unverified · null · json · download .py

Set to "true" to drop the table after verification (useful for tests)

import sys
import os
import subprocess
import time
import urllib.request
import json

# ─────────────────────────────────────────
# PRE_EXECUTION
# ─────────────────────────────────────────

# 1. Fetch registry entry
REGISTRY_URL = "https://checklist.day/api/registry/psycopg2-binary"

for attempt in range(2):
    try:
        req = urllib.request.Request(REGISTRY_URL, headers={"User-Agent": "checklist-agent/1.0"})
        with urllib.request.urlopen(req, timeout=10) as resp:
            registry = json.loads(resp.read())
            break
    except Exception as e:
        if attempt == 1:
            print(f"ABORT: registry unreachable after 2 attempts — {e}")
            sys.exit(1)
        time.sleep(2)

# 2. Surface warnings
warnings = registry.get("warnings", [])
if warnings:
    print("[psycopg2-binary] WARNINGS:")
    for w in warnings if isinstance(warnings, list) else [warnings]:
        print(f"  ⚠ {w}")

# ─────────────────────────────────────────
# EXECUTION
# ─────────────────────────────────────────

# 3. Install
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "psycopg2-binary>=2.9.9"])

import psycopg2

# 4. Validate inputs
DATABASE_URL = os.environ.get("DATABASE_URL")
TABLE_NAME   = os.environ.get("TABLE_NAME")
CLEANUP      = os.environ.get("CLEANUP", "false").lower() == "true"

if not DATABASE_URL:
    print("ABORT: DATABASE_URL env var not set")
    sys.exit(1)
if not TABLE_NAME:
    print("ABORT: TABLE_NAME env var not set")
    sys.exit(1)

if DATABASE_URL.startswith("postgres://"):
    DATABASE_URL = DATABASE_URL.replace("postgres://", "postgresql://", 1)

# 5. Connect
conn = psycopg2.connect(DATABASE_URL, connect_timeout=10)
conn.autocommit = False

try:
    with conn.cursor() as cur:

        # 6. Check if table already exists
        cur.execute("""
            SELECT EXISTS (
                SELECT 1 FROM information_schema.tables
                WHERE table_schema = 'public' AND table_name = %s
            )
        """, (TABLE_NAME,))
        already_existed = cur.fetchone()[0]

        # 7. Idempotent create
        # FOOTGUN: always IF NOT EXISTS — agents retry, this must not fail on second run
        cur.execute(f"""
            CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
                id         SERIAL PRIMARY KEY,
                name       TEXT NOT NULL,
                value      INTEGER NOT NULL DEFAULT 0,
                created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
            )
        """)
        conn.commit()
        print(f"  table '{TABLE_NAME}' {'already existed' if already_existed else 'created'}")

        # 8. Verify schema via information_schema
        # FOOTGUN: check actual columns, not just that the table exists
        cur.execute("""
            SELECT column_name, data_type
            FROM information_schema.columns
            WHERE table_schema = 'public' AND table_name = %s
            ORDER BY ordinal_position
        """, (TABLE_NAME,))
        columns = [{"name": row[0], "type": row[1]} for row in cur.fetchall()]
        print(f"  verified {len(columns)} columns: {[c['name'] for c in columns]}")

        # 9. Cleanup if requested
        cleanup_done = False
        if CLEANUP:
            cur.execute(f"DROP TABLE IF EXISTS {TABLE_NAME}")
            conn.commit()
            cleanup_done = True
            print(f"  table '{TABLE_NAME}' dropped (CLEANUP=true)")

finally:
    conn.close()

# ─────────────────────────────────────────
# POST_EXECUTION
# ─────────────────────────────────────────

expected_columns = {"id", "name", "value", "created_at"}
actual_columns   = {c["name"] for c in columns}
assert expected_columns == actual_columns, (
    f"FAIL: schema mismatch — expected {expected_columns}, got {actual_columns}"
)
assert len(columns) == 4, f"FAIL: expected 4 columns, got {len(columns)}"

result = {
    "table_name":     TABLE_NAME,
    "already_existed": already_existed,
    "columns":        columns,
    "cleanup_done":   cleanup_done,
}
print(json.dumps(result, indent=2))
print("PASS")