redis-rate-limit-sliding-window

code_execution · unverified · null · json · download .py

Redis connection URL e.g. "redis://localhost:6379"

import sys
import os
import subprocess
import time
import urllib.request
import json

# ─────────────────────────────────────────
# PRE_EXECUTION
# ─────────────────────────────────────────

for attempt in range(2):
    try:
        req = urllib.request.Request(
            "https://checklist.day/api/registry/redis",
            headers={"User-Agent": "checklist-agent/1.0"}
        )
        with urllib.request.urlopen(req, timeout=10) as resp:
            registry = json.loads(resp.read())
            break
    except Exception as e:
        if attempt == 1:
            print(f"ABORT: registry unreachable — {e}")
            sys.exit(1)
        time.sleep(2)

warnings = registry.get("warnings", [])
if warnings:
    print("[redis] WARNINGS:")
    for w in warnings if isinstance(warnings, list) else [warnings]:
        print(f"  ⚠ {w}")

# ─────────────────────────────────────────
# EXECUTION
# ─────────────────────────────────────────

subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "redis>=5.0.0"])

import redis as redis_lib

REDIS_URL = os.environ.get("REDIS_URL")
if not REDIS_URL:
    print("ABORT: REDIS_URL env var not set")
    sys.exit(1)

WINDOW_SECONDS = 10
MAX_REQUESTS   = 5
KEY_PREFIX     = "checklist:ratelimit"

client = redis_lib.Redis.from_url(REDIS_URL, decode_responses=True, socket_connect_timeout=10)

# Sliding window rate limiter using sorted set
# FOOTGUN: use Lua script for atomicity — ZREMRANGEBYSCORE + ZADD + EXPIRE is a race condition without Lua
RATE_LIMIT_SCRIPT = """
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local max_requests = tonumber(ARGV[3])
local request_id = ARGV[4]

-- Remove requests outside the sliding window
redis.call('ZREMRANGEBYSCORE', key, 0, now - window * 1000)

-- Count current requests in window
local count = redis.call('ZCARD', key)

if count < max_requests then
    -- Allow: add this request
    redis.call('ZADD', key, now, request_id)
    redis.call('PEXPIRE', key, window * 1000)
    return 1
else
    return 0
end
"""

rate_limiter = client.register_script(RATE_LIMIT_SCRIPT)

try:
    # Cleanup
    client.delete(f"{KEY_PREFIX}:user_123")

    allowed_count = 0
    blocked_count = 0

    # Send MAX_REQUESTS + 3 requests — first MAX_REQUESTS should be allowed, rest blocked
    total_requests = MAX_REQUESTS + 3
    for i in range(total_requests):
        now_ms = int(time.time() * 1000)
        result = rate_limiter(
            keys=[f"{KEY_PREFIX}:user_123"],
            args=[now_ms, WINDOW_SECONDS, MAX_REQUESTS, f"req_{i}_{now_ms}"]
        )
        if result == 1:
            allowed_count += 1
            print(f"  request {i+1}: ALLOWED ({allowed_count}/{MAX_REQUESTS})")
        else:
            blocked_count += 1
            print(f"  request {i+1}: BLOCKED (rate limit exceeded)")

    # Cleanup
    client.delete(f"{KEY_PREFIX}:user_123")

finally:
    client.close()

# ─────────────────────────────────────────
# POST_EXECUTION
# ─────────────────────────────────────────

assert allowed_count == MAX_REQUESTS, f"FAIL: expected {MAX_REQUESTS} allowed, got {allowed_count}"
assert blocked_count == 3, f"FAIL: expected 3 blocked, got {blocked_count}"

result = {
    "allowed_count":  allowed_count,
    "blocked_count":  blocked_count,
    "window_seconds": WINDOW_SECONDS,
    "max_requests":   MAX_REQUESTS,
}
print(json.dumps(result, indent=2))
print("PASS")