{"id":8891,"library":"celery-batches","title":"Celery Batches","description":"Celery Batches is an experimental task class that buffers messages and processes them as a list within Celery workers. Task requests are held in memory until either a configured flush count or flush interval is reached. This library is actively maintained, currently at version 0.11, and provides continuous support for recent Python and Celery versions.","status":"active","version":"0.11","language":"en","source_language":"en","source_url":"https://github.com/clokep/celery-batches","tags":["celery","batch processing","task queue","async"],"install":[{"cmd":"pip install celery-batches","lang":"bash","label":"Install stable version"}],"dependencies":[{"reason":"This library is an extension for Celery and requires Celery to function.","package":"celery","optional":false}],"imports":[{"note":"The original `celery.contrib.batches` was removed in Celery 4.0. For Celery 4.0+ you must import from `celery_batches`.","wrong":"from celery.contrib.batches import Batches","symbol":"Batches","correct":"from celery_batches import Batches"},{"note":"Used for type hinting or understanding the structure of individual requests within a batch.","symbol":"SimpleRequest","correct":"from celery_batches import SimpleRequest"}],"quickstart":{"code":"from celery import Celery\nfrom celery_batches import Batches\nimport os\n\n# Configure Celery app (replace with your actual broker/backend)\napp = Celery('my_app', broker=os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'), backend=os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1'))\n\n@app.task(base=Batches, flush_every=10, flush_interval=5) # Flush after 10 messages or 5 seconds\ndef process_batch_task(requests):\n    print(f\"Processing a batch of {len(requests)} requests.\")\n    for request in requests:\n        data = request.kwargs.get('data')\n        task_id = request.id\n        print(f\"  - Task ID: {task_id}, Data: {data}\")\n        # If you need to return results for individual tasks, do it here:\n        # app.backend.mark_as_done(task_id, f\"Processed: {data}\", request=request)\n\nif __name__ == '__main__':\n    # Example usage: send individual tasks\n    print(\"Sending 15 tasks...\")\n    for i in range(15):\n        process_batch_task.delay(data=f'item_{i}')\n\n    print(\"Waiting for tasks to be processed by a Celery worker...\")\n    # To run a worker: celery -A your_module_name worker -l info --pool=solo --prefetch-multiplier=0\n    # Note: --pool=solo is for simple testing. For production, use a proper pool and ensure prefetch-multiplier=0.\n","lang":"python","description":"This example defines a Celery task that uses `celery-batches` to process messages in groups of 10 or every 5 seconds. Individual tasks are sent using `.delay()`, and the `process_batch_task` receives a list of `SimpleRequest` objects. Remember to configure your Celery app and run a worker with `prefetch-multiplier=0` for `celery-batches` to operate correctly."},"warnings":[{"fix":"Consult the `celery-batches` GitHub README or PyPI page for the supported Celery and Python versions for your desired `celery-batches` version. Upgrade/downgrade `celery-batches`, Celery, or Python as needed.","message":"Celery Batches has strict compatibility requirements with Celery and Python versions. Each major release often drops support for older Celery and Python versions. Always check the Changelog or PyPI for compatibility with your specific environment.","severity":"breaking","affected_versions":"<= 0.10"},{"fix":"Configure your Celery worker to use `--prefetch-multiplier=0` (or a value > `flush_every`). Be mindful of memory usage in production environments with `prefetch-multiplier=0`.","message":"For Batches tasks to work correctly, Celery's `worker_prefetch_multiplier` must be set to `0` or a value higher than `flush_every`. Setting `worker_prefetch_multiplier` to `0` can cause excessive memory consumption on workers and the broker, especially with deep queues.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Inside your batched task, iterate through `requests` and call `app.backend.mark_as_done(request.id, individual_result, request=request)` for each `SimpleRequest` object to store individual results.","message":"Returning a value directly from a `Batches` task only provides values to signals and *does not* populate the results backend for individual task requests. If you need results for each item in the batch, you must explicitly call `app.backend.mark_as_done` for each `SimpleRequest`.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Upgrade to `celery-batches` version `0.10` or newer to resolve the bug where `flush_interval` could cause tasks to be stuck in the buffer.","message":"Versions of `celery-batches` prior to `0.10` contained a bug where the count of pending tasks could be incorrectly reset when a flush was triggered by `flush_interval`, potentially causing tasks to wait indefinitely in the buffer.","severity":"gotcha","affected_versions":"< 0.10"}],"env_vars":null,"last_verified":"2026-04-16T00:00:00.000Z","next_check":"2026-07-15T00:00:00.000Z","problems":[{"fix":"For Celery versions 4.0 and higher, `from celery_batches import Batches`. If using Celery < 4.0, you would use `from celery.contrib.batches import Batches`.","cause":"The `celery.contrib.batches` module was removed in Celery 4.0. `celery-batches` is a separate, standalone library for newer Celery versions.","error":"AttributeError: module 'celery.contrib.batches' has no attribute 'Batches'"},{"fix":"Ensure your Celery worker is started with `--prefetch-multiplier=0` (or a value greater than your task's `flush_every`). Review `flush_every` (number of tasks) and `flush_interval` (time in seconds) on your `@app.task(base=Batches, ...)` decorator to match your desired batching behavior.","cause":"This is often due to an incorrect `worker_prefetch_multiplier` setting on the Celery worker, or misconfigured `flush_every` / `flush_interval` parameters on the Batches task.","error":"Celery tasks sent using .delay() are not processed, or batches are flushed too slowly/quickly."},{"fix":"Access positional arguments via `request.args` (e.g., `request.args[0]`) and keyword arguments via `request.kwargs` (e.g., `request.kwargs['key']`).","cause":"The `request` object passed to your batched task is a `SimpleRequest` instance, which exposes arguments as attributes (`request.args` for positional, `request.kwargs` for keyword) rather than direct dictionary or list access.","error":"TypeError: 'SimpleRequest' object is not subscriptable (or similar AttributeError when accessing request arguments)"}]}