Celery Once

3.0.1 · active · verified Thu Apr 16

Celery Once is a Python library that prevents multiple execution and queuing of Celery tasks, ensuring a task runs only once. It's particularly useful for idempotent operations or tasks that should not be duplicated. The current version is 3.0.1, and it maintains an active release cadence with significant changes often introduced in major versions.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to integrate `celery-once` with a Celery task using the `QueueOnce` base class. It configures the Redis backend for `celery-once` and defines a task that will only execute once globally. Remember to start a Redis server and a Celery worker to see it in action.

import os
from celery import Celery
from celery_once import QueueOnce

# Configure Celery app
app = Celery('my_app', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

# Configure celery-once backend (e.g., Redis)
# Set CELERY_ONCE in your Celery config, or directly in app.conf
app.conf.update(
    CELERY_ONCE = {
        'backend': 'celery_once.backends.Redis', # or 'celery_once.backends.File'
        'default_timeout': 60 * 60 # 1 hour default lock timeout
    }
)

@app.task(base=QueueOnce)
def my_unique_task(arg1, arg2):
    """This task will only run once at a time globally."""
    print(f"Executing my_unique_task with {arg1}, {arg2}")
    return f"Task completed for {arg1}, {arg2}"

if __name__ == '__main__':
    # Example usage (usually tasks are called by Celery workers)
    # To run this, you'd typically start a Celery worker: celery -A your_module_name worker -l info
    # And then call the task from another Python script or shell:
    # from your_module_name import my_unique_task
    # my_unique_task.delay('value1', 'value2')
    # my_unique_task.delay('value1', 'value2') # This second call will be skipped if the first is still locked
    
    # For demonstration, manually calling (won't use Celery broker in this snippet directly)
    print("To run tasks, ensure Redis is running and start a Celery worker.\n")
    print("Example: my_unique_task.delay('hello', 123) will queue the task.\n")
    print("If run twice quickly, the second will be skipped (depending on backend).")

view raw JSON →