aiojobs

raw JSON →
1.4.0 verified Thu Apr 16 auth: no python

aiojobs is an asyncio-based job scheduler for managing background tasks. It allows spawning and tracking long-running coroutines, handling their cancellation, and ensuring graceful shutdown. The current version is 1.4.0, and it follows an active release cadence with minor updates and bug fixes.

pip install aiojobs
error AttributeError: module 'aiojobs' has no attribute 'create_scheduler'
cause Attempting to use the old API function `aiojobs.create_scheduler()` which was removed.
fix
Instantiate Scheduler directly: scheduler = Scheduler().
error Task was destroyed but it is pending!
cause The asyncio event loop was closed or the application exited while `aiojobs` tasks were still running or pending, without proper shutdown handling.
fix
Ensure graceful shutdown. Use async with Scheduler() as scheduler: or explicitly call await scheduler.wait_and_close() before closing your application.
error TypeError: 'Scheduler' object is not awaitable
cause Attempting to `await` the `Scheduler` instantiation (e.g., `await Scheduler()`) which is not necessary and incorrect for direct instantiation.
fix
Instantiate Scheduler without await: scheduler = Scheduler().
breaking The `aiojobs.create_scheduler()` function was removed. `Scheduler` objects must now be instantiated directly.
fix Replace `await aiojobs.create_scheduler()` with `Scheduler()`.
breaking Python 3.7 support was dropped.
fix Upgrade your Python environment to 3.9 or higher (current minimum requirement for v1.4.0).
gotcha Tasks spawned by `Scheduler` may be abruptly cancelled during application shutdown if `Scheduler.wait_and_close()` is not called, or if the `Scheduler` is not used within an `async with` block.
fix Use `async with Scheduler() as scheduler:` or manually call `await scheduler.wait_and_close()` before your application exits.
gotcha While `Scheduler` creation (v1.4.0+) no longer strictly requires a *running* event loop, tasks spawned by it still need an event loop to *execute*. Running `Scheduler` operations outside an `asyncio.run` context or a running event loop will lead to `RuntimeError` when tasks attempt to start.
fix Always ensure your `aiojobs` operations are executed within an `async` function called by `asyncio.run()` or within an existing `asyncio` event loop.

This quickstart demonstrates how to create a `Scheduler` using the `async with` context (recommended for graceful shutdown since v1.3.0), spawn background tasks using `scheduler.spawn()`, and optionally wait for individual jobs to complete with `job.wait()`.

import asyncio
from aiojobs import Scheduler

async def worker(task_id: int):
    print(f"Worker {task_id}: Starting...")
    try:
        await asyncio.sleep(2) # Simulate work
        print(f"Worker {task_id}: Finished.")
    except asyncio.CancelledError:
        print(f"Worker {task_id}: Cancelled during sleep.")

async def main():
    # Use async with for graceful shutdown (introduced in v1.3.0)
    async with Scheduler() as scheduler:
        print("Scheduler created.")

        job1 = await scheduler.spawn(worker(1))
        job2 = await scheduler.spawn(worker(2))

        print(f"Spawned jobs: {scheduler.pending_count}")

        # Wait for job1 to complete (optional)
        await job1.wait()
        print(f"Job 1 status: {job1.closed}")

    print("Scheduler closed. All pending tasks should be done or cancelled.")

if __name__ == "__main__":
    asyncio.run(main())