Asyncio DataLoader for Python

raw JSON →
0.4.3 verified Thu May 14 auth: no python

Asyncio DataLoader is a Python port of the JavaScript DataLoader, a generic utility for efficient data fetching. It provides a consistent API over various data sources, leveraging batching to coalesce multiple individual load requests into a single operation within an event loop tick and per-request caching to prevent redundant data loads. The current version is 0.4.3, with releases occurring periodically to address bug fixes and add minor features, typically a few times a year.

pip install aiodataloader
error ModuleNotFoundError: No module named 'aiodataloader'
cause The 'aiodataloader' package is not installed in the Python environment.
fix
Install the package using pip: 'pip install aiodataloader'.
error ImportError: cannot import name 'DataLoader' from 'aiodataloader'
cause The import statement is incorrect; 'DataLoader' should be imported directly from 'aiodataloader'.
fix
Use the correct import statement: 'from aiodataloader import DataLoader'.
error TypeError: 'coroutine' object is not iterable
cause Attempting to iterate over a coroutine without awaiting it.
fix
Ensure that the coroutine is awaited: 'result = await dataloader.load(key)'.
error RuntimeError: This event loop is already running
cause Calling 'asyncio.run()' inside an already running event loop, often in interactive environments like Jupyter notebooks.
fix
Use 'await' directly in the interactive environment or manage the event loop appropriately.
error AttributeError: 'DataLoader' object has no attribute 'load_many'
cause The 'DataLoader' class does not have a 'load_many' method; the correct method is 'load'.
fix
Use the correct method: 'await dataloader.load(key)'.
breaking Python 3.6 support was dropped in `v0.3.0` and `v0.4.0`. Users on Python 3.6 must upgrade to Python 3.7 or newer to use these versions.
fix Upgrade your Python environment to 3.7 or higher.
breaking In `v0.4.0`, the `key` argument to `DataLoader.load()` no longer has a default value of `None`. Code explicitly passing `key=None` may now raise a `TypeError`.
fix Ensure a valid, non-None key is always provided to `DataLoader.load()`.
gotcha `aiodataloader` implements per-request, in-memory caching, not an application-wide shared cache. Creating a single, long-lived `DataLoader` instance and sharing it across multiple distinct requests or users can lead to incorrect data being served (stale data, cross-user data leaks). Instances should typically be created per web request or GraphQL execution context.
fix Instantiate a new `DataLoader` (or a factory to provide one) for each incoming request, ensuring its lifecycle is tied to the request.
gotcha The `batch_load_fn` must return a list of values that directly correspond (one-to-one, same order) to the list of keys it received. If a key cannot be resolved to a value, `None` must be returned at that key's corresponding position in the list.
fix Always map the input `keys` list to the output `values` list, maintaining order and using `None` for unresolved keys.
gotcha After a data mutation or update, any existing cached values in `DataLoader` for the modified keys may become stale. To ensure fresh data is loaded, explicitly call `loader.clear(key)` for specific keys or `loader.clear_all()` to invalidate the entire loader's cache.
fix Invalidate relevant cache entries using `loader.clear(key)` or `loader.clear_all()` after operations that modify underlying data.
gotcha If `DataLoader` is instantiated with `cache=False` (disabling memoization caching), the `batch_load_fn` may receive duplicate keys. In this scenario, the batch function is responsible for returning a value for *each instance* of the requested key, not just unique keys.
fix When `cache=False`, ensure your `batch_load_fn` can handle and return values for duplicate keys as they appear in the input list.
python os / libc status wheel install import disk mem side effects
3.10 alpine (musl) wheel - 0.08s 18.1M 4.0M clean
3.10 alpine (musl) - - 0.10s 18.1M 4.0M -
3.10 slim (glibc) wheel 1.5s 0.06s 19M 4.0M clean
3.10 slim (glibc) - - 0.06s 19M 4.0M -
3.11 alpine (musl) wheel - 0.16s 20.0M 4.9M clean
3.11 alpine (musl) - - 0.23s 20.0M 4.9M -
3.11 slim (glibc) wheel 1.6s 0.14s 21M 4.9M clean
3.11 slim (glibc) - - 0.14s 21M 4.9M -
3.12 alpine (musl) wheel - 0.36s 11.9M 8.2M clean
3.12 alpine (musl) - - 0.56s 11.9M 8.2M -
3.12 slim (glibc) wheel 1.5s 0.32s 12M 8.2M clean
3.12 slim (glibc) - - 0.39s 12M 8.2M -
3.13 alpine (musl) wheel - 0.38s 11.6M 8.7M clean
3.13 alpine (musl) - - 0.51s 11.5M 8.7M -
3.13 slim (glibc) wheel 1.6s 0.34s 12M 8.7M clean
3.13 slim (glibc) - - 0.40s 12M 8.7M -
3.9 alpine (musl) wheel - 0.08s 17.6M 4.3M clean
3.9 alpine (musl) - - 0.09s 17.6M 4.3M -
3.9 slim (glibc) wheel 1.8s 0.08s 18M 4.3M clean
3.9 slim (glibc) - - 0.08s 18M 4.3M -

Create a `DataLoader` by subclassing it and implementing `batch_load_fn`, which receives a list of keys and must return a list of values in the same order. Individual `load()` calls made within the same event loop tick are automatically batched.

import asyncio
from aiodataloader import DataLoader

# A mock batch loading function for demonstration
async def fetch_users_from_db(user_ids: list[int]) -> list[dict | None]:
    print(f"Fetching users with IDs: {user_ids}")
    # Simulate an async database call
    await asyncio.sleep(0.01)
    # In a real scenario, this would query a database (e.g., ORM, API)
    users_data = {
        1: {"id": 1, "name": "Alice"},
        2: {"id": 2, "name": "Bob"},
        3: {"id": 3, "name": "Charlie"},
    }
    # Important: return values in the same order as keys, with None for missing
    return [users_data.get(uid) for uid in user_ids]

class UserLoader(DataLoader):
    def __init__(self):
        super().__init__(self.batch_load_fn)

    async def batch_load_fn(self, keys: list[int]) -> list[dict | None]:
        return await fetch_users_from_db(keys)

async def main():
    user_loader = UserLoader()

    # Load individual users concurrently
    # These three loads will be coalesced into a single call to fetch_users_from_db
    user1_task = user_loader.load(1)
    user2_task = user_loader.load(2)
    user3_task = user_loader.load(1) # This will be served from cache (for ID 1) from the first load

    user1, user2, user3 = await asyncio.gather(user1_task, user2_task, user3_task)

    print(f"User 1 (from first load): {user1}")
    print(f"User 2: {user2}")
    print(f"User 3 (from cache): {user3}") 

    # Example of loading many
    users_many = await user_loader.load_many([2, 3, 4]) # ID 4 will result in None
    print(f"Users (many): {users_many}")

if __name__ == "__main__":
    asyncio.run(main())