aiojobs
aiojobs is an asyncio-based job scheduler for managing background tasks. It allows spawning and tracking long-running coroutines, handling their cancellation, and ensuring graceful shutdown. The current version is 1.4.0, and it follows an active release cadence with minor updates and bug fixes.
Common errors
-
AttributeError: module 'aiojobs' has no attribute 'create_scheduler'
cause Attempting to use the old API function `aiojobs.create_scheduler()` which was removed.fixInstantiate `Scheduler` directly: `scheduler = Scheduler()`. -
Task was destroyed but it is pending!
cause The asyncio event loop was closed or the application exited while `aiojobs` tasks were still running or pending, without proper shutdown handling.fixEnsure graceful shutdown. Use `async with Scheduler() as scheduler:` or explicitly call `await scheduler.wait_and_close()` before closing your application. -
TypeError: 'Scheduler' object is not awaitable
cause Attempting to `await` the `Scheduler` instantiation (e.g., `await Scheduler()`) which is not necessary and incorrect for direct instantiation.fixInstantiate `Scheduler` without `await`: `scheduler = Scheduler()`.
Warnings
- breaking The `aiojobs.create_scheduler()` function was removed. `Scheduler` objects must now be instantiated directly.
- breaking Python 3.7 support was dropped.
- gotcha Tasks spawned by `Scheduler` may be abruptly cancelled during application shutdown if `Scheduler.wait_and_close()` is not called, or if the `Scheduler` is not used within an `async with` block.
- gotcha While `Scheduler` creation (v1.4.0+) no longer strictly requires a *running* event loop, tasks spawned by it still need an event loop to *execute*. Running `Scheduler` operations outside an `asyncio.run` context or a running event loop will lead to `RuntimeError` when tasks attempt to start.
Install
-
pip install aiojobs
Imports
- Scheduler
import aiojobs; aiojobs.create_scheduler()
from aiojobs import Scheduler
- Job
from aiojobs import Job
Quickstart
import asyncio
from aiojobs import Scheduler
async def worker(task_id: int):
print(f"Worker {task_id}: Starting...")
try:
await asyncio.sleep(2) # Simulate work
print(f"Worker {task_id}: Finished.")
except asyncio.CancelledError:
print(f"Worker {task_id}: Cancelled during sleep.")
async def main():
# Use async with for graceful shutdown (introduced in v1.3.0)
async with Scheduler() as scheduler:
print("Scheduler created.")
job1 = await scheduler.spawn(worker(1))
job2 = await scheduler.spawn(worker(2))
print(f"Spawned jobs: {scheduler.pending_count}")
# Wait for job1 to complete (optional)
await job1.wait()
print(f"Job 1 status: {job1.closed}")
print("Scheduler closed. All pending tasks should be done or cancelled.")
if __name__ == "__main__":
asyncio.run(main())