postgres-connection-pool
Number of connections in the pool
import sys
import os
import subprocess
import time
import threading
import urllib.request
import json
# ─────────────────────────────────────────
# PRE_EXECUTION
# ─────────────────────────────────────────
for attempt in range(2):
try:
for slug in ["psycopg2-binary", "sqlalchemy"]:
req = urllib.request.Request(
f"https://checklist.day/api/registry/{slug}",
headers={"User-Agent": "checklist-agent/1.0"}
)
with urllib.request.urlopen(req, timeout=10) as resp:
registry = json.loads(resp.read())
warnings = registry.get("warnings", [])
if warnings:
print(f"[{slug}] WARNINGS:")
for w in warnings if isinstance(warnings, list) else [warnings]:
print(f" ⚠ {w}")
break
except Exception as e:
if attempt == 1:
print(f"ABORT: registry unreachable — {e}")
sys.exit(1)
time.sleep(2)
# ─────────────────────────────────────────
# EXECUTION
# ─────────────────────────────────────────
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q",
"psycopg2-binary>=2.9.9", "sqlalchemy>=2.0.0"])
import psycopg2
from psycopg2 import pool as pg_pool
from sqlalchemy import create_engine, text
DATABASE_URL = os.environ.get("DATABASE_URL")
POOL_SIZE = int(os.environ.get("POOL_SIZE", "5"))
if not DATABASE_URL:
print("ABORT: DATABASE_URL env var not set")
sys.exit(1)
if DATABASE_URL.startswith("postgres://"):
DATABASE_URL = DATABASE_URL.replace("postgres://", "postgresql://", 1)
# Method 1: psycopg2 ThreadedConnectionPool
# FOOTGUN: use ThreadedConnectionPool for multithreaded apps, SimpleConnectionPool is not thread-safe
pool = pg_pool.ThreadedConnectionPool(minconn=1, maxconn=POOL_SIZE, dsn=DATABASE_URL)
connections_used = 0
lock = threading.Lock()
pool_exhausted = False
results = []
def worker(worker_id):
global connections_used, pool_exhausted
try:
conn = pool.getconn()
with lock:
connections_used += 1
try:
with conn.cursor() as cur:
cur.execute("SELECT pg_sleep(0.05), %s", (worker_id,))
results.append(cur.fetchone()[1])
finally:
# FOOTGUN: always putconn() — not putting back leaks connections
pool.putconn(conn)
except pg_pool.PoolError:
with lock:
pool_exhausted = True
threads = [threading.Thread(target=worker, args=(i,)) for i in range(POOL_SIZE)]
for t in threads:
t.start()
for t in threads:
t.join()
pool.closeall()
print(f" psycopg2 pool: {connections_used} peak connections, exhausted={pool_exhausted}")
# Method 2: SQLAlchemy pool config
# FOOTGUN: pool_pre_ping=True prevents stale connection errors after idle timeout
# FOOTGUN: pool_recycle avoids connections being killed by firewall/load balancer timeouts
engine = create_engine(
DATABASE_URL,
pool_size=POOL_SIZE,
max_overflow=2,
pool_pre_ping=True,
pool_recycle=300, # recycle after 5 minutes
)
sqlalchemy_pool_ok = False
with engine.connect() as conn:
result = conn.execute(text("SELECT 1"))
sqlalchemy_pool_ok = result.fetchone()[0] == 1
engine.dispose()
print(f" SQLAlchemy pool: pool_size={POOL_SIZE}, pre_ping=True, recycle=300s — ok={sqlalchemy_pool_ok}")
# ─────────────────────────────────────────
# POST_EXECUTION
# ─────────────────────────────────────────
assert connections_used > 0, "FAIL: no connections were established"
assert sqlalchemy_pool_ok, "FAIL: SQLAlchemy pool test query failed"
assert len(results) > 0, f"FAIL: no worker results returned"
result = {
"pool_size": POOL_SIZE,
"connections_used": connections_used,
"workers_completed": len(results),
"pool_exhausted": pool_exhausted,
"sqlalchemy_pool_ok": sqlalchemy_pool_ok,
}
print(json.dumps(result, indent=2))
print("PASS")