Throttler
Throttler is a zero-dependency Python package designed for easy rate limiting and concurrency control, with robust support for asyncio. It provides flexible context managers and decorators to manage the frequency and simultaneity of operations. The library is actively maintained, with version 1.2.3 being the latest, and receives updates to support newer Python versions and improve performance.
Warnings
- breaking Version 1.2.3 dropped official support for Python 3.6 and 3.7. Users on these Python versions should remain on `throttler<1.2.3` to avoid compatibility issues.
- gotcha Both `Throttler` and `ThrottlerSimultaneous` are asynchronous context managers or decorators. They must be used within an `async` function and typically require an `asyncio` event loop to run correctly. Attempting to use them synchronously will result in runtime errors.
- gotcha The `Throttler` (for rate limiting) implements a sliding window mechanism. This means it tracks the exact timestamps of previous executions within the `period`. If you expect a fixed window behavior, this might lead to different throttling patterns than anticipated.
- gotcha Prior to version 1.2.1, argument validation for `Throttler` and related classes might have been less robust, potentially leading to unexpected behavior with invalid input values or types. Version 1.2.1 introduced explicit checks for these.
Install
-
pip install throttler
Imports
- Throttler
from throttler import Throttler
- throttle
from throttler import throttle
- ThrottlerSimultaneous
from throttler import ThrottlerSimultaneous
- throttle_simultaneous
from throttler import throttle_simultaneous
- ExecutionTimer
from throttler import ExecutionTimer
- execution_timer
from throttler import execution_timer
Quickstart
import asyncio
from throttler import Throttler
import time
async def limited_task(task_id, throttler):
async with throttler:
print(f"{time.time():.2f}: Task {task_id} executing...")
async def main():
# Limit to 3 calls per second
rate_limiter = Throttler(rate_limit=3, period=1.0)
tasks = [limited_task(i, rate_limiter) for i in range(10)]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())