Celery Batches
Celery Batches is an experimental task class that buffers messages and processes them as a list within Celery workers. Task requests are held in memory until either a configured flush count or flush interval is reached. This library is actively maintained, currently at version 0.11, and provides continuous support for recent Python and Celery versions.
Common errors
-
AttributeError: module 'celery.contrib.batches' has no attribute 'Batches'
cause The `celery.contrib.batches` module was removed in Celery 4.0. `celery-batches` is a separate, standalone library for newer Celery versions.fixFor Celery versions 4.0 and higher, `from celery_batches import Batches`. If using Celery < 4.0, you would use `from celery.contrib.batches import Batches`. -
Celery tasks sent using .delay() are not processed, or batches are flushed too slowly/quickly.
cause This is often due to an incorrect `worker_prefetch_multiplier` setting on the Celery worker, or misconfigured `flush_every` / `flush_interval` parameters on the Batches task.fixEnsure your Celery worker is started with `--prefetch-multiplier=0` (or a value greater than your task's `flush_every`). Review `flush_every` (number of tasks) and `flush_interval` (time in seconds) on your `@app.task(base=Batches, ...)` decorator to match your desired batching behavior. -
TypeError: 'SimpleRequest' object is not subscriptable (or similar AttributeError when accessing request arguments)
cause The `request` object passed to your batched task is a `SimpleRequest` instance, which exposes arguments as attributes (`request.args` for positional, `request.kwargs` for keyword) rather than direct dictionary or list access.fixAccess positional arguments via `request.args` (e.g., `request.args[0]`) and keyword arguments via `request.kwargs` (e.g., `request.kwargs['key']`).
Warnings
- breaking Celery Batches has strict compatibility requirements with Celery and Python versions. Each major release often drops support for older Celery and Python versions. Always check the Changelog or PyPI for compatibility with your specific environment.
- gotcha For Batches tasks to work correctly, Celery's `worker_prefetch_multiplier` must be set to `0` or a value higher than `flush_every`. Setting `worker_prefetch_multiplier` to `0` can cause excessive memory consumption on workers and the broker, especially with deep queues.
- gotcha Returning a value directly from a `Batches` task only provides values to signals and *does not* populate the results backend for individual task requests. If you need results for each item in the batch, you must explicitly call `app.backend.mark_as_done` for each `SimpleRequest`.
- gotcha Versions of `celery-batches` prior to `0.10` contained a bug where the count of pending tasks could be incorrectly reset when a flush was triggered by `flush_interval`, potentially causing tasks to wait indefinitely in the buffer.
Install
-
pip install celery-batches
Imports
- Batches
from celery.contrib.batches import Batches
from celery_batches import Batches
- SimpleRequest
from celery_batches import SimpleRequest
Quickstart
from celery import Celery
from celery_batches import Batches
import os
# Configure Celery app (replace with your actual broker/backend)
app = Celery('my_app', broker=os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'), backend=os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1'))
@app.task(base=Batches, flush_every=10, flush_interval=5) # Flush after 10 messages or 5 seconds
def process_batch_task(requests):
print(f"Processing a batch of {len(requests)} requests.")
for request in requests:
data = request.kwargs.get('data')
task_id = request.id
print(f" - Task ID: {task_id}, Data: {data}")
# If you need to return results for individual tasks, do it here:
# app.backend.mark_as_done(task_id, f"Processed: {data}", request=request)
if __name__ == '__main__':
# Example usage: send individual tasks
print("Sending 15 tasks...")
for i in range(15):
process_batch_task.delay(data=f'item_{i}')
print("Waiting for tasks to be processed by a Celery worker...")
# To run a worker: celery -A your_module_name worker -l info --pool=solo --prefetch-multiplier=0
# Note: --pool=solo is for simple testing. For production, use a proper pool and ensure prefetch-multiplier=0.