RQ (Redis Queue)
Simple Python job queue backed by Redis or Valkey. Current version is 2.7.0. Requires Python >=3.9, Redis >=5 or Valkey >=7.2. Much simpler than Celery — no broker config, just a Redis connection. Key API change: job.result property removed in 1.12, replaced by job.return_value(). job.get_status() now returns a JobStatus enum, not a string.
Warnings
- breaking job.result property removed in rq 1.12.0. Accessing job.result raises AttributeError. All tutorials and LLM-generated code using job.result are broken.
- breaking job.get_status() returns a JobStatus enum (e.g. JobStatus.FINISHED) not a string ('finished'). Comparing with strings like if job.get_status() == 'finished' always evaluates to False.
- breaking Python 3.8 support dropped. Minimum is Python 3.9.
- breaking redis-py 6.0.0 is explicitly blocked — rq will refuse to install with redis==6.0.0 due to critical incompatibilities. Use redis>=4.0.0,!=6.0.0.
- gotcha Enqueued functions must be importable by the worker process. Functions defined in __main__ or interactively (notebooks, scripts run directly) cannot be pickled and will raise PicklingError.
- gotcha Workers must be started in a separate process. Running rq worker in the same process as enqueuers causes deadlocks. Workers run: rq worker [queue_name]
- gotcha On Windows and macOS, the default Worker uses fork() which is not available. Use SpawnWorker instead, which uses multiprocessing.spawn.
Install
-
pip install rq
Imports
- Queue
from redis import Redis from rq import Queue # Connection required at Queue creation time redis_conn = Redis(host='localhost', port=6379, db=0) q = Queue(connection=redis_conn) # Functions must be importable — define in a module, not __main__ from myapp.tasks import process_data job = q.enqueue(process_data, arg1, arg2) # Get result (rq >= 1.12) result = job.return_value() # None until job completes # Check status — returns JobStatus enum, not string from rq.job import JobStatus status = job.get_status() if status == JobStatus.FINISHED: print(job.return_value())
Quickstart
# tasks.py — functions must be in an importable module
def add(x, y):
return x + y
def send_email(to, subject, body):
# ... email logic
return True
# enqueue.py — enqueue jobs
from redis import Redis
from rq import Queue
from tasks import add
redis_conn = Redis()
q = Queue(connection=redis_conn)
# Enqueue
job = q.enqueue(add, 4, 6)
print('Job ID:', job.id)
# Enqueue with options
job2 = q.enqueue(
add, 10, 20,
job_timeout=300, # seconds before job is killed
result_ttl=500, # seconds to keep result in Redis
retry=3 # retry on failure
)
# Check result (after worker runs)
import time
time.sleep(1)
print(job.return_value()) # 10
print(job.get_status()) # JobStatus.FINISHED
# --- Start worker in separate terminal ---
# rq worker