Apache Airflow Livy Provider

raw JSON →
4.5.5 verified Tue Apr 14 auth: no python

The Apache Airflow Livy Provider package enables Apache Airflow to interact with Apache Livy, an open-source REST service for submitting and managing Spark jobs on a cluster over HTTP. It includes operators and hooks to facilitate the orchestration of Spark applications within Airflow DAGs. The current version is 4.5.5, with provider packages often updated independently of, but in alignment with, core Airflow releases.

pip install apache-airflow-providers-apache-livy
error ModuleNotFoundError: No module named 'airflow_livy'
cause The 'airflow_livy' module is not installed or not available in the Python environment.
fix
Ensure that the 'apache-airflow-providers-apache-livy' package is installed using pip: 'pip install apache-airflow-providers-apache-livy'.
error AirflowException: Batch {batch_id} did not succeed
cause The Livy batch job did not complete successfully, possibly due to issues in the Spark application or cluster.
fix
Check the Livy and Spark logs to identify the cause of failure and address any issues in the Spark application or cluster configuration.
error AttributeError: module 'airflow.providers.apache.livy.operators.livy' has no attribute 'LivyBatchOperator'
cause The 'LivyBatchOperator' has been renamed or is not available in the current version of the 'apache-airflow-providers-apache-livy' package.
fix
Use 'LivyOperator' instead of 'LivyBatchOperator' as per the latest documentation.
breaking Provider versions have specific minimum Airflow core versions. For `apache-airflow-providers-apache-livy` version 4.5.x, Airflow 2.11.0 or higher is required. Using older Airflow versions with newer providers can lead to incompatibility errors or unexpected behavior.
fix Always check the provider's documentation for the minimum supported Airflow version. Upgrade your Airflow installation if it doesn't meet the provider's requirements. For current version 4.5.5, ensure Airflow >= 2.11.0.
breaking Older provider versions (e.g., 3.0.0 and earlier) introduced breaking changes due to the removal of the `apply_default` decorator. If you upgrade the Livy provider on an Airflow instance older than 2.1.0, Airflow might automatically upgrade, necessitating a manual `airflow upgrade db` command.
fix When upgrading providers, especially on older Airflow installations, be aware of potential core Airflow upgrades. Always run `airflow upgrade db` after an Airflow upgrade to ensure database schema compatibility. It's recommended to upgrade Airflow to a modern version (>=2.11.0) before upgrading to recent Livy provider versions.
gotcha The `LivyOperator` requires a Livy connection to be configured in the Airflow UI (Admin -> Connections). The default `livy_conn_id` is 'livy_default'. Misconfiguration or absence of this connection will result in task failures.
fix Before running a DAG with `LivyOperator`, navigate to Airflow UI -> Admin -> Connections. Create or verify a connection named 'livy_default' (or your specified `livy_conn_id`) with the correct host and port for your Apache Livy server.

This quickstart demonstrates a basic Airflow DAG using the `LivyOperator` to submit a Spark Pi calculation job to a Livy server. Ensure you have a 'livy_default' connection configured in your Airflow UI pointing to your Livy instance. The `file` parameter should point to the Spark example JAR on your cluster's accessible path.

from __future__ import annotations

import os
from datetime import datetime

from airflow.models.dag import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator


with DAG(
    dag_id='livy_spark_pi_example',
    schedule=None,
    start_date=datetime(2023, 1, 1),
    catchup=False,
    tags=['livy', 'spark', 'example'],
) as dag:
    # Ensure 'livy_default' connection is configured in Airflow UI (Admin -> Connections)
    # with appropriate host and port for your Livy server.
    # Example: livy_default, Host: localhost, Port: 8998

    submit_spark_pi_job = LivyOperator(
        task_id='submit_spark_pi_job',
        file=os.getenv('LIVY_SPARK_PI_JAR', '/opt/spark/examples/jars/spark-examples_2.12-3.2.1.jar'),
        class_name='org.apache.spark.examples.SparkPi',
        args=['10'], # Example: Calculate Pi with 10 iterations
        livy_conn_id='livy_default',
        driver_memory='1g',
        executor_memory='1g',
        num_executors=1
    )