authenticated-rest-api-call
Make authenticated REST API calls using Bearer token and API key header patterns
import os
import sys
import json
import subprocess
import requests as _requests
# ----------------------------------------
# PRE_EXECUTION
# FM-2.2: fetch ground truth for all registry_refs
# ----------------------------------------
REGISTRY_REFS = ["httpx", "requests"]
MAX_RETRIES = 2
registries = {}
for lib in REGISTRY_REFS:
for attempt in range(MAX_RETRIES):
try:
response = _requests.get(
f"https://checklist.day/api/registry/{lib}",
timeout=10
)
if response.status_code == 200:
registries[lib] = response.json()
break
except _requests.exceptions.RequestException:
pass
for lib in REGISTRY_REFS:
assert lib in registries, \
f"ABORT: registry fetch failed for {lib} after {MAX_RETRIES} attempts"
# FM-2.4: surface breaking warnings — do not withhold
for lib, registry in registries.items():
breaking = [
w for w in registry.get("warnings", [])
if w.get("severity") == "breaking"
]
if breaking:
print(f"PRE_EXECUTION: {lib} has {len(breaking)} breaking warning(s):")
for w in breaking:
print(f" [!] [{w.get('affected_versions', 'all')}] {w['message'][:120]}")
print(f" fix: {w['fix'][:100]}")
print()
print("PRE_EXECUTION: all registry refs verified ✓")
for lib, registry in registries.items():
install = registry.get("install", [{}])[0].get("cmd", "unknown")
print(f" {lib:20s} : {install}")
# ----------------------------------------
# KNOWN FAILURE MODES
#
# 1. Token in URL — NEVER do: requests.get(url + "?token=" + token)
# Tokens in URLs end up in server logs, browser history, referrer headers
#
# 2. Missing "Bearer " prefix — Authorization header must be "Bearer <token>"
# not just the raw token string
#
# 3. Using httpx.get() for sync calls — correct for sync, but
# httpx.AsyncClient is required inside async contexts (e.g. FastAPI, asyncio)
# Using sync httpx inside async will block the event loop
#
# 4. No timeout — agents hang indefinitely on unresponsive APIs
# Always set timeout explicitly
#
# 5. Credentials in logs — never log headers dict directly, redact first
# ----------------------------------------
DEFAULT_TIMEOUT = 10 # seconds — always set explicitly
def redact_headers(headers: dict) -> dict:
"""FM-2.4: never log raw auth headers — redact before surfacing."""
safe = {}
for k, v in headers.items():
if k.lower() in ("authorization", "x-api-key", "api-key", "x-auth-token"):
safe[k] = "[REDACTED]"
else:
safe[k] = v
return safe
def make_bearer_request(url: str, token: str, method: str = "GET", **kwargs):
"""
Pattern 1: Bearer token auth via requests.
FM-2.6: Authorization header format is 'Bearer <token>' — not just the token.
FM-1.5: explicit timeout — never hang indefinitely.
"""
import requests
headers = {
"Authorization": f"Bearer {token}", # FM-2.6: prefix required
"Accept": "application/json",
"Content-Type": "application/json",
}
response = requests.request(
method=method,
url=url,
headers=headers,
timeout=DEFAULT_TIMEOUT, # FM-1.5: always set
**kwargs
)
return response
def make_apikey_request(url: str, api_key: str, method: str = "GET", **kwargs):
"""
Pattern 2: API key header auth via httpx (sync).
Common header names: X-API-Key, Api-Key, x-api-key — check provider docs.
FM-2.6: use httpx for sync; use httpx.AsyncClient for async contexts.
FM-1.5: explicit timeout.
"""
import httpx
headers = {
"X-API-Key": api_key, # adjust header name per provider
"Accept": "application/json",
}
response = httpx.request(
method=method,
url=url,
headers=headers,
timeout=DEFAULT_TIMEOUT, # FM-1.5: always set
**kwargs
)
return response
def handle_auth_error(response) -> None:
"""
FM-3.2: check auth before parsing response body.
401 = bad/expired token. 403 = valid token, insufficient permissions.
Both are distinct — handle separately.
"""
if response.status_code == 401:
raise PermissionError(
"ABORT: 401 Unauthorized — token is missing, expired, or malformed. "
"Check token value and 'Bearer ' prefix."
)
if response.status_code == 403:
raise PermissionError(
"ABORT: 403 Forbidden — token is valid but lacks required scope/permissions. "
"Check API key scopes or OAuth grant."
)
# ----------------------------------------
# EXECUTION
# Demonstrate both patterns against httpbin.org — no real API key needed
# httpbin echoes back the request headers so we can verify auth was sent correctly
# FM-1.1: idempotent GET requests — safe to run multiple times
# ----------------------------------------
try:
import requests
except ImportError:
pkg = registries["requests"]["install"][0]["cmd"].replace("pip install ", "").strip()
print(f"\nEXECUTION: requests not found — installing {pkg}...")
subprocess.check_call([sys.executable, "-m", "pip", "install", pkg])
import requests
try:
import httpx
except ImportError:
pkg = registries["httpx"]["install"][0]["cmd"].replace("pip install ", "").strip()
print(f"\nEXECUTION: httpx not found — installing {pkg}...")
subprocess.check_call([sys.executable, "-m", "pip", "install", pkg])
import httpx
TEST_URL = "https://httpbin.org/headers"
MOCK_BEARER_TOKEN = "test-bearer-token-abc123"
MOCK_API_KEY = "test-api-key-xyz789"
print()
print("EXECUTION: Pattern 1 — Bearer token via requests...")
response_bearer = make_bearer_request(TEST_URL, MOCK_BEARER_TOKEN)
handle_auth_error(response_bearer)
bearer_data = response_bearer.json()
print(f" status : {response_bearer.status_code}")
print(f" headers sent: {json.dumps(redact_headers(bearer_data['headers']), indent=4)}")
print()
print("EXECUTION: Pattern 2 — API key header via httpx...")
response_apikey = make_apikey_request(TEST_URL, MOCK_API_KEY)
handle_auth_error(response_apikey)
apikey_data = response_apikey.json()
print(f" status : {response_apikey.status_code}")
print(f" headers sent: {json.dumps(redact_headers(apikey_data['headers']), indent=4)}")
# ----------------------------------------
# POST_EXECUTION
# FM-3.2: verify auth headers were actually sent — not just that request succeeded
# FM-3.3: exact match on header values
# ----------------------------------------
# Verify Bearer token was sent with correct format
sent_auth = bearer_data["headers"].get("Authorization", "")
assert sent_auth == f"Bearer {MOCK_BEARER_TOKEN}", \
f"FAIL: Authorization header mismatch — got '{sent_auth}'"
assert sent_auth.startswith("Bearer "), \
"FAIL: Authorization header missing 'Bearer ' prefix"
# Verify API key header was sent
sent_apikey = apikey_data["headers"].get("X-Api-Key", "")
assert sent_apikey == MOCK_API_KEY, \
f"FAIL: X-API-Key header mismatch — got '{sent_apikey}'"
# Verify token is NOT in URL (common footgun)
assert MOCK_BEARER_TOKEN not in response_bearer.request.url, \
"FAIL: token found in URL — credentials must be in headers only"
assert MOCK_API_KEY not in str(response_apikey.request.url), \
"FAIL: api key found in URL — credentials must be in headers only"
# Verify redaction works — credentials must not appear in logs
redacted = redact_headers({"Authorization": f"Bearer {MOCK_BEARER_TOKEN}", "X-API-Key": MOCK_API_KEY})
assert redacted["Authorization"] == "[REDACTED]", \
"FAIL: Authorization header not redacted"
assert redacted["X-API-Key"] == "[REDACTED]", \
"FAIL: X-API-Key header not redacted"
print()
print("POST_EXECUTION: Bearer token format verified ✓ ('Bearer ' prefix present)")
print("POST_EXECUTION: API key header verified ✓")
print("POST_EXECUTION: credentials not in URL ✓")
print("POST_EXECUTION: header redaction verified ✓")
result = {
"status": "pass",
"bearer_auth_verified": True,
"apikey_verified": True,
"credentials_in_url": False,
"redaction_verified": True,
}
print(result)
print("PASS")