SAQ: Distributed Async Job Queue
SAQ is a distributed Python job queue designed for asynchronous applications, leveraging `asyncio` and Redis. It provides a simple, fast, and reliable way to manage background tasks with features like job retries, timeouts, and scheduled execution. The library is actively maintained with frequent updates, though without a strict release cadence.
Common errors
-
RuntimeError: Event loop is already running
cause Attempting to call `asyncio.run()` when an event loop is already active in the current thread (e.g., when integrating SAQ into an existing `asyncio` application like FastAPI or Aiohttp).fixInstead of `asyncio.run()`, integrate SAQ client/worker logic into the existing event loop by creating tasks with `asyncio.create_task()` or using appropriate framework-specific methods to manage asynchronous operations. -
saq.exceptions.JobNotFound: Job 'my_missing_task' not found
cause The SAQ worker instance was started without having the specific task function (`'my_missing_task'`) registered with it. The worker doesn't know how to execute the job.fixEnsure all task functions that the worker is expected to process are explicitly included in the `functions` list when initializing the `Worker`: `Worker(queue=q, functions=[my_task_1, my_task_2])`. -
redis.exceptions.ConnectionError: Error 111 connecting to localhost:6379. Connection refused.
cause The Redis server is either not running, not listening on the specified host/port, or is inaccessible due to network/firewall issues.fixVerify that your Redis server is running and accessible. Check the Redis server's configuration and ensure the `REDIS_URL` used by SAQ (e.g., `redis://localhost:6379`) is correct and reachable. You can quickly run a Redis server with `docker run -p 6379:6379 --name saq-redis -d redis/redis-stack:latest`.
Warnings
- breaking The `Worker` constructor now requires the `functions` argument to explicitly register task functions. Previously, tasks could be registered using `worker.register_job()` after instantiation.
- gotcha When using the `@job` decorator without a specific `queue` argument (e.g., `@job`), tasks are implicitly assigned to the 'default' queue. This can lead to tasks being processed by unintended workers if multiple queues are in use.
- gotcha SAQ's default serializer (JSON) cannot handle complex Python objects (e.g., dataclasses, Pydantic models, custom classes) directly as job arguments. This will cause serialization errors.
Install
-
pip install saq
Imports
- Queue
from saq import Queue
- Worker
from saq import Worker
- Job
from saq import Job
- job
from saq import job
from saq.utils import job
Quickstart
import asyncio
from saq import Queue, Worker
from saq.utils import job
import os
# Define a task function
@job
async def my_task(ctx, a, b):
"""A sample task that adds two numbers."""
print(f"[{ctx.job.id}] Running my_task with {a} + {b}")
await asyncio.sleep(0.5) # Simulate async work
result = a + b
print(f"[{ctx.job.id}] Task finished, result: {result}")
return result
async def producer():
"""Enqueues jobs."""
redis_url = os.environ.get('REDIS_URL', 'redis://localhost:6379')
q = Queue.from_url(redis_url, name="default")
print("Enqueuing jobs...")
job1 = await q.enqueue("my_task", a=1, b=2)
job2 = await q.enqueue("my_task", a=10, b=20, op="add") # 'op' will be stored in job.meta
await asyncio.sleep(0.1) # Give time for jobs to be pushed to Redis
print(f"Enqueued job1 ID: {job1.id}, job2 ID: {job2.id}")
# Retrieving result blocks until job is done. For quickstart, it's illustrative.
# In a real app, you might check results later or not block.
if job1:
try:
job1_result = await job1.result(timeout=2)
print(f"Job1 result (producer-side): {job1_result}")
except asyncio.TimeoutError:
print("Job1 result timed out on producer side.")
async def consumer():
"""Runs the worker to process jobs."""
redis_url = os.environ.get('REDIS_URL', 'redis://localhost:6379')
q = Queue.from_url(redis_url, name="default")
worker = Worker(
queue=q,
functions=[my_task], # Register the task function
concurrency=1 # For simple quickstart, use 1 concurrent task
)
print("Starting worker for 5 seconds...")
try:
# In a real application, worker.start() would run indefinitely.
# For a quickstart, we'll run it briefly and then stop.
await asyncio.wait_for(worker.start(), timeout=5)
except asyncio.TimeoutError:
print("Worker stopped due to timeout (expected for quickstart).")
except asyncio.CancelledError:
print("Worker cancelled.")
finally:
await worker.stop() # Ensure clean shutdown
async def main():
print("This quickstart demonstrates SAQ client (producer) and worker (consumer) in a single script.")
print("In a real scenario, these would typically run in separate processes.")
print("Ensure a Redis server is running at redis://localhost:6379 or set the REDIS_URL environment variable.")
print("Example: `docker run -p 6379:6379 --name saq-redis -d redis/redis-stack:latest`")
await asyncio.gather(producer(), consumer())
print("\nQuickstart finished. Check Redis for any remaining jobs if the worker didn't process them all.")
if __name__ == "__main__":
asyncio.run(main())