Dagster Celery Integration

0.29.0 · active · verified Sat Apr 11

dagster-celery is a library that allows Dagster to use Celery as its execution engine, offloading op computations to a distributed Celery cluster. The current version is 0.29.0, which corresponds to Dagster core version 1.13.0. This library typically releases in lockstep with the main Dagster project, following a rapid, iterative release cadence.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to configure a Dagster job to use the Celery executor. It defines a simple op and a job, then configures `celery_executor` with basic broker and backend URLs (using environment variables for flexibility). For this to run, you need a running Celery broker/backend (e.g., Redis) and a Celery worker started with `celery -A your_module worker -l info` pointing to your Dagster code. The job will then submit its op execution to the Celery queue.

import os
from dagster import Definitions, job, op
from dagster_celery import celery_executor

# Configure Celery broker and backend URLs via environment variables
# Example for local development with Redis: export CELERY_BROKER_URL="redis://localhost:6379/0"
# Example for local development with Redis: export CELERY_RESULT_BACKEND="redis://localhost:6379/0"
celery_broker_url = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0')
celery_result_backend = os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0')

celery_config = {
    "broker_url": celery_broker_url,
    "result_backend": celery_result_backend,
    "task_default_queue": "dagster_celery_queue", # Optional: dedicated queue for Dagster tasks
}

@op
def my_celery_op(context):
    context.log.info("Executing op on Celery worker!")
    return "Hello from Celery!"

@job(executor_def=celery_executor.configured(celery_config))
def my_celery_job():
    my_celery_op()

# To run this, you would need to:
# 1. Start a Redis server (or other message broker/backend).
# 2. Start a Celery worker: celery -A dagster_celery.app worker -l info -Q dagster_celery_queue
#    (Assuming your Dagster code is in a module named 'dagster_celery.app' or similar)
# 3. Load the repository with 'dagster dev' and launch a run for 'my_celery_job'.

defs = Definitions(
    jobs=[my_celery_job]
)

view raw JSON →