Apache Airflow Apache Spark Provider

6.0.0 · active · verified Sun Apr 12

This provider package enables Apache Airflow to interact with Apache Spark, allowing for the orchestration and scheduling of Spark jobs. It includes operators and hooks for submitting Spark applications, executing Spark SQL queries, and performing data transfers. It's an active provider package, with version 6.0.0 released on March 28, 2026. Airflow providers are released independently of Airflow core, typically with a regular cadence to support new features and bug fixes.

Warnings

Install

Imports

Quickstart

This example demonstrates a basic Airflow DAG using the `SparkSubmitOperator` to submit a PySpark application to a Spark cluster. Before running, ensure you have a 'Spark' connection (e.g., `spark_default`) configured in your Airflow UI with the appropriate Spark master URL. The `application` parameter should point to your PySpark script accessible by the Airflow worker.

from __future__ import annotations

import pendulum

from airflow.models.dag import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

# For local testing, ensure a Spark Connection 'spark_default' is configured in Airflow UI.
# Example: Host: spark://localhost:7077 (or similar Spark Master URL)
# For a PySpark job, you might need a local 'pyspark_job.py' file.
# Example pyspark_job.py content:
# from pyspark.sql import SparkSession
# spark = SparkSession.builder.appName('SimpleSparkApp').getOrCreate()
# data = [('Alice', 1), ('Bob', 2), ('Charlie', 3)]
# df = spark.createDataFrame(data, ['Name', 'Age'])
# df.show()
# spark.stop()

with DAG(
    dag_id="spark_submit_example_dag",
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=False,
    schedule=None,
    tags=["spark", "example"],
) as dag:
    submit_pyspark_job = SparkSubmitOperator(
        task_id="submit_pyspark_job",
        conn_id="spark_default", # Ensure this Spark connection is configured in Airflow UI
        application="/opt/airflow/dags/pyspark_job.py", # Path to your PySpark script
        name="airflow_pyspark_job",
        conn_id="spark_default",
        conf={
            "spark.executor.memory": "2g",
            "spark.driver.memory": "1g"
        },
        verbose=True,
        # For more options, see SparkSubmitOperator documentation
        # application_args=["--input", "/path/to/input.csv", "--output", "/path/to/output.csv"]
    )

view raw JSON →