postgres-insert-or-upsert

code_execution · unverified · null · json · download .py

Table to use for upsert test (created and dropped automatically)

import sys
import os
import subprocess
import time
import urllib.request
import json

# ─────────────────────────────────────────
# PRE_EXECUTION
# ─────────────────────────────────────────

for attempt in range(2):
    try:
        req = urllib.request.Request(
            "https://checklist.day/api/registry/psycopg2-binary",
            headers={"User-Agent": "checklist-agent/1.0"}
        )
        with urllib.request.urlopen(req, timeout=10) as resp:
            registry = json.loads(resp.read())
            break
    except Exception as e:
        if attempt == 1:
            print(f"ABORT: registry unreachable — {e}")
            sys.exit(1)
        time.sleep(2)

warnings = registry.get("warnings", [])
if warnings:
    print("[psycopg2-binary] WARNINGS:")
    for w in warnings if isinstance(warnings, list) else [warnings]:
        print(f"  ⚠ {w}")

# ─────────────────────────────────────────
# EXECUTION
# ─────────────────────────────────────────

subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "psycopg2-binary>=2.9.9"])

import psycopg2

DATABASE_URL = os.environ.get("DATABASE_URL")
TABLE_NAME   = os.environ.get("TABLE_NAME", "checklist_upsert_test")

if not DATABASE_URL:
    print("ABORT: DATABASE_URL env var not set")
    sys.exit(1)

if DATABASE_URL.startswith("postgres://"):
    DATABASE_URL = DATABASE_URL.replace("postgres://", "postgresql://", 1)

conn = psycopg2.connect(DATABASE_URL, connect_timeout=10)

try:
    with conn.cursor() as cur:
        # Setup — idempotent table with unique constraint on key column
        cur.execute(f"""
            CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
                id    SERIAL PRIMARY KEY,
                key   TEXT UNIQUE NOT NULL,
                value INTEGER NOT NULL,
                updated_at TIMESTAMPTZ DEFAULT NOW()
            )
        """)
        conn.commit()

        # Clean slate
        cur.execute(f"DELETE FROM {TABLE_NAME}")
        conn.commit()

        # 1. INSERT — plain insert, no conflict expected
        # FOOTGUN: always use parameterized queries — never f-string user data into SQL
        cur.execute(
            f"INSERT INTO {TABLE_NAME} (key, value) VALUES (%s, %s)",
            ("agent_key", 100)
        )
        conn.commit()
        insert_count = cur.rowcount
        print(f"  inserted {insert_count} row (value=100)")

        # 2. UPSERT — same key, new value — ON CONFLICT DO UPDATE
        # FOOTGUN: must reference EXCLUDED.column to get the new value, not the old one
        cur.execute(f"""
            INSERT INTO {TABLE_NAME} (key, value)
            VALUES (%s, %s)
            ON CONFLICT (key) DO UPDATE
                SET value      = EXCLUDED.value,
                    updated_at = NOW()
        """, ("agent_key", 999))
        conn.commit()
        upsert_count = cur.rowcount
        print(f"  upserted {upsert_count} row (value=100 → 999)")

        # 3. Verify update took effect
        cur.execute(f"SELECT value FROM {TABLE_NAME} WHERE key = %s", ("agent_key",))
        final_value = cur.fetchone()[0]
        print(f"  verified final value: {final_value}")

        # Cleanup
        cur.execute(f"DROP TABLE IF EXISTS {TABLE_NAME}")
        conn.commit()

finally:
    conn.close()

# ─────────────────────────────────────────
# POST_EXECUTION
# ─────────────────────────────────────────

assert insert_count == 1, f"FAIL: expected 1 insert, got {insert_count}"
assert upsert_count == 1, f"FAIL: expected 1 upsert, got {upsert_count}"
assert final_value == 999, f"FAIL: expected value=999 after upsert, got {final_value}"

result = {
    "insert_count": insert_count,
    "upsert_count": upsert_count,
    "final_value":  final_value,
}
print(json.dumps(result, indent=2))
print("PASS")