postgres-bulk-insert

code_execution · unverified · null · json · download .py

Number of rows to insert in the benchmark

import sys
import os
import subprocess
import time
import urllib.request
import json
import io

# ─────────────────────────────────────────
# PRE_EXECUTION
# ─────────────────────────────────────────

for attempt in range(2):
    try:
        req = urllib.request.Request(
            "https://checklist.day/api/registry/psycopg2-binary",
            headers={"User-Agent": "checklist-agent/1.0"}
        )
        with urllib.request.urlopen(req, timeout=10) as resp:
            registry = json.loads(resp.read())
            break
    except Exception as e:
        if attempt == 1:
            print(f"ABORT: registry unreachable — {e}")
            sys.exit(1)
        time.sleep(2)

warnings = registry.get("warnings", [])
if warnings:
    print("[psycopg2-binary] WARNINGS:")
    for w in warnings if isinstance(warnings, list) else [warnings]:
        print(f"  ⚠ {w}")

# ─────────────────────────────────────────
# EXECUTION
# ─────────────────────────────────────────

subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "psycopg2-binary>=2.9.9"])

import psycopg2

DATABASE_URL = os.environ.get("DATABASE_URL")
ROW_COUNT    = int(os.environ.get("ROW_COUNT", "100"))

if not DATABASE_URL:
    print("ABORT: DATABASE_URL env var not set")
    sys.exit(1)

if DATABASE_URL.startswith("postgres://"):
    DATABASE_URL = DATABASE_URL.replace("postgres://", "postgresql://", 1)

TABLE = "checklist_bulk_test"
rows = [(f"item_{i}", i) for i in range(ROW_COUNT)]

conn = psycopg2.connect(DATABASE_URL, connect_timeout=10)

try:
    with conn.cursor() as cur:
        cur.execute(f"""
            CREATE TABLE IF NOT EXISTS {TABLE} (
                id    SERIAL PRIMARY KEY,
                name  TEXT NOT NULL,
                value INTEGER NOT NULL
            )
        """)
        conn.commit()

        # Method 1: executemany
        # FOOTGUN: executemany is safe but slow — one round-trip per row above psycopg2 3.x
        # Use for < 1000 rows or when values need per-row validation
        cur.execute(f"DELETE FROM {TABLE}")
        conn.commit()

        t0 = time.perf_counter()
        cur.executemany(f"INSERT INTO {TABLE} (name, value) VALUES (%s, %s)", rows)
        conn.commit()
        executemany_ms = round((time.perf_counter() - t0) * 1000, 1)

        cur.execute(f"SELECT COUNT(*) FROM {TABLE}")
        executemany_count = cur.fetchone()[0]
        print(f"  executemany: {executemany_count} rows in {executemany_ms}ms")

        # Method 2: copy_expert
        # FOOTGUN: copy_expert is 10-100x faster but requires CSV-formatted StringIO
        # Use for > 1000 rows when speed matters
        cur.execute(f"DELETE FROM {TABLE}")
        conn.commit()

        csv_data = io.StringIO()
        for name, value in rows:
            csv_data.write(f"{name}\t{value}\n")
        csv_data.seek(0)

        t0 = time.perf_counter()
        cur.copy_expert(f"COPY {TABLE} (name, value) FROM STDIN", csv_data)
        conn.commit()
        copy_expert_ms = round((time.perf_counter() - t0) * 1000, 1)

        cur.execute(f"SELECT COUNT(*) FROM {TABLE}")
        copy_count = cur.fetchone()[0]
        print(f"  copy_expert: {copy_count} rows in {copy_expert_ms}ms")

        # Cleanup
        cur.execute(f"DROP TABLE IF EXISTS {TABLE}")
        conn.commit()

finally:
    conn.close()

# ─────────────────────────────────────────
# POST_EXECUTION
# ─────────────────────────────────────────

assert executemany_count == ROW_COUNT, f"FAIL: executemany inserted {executemany_count}, expected {ROW_COUNT}"
assert copy_count == ROW_COUNT, f"FAIL: copy_expert inserted {copy_count}, expected {ROW_COUNT}"

copy_speedup = round(executemany_ms / copy_expert_ms, 1) if copy_expert_ms > 0 else 0
recommendation = "copy_expert" if ROW_COUNT >= 1000 else "executemany"

result = {
    "row_count":       ROW_COUNT,
    "executemany_ms":  executemany_ms,
    "copy_expert_ms":  copy_expert_ms,
    "copy_speedup":    copy_speedup,
    "recommendation":  recommendation,
}
print(json.dumps(result, indent=2))
print("PASS")