aiometer: Concurrency Scheduler

raw JSON →
1.0.0 verified Thu Apr 16 auth: no python

aiometer is a Python concurrency scheduling library, compatible with asyncio and trio. It makes it easier to execute many tasks concurrently while controlling concurrency limits and collecting results predictably. It is currently at version 1.0.0 and has a consistent release cadence with a focus on supporting recent Python and `anyio` versions.

pip install aiometer
error TypeError: object <async_function> is not callable
cause Attempting to pass an already-called coroutine object (e.g., `my_async_func()`) instead of the callable coroutine function (`my_async_func`) to `aiometer` functions like `run_all` or `amap`.
fix
Pass the coroutine function itself or a functools.partial that can be called, not the result of calling it. For instance, aiometer.run_all([my_async_func]) or aiometer.run_all([functools.partial(my_async_func, arg)]) instead of aiometer.run_all([my_async_func()]).
error TypeError: <function_name> takes N positional arguments but M were given
cause The async function passed to `aiometer.run_on_each` or `aiometer.amap` expects more than one positional argument, but these `aiometer` functions only provide a single argument from the iterable.
fix
Refactor your async function to accept a single argument (e.g., a data object) or use functools.partial to bind additional arguments before passing the partially applied function to aiometer.
error RuntimeError: Task <Task ...> got an exception that doesn't inherit from BaseException.
cause An asynchronous task within `aiometer` encountered an exception that does not inherit from Python's `BaseException` hierarchy, or a non-exception object was raised. This can sometimes occur with poorly-behaved libraries or custom error types.
fix
Ensure all errors raised within your asynchronous tasks inherit from Exception (or at least BaseException). Inspect the traceback to identify the source of the non-compliant 'exception' object.
breaking aiometer dropped support for Python 3.7 starting from version 0.5.0, as it reached its End-of-Life. Ensure your environment uses Python 3.8 or newer.
fix Upgrade your Python environment to 3.8 or a later supported version. aiometer 1.0.0 supports Python 3.8 to 3.13.
breaking Upgrading aiometer to version 0.3.0 or higher requires `anyio` v3. Dependency mismatches may occur if your project or other dependencies are still on `anyio` v1 or v2.
fix Ensure all parts of your codebase and its dependencies are compatible with `anyio` v3 (for aiometer 0.3.0+) or `anyio` v4 (for aiometer 0.5.0+). The latest `aiometer` (1.0.0) supports `anyio>=3.0,<5`.
gotcha Functions passed to `aiometer.run_on_each` and `aiometer.amap` must accept only a single positional argument. If your asynchronous function requires multiple parameters, you need to refactor it.
fix Use `functools.partial` to pre-fill arguments of your function or wrap your function with a proxy container type (like a `NamedTuple`) that bundles arguments into a single object.
gotcha With `anyio` 4 (supported by `aiometer` 0.5.0+), native `ExceptionGroup` is used. If your code explicitly catches `anyio.ExceptionGroup` types from `anyio` 3.2+, you might need to adjust for the standard library's `ExceptionGroup`.
fix Review exception handling logic if you are directly type-checking `ExceptionGroup` instances, considering the change from `anyio`'s internal type to Python's native `ExceptionGroup` in modern Python/anyio versions.

This quickstart demonstrates how to use `aiometer.run_all` for ordered results and `aiometer.amap` for results as they become available. It also showcases applying concurrency limits (`max_at_once`, `max_per_second`) and how to adapt functions requiring multiple arguments using `functools.partial` for `aiometer`'s single-argument functions.

import asyncio
import functools
import aiometer

async def get_greeting(name: str) -> str:
    """Simulates an async operation, returning a greeting."""
    await asyncio.sleep(0.05) # Simulate I/O
    return f"Hello, {name}!"

async def main():
    names = ["Alice", "Bob", "Charlie", "David", "Eve"]

    print("Running tasks with aiometer.run_all (ordered results):")
    # Use functools.partial to pass multiple arguments or wrap complex logic
    greetings_ordered = await aiometer.run_all(
        [functools.partial(get_greeting, name) for name in names],
        max_at_once=2, # Limit to 2 concurrent tasks
        max_per_second=5 # Limit task spawning rate
    )
    for greeting in greetings_ordered:
        print(greeting)

    print("\nRunning tasks with aiometer.amap (unordered results as they become available):")
    async with aiometer.amap(get_greeting, names, max_at_once=2) as greetings_unordered:
        async for greeting in greetings_unordered:
            print(greeting)

if __name__ == "__main__":
    asyncio.run(main())