Dagster PySpark
Dagster-pyspark provides components for integrating PySpark with Dagster, enabling data engineers to define, orchestrate, and observe Spark-based data pipelines. It is currently at version 0.29.0 and its releases are tightly coupled with the Dagster core library's release cadence, typically releasing new versions alongside each Dagster core major/minor update.
Warnings
- breaking Dagster library versions, including `dagster-pyspark`, are tightly coupled with the core `dagster` library. Installing a mismatched version (e.g., `dagster-pyspark` 0.29.0 with `dagster` 1.12.0) will lead to runtime errors or unexpected behavior.
- gotcha While `dagster-pyspark` lists `pyspark` as a dependency, proper PySpark environment setup (e.g., Java Development Kit, `SPARK_HOME` environment variable, or cluster-specific configurations) is critical and often falls outside the scope of this library. Misconfiguration can lead to Spark errors.
- gotcha The `build_pyspark_resource` factory function creates and manages a single `SparkSession` instance for your Dagster run within the asset's execution context. Avoid creating additional `SparkSession` instances directly within your assets unless you have specific, advanced requirements, as this can lead to resource contention or incorrect behavior.
- deprecated Direct instantiation of `pyspark_resource` (e.g., `pyspark_resource()`) is deprecated. The preferred and more flexible approach is to use the `build_pyspark_resource` factory function.
Install
-
pip install dagster-pyspark -
pip install dagster-pyspark[databricks] -
pip install dagster-pyspark[emr]
Imports
- build_pyspark_resource
from dagster_pyspark import build_pyspark_resource
- pyspark_delta_lake_io_manager
from dagster_pyspark import pyspark_delta_lake_io_manager
- pyspark_step_launcher
from dagster_pyspark import pyspark_step_launcher
Quickstart
from dagster import asset, job, Definitions
from dagster_pyspark import build_pyspark_resource
from pyspark.sql import SparkSession
# 1. Define the PySpark resource
# This resource manages the lifecycle of a SparkSession.
# You can pass spark_conf for custom Spark properties or a setup_fn for advanced setup.
my_pyspark_resource = build_pyspark_resource(
# Example: Configure Spark application name
# spark_conf={
# "spark.app.name": "MyDagsterSparkApp",
# "spark.master": "local[*]" # Use local master for development
# }
)
# 2. Define an asset that uses the SparkSession from the resource
@asset
def my_spark_asset(pyspark: SparkSession):
# The 'pyspark' parameter receives the configured SparkSession object
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)]
df = pyspark.createDataFrame(data, ["Name", "ID"])
df.show()
# In a real scenario, you'd perform data transformations,
# read/write from external systems, etc.
return df.count() # Return some materialization metadata
# 3. Define a job that executes the asset using the PySpark resource
@job(resource_defs={"pyspark": my_pyspark_resource})
def my_spark_job():
my_spark_asset()
# When deploying, you expose your definitions:
# defs = Definitions(
# assets=[my_spark_asset],
# jobs=[my_spark_job],
# resources={
# "pyspark": my_pyspark_resource,
# # Add other resources like IO managers here
# },
# )
# To run locally for testing (optional, usually done via `dagster dev`)
if __name__ == "__main__":
from dagster import materialize_to_memory
print("\n--- Executing my_spark_job ---")
result = materialize_to_memory(my_spark_job)
assert result.success
print("Asset succeeded with result:", result.output_for_node("my_spark_asset"))