rate-limit-handling-with-retry

code_execution · unverified · null · json · download .py

Handle API rate limits with exponential backoff and jitter using tenacity

import subprocess, sys, time, json

# ── PRE_EXECUTION ─────────────────────────────────────────────
import urllib.request, urllib.error

REGISTRY_URL = "https://checklist.day/api/registry/tenacity"
for attempt in range(2):
    try:
        with urllib.request.urlopen(REGISTRY_URL, timeout=10) as resp:
            registry = json.loads(resp.read())
        assert "imports" in registry, "ABORT: registry missing imports field"
        assert "warnings" in registry, "ABORT: registry missing warnings field"
        # Surface breaking warnings
        for w in registry.get("warnings", []):
            if w.get("severity") == "breaking":
                print(f"[WARNING] {w['message']}")
        break
    except Exception as e:
        if attempt == 1:
            print(f"ABORT: registry unreachable after 2 attempts — {e}")
            sys.exit(1)
        time.sleep(1)

# ── EXECUTION ──────────────────────────────────────────────────
# Auto-install deps
for pkg in ["tenacity", "requests"]:
    try:
        __import__(pkg)
    except ImportError:
        subprocess.check_call([sys.executable, "-m", "pip", "install", pkg, "-q"])

from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential_jitter,
    retry_if_exception_type,
    before_sleep_log,
    RetryError,
)
import requests
import logging

logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)

# Config
TARGET_URL = "https://httpbin.org/status/200"   # swap to /status/429 to test retry path
MAX_ATTEMPTS = 5
BASE_WAIT = 1
attempts_made = 0

class RateLimitError(Exception):
    pass

@retry(
    retry=retry_if_exception_type(RateLimitError),
    stop=stop_after_attempt(MAX_ATTEMPTS),          # FM-1.3, FM-1.5 — hard ceiling
    wait=wait_exponential_jitter(initial=BASE_WAIT, max=30),  # jitter prevents thundering herd
    before_sleep=before_sleep_log(logger, logging.WARNING),
    reraise=True,
)
def call_with_retry(url: str) -> requests.Response:
    global attempts_made
    attempts_made += 1
    resp = requests.get(url, timeout=10)

    # Surface rate limit headers — FM-2.4
    retry_after = resp.headers.get("Retry-After")
    ratelimit_remaining = resp.headers.get("X-RateLimit-Remaining")
    if retry_after:
        print(f"[RATE-LIMIT] Retry-After: {retry_after}s")
    if ratelimit_remaining is not None:
        print(f"[RATE-LIMIT] X-RateLimit-Remaining: {ratelimit_remaining}")

    if resp.status_code == 429:
        raise RateLimitError(f"Rate limited (429). Attempt {attempts_made}/{MAX_ATTEMPTS}")
    if resp.status_code == 503:
        raise RateLimitError(f"Service unavailable (503). Attempt {attempts_made}/{MAX_ATTEMPTS}")

    return resp

t0 = time.time()
try:
    response = call_with_retry(TARGET_URL)
except RetryError as e:
    print(f"FAIL: exhausted {MAX_ATTEMPTS} attempts — {e}")
    sys.exit(1)

elapsed_ms = int((time.time() - t0) * 1000)

# ── POST_EXECUTION ─────────────────────────────────────────────
# Verify final response is 2xx — FM-3.2, FM-3.3
assert response.status_code in range(200, 300), (
    f"FAIL: final response was {response.status_code}, expected 2xx"
)
assert attempts_made >= 1, "FAIL: no attempts recorded"
assert attempts_made <= MAX_ATTEMPTS, (
    f"FAIL: exceeded MAX_ATTEMPTS ({attempts_made} > {MAX_ATTEMPTS})"
)

result = {
    "status": response.status_code,
    "attempts_made": attempts_made,
    "elapsed_ms": elapsed_ms,
    "url": TARGET_URL,
}

print(json.dumps(result, indent=2))
print("PASS")