Celery RedBeat
Celery RedBeat is a custom Celery Beat Scheduler that leverages Redis for persistent storage of scheduled tasks and their runtime metadata. It allows for dynamic creation, modification, and deletion of periodic tasks at runtime without requiring a restart of the Celery Beat service. This design provides fast startup times, even with a large number of tasks, and prevents multiple Beat instances from running simultaneously through a distributed lock mechanism. The current version is 2.3.3, and it maintains an active development and release cadence.
Warnings
- breaking Version 2.1.0 dropped support for Python versions older than 3.8. Ensure your Python environment meets this requirement before upgrading.
- breaking Version 0.10.0 introduced significant API changes, including breaking changes due to a reworked API and improved Python 3 compatibility. Users upgrading from very old versions (pre-0.10.0) may need to update their task definitions and configurations.
- gotcha RedBeat uses a distributed lock to prevent multiple Beat instances from running simultaneously. Misconfiguration of `redbeat_lock_key` or `redbeat_lock_timeout` can lead to multiple Beat instances (if `redbeat_lock_key` is `None`) or extended downtime if a Beat instance fails and the lock takes too long to expire.
- gotcha Using the same Redis database for Celery's broker/result backend and RedBeat's schedule storage can lead to issues, particularly if idle connections are closed, potentially corrupting the scheduler state and causing tasks to stop running silently.
- gotcha Celery Beat, including RedBeat, can sometimes silently stop dispatching tasks or fail to pick up new tasks, especially if workers are saturated with long-running tasks, if there are underlying Redis connection issues, or if Beat and Worker are run in the same process (which is not recommended for production).
Install
-
pip install celery-redbeat
Imports
- RedBeatScheduler
from redbeat.schedulers import RedBeatScheduler
- RedBeatSchedulerEntry
from redbeat import RedBeatSchedulerEntry
Quickstart
import os
from celery import Celery
from celery.schedules import crontab
from redbeat import RedBeatSchedulerEntry
# Configure Celery app with RedBeat
app = Celery('my_app', broker=os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'))
app.conf.update(
redbeat_redis_url=os.environ.get('REDBEAT_REDIS_URL', 'redis://localhost:6379/1'), # Use a different DB than broker
redbeat_lock_timeout=300, # 5 minutes
timezone='UTC',
enable_utc=True,
# Other Celery settings
)
# Define a Celery task
@app.task
def my_periodic_task(arg1, arg2):
print(f"Executing task with {arg1} and {arg2}")
return arg1 + arg2
# Example of dynamically scheduling a task
# This code would typically run in an application context, not directly in the Beat process
# To make it runnable for quickstart, wrap it in a function
def schedule_task():
# Ensure RedBeat is properly configured and Redis is running
# A new entry for an interval task
entry_interval = RedBeatSchedulerEntry(
'my-interval-task',
'my_app.my_periodic_task', # Task path
app.conf.beat_schedule.schedule(run_every=10), # Run every 10 seconds
args=[10, 20],
kwargs={'some_kwarg': 'value'},
app=app
)
entry_interval.save()
print(f"Scheduled interval task: {entry_interval.key}")
# A new entry for a crontab task (every minute)
entry_crontab = RedBeatSchedulerEntry(
'my-crontab-task',
'my_app.my_periodic_task', # Task path
crontab(minute='*', hour='*', day_of_week='*'), # Run every minute
args=['hello', 'world'],
app=app
)
entry_crontab.save()
print(f"Scheduled crontab task: {entry_crontab.key}")
# To run Celery Beat with RedBeat:
# celery -A my_app beat -S redbeat.RedBeatScheduler --loglevel=info
# To run Celery Worker:
# celery -A my_app worker --loglevel=info
# Example of how to call schedule_task() (for demonstration purposes)
if __name__ == '__main__':
# In a real application, you'd trigger this via API or startup logic
# For this quickstart, we'll just demonstrate saving an entry
# and assume you'll run Beat and Worker separately.
print("To run, ensure Redis is active, then execute:")
print("1. For Celery Beat: celery -A my_app beat -S redbeat.RedBeatScheduler --loglevel=info")
print("2. For Celery Worker: celery -A my_app worker --loglevel=info")
print("3. Then, from an interactive shell or a script, call schedule_task()")
# Example of manual scheduling if app context is setup:
# from my_app import app, my_periodic_task, schedule_task
# schedule_task()