Pebble
Pebble is a Python library that enhances `concurrent.futures` with features like timeouts, remote process tracebacks, and cleaner pool management, making threading and multiprocessing more robust and user-friendly. It is currently at version 5.2.0 and maintains an active release cadence with frequent minor updates.
Warnings
- gotcha When handling timeouts with `ProcessPool` or `ThreadPool`, the `TimeoutError` exception is from `concurrent.futures`, not directly from `pebble`. Ensure you import it correctly: `from concurrent.futures import TimeoutError`.
- gotcha When using `ProcessPool`, especially on Windows and macOS, the code that creates the pool must be guarded by `if __name__ == '__main__':`. Failure to do so can lead to `RuntimeError` or infinite recursion when child processes try to re-import the main script.
- gotcha Pebble's `schedule()` method provides direct timeout handling and returns futures that can expose the remote traceback of exceptions via `future.exception().traceback`. If you use `concurrent.futures.submit()`, you won't get these enhanced features.
- breaking In version 5.1.0, `Pebble` changed its default `SIGTERM` handling to reset it to the default handler for child processes. If you had custom `SIGTERM` handlers in child processes that relied on Pebble not interfering, this behavior might have changed.
Install
-
pip install pebble
Imports
- ProcessPool
from pebble import ProcessPool
- ThreadPool
from pebble import ThreadPool
Quickstart
from pebble import ProcessPool
from concurrent.futures import TimeoutError
import os
import time
def my_task(data, delay):
# Simulate some work
time.sleep(delay)
return f"Processed {data} after {delay}s on PID {os.getpid()}"
if __name__ == "__main__": # Essential for ProcessPool on Windows/macOS
print("--- Pebble ProcessPool Quickstart ---")
with ProcessPool(max_workers=2) as pool:
print("Submitting tasks...")
# schedule returns a future object, allowing timeout directly on the task
future1 = pool.schedule(my_task, args=("task A", 1), timeout=2)
future2 = pool.schedule(my_task, args=("task B", 3), timeout=2) # This task will intentionally timeout
print("\nGetting results for task 1 (should succeed):")
try:
result1 = future1.result() # blocks until result is ready or timeout/exception
print(f"Result 1: {result1}")
except TimeoutError:
print("Task 1 timed out!")
except Exception as e:
print(f"Task 1 raised an unexpected exception: {e}")
# For remote exceptions, e.traceback can provide the remote stack trace
print("\nGetting results for task 2 (should timeout):")
try:
result2 = future2.result()
print(f"Result 2: {result2}")
except TimeoutError:
print("Task 2 timed out as expected!")
except Exception as e:
print(f"Task 2 raised an unexpected exception: {e}")
# For remote exceptions, e.traceback can provide the remote stack trace
print("\nAll tasks completed or processed in pool.")