aioprocessing
raw JSON → 2.0.1 verified Thu Apr 16 auth: no python
aioprocessing is a Python 3.5+ library that provides asynchronous, asyncio-compatible versions of many blocking instance methods found in Python's standard `multiprocessing` module. It allows seamless integration of multiprocessing objects within `asyncio` coroutines without blocking the event loop. The library is currently at version 2.0.1 and generally follows an active release cadence, with the last major update (2.0.0) introducing `dill` support and internal `async/await` usage.
pip install aioprocessing Common errors
error RuntimeError: An attempt has been made to start a new process before the current process has finished its bootstrapping phase. ↓
cause This error typically occurs on Windows and macOS (Python 3.8+) where the default multiprocessing start method is 'spawn'. It happens when process creation code is placed directly at the top level of a script, leading to re-import issues in child processes.
fix
Wrap all code that spawns new processes within an
if __name__ == '__main__': block. This ensures the code only runs in the main process and not in newly spawned child processes during bootstrapping. Example: if __name__ == '__main__': asyncio.run(main()). error AttributeError: Can't pickle local object 'example.<locals>.worker_func' ↓
cause When using the 'spawn' or 'forkserver' start methods (default on Windows/macOS), objects (including functions) passed to child processes must be importable from the child's context. Nested or locally defined functions cannot be pickled and sent to other processes.
fix
Ensure that any functions or classes passed as
target to AioProcess or used within AioPool.map are defined at the top level of a module, making them globally accessible and therefore picklable. Avoid lambda functions or nested function definitions for multiprocessing targets. error BlockingIOError: [Errno 11] Resource temporarily unavailable ↓
cause Attempting to use a `multiprocessing` object directly in an `asyncio` coroutine without its `aioprocessing` (coroutine-friendly) wrapper will block the event loop, as the underlying `multiprocessing` methods are synchronous.
fix
Always use the
coro_ prefixed methods provided by aioprocessing (e.g., queue.coro_put(), event.coro_wait(), lock.coro_acquire()) when interacting with aioprocessing objects from within asyncio coroutines. The non-coro_ methods are blocking. Warnings
gotcha Mixing threads with forked processes can lead to unexpected behavior and issues, as `aioprocessing` often uses `ThreadPoolExecutor` internally to make blocking `multiprocessing` calls asynchronous. This caveat applies particularly if the underlying `multiprocessing.Pool` (used by `AioPool`) is employing threads. ↓
fix Be mindful of your `multiprocessing` start method, especially on Unix-like systems where 'fork' is the default. If encountering issues, consider explicitly setting the start method to 'spawn' using `multiprocessing.set_start_method('spawn')` at the beginning of your program, although this also has implications (see 'RuntimeError' below). Ensure any objects passed to child processes are picklable.
breaking Version 2.0.0 introduced support for universal pickling using `dill` and also moved to using `async/await` internally. This might affect applications relying on specific pickling behaviors or internal event loop interactions, though the external API remains largely consistent with `coro_` prefixes. ↓
fix If experiencing pickling issues, ensure `dill` is installed (`pip install aioprocessing[dill]`). If you need to force standard library `multiprocessing` pickling behavior, set the environment variable `AIOPROCESSING_DILL_DISABLED=1`.
gotcha Using `AioPool` with `maxtasksperchild` can sometimes lead to `OSError: [Errno 24] Too many open files` if worker processes are not properly cleaned up and new ones are spawned, especially under heavy load. ↓
fix Investigate the lifecycle of tasks within `maxtasksperchild`. Ensure all resources (e.g., file handles, network connections) acquired by a worker are properly closed before the task completes or the worker process is terminated. Review resource management within your worker functions.
Install
pip install aioprocessing[dill] Imports
- AioProcess
from aioprocessing import AioProcess - AioPool
from aioprocessing import AioPool - AioQueue
from aioprocessing import AioQueue - AioLock
from aioprocessing import AioLock - AioEvent
from aioprocessing import AioEvent - AioManager wrong
from aioprocessing.manager import AioManagercorrectfrom aioprocessing import AioManager
Quickstart
import asyncio
import time
import aioprocessing
def worker_func(queue, event, lock, items):
""" Demo worker function for a separate process. """
with lock:
event.set()
for item in items:
time.sleep(0.1) # Simulate work
queue.put(item + 5)
queue.put(None) # Signal completion
async def example():
queue = aioprocessing.AioQueue()
lock = aioprocessing.AioLock()
event = aioprocessing.AioEvent()
items_to_process = [1, 2, 3, 4, 5]
print("Starting worker process...")
p = aioprocessing.AioProcess(target=worker_func, args=(queue, event, lock, items_to_process))
p.start()
# Wait for the worker to signal it's ready
await event.coro_wait()
print("Worker is ready.")
async with lock: # Acquire lock in async context
print("Lock acquired in main process (async).")
# Demonstrating put from main process
await queue.coro_put(78)
print("Put 78 into queue from main process.")
print("Collecting results from queue...")
while True:
result = await queue.coro_get()
if result is None:
break
print(f"Got result: {result}")
await p.coro_join() # Asynchronously wait for process to finish
print("Worker process finished.")
if __name__ == "__main__":
asyncio.run(example())