Zthreading: Thread and Async Task Management
Zthreading is a Python library offering a unified interface for managing both traditional Python threads and `asyncio` tasks. It provides wrapper classes for event broadcasting, task management, and decorators for common concurrency patterns. The library is actively maintained, with minor releases addressing bug fixes and new features every few months.
Warnings
- breaking The signature for the `on_event` argument changed in version 0.1.11 from `on_event(name, *args, **kwargs)` to `on_event(evnt: Event)`. Event handlers now receive an `Event` object.
- breaking The `predict` method signature for `wait_for_events` (and `wait_for`) was changed in version 0.1.13. It now takes `predict(handler, event)` instead of `predict(handler, name: str, *args, **kwargs)`.
- gotcha Older versions of `zthreading` (prior to 0.1.18) used `threading.currentThread` internally, which was deprecated in Python 3.10 and removed in Python 3.12. Running these older versions on Python 3.10+ will result in deprecation warnings or `AttributeError`.
- gotcha When using `ThreadManager` or `AsyncioManager` with tasks, it's crucial to call `manager.wait_for_tasks()` or `manager.join_all()` to ensure all tasks complete and resources are cleaned up. Forgetting this can lead to unhandled tasks or processes remaining active.
Install
-
pip install zthreading
Imports
- ThreadManager
from zthreading.thread_manager import ThreadManager
- AsyncioManager
from zthreading.asyncio_manager import AsyncioManager
- as_task
from zthreading.decorators import as_task
- Event
from zthreading.events import Event
Quickstart
import time
from zthreading.thread_manager import ThreadManager
from zthreading.decorators import as_task
def my_task(name):
print(f"Task {name} started")
time.sleep(1) # Simulate work
print(f"Task {name} finished")
return f"Result for {name}"
# Initialize ThreadManager
manager = ThreadManager()
# Add tasks
task1_handle = manager.add_task(my_task, args=("Task 1",))
task2_handle = manager.add_task(my_task, args=("Task 2",))
# Use decorator for another task
@as_task(manager=manager)
def decorated_task(name):
print(f"Decorated task {name} started")
time.sleep(0.5)
print(f"Decorated task {name} finished")
return f"Decorated result for {name}"
decorated_task_handle = decorated_task("Decorated Task 3")
# Wait for all tasks to complete
manager.wait_for_tasks()
print(f"\nTask 1 result: {task1_handle.get_result()}")
print(f"Task 2 result: {task2_handle.get_result()}")
print(f"Decorated task result: {decorated_task_handle.get_result()}")
# Ensure all threads are properly joined/terminated
manager.join_all()