Dagster Spark
Dagster Spark is a Python library that provides integration components for orchestrating Apache Spark jobs within the Dagster data platform. It enables users to define, run, and monitor Spark-based data pipelines with Dagster's declarative programming model, offering capabilities for data management, lineage, and observability. The library is actively maintained and typically releases in sync with the core Dagster library.
Warnings
- breaking The `SparkSolidDefinition` has been removed. Users should migrate to `create_spark_op` for defining Spark-based operations.
- deprecated Spark Step Launchers are superseded by Dagster Pipes and are no longer the recommended method for launching external code from Dagster ops and assets. While still available, they will not receive new features or active development.
- gotcha The Spark Declarative Pipeline (SDP) integration components (`SparkDeclarativePipelineComponent`, `SparkPipelinesResource`) are in feature preview. This API may have breaking changes in patch version releases and is not considered ready for production use.
- gotcha Ensure compatibility between your `dagster-spark` version, the `pyspark` library version (if used), and your Apache Spark cluster version. Specific Hadoop/AWS Java SDK versions might also be critical for integrations like S3.
- breaking Dagster core (a dependency of `dagster-spark`) no longer supports Python 3.8 and requires `pydantic>=2`.
Install
-
pip install dagster dagster-spark
Imports
- create_spark_op
from dagster_spark import create_spark_op
- define_spark_config
from dagster_spark import define_spark_config
- SparkDeclarativePipelineComponent
from dagster_spark.components.spark_declarative_pipeline import SparkDeclarativePipelineComponent
Quickstart
from dagster import job, Definitions, asset
from dagster_spark import create_spark_op, define_spark_config
# Define Spark configuration
my_spark_config = define_spark_config(
{
"spark.master": "local[*]",
"spark.app.name": "dagster-spark-example"
}
)
# Create an op from the Spark job definition
my_spark_job_op = create_spark_op(
main_class="org.apache.spark.examples.SparkPi",
jars=["path/to/your/spark-examples.jar"], # Replace with your Spark job JAR path
spark_config=my_spark_config,
name="my_spark_pi_op"
)
@job
def spark_pi_job():
my_spark_job_op()
# Or, integrate with assets (requires PySparkResource from dagster-pyspark for direct SparkSession)
# For a simple asset that just runs a Spark job via `spark-submit`, you might do:
# @asset
# def spark_data_asset():
# # This would involve using create_spark_op within an asset or using Dagster Pipes
# # For simplicity, this example focuses on a job.
# pass
defs = Definitions(jobs=[spark_pi_job])