postgres-bulk-insert
Number of rows to insert in the benchmark
import sys
import os
import subprocess
import time
import urllib.request
import json
import io
# ─────────────────────────────────────────
# PRE_EXECUTION
# ─────────────────────────────────────────
for attempt in range(2):
try:
req = urllib.request.Request(
"https://checklist.day/api/registry/psycopg2-binary",
headers={"User-Agent": "checklist-agent/1.0"}
)
with urllib.request.urlopen(req, timeout=10) as resp:
registry = json.loads(resp.read())
break
except Exception as e:
if attempt == 1:
print(f"ABORT: registry unreachable — {e}")
sys.exit(1)
time.sleep(2)
warnings = registry.get("warnings", [])
if warnings:
print("[psycopg2-binary] WARNINGS:")
for w in warnings if isinstance(warnings, list) else [warnings]:
print(f" ⚠ {w}")
# ─────────────────────────────────────────
# EXECUTION
# ─────────────────────────────────────────
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "psycopg2-binary>=2.9.9"])
import psycopg2
DATABASE_URL = os.environ.get("DATABASE_URL")
ROW_COUNT = int(os.environ.get("ROW_COUNT", "100"))
if not DATABASE_URL:
print("ABORT: DATABASE_URL env var not set")
sys.exit(1)
if DATABASE_URL.startswith("postgres://"):
DATABASE_URL = DATABASE_URL.replace("postgres://", "postgresql://", 1)
TABLE = "checklist_bulk_test"
rows = [(f"item_{i}", i) for i in range(ROW_COUNT)]
conn = psycopg2.connect(DATABASE_URL, connect_timeout=10)
try:
with conn.cursor() as cur:
cur.execute(f"""
CREATE TABLE IF NOT EXISTS {TABLE} (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
value INTEGER NOT NULL
)
""")
conn.commit()
# Method 1: executemany
# FOOTGUN: executemany is safe but slow — one round-trip per row above psycopg2 3.x
# Use for < 1000 rows or when values need per-row validation
cur.execute(f"DELETE FROM {TABLE}")
conn.commit()
t0 = time.perf_counter()
cur.executemany(f"INSERT INTO {TABLE} (name, value) VALUES (%s, %s)", rows)
conn.commit()
executemany_ms = round((time.perf_counter() - t0) * 1000, 1)
cur.execute(f"SELECT COUNT(*) FROM {TABLE}")
executemany_count = cur.fetchone()[0]
print(f" executemany: {executemany_count} rows in {executemany_ms}ms")
# Method 2: copy_expert
# FOOTGUN: copy_expert is 10-100x faster but requires CSV-formatted StringIO
# Use for > 1000 rows when speed matters
cur.execute(f"DELETE FROM {TABLE}")
conn.commit()
csv_data = io.StringIO()
for name, value in rows:
csv_data.write(f"{name}\t{value}\n")
csv_data.seek(0)
t0 = time.perf_counter()
cur.copy_expert(f"COPY {TABLE} (name, value) FROM STDIN", csv_data)
conn.commit()
copy_expert_ms = round((time.perf_counter() - t0) * 1000, 1)
cur.execute(f"SELECT COUNT(*) FROM {TABLE}")
copy_count = cur.fetchone()[0]
print(f" copy_expert: {copy_count} rows in {copy_expert_ms}ms")
# Cleanup
cur.execute(f"DROP TABLE IF EXISTS {TABLE}")
conn.commit()
finally:
conn.close()
# ─────────────────────────────────────────
# POST_EXECUTION
# ─────────────────────────────────────────
assert executemany_count == ROW_COUNT, f"FAIL: executemany inserted {executemany_count}, expected {ROW_COUNT}"
assert copy_count == ROW_COUNT, f"FAIL: copy_expert inserted {copy_count}, expected {ROW_COUNT}"
copy_speedup = round(executemany_ms / copy_expert_ms, 1) if copy_expert_ms > 0 else 0
recommendation = "copy_expert" if ROW_COUNT >= 1000 else "executemany"
result = {
"row_count": ROW_COUNT,
"executemany_ms": executemany_ms,
"copy_expert_ms": copy_expert_ms,
"copy_speedup": copy_speedup,
"recommendation": recommendation,
}
print(json.dumps(result, indent=2))
print("PASS")