Threaded
Threaded is a Python library providing decorators for easily wrapping functions to run in `concurrent.futures.ThreadPoolExecutor`, `threading.Thread`, or `asyncio.Task`. It aims to reduce boilerplate code associated with managing concurrent operations. The library is actively maintained, with its latest version being 4.2.0, and has a consistent release cadence.
Warnings
- gotcha Python's Global Interpreter Lock (GIL) means that standard CPython threads cannot execute CPU-bound tasks in parallel across multiple cores. This library, by using `threading` and `concurrent.futures.ThreadPoolExecutor`, primarily benefits I/O-bound tasks where threads can yield the GIL while waiting. For CPU-bound parallelism, consider `multiprocessing`.
- gotcha When using `ThreadPooled` (which leverages `ThreadPoolExecutor`), it's important to explicitly call `ThreadPooled.shutdown()` when your application is exiting or when the pool is no longer needed. Failure to do so can lead to resource leaks, prevent the program from exiting cleanly, or cause issues during application shutdown.
- gotcha Like any concurrency mechanism, using `threaded` with shared mutable state (e.g., global variables, class attributes, shared objects) can lead to race conditions if access is not properly synchronized. This can result in unpredictable behavior or corrupted data.
Install
-
pip install threaded
Imports
- ThreadPooled
from threaded import ThreadPooled
- threadpooled
from threaded import threadpooled
- Threaded
from threaded import Threaded
- threaded
from threaded import threaded
- AsyncIOTask
from threaded import AsyncIOTask
- asynciotask
from threaded import asynciotask
Quickstart
import time
import concurrent.futures
from threaded import ThreadPooled
# Configure the thread pool (optional, defaults to CPU_COUNT * 5 workers)
ThreadPooled.configure(max_workers=3)
@ThreadPooled
def process_item(item_id):
print(f"Processing item {item_id} in a thread...")
time.sleep(1) # Simulate I/O-bound work
return f"Item {item_id} processed."
if __name__ == "__main__":
print("Submitting tasks to the thread pool...")
futures = [process_item(i) for i in range(5)]
# Wait for all tasks to complete and retrieve results
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
print(f"Result: {result}")
except Exception as exc:
print(f'Task generated an exception: {exc}')
print("All tasks submitted and results collected.")
# It's crucial to explicitly shut down the thread pool for graceful exit
ThreadPooled.shutdown()
print("Thread pool shut down.")