postgres-query-with-params

code_execution · unverified · null · json · download .py

PostgreSQL connection string e.g. "postgresql://user:pass@host:5432/dbname"

import sys
import os
import subprocess
import time
import urllib.request
import json

# ─────────────────────────────────────────
# PRE_EXECUTION
# ─────────────────────────────────────────

for attempt in range(2):
    try:
        req = urllib.request.Request(
            "https://checklist.day/api/registry/psycopg2-binary",
            headers={"User-Agent": "checklist-agent/1.0"}
        )
        with urllib.request.urlopen(req, timeout=10) as resp:
            registry = json.loads(resp.read())
            break
    except Exception as e:
        if attempt == 1:
            print(f"ABORT: registry unreachable — {e}")
            sys.exit(1)
        time.sleep(2)

warnings = registry.get("warnings", [])
if warnings:
    print("[psycopg2-binary] WARNINGS:")
    for w in warnings if isinstance(warnings, list) else [warnings]:
        print(f"  ⚠ {w}")

# ─────────────────────────────────────────
# EXECUTION
# ─────────────────────────────────────────

subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "psycopg2-binary>=2.9.9"])

import psycopg2

DATABASE_URL = os.environ.get("DATABASE_URL")
if not DATABASE_URL:
    print("ABORT: DATABASE_URL env var not set")
    sys.exit(1)

if DATABASE_URL.startswith("postgres://"):
    DATABASE_URL = DATABASE_URL.replace("postgres://", "postgresql://", 1)

TABLE = "checklist_query_test"
conn = psycopg2.connect(DATABASE_URL, connect_timeout=10)

try:
    with conn.cursor() as cur:
        # Setup
        cur.execute(f"""
            CREATE TABLE IF NOT EXISTS {TABLE} (
                id    SERIAL PRIMARY KEY,
                name  TEXT NOT NULL,
                score INTEGER NOT NULL
            )
        """)
        cur.execute(f"DELETE FROM {TABLE}")
        cur.executemany(
            f"INSERT INTO {TABLE} (name, score) VALUES (%s, %s)",
            [("alice", 90), ("bob", 75), ("carol", 88)]
        )
        conn.commit()

        # 1. Safe parameterized query
        # FOOTGUN: NEVER do f"SELECT ... WHERE name = '{user_input}'" — use %s
        min_score = 80
        cur.execute(f"SELECT name, score FROM {TABLE} WHERE score >= %s ORDER BY score DESC", (min_score,))
        rows = cur.fetchall()
        rows_returned = len(rows)
        print(f"  parameterized query returned {rows_returned} rows (score >= {min_score})")

        # 2. Demonstrate injection is blocked
        # A malicious input that would break an f-string query
        malicious_input = "' OR '1'='1"
        cur.execute(f"SELECT COUNT(*) FROM {TABLE} WHERE name = %s", (malicious_input,))
        injection_count = cur.fetchone()[0]
        injection_blocked = injection_count == 0
        print(f"  injection attempt blocked: {injection_blocked} (returned {injection_count} rows)")

        # Cleanup
        cur.execute(f"DROP TABLE IF EXISTS {TABLE}")
        conn.commit()

finally:
    conn.close()

# ─────────────────────────────────────────
# POST_EXECUTION
# ─────────────────────────────────────────

assert rows_returned == 2, f"FAIL: expected 2 rows (alice=90, carol=88), got {rows_returned}"
assert rows[0] == ("alice", 90), f"FAIL: expected ('alice', 90), got {rows[0]}"
assert rows[1] == ("carol", 88), f"FAIL: expected ('carol', 88), got {rows[1]}"
assert injection_blocked, "FAIL: SQL injection was not blocked"

result = {
    "rows_returned":    rows_returned,
    "injection_blocked": injection_blocked,
    "parameterized_ok": rows_returned == 2,
}
print(json.dumps(result, indent=2))
print("PASS")