Apache Airflow Celery Provider
This provider package integrates Apache Airflow with Celery, enabling the use of Celery workers for task execution. It allows Airflow to scale task processing by distributing tasks to a pool of Celery workers, using message brokers like RabbitMQ or Redis. The current version is 3.17.2, and releases are tied to the Apache Airflow release cycle, typically monthly or bi-monthly.
Warnings
- breaking Airflow 2.0+ changed the recommended way to enable the Celery Executor. The `[celery]` section in `airflow.cfg` is still relevant for broker/backend settings, but the executor must now be explicitly set in the `[core]` section.
- gotcha Incorrect `broker_url` or `result_backend` configuration in `airflow.cfg` is a common source of issues. These must point to your running message broker (e.g., RabbitMQ, Redis) and result store, respectively, and be accessible from all Airflow components (webserver, scheduler, workers).
- gotcha Tasks can be assigned to specific Celery queues using the `queue` parameter. If Celery workers are not configured to listen to these specific queues (e.g., `airflow celery worker -q default,my_custom_queue`), tasks assigned to those queues will remain in a 'queued' state indefinitely.
- deprecated The `celery_queue` parameter for tasks has been deprecated in favor of the `queue` parameter.
Install
-
pip install apache-airflow-providers-celery
Imports
- CeleryConnectionHook
from airflow.providers.celery.hooks.celery import CeleryConnectionHook
Quickstart
import pendulum
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
with DAG(
dag_id="celery_executor_example",
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False,
schedule=None,
tags=["celery", "example"],
) as dag:
# This task will run on a worker designated for the 'high_priority' queue
high_priority_task = BashOperator(
task_id="run_on_high_priority_queue",
bash_command="echo 'Running on high priority queue' && sleep 5",
queue="high_priority", # Configure Celery worker to listen to this queue
)
# This task will run on a worker designated for the 'default' queue
default_queue_task = BashOperator(
task_id="run_on_default_queue",
bash_command="echo 'Running on default queue' && sleep 2",
queue="default", # Or omit, if default queue is configured
)
high_priority_task >> default_queue_task