Dagster Celery Integration
dagster-celery is a library that allows Dagster to use Celery as its execution engine, offloading op computations to a distributed Celery cluster. The current version is 0.29.0, which corresponds to Dagster core version 1.13.0. This library typically releases in lockstep with the main Dagster project, following a rapid, iterative release cadence.
Warnings
- gotcha The `dagster-celery` library requires an externally provisioned and managed Celery cluster (broker, backend, and workers). Dagster itself does not manage the lifecycle of Celery components. Misconfiguration of `broker_url` or `result_backend` in `celery_config`, or failure to start Celery workers, are common sources of errors.
- breaking Dagster's execution engine configuration has evolved across major versions. Older patterns involving `config_schema` or directly passing configuration dictionaries to `executor_def` might be deprecated or behave differently. Ensure you are using the `executor_def.configured(config)` pattern.
- gotcha Version compatibility between `dagster` core and `dagster-celery` is critical. Mismatched major versions can lead to runtime errors or unexpected behavior due to API changes in the core execution engine.
- gotcha When using `celery_k8s_executor`, the Kubernetes cluster must be properly configured for Celery workers, including appropriate images, resource requests/limits, environment variables, secrets (for broker/backend), and persistent storage if needed. Issues with any of these can prevent ops from running or reporting results.
Install
-
pip install dagster-celery
Imports
- celery_executor
from dagster_celery import celery_executor
- celery_k8s_executor
from dagster_celery import celery_k8s_executor
Quickstart
import os
from dagster import Definitions, job, op
from dagster_celery import celery_executor
# Configure Celery broker and backend URLs via environment variables
# Example for local development with Redis: export CELERY_BROKER_URL="redis://localhost:6379/0"
# Example for local development with Redis: export CELERY_RESULT_BACKEND="redis://localhost:6379/0"
celery_broker_url = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0')
celery_result_backend = os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0')
celery_config = {
"broker_url": celery_broker_url,
"result_backend": celery_result_backend,
"task_default_queue": "dagster_celery_queue", # Optional: dedicated queue for Dagster tasks
}
@op
def my_celery_op(context):
context.log.info("Executing op on Celery worker!")
return "Hello from Celery!"
@job(executor_def=celery_executor.configured(celery_config))
def my_celery_job():
my_celery_op()
# To run this, you would need to:
# 1. Start a Redis server (or other message broker/backend).
# 2. Start a Celery worker: celery -A dagster_celery.app worker -l info -Q dagster_celery_queue
# (Assuming your Dagster code is in a module named 'dagster_celery.app' or similar)
# 3. Load the repository with 'dagster dev' and launch a run for 'my_celery_job'.
defs = Definitions(
jobs=[my_celery_job]
)