postgres-create-table-idempotent
Set to "true" to drop the table after verification (useful for tests)
import sys
import os
import subprocess
import time
import urllib.request
import json
# ─────────────────────────────────────────
# PRE_EXECUTION
# ─────────────────────────────────────────
# 1. Fetch registry entry
REGISTRY_URL = "https://checklist.day/api/registry/psycopg2-binary"
for attempt in range(2):
try:
req = urllib.request.Request(REGISTRY_URL, headers={"User-Agent": "checklist-agent/1.0"})
with urllib.request.urlopen(req, timeout=10) as resp:
registry = json.loads(resp.read())
break
except Exception as e:
if attempt == 1:
print(f"ABORT: registry unreachable after 2 attempts — {e}")
sys.exit(1)
time.sleep(2)
# 2. Surface warnings
warnings = registry.get("warnings", [])
if warnings:
print("[psycopg2-binary] WARNINGS:")
for w in warnings if isinstance(warnings, list) else [warnings]:
print(f" ⚠ {w}")
# ─────────────────────────────────────────
# EXECUTION
# ─────────────────────────────────────────
# 3. Install
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "psycopg2-binary>=2.9.9"])
import psycopg2
# 4. Validate inputs
DATABASE_URL = os.environ.get("DATABASE_URL")
TABLE_NAME = os.environ.get("TABLE_NAME")
CLEANUP = os.environ.get("CLEANUP", "false").lower() == "true"
if not DATABASE_URL:
print("ABORT: DATABASE_URL env var not set")
sys.exit(1)
if not TABLE_NAME:
print("ABORT: TABLE_NAME env var not set")
sys.exit(1)
if DATABASE_URL.startswith("postgres://"):
DATABASE_URL = DATABASE_URL.replace("postgres://", "postgresql://", 1)
# 5. Connect
conn = psycopg2.connect(DATABASE_URL, connect_timeout=10)
conn.autocommit = False
try:
with conn.cursor() as cur:
# 6. Check if table already exists
cur.execute("""
SELECT EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_schema = 'public' AND table_name = %s
)
""", (TABLE_NAME,))
already_existed = cur.fetchone()[0]
# 7. Idempotent create
# FOOTGUN: always IF NOT EXISTS — agents retry, this must not fail on second run
cur.execute(f"""
CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
value INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
""")
conn.commit()
print(f" table '{TABLE_NAME}' {'already existed' if already_existed else 'created'}")
# 8. Verify schema via information_schema
# FOOTGUN: check actual columns, not just that the table exists
cur.execute("""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = %s
ORDER BY ordinal_position
""", (TABLE_NAME,))
columns = [{"name": row[0], "type": row[1]} for row in cur.fetchall()]
print(f" verified {len(columns)} columns: {[c['name'] for c in columns]}")
# 9. Cleanup if requested
cleanup_done = False
if CLEANUP:
cur.execute(f"DROP TABLE IF EXISTS {TABLE_NAME}")
conn.commit()
cleanup_done = True
print(f" table '{TABLE_NAME}' dropped (CLEANUP=true)")
finally:
conn.close()
# ─────────────────────────────────────────
# POST_EXECUTION
# ─────────────────────────────────────────
expected_columns = {"id", "name", "value", "created_at"}
actual_columns = {c["name"] for c in columns}
assert expected_columns == actual_columns, (
f"FAIL: schema mismatch — expected {expected_columns}, got {actual_columns}"
)
assert len(columns) == 4, f"FAIL: expected 4 columns, got {len(columns)}"
result = {
"table_name": TABLE_NAME,
"already_existed": already_existed,
"columns": columns,
"cleanup_done": cleanup_done,
}
print(json.dumps(result, indent=2))
print("PASS")