{"id":3403,"library":"arq","title":"arq: Async Redis Job Queue","description":"arq is a Python library for asynchronous job queues and RPC, leveraging asyncio and Redis. Conceived as a simple, modern, and performant successor to `rq`, it is designed for highly concurrent, non-blocking task execution. It is particularly well-suited for modern async Python applications like those built with FastAPI. Currently at version 0.27.0, arq is actively maintained with frequent releases.","status":"active","version":"0.27.0","language":"en","source_language":"en","source_url":"https://github.com/python-arq/arq","tags":["async","redis","job-queue","worker","asyncio","background-tasks","fastapi"],"install":[{"cmd":"pip install arq httpx","lang":"bash","label":"Install arq and its common dependencies"}],"dependencies":[{"reason":"arq uses Redis as its sole message broker and state backend.","package":"redis","optional":false},{"reason":"Commonly used for HTTP requests within async jobs, as shown in quickstart examples.","package":"httpx","optional":true}],"imports":[{"symbol":"create_pool","correct":"from arq import create_pool"},{"symbol":"RedisSettings","correct":"from arq.connections import RedisSettings"},{"symbol":"WorkerSettings","correct":"from arq.worker import WorkerSettings"},{"note":"Used to wrap job functions for more advanced settings (e.g., `keep_result`, `timeout`).","symbol":"func","correct":"from arq.worker import func"},{"note":"Used to define recurring jobs with cron-like schedules.","symbol":"cron","correct":"from arq.cron import cron"}],"quickstart":{"code":"import asyncio\nimport os\n\nfrom arq import create_pool\nfrom arq.connections import RedisSettings\nfrom arq.worker import WorkerSettings\n\n# For demonstration, typically use an AsyncClient from httpx or aiohttp\n# Install 'httpx' for this example: pip install httpx\nimport httpx\n\nREDIS_DSN = os.environ.get('ARQ_REDIS_DSN', 'redis://localhost:6379')\nREDIS_SETTINGS = RedisSettings.from_dsn(REDIS_DSN)\n\nasync def download_content(ctx, url: str) -> int:\n    \"\"\"An example job function that downloads content from a URL.\"\"\"\n    session: httpx.AsyncClient = ctx['session']\n    response = await session.get(url, timeout=5) # Added timeout for robustness\n    response.raise_for_status()\n    print(f\"Worker: Downloaded {len(response.text)} bytes from {url[:50]}...\")\n    return len(response.text)\n\nasync def startup(ctx):\n    \"\"\"Worker startup hook to initialize shared resources.\"\"\"\n    print(\"Worker: Starting up - creating httpx.AsyncClient session.\")\n    ctx['session'] = httpx.AsyncClient()\n\nasync def shutdown(ctx):\n    \"\"\"Worker shutdown hook to clean up shared resources.\"\"\"\n    if 'session' in ctx:\n        await ctx['session'].aclose()\n        print(\"Worker: Shutting down - closing httpx.AsyncClient session.\")\n\nclass MyWorkerSettings(WorkerSettings):\n    \"\"\"Worker settings for arq CLI.\"\"\"\n    functions = [download_content]\n    on_startup = startup\n    on_shutdown = shutdown\n    redis_settings = REDIS_SETTINGS\n    keep_result = 60 * 60 # Keep job results for 1 hour\n\nasync def main():\n    \"\"\"Producer: Enqueues jobs.\"\"\"\n    print(f\"Producer: Connecting to Redis at {REDIS_DSN}\")\n    redis = await create_pool(REDIS_SETTINGS)\n    \n    urls = [\n        'https://www.google.com',\n        'https://www.bing.com',\n        'https://www.yahoo.com',\n    ]\n\n    for url in urls:\n        job = await redis.enqueue_job('download_content', url)\n        print(f\"Producer: Enqueued job {job.job_id} for {url}\")\n    \n    # Optional: Wait for a job result (for demonstration)\n    # first_job_id = (await redis.queued_jobs())[0].job_id if await redis.queued_jobs() else None\n    # if first_job_id:\n    #     job = await redis.job(first_job_id)\n    #     print(f\"Producer: Waiting for job {job.job_id} result...\")\n    #     result = await job.result(timeout=10) # Wait up to 10 seconds\n    #     print(f\"Producer: Job {job.job_id} finished with result: {result}\")\n    \n    redis.close()\n    await redis.wait_closed()\n    print(\"Producer: Redis connection closed.\")\n\nif __name__ == '__main__':\n    # To run the producer (enqueue jobs):\n    # python your_script_name.py\n    asyncio.run(main())\n\n    # To run the worker (in a separate terminal):\n    # ARQ_REDIS_DSN='redis://localhost:6379' arq your_script_name.MyWorkerSettings --burst\n    # (Remove --burst to run continuously)\n","lang":"python","description":"This quickstart demonstrates how to enqueue jobs using `arq` and how to define `WorkerSettings` for processing them. It includes a `download_content` job, along with `startup` and `shutdown` hooks for resource management (e.g., an `httpx.AsyncClient` session). The script can be run to enqueue jobs, and a separate worker process (using the `arq` CLI) can be started to consume them from Redis. Ensure a Redis server is running and accessible via `ARQ_REDIS_DSN` environment variable or the default `redis://localhost:6379`."},"warnings":[{"fix":"Users migrating from v0.15 or older must entirely rewrite their arq integration following the new API. Refer to the v0.16 documentation for the new patterns.","message":"arq underwent a complete rewrite in v0.16, fundamentally changing how workers are registered, and jobs are enqueued and processed. Code written for v0.15 or earlier is incompatible.","severity":"breaking","affected_versions":"<=0.15 to >=0.16"},{"fix":"Ensure your project runs on Python 3.9 or newer. Upgrade your Python environment if currently using 3.8.","message":"Support for Python 3.8 was officially removed, while support for Python 3.13 was added.","severity":"breaking","affected_versions":"0.27.0"},{"fix":"Modify your job function signatures to `async def my_job(ctx, *args, **kwargs):`. Access shared resources via `ctx['key']`.","message":"All job functions defined for arq workers must accept `ctx` as their first argument, which is a dictionary for passing shared resources (e.g., database connections, HTTP sessions) to jobs.","severity":"gotcha","affected_versions":"All versions >=0.16"},{"fix":"Design your jobs to be idempotent or handle potential duplicate executions gracefully. Be aware that job execution might not be 'exactly once' in failure scenarios.","message":"arq uses 'pessimistic execution': jobs are only removed from the queue upon successful completion or final failure. If a worker process shuts down unexpectedly, in-progress jobs will remain in the queue to be rerun by another worker or when the worker restarts.","severity":"gotcha","affected_versions":"All versions >=0.16"},{"fix":"Ensure you are using `arq` v0.27.1 or newer for Python 3.14+ compatibility. If manually managing event loops, use `asyncio.new_event_loop()` explicitly or ensure a loop is set before calling `arq` components.","message":"For Python 3.14 and newer, `asyncio.get_event_loop()` no longer implicitly creates an event loop and will raise a `RuntimeError` if no loop is running. While `arq` v0.27.1+ includes internal fixes, custom asyncio setups might be affected.","severity":"gotcha","affected_versions":"0.27.0 and older versions when used with Python 3.14+"},{"fix":"Avoid using these methods for retrieving large sets of job data in production. If you need to monitor or manage many jobs, consider designing custom Redis queries or using `arq`'s built-in monitoring tools if available for specific use cases.","message":"The `ArqRedis` methods `get_all_job_results()` and `queued_jobs()` use inefficient Redis commands (`KEYS` or multiple `GET` operations) internally, making them unsuitable for production environments with a large number of keys or jobs.","severity":"gotcha","affected_versions":"All versions"}],"env_vars":null,"last_verified":"2026-04-11T00:00:00.000Z","next_check":"2026-07-10T00:00:00.000Z"}