postgres-insert-or-upsert
Table to use for upsert test (created and dropped automatically)
import sys
import os
import subprocess
import time
import urllib.request
import json
# ─────────────────────────────────────────
# PRE_EXECUTION
# ─────────────────────────────────────────
for attempt in range(2):
try:
req = urllib.request.Request(
"https://checklist.day/api/registry/psycopg2-binary",
headers={"User-Agent": "checklist-agent/1.0"}
)
with urllib.request.urlopen(req, timeout=10) as resp:
registry = json.loads(resp.read())
break
except Exception as e:
if attempt == 1:
print(f"ABORT: registry unreachable — {e}")
sys.exit(1)
time.sleep(2)
warnings = registry.get("warnings", [])
if warnings:
print("[psycopg2-binary] WARNINGS:")
for w in warnings if isinstance(warnings, list) else [warnings]:
print(f" ⚠ {w}")
# ─────────────────────────────────────────
# EXECUTION
# ─────────────────────────────────────────
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "psycopg2-binary>=2.9.9"])
import psycopg2
DATABASE_URL = os.environ.get("DATABASE_URL")
TABLE_NAME = os.environ.get("TABLE_NAME", "checklist_upsert_test")
if not DATABASE_URL:
print("ABORT: DATABASE_URL env var not set")
sys.exit(1)
if DATABASE_URL.startswith("postgres://"):
DATABASE_URL = DATABASE_URL.replace("postgres://", "postgresql://", 1)
conn = psycopg2.connect(DATABASE_URL, connect_timeout=10)
try:
with conn.cursor() as cur:
# Setup — idempotent table with unique constraint on key column
cur.execute(f"""
CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
id SERIAL PRIMARY KEY,
key TEXT UNIQUE NOT NULL,
value INTEGER NOT NULL,
updated_at TIMESTAMPTZ DEFAULT NOW()
)
""")
conn.commit()
# Clean slate
cur.execute(f"DELETE FROM {TABLE_NAME}")
conn.commit()
# 1. INSERT — plain insert, no conflict expected
# FOOTGUN: always use parameterized queries — never f-string user data into SQL
cur.execute(
f"INSERT INTO {TABLE_NAME} (key, value) VALUES (%s, %s)",
("agent_key", 100)
)
conn.commit()
insert_count = cur.rowcount
print(f" inserted {insert_count} row (value=100)")
# 2. UPSERT — same key, new value — ON CONFLICT DO UPDATE
# FOOTGUN: must reference EXCLUDED.column to get the new value, not the old one
cur.execute(f"""
INSERT INTO {TABLE_NAME} (key, value)
VALUES (%s, %s)
ON CONFLICT (key) DO UPDATE
SET value = EXCLUDED.value,
updated_at = NOW()
""", ("agent_key", 999))
conn.commit()
upsert_count = cur.rowcount
print(f" upserted {upsert_count} row (value=100 → 999)")
# 3. Verify update took effect
cur.execute(f"SELECT value FROM {TABLE_NAME} WHERE key = %s", ("agent_key",))
final_value = cur.fetchone()[0]
print(f" verified final value: {final_value}")
# Cleanup
cur.execute(f"DROP TABLE IF EXISTS {TABLE_NAME}")
conn.commit()
finally:
conn.close()
# ─────────────────────────────────────────
# POST_EXECUTION
# ─────────────────────────────────────────
assert insert_count == 1, f"FAIL: expected 1 insert, got {insert_count}"
assert upsert_count == 1, f"FAIL: expected 1 upsert, got {upsert_count}"
assert final_value == 999, f"FAIL: expected value=999 after upsert, got {final_value}"
result = {
"insert_count": insert_count,
"upsert_count": upsert_count,
"final_value": final_value,
}
print(json.dumps(result, indent=2))
print("PASS")