arq: Async Redis Job Queue

0.27.0 · active · verified Sat Apr 11

arq is a Python library for asynchronous job queues and RPC, leveraging asyncio and Redis. Conceived as a simple, modern, and performant successor to `rq`, it is designed for highly concurrent, non-blocking task execution. It is particularly well-suited for modern async Python applications like those built with FastAPI. Currently at version 0.27.0, arq is actively maintained with frequent releases.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to enqueue jobs using `arq` and how to define `WorkerSettings` for processing them. It includes a `download_content` job, along with `startup` and `shutdown` hooks for resource management (e.g., an `httpx.AsyncClient` session). The script can be run to enqueue jobs, and a separate worker process (using the `arq` CLI) can be started to consume them from Redis. Ensure a Redis server is running and accessible via `ARQ_REDIS_DSN` environment variable or the default `redis://localhost:6379`.

import asyncio
import os

from arq import create_pool
from arq.connections import RedisSettings
from arq.worker import WorkerSettings

# For demonstration, typically use an AsyncClient from httpx or aiohttp
# Install 'httpx' for this example: pip install httpx
import httpx

REDIS_DSN = os.environ.get('ARQ_REDIS_DSN', 'redis://localhost:6379')
REDIS_SETTINGS = RedisSettings.from_dsn(REDIS_DSN)

async def download_content(ctx, url: str) -> int:
    """An example job function that downloads content from a URL."""
    session: httpx.AsyncClient = ctx['session']
    response = await session.get(url, timeout=5) # Added timeout for robustness
    response.raise_for_status()
    print(f"Worker: Downloaded {len(response.text)} bytes from {url[:50]}...")
    return len(response.text)

async def startup(ctx):
    """Worker startup hook to initialize shared resources."""
    print("Worker: Starting up - creating httpx.AsyncClient session.")
    ctx['session'] = httpx.AsyncClient()

async def shutdown(ctx):
    """Worker shutdown hook to clean up shared resources."""
    if 'session' in ctx:
        await ctx['session'].aclose()
        print("Worker: Shutting down - closing httpx.AsyncClient session.")

class MyWorkerSettings(WorkerSettings):
    """Worker settings for arq CLI."""
    functions = [download_content]
    on_startup = startup
    on_shutdown = shutdown
    redis_settings = REDIS_SETTINGS
    keep_result = 60 * 60 # Keep job results for 1 hour

async def main():
    """Producer: Enqueues jobs."""
    print(f"Producer: Connecting to Redis at {REDIS_DSN}")
    redis = await create_pool(REDIS_SETTINGS)
    
    urls = [
        'https://www.google.com',
        'https://www.bing.com',
        'https://www.yahoo.com',
    ]

    for url in urls:
        job = await redis.enqueue_job('download_content', url)
        print(f"Producer: Enqueued job {job.job_id} for {url}")
    
    # Optional: Wait for a job result (for demonstration)
    # first_job_id = (await redis.queued_jobs())[0].job_id if await redis.queued_jobs() else None
    # if first_job_id:
    #     job = await redis.job(first_job_id)
    #     print(f"Producer: Waiting for job {job.job_id} result...")
    #     result = await job.result(timeout=10) # Wait up to 10 seconds
    #     print(f"Producer: Job {job.job_id} finished with result: {result}")
    
    redis.close()
    await redis.wait_closed()
    print("Producer: Redis connection closed.")

if __name__ == '__main__':
    # To run the producer (enqueue jobs):
    # python your_script_name.py
    asyncio.run(main())

    # To run the worker (in a separate terminal):
    # ARQ_REDIS_DSN='redis://localhost:6379' arq your_script_name.MyWorkerSettings --burst
    # (Remove --burst to run continuously)

view raw JSON →