{"library":"saq","title":"SAQ: Distributed Async Job Queue","description":"SAQ is a distributed Python job queue designed for asynchronous applications, leveraging `asyncio` and Redis. It provides a simple, fast, and reliable way to manage background tasks with features like job retries, timeouts, and scheduled execution. The library is actively maintained with frequent updates, though without a strict release cadence.","language":"python","status":"active","last_verified":"Thu Apr 16","install":{"commands":["pip install saq"],"cli":null},"imports":["from saq import Queue","from saq import Worker","from saq import Job","from saq.utils import job"],"auth":{"required":false,"env_vars":[]},"quickstart":{"code":"import asyncio\nfrom saq import Queue, Worker\nfrom saq.utils import job\nimport os\n\n# Define a task function\n@job\nasync def my_task(ctx, a, b):\n    \"\"\"A sample task that adds two numbers.\"\"\"\n    print(f\"[{ctx.job.id}] Running my_task with {a} + {b}\")\n    await asyncio.sleep(0.5) # Simulate async work\n    result = a + b\n    print(f\"[{ctx.job.id}] Task finished, result: {result}\")\n    return result\n\nasync def producer():\n    \"\"\"Enqueues jobs.\"\"\"\n    redis_url = os.environ.get('REDIS_URL', 'redis://localhost:6379')\n    q = Queue.from_url(redis_url, name=\"default\")\n\n    print(\"Enqueuing jobs...\")\n    job1 = await q.enqueue(\"my_task\", a=1, b=2)\n    job2 = await q.enqueue(\"my_task\", a=10, b=20, op=\"add\") # 'op' will be stored in job.meta\n    await asyncio.sleep(0.1) # Give time for jobs to be pushed to Redis\n    print(f\"Enqueued job1 ID: {job1.id}, job2 ID: {job2.id}\")\n    # Retrieving result blocks until job is done. For quickstart, it's illustrative.\n    # In a real app, you might check results later or not block.\n    if job1:\n        try:\n            job1_result = await job1.result(timeout=2)\n            print(f\"Job1 result (producer-side): {job1_result}\")\n        except asyncio.TimeoutError:\n            print(\"Job1 result timed out on producer side.\")\n\nasync def consumer():\n    \"\"\"Runs the worker to process jobs.\"\"\"\n    redis_url = os.environ.get('REDIS_URL', 'redis://localhost:6379')\n    q = Queue.from_url(redis_url, name=\"default\")\n    worker = Worker(\n        queue=q,\n        functions=[my_task], # Register the task function\n        concurrency=1 # For simple quickstart, use 1 concurrent task\n    )\n    print(\"Starting worker for 5 seconds...\")\n    try:\n        # In a real application, worker.start() would run indefinitely.\n        # For a quickstart, we'll run it briefly and then stop.\n        await asyncio.wait_for(worker.start(), timeout=5)\n    except asyncio.TimeoutError:\n        print(\"Worker stopped due to timeout (expected for quickstart).\")\n    except asyncio.CancelledError:\n        print(\"Worker cancelled.\")\n    finally:\n        await worker.stop() # Ensure clean shutdown\n\nasync def main():\n    print(\"This quickstart demonstrates SAQ client (producer) and worker (consumer) in a single script.\")\n    print(\"In a real scenario, these would typically run in separate processes.\")\n    print(\"Ensure a Redis server is running at redis://localhost:6379 or set the REDIS_URL environment variable.\")\n    print(\"Example: `docker run -p 6379:6379 --name saq-redis -d redis/redis-stack:latest`\")\n    await asyncio.gather(producer(), consumer())\n    print(\"\\nQuickstart finished. Check Redis for any remaining jobs if the worker didn't process them all.\")\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n","lang":"python","description":"This quickstart demonstrates how to define an asynchronous task, enqueue it using a SAQ client (producer), and process it with a SAQ worker (consumer). It's designed to run in a single script for demonstration purposes, but in production, the producer and consumer typically run in separate processes. Ensure a Redis server is running and accessible at the specified URL (default: `redis://localhost:6379`). You can run a Redis server quickly using Docker.","tag":null,"tag_description":null,"last_tested":null,"results":[]},"compatibility":null}