{"id":9293,"library":"saq","title":"SAQ: Distributed Async Job Queue","description":"SAQ is a distributed Python job queue designed for asynchronous applications, leveraging `asyncio` and Redis. It provides a simple, fast, and reliable way to manage background tasks with features like job retries, timeouts, and scheduled execution. The library is actively maintained with frequent updates, though without a strict release cadence.","status":"active","version":"0.26.3","language":"en","source_language":"en","source_url":"https://github.com/tobymao/saq","tags":["async","job-queue","redis","worker","distributed","background-tasks","asyncio"],"install":[{"cmd":"pip install saq","lang":"bash","label":"Install SAQ"}],"dependencies":[{"reason":"SAQ uses Redis as its backend for queue management, job storage, and coordination.","package":"redis","optional":false},{"reason":"Optional dependency for enhanced job argument validation and serialization when using `PydanticSerializer`.","package":"pydantic","optional":true}],"imports":[{"symbol":"Queue","correct":"from saq import Queue"},{"symbol":"Worker","correct":"from saq import Worker"},{"symbol":"Job","correct":"from saq import Job"},{"note":"The `job` decorator, used to define SAQ tasks, is located in `saq.utils`, not directly under the top-level `saq` package.","wrong":"from saq import job","symbol":"job","correct":"from saq.utils import job"}],"quickstart":{"code":"import asyncio\nfrom saq import Queue, Worker\nfrom saq.utils import job\nimport os\n\n# Define a task function\n@job\nasync def my_task(ctx, a, b):\n    \"\"\"A sample task that adds two numbers.\"\"\"\n    print(f\"[{ctx.job.id}] Running my_task with {a} + {b}\")\n    await asyncio.sleep(0.5) # Simulate async work\n    result = a + b\n    print(f\"[{ctx.job.id}] Task finished, result: {result}\")\n    return result\n\nasync def producer():\n    \"\"\"Enqueues jobs.\"\"\"\n    redis_url = os.environ.get('REDIS_URL', 'redis://localhost:6379')\n    q = Queue.from_url(redis_url, name=\"default\")\n\n    print(\"Enqueuing jobs...\")\n    job1 = await q.enqueue(\"my_task\", a=1, b=2)\n    job2 = await q.enqueue(\"my_task\", a=10, b=20, op=\"add\") # 'op' will be stored in job.meta\n    await asyncio.sleep(0.1) # Give time for jobs to be pushed to Redis\n    print(f\"Enqueued job1 ID: {job1.id}, job2 ID: {job2.id}\")\n    # Retrieving result blocks until job is done. For quickstart, it's illustrative.\n    # In a real app, you might check results later or not block.\n    if job1:\n        try:\n            job1_result = await job1.result(timeout=2)\n            print(f\"Job1 result (producer-side): {job1_result}\")\n        except asyncio.TimeoutError:\n            print(\"Job1 result timed out on producer side.\")\n\nasync def consumer():\n    \"\"\"Runs the worker to process jobs.\"\"\"\n    redis_url = os.environ.get('REDIS_URL', 'redis://localhost:6379')\n    q = Queue.from_url(redis_url, name=\"default\")\n    worker = Worker(\n        queue=q,\n        functions=[my_task], # Register the task function\n        concurrency=1 # For simple quickstart, use 1 concurrent task\n    )\n    print(\"Starting worker for 5 seconds...\")\n    try:\n        # In a real application, worker.start() would run indefinitely.\n        # For a quickstart, we'll run it briefly and then stop.\n        await asyncio.wait_for(worker.start(), timeout=5)\n    except asyncio.TimeoutError:\n        print(\"Worker stopped due to timeout (expected for quickstart).\")\n    except asyncio.CancelledError:\n        print(\"Worker cancelled.\")\n    finally:\n        await worker.stop() # Ensure clean shutdown\n\nasync def main():\n    print(\"This quickstart demonstrates SAQ client (producer) and worker (consumer) in a single script.\")\n    print(\"In a real scenario, these would typically run in separate processes.\")\n    print(\"Ensure a Redis server is running at redis://localhost:6379 or set the REDIS_URL environment variable.\")\n    print(\"Example: `docker run -p 6379:6379 --name saq-redis -d redis/redis-stack:latest`\")\n    await asyncio.gather(producer(), consumer())\n    print(\"\\nQuickstart finished. Check Redis for any remaining jobs if the worker didn't process them all.\")\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n","lang":"python","description":"This quickstart demonstrates how to define an asynchronous task, enqueue it using a SAQ client (producer), and process it with a SAQ worker (consumer). It's designed to run in a single script for demonstration purposes, but in production, the producer and consumer typically run in separate processes. Ensure a Redis server is running and accessible at the specified URL (default: `redis://localhost:6379`). You can run a Redis server quickly using Docker."},"warnings":[{"fix":"Initialize `Worker` with `Worker(queue=q, functions=[my_task_1, my_task_2])` instead of registering functions separately. This makes task registration explicit during worker setup.","message":"The `Worker` constructor now requires the `functions` argument to explicitly register task functions. Previously, tasks could be registered using `worker.register_job()` after instantiation.","severity":"breaking","affected_versions":">=0.20.0"},{"fix":"Always specify the target queue explicitly using `@job(queue='my_specific_queue')` for clarity and to ensure tasks are routed correctly.","message":"When using the `@job` decorator without a specific `queue` argument (e.g., `@job`), tasks are implicitly assigned to the 'default' queue. This can lead to tasks being processed by unintended workers if multiple queues are in use.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Either pass only JSON-serializable arguments (basic types, lists, dicts), or configure a custom serializer like `saq.serializers.PydanticSerializer` (if `pydantic` is installed) or `saq.serializers.MsgpackSerializer` when initializing the `Queue`.","message":"SAQ's default serializer (JSON) cannot handle complex Python objects (e.g., dataclasses, Pydantic models, custom classes) directly as job arguments. This will cause serialization errors.","severity":"gotcha","affected_versions":"All versions"}],"env_vars":null,"last_verified":"2026-04-16T00:00:00.000Z","next_check":"2026-07-15T00:00:00.000Z","problems":[{"fix":"Instead of `asyncio.run()`, integrate SAQ client/worker logic into the existing event loop by creating tasks with `asyncio.create_task()` or using appropriate framework-specific methods to manage asynchronous operations.","cause":"Attempting to call `asyncio.run()` when an event loop is already active in the current thread (e.g., when integrating SAQ into an existing `asyncio` application like FastAPI or Aiohttp).","error":"RuntimeError: Event loop is already running"},{"fix":"Ensure all task functions that the worker is expected to process are explicitly included in the `functions` list when initializing the `Worker`: `Worker(queue=q, functions=[my_task_1, my_task_2])`.","cause":"The SAQ worker instance was started without having the specific task function (`'my_missing_task'`) registered with it. The worker doesn't know how to execute the job.","error":"saq.exceptions.JobNotFound: Job 'my_missing_task' not found"},{"fix":"Verify that your Redis server is running and accessible. Check the Redis server's configuration and ensure the `REDIS_URL` used by SAQ (e.g., `redis://localhost:6379`) is correct and reachable. You can quickly run a Redis server with `docker run -p 6379:6379 --name saq-redis -d redis/redis-stack:latest`.","cause":"The Redis server is either not running, not listening on the specified host/port, or is inaccessible due to network/firewall issues.","error":"redis.exceptions.ConnectionError: Error 111 connecting to localhost:6379. Connection refused."}]}