Dagster Celery Kubernetes
dagster-celery-k8s is a Dagster integration library that enables scalable, distributed execution of Dagster jobs using Celery as a task queue and Kubernetes for orchestrating Celery workers and individual step execution. It allows users to leverage Celery for concurrency control and task isolation, while Kubernetes handles the underlying infrastructure for distributed execution. The current version is 0.29.0, with releases typically synchronized with the Dagster core library's rapid development cadence.
Warnings
- breaking The `dagster-celery` module was refactored into `dagster-celery`, `dagster-celery-k8s`, and `dagster-celery-docker` around Dagster version 0.9.0. If you are upgrading from older versions or migrating, ensure your Celery worker startup commands use the correct `-A` parameter, e.g., `celery -A dagster_celery_k8s.app worker -l info` for this library.
- gotcha When using `celery_k8s_job_executor`, ops exchange data between potentially different worker processes or nodes. This requires configuring a persistent I/O manager (e.g., S3, GCS, or an NFS mount) that is accessible by all Celery workers and step execution Kubernetes Jobs. Default in-memory or filesystem storage will not work in a distributed setup.
- gotcha In Helm chart configurations for Dagster, the `runLauncher.type` field in `values.yaml` is case-sensitive and must be `CeleryK8sRunLauncher` (with proper capitalization). Incorrect capitalization will lead to validation errors during deployment.
- gotcha The Celery broker and backend URLs (e.g., for Redis or RabbitMQ) configured in your `dagster.yaml` for `CeleryK8sRunLauncher` and `celery_k8s_job_executor` must precisely match the configuration used to start your Celery workers. A mismatch will prevent workers from picking up tasks.
- gotcha Dagster, and its libraries, are under active development. This means internal APIs and deployment configurations can change rapidly, sometimes leading to breaking changes or deprecations between minor or even patch versions. While `dagster-celery-k8s` itself has Python requirements `<3.15,>=3.10`, the core `dagster` library it depends on has its own evolving requirements (e.g., dropping Python 3.8 support and requiring `pydantic>=2` from Dagster 1.12.x).
Install
-
pip install dagster-celery-k8s
Imports
- CeleryK8sRunLauncher
from dagster_k8s.launcher import CeleryK8sRunLauncher
- celery_k8s_job_executor
from dagster_celery_k8s.executor import celery_k8s_job_executor
Quickstart
import os
from dagster import job, op
from dagster_celery_k8s.executor import celery_k8s_job_executor
@op
def my_k8s_celery_op(context):
context.log.info(f"Executing op on Celery K8s worker with PID {os.getpid()}")
return "Hello, Celery K8s!"
@job(executor_def=celery_k8s_job_executor)
def my_k8s_celery_job():
my_k8s_celery_op()
# To configure this job, you would typically use a dagster.yaml like this:
# run_launcher:
# module: dagster_k8s.launcher
# class: CeleryK8sRunLauncher
# config:
# instance_config_map: "dagster-k8s-instance-config-map"
# dagster_home: "/opt/dagster/dagster_home"
# broker: "redis://" + os.environ.get("REDIS_BROKER_HOST", "localhost") + ":6379/0"
# backend: "redis://" + os.environ.get("REDIS_BACKEND_HOST", "localhost") + ":6379/1"
# And ensure Celery workers are running:
# celery -A dagster_celery_k8s.app worker -l info --hostname=celery@%h