postgres-connection-pool

code_execution · unverified · null · json · download .py

Number of connections in the pool

import sys
import os
import subprocess
import time
import threading
import urllib.request
import json

# ─────────────────────────────────────────
# PRE_EXECUTION
# ─────────────────────────────────────────

for attempt in range(2):
    try:
        for slug in ["psycopg2-binary", "sqlalchemy"]:
            req = urllib.request.Request(
                f"https://checklist.day/api/registry/{slug}",
                headers={"User-Agent": "checklist-agent/1.0"}
            )
            with urllib.request.urlopen(req, timeout=10) as resp:
                registry = json.loads(resp.read())
                warnings = registry.get("warnings", [])
                if warnings:
                    print(f"[{slug}] WARNINGS:")
                    for w in warnings if isinstance(warnings, list) else [warnings]:
                        print(f"  ⚠ {w}")
        break
    except Exception as e:
        if attempt == 1:
            print(f"ABORT: registry unreachable — {e}")
            sys.exit(1)
        time.sleep(2)

# ─────────────────────────────────────────
# EXECUTION
# ─────────────────────────────────────────

subprocess.check_call([sys.executable, "-m", "pip", "install", "-q",
                       "psycopg2-binary>=2.9.9", "sqlalchemy>=2.0.0"])

import psycopg2
from psycopg2 import pool as pg_pool
from sqlalchemy import create_engine, text

DATABASE_URL = os.environ.get("DATABASE_URL")
POOL_SIZE    = int(os.environ.get("POOL_SIZE", "5"))

if not DATABASE_URL:
    print("ABORT: DATABASE_URL env var not set")
    sys.exit(1)

if DATABASE_URL.startswith("postgres://"):
    DATABASE_URL = DATABASE_URL.replace("postgres://", "postgresql://", 1)

# Method 1: psycopg2 ThreadedConnectionPool
# FOOTGUN: use ThreadedConnectionPool for multithreaded apps, SimpleConnectionPool is not thread-safe
pool = pg_pool.ThreadedConnectionPool(minconn=1, maxconn=POOL_SIZE, dsn=DATABASE_URL)

connections_used = 0
lock = threading.Lock()
pool_exhausted = False
results = []

def worker(worker_id):
    global connections_used, pool_exhausted
    try:
        conn = pool.getconn()
        with lock:
            connections_used += 1
        try:
            with conn.cursor() as cur:
                cur.execute("SELECT pg_sleep(0.05), %s", (worker_id,))
                results.append(cur.fetchone()[1])
        finally:
            # FOOTGUN: always putconn() — not putting back leaks connections
            pool.putconn(conn)
    except pg_pool.PoolError:
        with lock:
            pool_exhausted = True

threads = [threading.Thread(target=worker, args=(i,)) for i in range(POOL_SIZE)]
for t in threads:
    t.start()
for t in threads:
    t.join()

pool.closeall()
print(f"  psycopg2 pool: {connections_used} peak connections, exhausted={pool_exhausted}")

# Method 2: SQLAlchemy pool config
# FOOTGUN: pool_pre_ping=True prevents stale connection errors after idle timeout
# FOOTGUN: pool_recycle avoids connections being killed by firewall/load balancer timeouts
engine = create_engine(
    DATABASE_URL,
    pool_size=POOL_SIZE,
    max_overflow=2,
    pool_pre_ping=True,
    pool_recycle=300,  # recycle after 5 minutes
)

sqlalchemy_pool_ok = False
with engine.connect() as conn:
    result = conn.execute(text("SELECT 1"))
    sqlalchemy_pool_ok = result.fetchone()[0] == 1

engine.dispose()
print(f"  SQLAlchemy pool: pool_size={POOL_SIZE}, pre_ping=True, recycle=300s — ok={sqlalchemy_pool_ok}")

# ─────────────────────────────────────────
# POST_EXECUTION
# ─────────────────────────────────────────

assert connections_used > 0, "FAIL: no connections were established"
assert sqlalchemy_pool_ok, "FAIL: SQLAlchemy pool test query failed"
assert len(results) > 0, f"FAIL: no worker results returned"

result = {
    "pool_size":          POOL_SIZE,
    "connections_used":   connections_used,
    "workers_completed":  len(results),
    "pool_exhausted":     pool_exhausted,
    "sqlalchemy_pool_ok": sqlalchemy_pool_ok,
}
print(json.dumps(result, indent=2))
print("PASS")