Celery Singleton
Celery Singleton is a Python library that provides a base class for Celery tasks, ensuring that only one instance of a specific task can be queued or running at any given time. It achieves this by using Redis for distributed locking, leveraging the task's name and arguments to determine uniqueness. The current version is 0.3.1, released in January 2021, and its development appears to be in maintenance with recent activity on its GitHub issues.
Common errors
-
TypeError: Object of type <YourObject> is not JSON serializable
cause You are passing a Python object as a task argument that cannot be converted to JSON. celery-singleton needs JSON serializable arguments to create a consistent lock key.fixRefactor your task to accept only JSON-serializable data (e.g., IDs, strings, numbers, lists, dictionaries). If you need to work with complex objects, pass their unique identifiers and load the full object within the task body. -
celery.exceptions.DuplicateTaskError: Task <task_name> with args <args> and kwargs <kwargs> is already running or queued.
cause This error occurs when you attempt to call a singleton task that is already running or queued, and the task is configured with `raise_on_duplicate=True`.fixIf you want to handle duplicates by receiving the `AsyncResult` of the existing task instead of an error, remove `raise_on_duplicate=True` from your task decorator. If you explicitly want to prevent queuing and catch the error, ensure your calling code handles `DuplicateTaskError`. -
Task always remains in 'PENDING' state and never executes, even when no other instance is running.
cause This often indicates that the Redis lock for the task was not released due to a previous worker crash or hard termination, leading to a stale lock. Another cause can be a misconfigured `singleton_backend_url` preventing `celery-singleton` from finding/creating locks.fixEnsure all singleton tasks have a `lock_expiry` set to prevent permanent stale locks. Check your Celery and `celery-singleton` Redis backend configurations. Manually clear stale locks from your Redis instance if necessary, using `DEL <lock_key>` where `<lock_key>` typically includes the task name and arguments hash.
Warnings
- gotcha All arguments passed to a singleton task must be JSON serializable, as celery-singleton uses their JSON representation to generate a unique lock key in Redis. Non-serializable arguments will cause task queuing to fail.
- gotcha Celery Singleton relies on Redis for distributed locking. If your Celery app does not use Redis as its result backend or broker, you must explicitly configure a Redis URL for `celery-singleton` using the `singleton_backend_url` setting in your Celery config.
- gotcha If a Celery worker is forcefully terminated (e.g., via a hard time limit or unexpected crash) while executing a singleton task, the Redis lock for that task might not be released, leading to a 'deadlock' where subsequent identical tasks are permanently blocked.
Install
-
pip install celery-singleton
Imports
- Singleton
from celery_singleton import Singleton
- DuplicateTaskError
from celery_singleton import Singleton, DuplicateTaskError
Quickstart
import time
from celery_singleton import Singleton
from celery import Celery
import os
# Assuming a local Redis for Celery broker and result backend
celery_app = Celery(
'my_app',
broker=os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'),
backend=os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1')
)
@celery_app.task(base=Singleton)
def do_stuff(*args, **kwargs):
time.sleep(4)
return 'I just woke up'
if __name__ == '__main__':
# Example of running tasks
print('Calling do_stuff(1, 2, 3, a="b") the first time...')
async_result = do_stuff.delay(1, 2, 3, a='b')
print(f'First call result ID: {async_result.id}')
print('Calling do_stuff(1, 2, 3, a="b") the second time (should return existing task)...')
async_result2 = do_stuff.delay(1, 2, 3, a='b')
print(f'Second call result ID: {async_result2.id}')
assert async_result.id == async_result2.id
print('Assertion successful: Duplicate task call returned the same AsyncResult ID.')
print(f'Task 1 status: {async_result.status}')
print(f'Waiting for task 1 to complete...')
print(f'Task 1 result: {async_result.get()}')