Throttled-py

3.2.0 · active · verified Sun Apr 12

Throttled-py is a high-performance Python rate limiting library, currently at version 3.2.0, providing various algorithms like Fixed Window, Sliding Window, Token Bucket, Leaky Bucket, and GCRA. It supports both in-memory and Redis storage backends, and offers synchronous and asynchronous APIs. The project maintains an active release cadence, with updates typically occurring every 1-3 months.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates both synchronous and asynchronous usage of `throttled-py` as a decorator. It uses the `TOKEN_BUCKET` algorithm for the synchronous example and `SLIDING_WINDOW` for the asynchronous one, with an optional Redis backend configured via `REDIS_URL` for distributed rate limiting. Ensure `throttled-py[redis]` is installed and a Redis instance is accessible for distributed storage.

import os
import time
from throttled import Throttled, RateLimiterType

# Configure Redis connection via URL for example, falling back to localhost
REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0')

# Example with RedisStore
# Note: Requires 'throttled-py[redis]' to be installed.
# If Redis is not available, this example will use In-MemoryStore as a fallback implicitly
# or explicitly set store=MemoryStore() for local testing without redis.

# Create a throttler instance with Token Bucket algorithm, 1 request per second, burst 1.
# Key is dynamically generated from the function name.
@Throttled(using=RateLimiterType.TOKEN_BUCKET.value, quota="1/s burst 1", store_url=REDIS_URL)
def process_request(request_id):
    print(f"Processing request {request_id} at {time.time()}")
    return f"Processed {request_id}"

print("Starting throttled requests...")
for i in range(5):
    try:
        result = process_request(i)
        print(f"Success: {result}")
    except Exception as e:
        print(f"Failed to process request {i}: {e}")
    time.sleep(0.5) # Simulate some interval between calls

print("\nTrying with async Throttled (requires `throttled-py[redis]` and a running Redis for distributed behavior):")

import asyncio
from throttled.asyncio import Throttled as AsyncThrottled
from throttled.asyncio import RateLimiterType as AsyncRateLimiterType

@AsyncThrottled(using=AsyncRateLimiterType.SLIDING_WINDOW.value, quota="2/m", store_url=REDIS_URL)
async def async_process_request(request_id):
    print(f"Async processing request {request_id} at {time.time()}")
    await asyncio.sleep(0.1) # Simulate async work
    return f"Async processed {request_id}"

async def main():
    for i in range(5):
        try:
            result = await async_process_request(i)
            print(f"Async Success: {result}")
        except Exception as e:
            print(f"Async Failed to process request {i}: {e}")
        await asyncio.sleep(1) # Simulate some interval

if __name__ == '__main__':
    # Note: If redis-py is not installed or Redis is not running, 
    # the store_url might implicitly fallback to in-memory behavior 
    # or raise connection errors. Ensure Redis is accessible for full distributed functionality.
    try:
        asyncio.run(main())
    except ImportError:
        print("\nSkipping async example: throttled-py[redis] might not be installed or redis-py is missing.")
    except Exception as e:
        print(f"\nError during async execution: {e}")

view raw JSON →