Celery RedBeat

2.3.3 · active · verified Sat Apr 11

Celery RedBeat is a custom Celery Beat Scheduler that leverages Redis for persistent storage of scheduled tasks and their runtime metadata. It allows for dynamic creation, modification, and deletion of periodic tasks at runtime without requiring a restart of the Celery Beat service. This design provides fast startup times, even with a large number of tasks, and prevents multiple Beat instances from running simultaneously through a distributed lock mechanism. The current version is 2.3.3, and it maintains an active development and release cadence.

Warnings

Install

Imports

Quickstart

To get started, configure your Celery application to use `RedBeatScheduler` by setting `redbeat_redis_url` and specifying the scheduler when running `celery beat`. Tasks can be defined dynamically using `RedBeatSchedulerEntry` instances and saved to Redis. It's recommended to use a different Redis database for RedBeat than for your Celery broker.

import os
from celery import Celery
from celery.schedules import crontab
from redbeat import RedBeatSchedulerEntry

# Configure Celery app with RedBeat
app = Celery('my_app', broker=os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'))
app.conf.update(
    redbeat_redis_url=os.environ.get('REDBEAT_REDIS_URL', 'redis://localhost:6379/1'), # Use a different DB than broker
    redbeat_lock_timeout=300, # 5 minutes
    timezone='UTC',
    enable_utc=True,
    # Other Celery settings
)

# Define a Celery task
@app.task
def my_periodic_task(arg1, arg2):
    print(f"Executing task with {arg1} and {arg2}")
    return arg1 + arg2

# Example of dynamically scheduling a task
# This code would typically run in an application context, not directly in the Beat process
# To make it runnable for quickstart, wrap it in a function
def schedule_task():
    # Ensure RedBeat is properly configured and Redis is running
    # A new entry for an interval task
    entry_interval = RedBeatSchedulerEntry(
        'my-interval-task',
        'my_app.my_periodic_task', # Task path
        app.conf.beat_schedule.schedule(run_every=10), # Run every 10 seconds
        args=[10, 20],
        kwargs={'some_kwarg': 'value'},
        app=app
    )
    entry_interval.save()
    print(f"Scheduled interval task: {entry_interval.key}")

    # A new entry for a crontab task (every minute)
    entry_crontab = RedBeatSchedulerEntry(
        'my-crontab-task',
        'my_app.my_periodic_task', # Task path
        crontab(minute='*', hour='*', day_of_week='*'), # Run every minute
        args=['hello', 'world'],
        app=app
    )
    entry_crontab.save()
    print(f"Scheduled crontab task: {entry_crontab.key}")

# To run Celery Beat with RedBeat:
# celery -A my_app beat -S redbeat.RedBeatScheduler --loglevel=info

# To run Celery Worker:
# celery -A my_app worker --loglevel=info

# Example of how to call schedule_task() (for demonstration purposes)
if __name__ == '__main__':
    # In a real application, you'd trigger this via API or startup logic
    # For this quickstart, we'll just demonstrate saving an entry
    # and assume you'll run Beat and Worker separately.
    print("To run, ensure Redis is active, then execute:")
    print("1. For Celery Beat: celery -A my_app beat -S redbeat.RedBeatScheduler --loglevel=info")
    print("2. For Celery Worker: celery -A my_app worker --loglevel=info")
    print("3. Then, from an interactive shell or a script, call schedule_task()")
    # Example of manual scheduling if app context is setup:
    # from my_app import app, my_periodic_task, schedule_task
    # schedule_task()

view raw JSON →