Celery Batches

0.11 · active · verified Thu Apr 16

Celery Batches is an experimental task class that buffers messages and processes them as a list within Celery workers. Task requests are held in memory until either a configured flush count or flush interval is reached. This library is actively maintained, currently at version 0.11, and provides continuous support for recent Python and Celery versions.

Common errors

Warnings

Install

Imports

Quickstart

This example defines a Celery task that uses `celery-batches` to process messages in groups of 10 or every 5 seconds. Individual tasks are sent using `.delay()`, and the `process_batch_task` receives a list of `SimpleRequest` objects. Remember to configure your Celery app and run a worker with `prefetch-multiplier=0` for `celery-batches` to operate correctly.

from celery import Celery
from celery_batches import Batches
import os

# Configure Celery app (replace with your actual broker/backend)
app = Celery('my_app', broker=os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'), backend=os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1'))

@app.task(base=Batches, flush_every=10, flush_interval=5) # Flush after 10 messages or 5 seconds
def process_batch_task(requests):
    print(f"Processing a batch of {len(requests)} requests.")
    for request in requests:
        data = request.kwargs.get('data')
        task_id = request.id
        print(f"  - Task ID: {task_id}, Data: {data}")
        # If you need to return results for individual tasks, do it here:
        # app.backend.mark_as_done(task_id, f"Processed: {data}", request=request)

if __name__ == '__main__':
    # Example usage: send individual tasks
    print("Sending 15 tasks...")
    for i in range(15):
        process_batch_task.delay(data=f'item_{i}')

    print("Waiting for tasks to be processed by a Celery worker...")
    # To run a worker: celery -A your_module_name worker -l info --pool=solo --prefetch-multiplier=0
    # Note: --pool=solo is for simple testing. For production, use a proper pool and ensure prefetch-multiplier=0.

view raw JSON →