redis-rate-limit-sliding-window
Redis connection URL e.g. "redis://localhost:6379"
import sys
import os
import subprocess
import time
import urllib.request
import json
# ─────────────────────────────────────────
# PRE_EXECUTION
# ─────────────────────────────────────────
for attempt in range(2):
try:
req = urllib.request.Request(
"https://checklist.day/api/registry/redis",
headers={"User-Agent": "checklist-agent/1.0"}
)
with urllib.request.urlopen(req, timeout=10) as resp:
registry = json.loads(resp.read())
break
except Exception as e:
if attempt == 1:
print(f"ABORT: registry unreachable — {e}")
sys.exit(1)
time.sleep(2)
warnings = registry.get("warnings", [])
if warnings:
print("[redis] WARNINGS:")
for w in warnings if isinstance(warnings, list) else [warnings]:
print(f" ⚠ {w}")
# ─────────────────────────────────────────
# EXECUTION
# ─────────────────────────────────────────
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "redis>=5.0.0"])
import redis as redis_lib
REDIS_URL = os.environ.get("REDIS_URL")
if not REDIS_URL:
print("ABORT: REDIS_URL env var not set")
sys.exit(1)
WINDOW_SECONDS = 10
MAX_REQUESTS = 5
KEY_PREFIX = "checklist:ratelimit"
client = redis_lib.Redis.from_url(REDIS_URL, decode_responses=True, socket_connect_timeout=10)
# Sliding window rate limiter using sorted set
# FOOTGUN: use Lua script for atomicity — ZREMRANGEBYSCORE + ZADD + EXPIRE is a race condition without Lua
RATE_LIMIT_SCRIPT = """
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local max_requests = tonumber(ARGV[3])
local request_id = ARGV[4]
-- Remove requests outside the sliding window
redis.call('ZREMRANGEBYSCORE', key, 0, now - window * 1000)
-- Count current requests in window
local count = redis.call('ZCARD', key)
if count < max_requests then
-- Allow: add this request
redis.call('ZADD', key, now, request_id)
redis.call('PEXPIRE', key, window * 1000)
return 1
else
return 0
end
"""
rate_limiter = client.register_script(RATE_LIMIT_SCRIPT)
try:
# Cleanup
client.delete(f"{KEY_PREFIX}:user_123")
allowed_count = 0
blocked_count = 0
# Send MAX_REQUESTS + 3 requests — first MAX_REQUESTS should be allowed, rest blocked
total_requests = MAX_REQUESTS + 3
for i in range(total_requests):
now_ms = int(time.time() * 1000)
result = rate_limiter(
keys=[f"{KEY_PREFIX}:user_123"],
args=[now_ms, WINDOW_SECONDS, MAX_REQUESTS, f"req_{i}_{now_ms}"]
)
if result == 1:
allowed_count += 1
print(f" request {i+1}: ALLOWED ({allowed_count}/{MAX_REQUESTS})")
else:
blocked_count += 1
print(f" request {i+1}: BLOCKED (rate limit exceeded)")
# Cleanup
client.delete(f"{KEY_PREFIX}:user_123")
finally:
client.close()
# ─────────────────────────────────────────
# POST_EXECUTION
# ─────────────────────────────────────────
assert allowed_count == MAX_REQUESTS, f"FAIL: expected {MAX_REQUESTS} allowed, got {allowed_count}"
assert blocked_count == 3, f"FAIL: expected 3 blocked, got {blocked_count}"
result = {
"allowed_count": allowed_count,
"blocked_count": blocked_count,
"window_seconds": WINDOW_SECONDS,
"max_requests": MAX_REQUESTS,
}
print(json.dumps(result, indent=2))
print("PASS")