aioprocessing

raw JSON →
2.0.1 verified Thu Apr 16 auth: no python

aioprocessing is a Python 3.5+ library that provides asynchronous, asyncio-compatible versions of many blocking instance methods found in Python's standard `multiprocessing` module. It allows seamless integration of multiprocessing objects within `asyncio` coroutines without blocking the event loop. The library is currently at version 2.0.1 and generally follows an active release cadence, with the last major update (2.0.0) introducing `dill` support and internal `async/await` usage.

pip install aioprocessing
error RuntimeError: An attempt has been made to start a new process before the current process has finished its bootstrapping phase.
cause This error typically occurs on Windows and macOS (Python 3.8+) where the default multiprocessing start method is 'spawn'. It happens when process creation code is placed directly at the top level of a script, leading to re-import issues in child processes.
fix
Wrap all code that spawns new processes within an if __name__ == '__main__': block. This ensures the code only runs in the main process and not in newly spawned child processes during bootstrapping. Example: if __name__ == '__main__': asyncio.run(main()).
error AttributeError: Can't pickle local object 'example.<locals>.worker_func'
cause When using the 'spawn' or 'forkserver' start methods (default on Windows/macOS), objects (including functions) passed to child processes must be importable from the child's context. Nested or locally defined functions cannot be pickled and sent to other processes.
fix
Ensure that any functions or classes passed as target to AioProcess or used within AioPool.map are defined at the top level of a module, making them globally accessible and therefore picklable. Avoid lambda functions or nested function definitions for multiprocessing targets.
error BlockingIOError: [Errno 11] Resource temporarily unavailable
cause Attempting to use a `multiprocessing` object directly in an `asyncio` coroutine without its `aioprocessing` (coroutine-friendly) wrapper will block the event loop, as the underlying `multiprocessing` methods are synchronous.
fix
Always use the coro_ prefixed methods provided by aioprocessing (e.g., queue.coro_put(), event.coro_wait(), lock.coro_acquire()) when interacting with aioprocessing objects from within asyncio coroutines. The non-coro_ methods are blocking.
gotcha Mixing threads with forked processes can lead to unexpected behavior and issues, as `aioprocessing` often uses `ThreadPoolExecutor` internally to make blocking `multiprocessing` calls asynchronous. This caveat applies particularly if the underlying `multiprocessing.Pool` (used by `AioPool`) is employing threads.
fix Be mindful of your `multiprocessing` start method, especially on Unix-like systems where 'fork' is the default. If encountering issues, consider explicitly setting the start method to 'spawn' using `multiprocessing.set_start_method('spawn')` at the beginning of your program, although this also has implications (see 'RuntimeError' below). Ensure any objects passed to child processes are picklable.
breaking Version 2.0.0 introduced support for universal pickling using `dill` and also moved to using `async/await` internally. This might affect applications relying on specific pickling behaviors or internal event loop interactions, though the external API remains largely consistent with `coro_` prefixes.
fix If experiencing pickling issues, ensure `dill` is installed (`pip install aioprocessing[dill]`). If you need to force standard library `multiprocessing` pickling behavior, set the environment variable `AIOPROCESSING_DILL_DISABLED=1`.
gotcha Using `AioPool` with `maxtasksperchild` can sometimes lead to `OSError: [Errno 24] Too many open files` if worker processes are not properly cleaned up and new ones are spawned, especially under heavy load.
fix Investigate the lifecycle of tasks within `maxtasksperchild`. Ensure all resources (e.g., file handles, network connections) acquired by a worker are properly closed before the task completes or the worker process is terminated. Review resource management within your worker functions.
pip install aioprocessing[dill]

This quickstart demonstrates how to use `AioProcess`, `AioQueue`, `AioLock`, and `AioEvent` from `aioprocessing` within an `asyncio` application. A worker function runs in a separate process, interacting with shared `aioprocessing` primitives, while the main `asyncio` loop communicates with it without blocking. Coroutine versions of blocking methods are prefixed with `coro_` (e.g., `coro_get`, `coro_put`, `coro_wait`, `coro_acquire`).

import asyncio
import time
import aioprocessing

def worker_func(queue, event, lock, items):
    """ Demo worker function for a separate process. """
    with lock:
        event.set()
    for item in items:
        time.sleep(0.1) # Simulate work
        queue.put(item + 5)
    queue.put(None) # Signal completion

async def example():
    queue = aioprocessing.AioQueue()
    lock = aioprocessing.AioLock()
    event = aioprocessing.AioEvent()
    items_to_process = [1, 2, 3, 4, 5]

    print("Starting worker process...")
    p = aioprocessing.AioProcess(target=worker_func, args=(queue, event, lock, items_to_process))
    p.start()

    # Wait for the worker to signal it's ready
    await event.coro_wait()
    print("Worker is ready.")

    async with lock: # Acquire lock in async context
        print("Lock acquired in main process (async).")
        # Demonstrating put from main process
        await queue.coro_put(78)
        print("Put 78 into queue from main process.")

    print("Collecting results from queue...")
    while True:
        result = await queue.coro_get()
        if result is None:
            break
        print(f"Got result: {result}")

    await p.coro_join() # Asynchronously wait for process to finish
    print("Worker process finished.")

if __name__ == "__main__":
    asyncio.run(example())