arq: Async Redis Job Queue
arq is a Python library for asynchronous job queues and RPC, leveraging asyncio and Redis. Conceived as a simple, modern, and performant successor to `rq`, it is designed for highly concurrent, non-blocking task execution. It is particularly well-suited for modern async Python applications like those built with FastAPI. Currently at version 0.27.0, arq is actively maintained with frequent releases.
Warnings
- breaking arq underwent a complete rewrite in v0.16, fundamentally changing how workers are registered, and jobs are enqueued and processed. Code written for v0.15 or earlier is incompatible.
- breaking Support for Python 3.8 was officially removed, while support for Python 3.13 was added.
- gotcha All job functions defined for arq workers must accept `ctx` as their first argument, which is a dictionary for passing shared resources (e.g., database connections, HTTP sessions) to jobs.
- gotcha arq uses 'pessimistic execution': jobs are only removed from the queue upon successful completion or final failure. If a worker process shuts down unexpectedly, in-progress jobs will remain in the queue to be rerun by another worker or when the worker restarts.
- gotcha For Python 3.14 and newer, `asyncio.get_event_loop()` no longer implicitly creates an event loop and will raise a `RuntimeError` if no loop is running. While `arq` v0.27.1+ includes internal fixes, custom asyncio setups might be affected.
- gotcha The `ArqRedis` methods `get_all_job_results()` and `queued_jobs()` use inefficient Redis commands (`KEYS` or multiple `GET` operations) internally, making them unsuitable for production environments with a large number of keys or jobs.
Install
-
pip install arq httpx
Imports
- create_pool
from arq import create_pool
- RedisSettings
from arq.connections import RedisSettings
- WorkerSettings
from arq.worker import WorkerSettings
- func
from arq.worker import func
- cron
from arq.cron import cron
Quickstart
import asyncio
import os
from arq import create_pool
from arq.connections import RedisSettings
from arq.worker import WorkerSettings
# For demonstration, typically use an AsyncClient from httpx or aiohttp
# Install 'httpx' for this example: pip install httpx
import httpx
REDIS_DSN = os.environ.get('ARQ_REDIS_DSN', 'redis://localhost:6379')
REDIS_SETTINGS = RedisSettings.from_dsn(REDIS_DSN)
async def download_content(ctx, url: str) -> int:
"""An example job function that downloads content from a URL."""
session: httpx.AsyncClient = ctx['session']
response = await session.get(url, timeout=5) # Added timeout for robustness
response.raise_for_status()
print(f"Worker: Downloaded {len(response.text)} bytes from {url[:50]}...")
return len(response.text)
async def startup(ctx):
"""Worker startup hook to initialize shared resources."""
print("Worker: Starting up - creating httpx.AsyncClient session.")
ctx['session'] = httpx.AsyncClient()
async def shutdown(ctx):
"""Worker shutdown hook to clean up shared resources."""
if 'session' in ctx:
await ctx['session'].aclose()
print("Worker: Shutting down - closing httpx.AsyncClient session.")
class MyWorkerSettings(WorkerSettings):
"""Worker settings for arq CLI."""
functions = [download_content]
on_startup = startup
on_shutdown = shutdown
redis_settings = REDIS_SETTINGS
keep_result = 60 * 60 # Keep job results for 1 hour
async def main():
"""Producer: Enqueues jobs."""
print(f"Producer: Connecting to Redis at {REDIS_DSN}")
redis = await create_pool(REDIS_SETTINGS)
urls = [
'https://www.google.com',
'https://www.bing.com',
'https://www.yahoo.com',
]
for url in urls:
job = await redis.enqueue_job('download_content', url)
print(f"Producer: Enqueued job {job.job_id} for {url}")
# Optional: Wait for a job result (for demonstration)
# first_job_id = (await redis.queued_jobs())[0].job_id if await redis.queued_jobs() else None
# if first_job_id:
# job = await redis.job(first_job_id)
# print(f"Producer: Waiting for job {job.job_id} result...")
# result = await job.result(timeout=10) # Wait up to 10 seconds
# print(f"Producer: Job {job.job_id} finished with result: {result}")
redis.close()
await redis.wait_closed()
print("Producer: Redis connection closed.")
if __name__ == '__main__':
# To run the producer (enqueue jobs):
# python your_script_name.py
asyncio.run(main())
# To run the worker (in a separate terminal):
# ARQ_REDIS_DSN='redis://localhost:6379' arq your_script_name.MyWorkerSettings --burst
# (Remove --burst to run continuously)