SAQ: Distributed Async Job Queue

0.26.3 · active · verified Thu Apr 16

SAQ is a distributed Python job queue designed for asynchronous applications, leveraging `asyncio` and Redis. It provides a simple, fast, and reliable way to manage background tasks with features like job retries, timeouts, and scheduled execution. The library is actively maintained with frequent updates, though without a strict release cadence.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to define an asynchronous task, enqueue it using a SAQ client (producer), and process it with a SAQ worker (consumer). It's designed to run in a single script for demonstration purposes, but in production, the producer and consumer typically run in separate processes. Ensure a Redis server is running and accessible at the specified URL (default: `redis://localhost:6379`). You can run a Redis server quickly using Docker.

import asyncio
from saq import Queue, Worker
from saq.utils import job
import os

# Define a task function
@job
async def my_task(ctx, a, b):
    """A sample task that adds two numbers."""
    print(f"[{ctx.job.id}] Running my_task with {a} + {b}")
    await asyncio.sleep(0.5) # Simulate async work
    result = a + b
    print(f"[{ctx.job.id}] Task finished, result: {result}")
    return result

async def producer():
    """Enqueues jobs."""
    redis_url = os.environ.get('REDIS_URL', 'redis://localhost:6379')
    q = Queue.from_url(redis_url, name="default")

    print("Enqueuing jobs...")
    job1 = await q.enqueue("my_task", a=1, b=2)
    job2 = await q.enqueue("my_task", a=10, b=20, op="add") # 'op' will be stored in job.meta
    await asyncio.sleep(0.1) # Give time for jobs to be pushed to Redis
    print(f"Enqueued job1 ID: {job1.id}, job2 ID: {job2.id}")
    # Retrieving result blocks until job is done. For quickstart, it's illustrative.
    # In a real app, you might check results later or not block.
    if job1:
        try:
            job1_result = await job1.result(timeout=2)
            print(f"Job1 result (producer-side): {job1_result}")
        except asyncio.TimeoutError:
            print("Job1 result timed out on producer side.")

async def consumer():
    """Runs the worker to process jobs."""
    redis_url = os.environ.get('REDIS_URL', 'redis://localhost:6379')
    q = Queue.from_url(redis_url, name="default")
    worker = Worker(
        queue=q,
        functions=[my_task], # Register the task function
        concurrency=1 # For simple quickstart, use 1 concurrent task
    )
    print("Starting worker for 5 seconds...")
    try:
        # In a real application, worker.start() would run indefinitely.
        # For a quickstart, we'll run it briefly and then stop.
        await asyncio.wait_for(worker.start(), timeout=5)
    except asyncio.TimeoutError:
        print("Worker stopped due to timeout (expected for quickstart).")
    except asyncio.CancelledError:
        print("Worker cancelled.")
    finally:
        await worker.stop() # Ensure clean shutdown

async def main():
    print("This quickstart demonstrates SAQ client (producer) and worker (consumer) in a single script.")
    print("In a real scenario, these would typically run in separate processes.")
    print("Ensure a Redis server is running at redis://localhost:6379 or set the REDIS_URL environment variable.")
    print("Example: `docker run -p 6379:6379 --name saq-redis -d redis/redis-stack:latest`")
    await asyncio.gather(producer(), consumer())
    print("\nQuickstart finished. Check Redis for any remaining jobs if the worker didn't process them all.")

if __name__ == "__main__":
    asyncio.run(main())

view raw JSON →