aioprocessing
aioprocessing is a Python 3.5+ library that provides asynchronous, asyncio-compatible versions of many blocking instance methods found in Python's standard `multiprocessing` module. It allows seamless integration of multiprocessing objects within `asyncio` coroutines without blocking the event loop. The library is currently at version 2.0.1 and generally follows an active release cadence, with the last major update (2.0.0) introducing `dill` support and internal `async/await` usage.
Common errors
-
RuntimeError: An attempt has been made to start a new process before the current process has finished its bootstrapping phase.
cause This error typically occurs on Windows and macOS (Python 3.8+) where the default multiprocessing start method is 'spawn'. It happens when process creation code is placed directly at the top level of a script, leading to re-import issues in child processes.fixWrap all code that spawns new processes within an `if __name__ == '__main__':` block. This ensures the code only runs in the main process and not in newly spawned child processes during bootstrapping. Example: `if __name__ == '__main__': asyncio.run(main())`. -
AttributeError: Can't pickle local object 'example.<locals>.worker_func'
cause When using the 'spawn' or 'forkserver' start methods (default on Windows/macOS), objects (including functions) passed to child processes must be importable from the child's context. Nested or locally defined functions cannot be pickled and sent to other processes.fixEnsure that any functions or classes passed as `target` to `AioProcess` or used within `AioPool.map` are defined at the top level of a module, making them globally accessible and therefore picklable. Avoid lambda functions or nested function definitions for multiprocessing targets. -
BlockingIOError: [Errno 11] Resource temporarily unavailable
cause Attempting to use a `multiprocessing` object directly in an `asyncio` coroutine without its `aioprocessing` (coroutine-friendly) wrapper will block the event loop, as the underlying `multiprocessing` methods are synchronous.fixAlways use the `coro_` prefixed methods provided by `aioprocessing` (e.g., `queue.coro_put()`, `event.coro_wait()`, `lock.coro_acquire()`) when interacting with `aioprocessing` objects from within `asyncio` coroutines. The non-`coro_` methods are blocking.
Warnings
- gotcha Mixing threads with forked processes can lead to unexpected behavior and issues, as `aioprocessing` often uses `ThreadPoolExecutor` internally to make blocking `multiprocessing` calls asynchronous. This caveat applies particularly if the underlying `multiprocessing.Pool` (used by `AioPool`) is employing threads.
- breaking Version 2.0.0 introduced support for universal pickling using `dill` and also moved to using `async/await` internally. This might affect applications relying on specific pickling behaviors or internal event loop interactions, though the external API remains largely consistent with `coro_` prefixes.
- gotcha Using `AioPool` with `maxtasksperchild` can sometimes lead to `OSError: [Errno 24] Too many open files` if worker processes are not properly cleaned up and new ones are spawned, especially under heavy load.
Install
-
pip install aioprocessing -
pip install aioprocessing[dill]
Imports
- AioProcess
from aioprocessing import AioProcess
- AioPool
from aioprocessing import AioPool
- AioQueue
from aioprocessing import AioQueue
- AioLock
from aioprocessing import AioLock
- AioEvent
from aioprocessing import AioEvent
- AioManager
from aioprocessing.manager import AioManager
from aioprocessing import AioManager
Quickstart
import asyncio
import time
import aioprocessing
def worker_func(queue, event, lock, items):
""" Demo worker function for a separate process. """
with lock:
event.set()
for item in items:
time.sleep(0.1) # Simulate work
queue.put(item + 5)
queue.put(None) # Signal completion
async def example():
queue = aioprocessing.AioQueue()
lock = aioprocessing.AioLock()
event = aioprocessing.AioEvent()
items_to_process = [1, 2, 3, 4, 5]
print("Starting worker process...")
p = aioprocessing.AioProcess(target=worker_func, args=(queue, event, lock, items_to_process))
p.start()
# Wait for the worker to signal it's ready
await event.coro_wait()
print("Worker is ready.")
async with lock: # Acquire lock in async context
print("Lock acquired in main process (async).")
# Demonstrating put from main process
await queue.coro_put(78)
print("Put 78 into queue from main process.")
print("Collecting results from queue...")
while True:
result = await queue.coro_get()
if result is None:
break
print(f"Got result: {result}")
await p.coro_join() # Asynchronously wait for process to finish
print("Worker process finished.")
if __name__ == "__main__":
asyncio.run(example())