Celery Once
Celery Once is a Python library that prevents multiple execution and queuing of Celery tasks, ensuring a task runs only once. It's particularly useful for idempotent operations or tasks that should not be duplicated. The current version is 3.0.1, and it maintains an active release cadence with significant changes often introduced in major versions.
Common errors
-
OSError: [Errno 36] File name too long
cause When using the `File` backend with `celery-once` versions prior to 3.0.0, long or complex task arguments could generate lock filenames exceeding the operating system's filename length limits.fixUpgrade `celery-once` to version 3.0.0 or newer. This version introduces hashing and truncation for file backend keys, resolving the filename length issue. -
Too many open files
cause When using the `File` backend with `celery-once` versions prior to 2.1.1, file descriptors were not always properly closed, leading to resource exhaustion over time.fixUpgrade `celery-once` to version 2.1.1 or newer. This version includes a fix to ensure file descriptors are correctly closed after use. -
Tasks are still running multiple times / Race condition when acquiring lock
cause Prior to `celery-once` version 2.0.0, the Redis backend implementation had known race conditions that could allow duplicate task execution under specific circumstances.fixUpgrade `celery-once` to version 2.0.0 or newer. This version refactors the Redis backend to use a robust SETNX-based RedLock mechanism, which effectively mitigates race conditions. Remember to clear old Redis keys when upgrading. -
Task always reports AlreadyQueued but never executes or completes.
cause A task might acquire a lock but fail to release it (e.g., due to an unhandled exception before the lock release, or worker crash), leaving the task permanently locked until the timeout expires.fixEnsure your task code handles exceptions gracefully. If a lock is genuinely stuck, you may need to manually clear the lock key from your chosen backend (e.g., delete the specific key from Redis or the file from the filesystem path configured for `celery-once`). Consider using a shorter `default_timeout` for locks if tasks are generally short-lived.
Warnings
- breaking Version 3.0.0 changed the file backend's key generation by hashing and limiting key length. This makes it incompatible with existing locks from previous versions if you are using the file backend.
- breaking Version 2.0.0 fundamentally changed the Redis backend to use a SETNX-based lock (RedLock) to address race conditions. This change may not be backward compatible with existing keys stored in Redis from 1.x.x versions.
- gotcha Prior to version 2.1.1, the `File` backend could lead to a 'Too many open files' error due to unclosed file descriptors, especially under heavy load or long-running processes.
- gotcha Tasks using `celery-once` might silently fail to prevent duplicate execution if the `CELERY_ONCE` configuration is missing or incorrect, defaulting to no `celery-once` backend.
Install
-
pip install celery-once -
pip install celery-once[redis]
Imports
- QueueOnce
from celery_once import QueueOnce
Quickstart
import os
from celery import Celery
from celery_once import QueueOnce
# Configure Celery app
app = Celery('my_app', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
# Configure celery-once backend (e.g., Redis)
# Set CELERY_ONCE in your Celery config, or directly in app.conf
app.conf.update(
CELERY_ONCE = {
'backend': 'celery_once.backends.Redis', # or 'celery_once.backends.File'
'default_timeout': 60 * 60 # 1 hour default lock timeout
}
)
@app.task(base=QueueOnce)
def my_unique_task(arg1, arg2):
"""This task will only run once at a time globally."""
print(f"Executing my_unique_task with {arg1}, {arg2}")
return f"Task completed for {arg1}, {arg2}"
if __name__ == '__main__':
# Example usage (usually tasks are called by Celery workers)
# To run this, you'd typically start a Celery worker: celery -A your_module_name worker -l info
# And then call the task from another Python script or shell:
# from your_module_name import my_unique_task
# my_unique_task.delay('value1', 'value2')
# my_unique_task.delay('value1', 'value2') # This second call will be skipped if the first is still locked
# For demonstration, manually calling (won't use Celery broker in this snippet directly)
print("To run tasks, ensure Redis is running and start a Celery worker.\n")
print("Example: my_unique_task.delay('hello', 123) will queue the task.\n")
print("If run twice quickly, the second will be skipped (depending on backend).")