Apache Airflow Livy Provider
The Apache Airflow Livy Provider package enables Apache Airflow to interact with Apache Livy, an open-source REST service for submitting and managing Spark jobs on a cluster over HTTP. It includes operators and hooks to facilitate the orchestration of Spark applications within Airflow DAGs. The current version is 4.5.5, with provider packages often updated independently of, but in alignment with, core Airflow releases.
Warnings
- breaking Provider versions have specific minimum Airflow core versions. For `apache-airflow-providers-apache-livy` version 4.5.x, Airflow 2.11.0 or higher is required. Using older Airflow versions with newer providers can lead to incompatibility errors or unexpected behavior.
- breaking Older provider versions (e.g., 3.0.0 and earlier) introduced breaking changes due to the removal of the `apply_default` decorator. If you upgrade the Livy provider on an Airflow instance older than 2.1.0, Airflow might automatically upgrade, necessitating a manual `airflow upgrade db` command.
- gotcha The `LivyOperator` requires a Livy connection to be configured in the Airflow UI (Admin -> Connections). The default `livy_conn_id` is 'livy_default'. Misconfiguration or absence of this connection will result in task failures.
Install
-
pip install apache-airflow-providers-apache-livy
Imports
- LivyOperator
from airflow.providers.apache.livy.operators.livy import LivyOperator
- LivyHook
from airflow.providers.apache.livy.hooks.livy import LivyHook
Quickstart
from __future__ import annotations
import os
from datetime import datetime
from airflow.models.dag import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator
with DAG(
dag_id='livy_spark_pi_example',
schedule=None,
start_date=datetime(2023, 1, 1),
catchup=False,
tags=['livy', 'spark', 'example'],
) as dag:
# Ensure 'livy_default' connection is configured in Airflow UI (Admin -> Connections)
# with appropriate host and port for your Livy server.
# Example: livy_default, Host: localhost, Port: 8998
submit_spark_pi_job = LivyOperator(
task_id='submit_spark_pi_job',
file=os.getenv('LIVY_SPARK_PI_JAR', '/opt/spark/examples/jars/spark-examples_2.12-3.2.1.jar'),
class_name='org.apache.spark.examples.SparkPi',
args=['10'], # Example: Calculate Pi with 10 iterations
livy_conn_id='livy_default',
driver_memory='1g',
executor_memory='1g',
num_executors=1
)