Bounded Pool Executor
Bounded Pool Executor is a Python library that provides `BoundedThreadPoolExecutor` and `BoundedProcessPoolExecutor` classes, extending `concurrent.futures` to manage a fixed-size queue for tasks. This prevents memory exhaustion that can occur with the standard unbounded queues in `concurrent.futures` when submitting a large number of tasks. The current version is 0.0.3, offering a solution to prevent memory leaks in high-concurrency scenarios by blocking `submit` calls when the queue is full.
Warnings
- gotcha Exceptions raised within tasks submitted to the executors are silently ignored by default. This can lead to silent failures that are difficult to debug.
- gotcha The `max_queue_size` parameter in `BoundedProcessPoolExecutor` (and `BoundedThreadPoolExecutor`) represents the sum of currently executing tasks *plus* tasks waiting in the queue. If `max_queue_size` is too small (e.g., less than `max_workers`), worker processes may sit idle even when tasks are available to be submitted, leading to underutilization.
- gotcha When using `BoundedProcessPoolExecutor`, functions and their arguments must be picklable. Submitting unpicklable objects (e.g., lambda functions defined in the main script, or objects containing unpicklable attributes like an executor instance itself) will cause runtime errors. This is a common limitation of Python's `multiprocessing` module.
- gotcha Deadlock can occur if tasks submitted to the bounded pool attempt to submit new tasks to the *same* pool or wait for results from other tasks in the same pool, especially if `max_workers` and `max_queue_size` are small. This is a general concurrency hazard with bounded pools.
Install
-
pip install bounded-pool-executor
Imports
- BoundedThreadPoolExecutor
from bounded_pool_executor import BoundedThreadPoolExecutor
- BoundedProcessPoolExecutor
from bounded_pool_executor import BoundedProcessPoolExecutor
Quickstart
import time
from bounded_pool_executor import BoundedThreadPoolExecutor
def my_task(n):
time.sleep(0.1) # Simulate some work
return f"Task {n} completed"
max_workers = 2
max_queue_size = 4 # total slots = max_workers + max_queue_size_for_waiting_tasks
print(f"Using BoundedThreadPoolExecutor with {max_workers} workers and queue size {max_queue_size}")
futures = []
with BoundedThreadPoolExecutor(max_workers=max_workers, max_queue_size=max_queue_size) as pool:
for i in range(1, 10):
print(f"Submitting task {i}...")
# The submit call will block if the queue is full
future = pool.submit(my_task, i)
futures.append(future)
print("All tasks submitted. Waiting for results...")
for future in futures:
print(future.result())