Dagster PySpark

0.29.0 · active · verified Mon Apr 13

Dagster-pyspark provides components for integrating PySpark with Dagster, enabling data engineers to define, orchestrate, and observe Spark-based data pipelines. It is currently at version 0.29.0 and its releases are tightly coupled with the Dagster core library's release cadence, typically releasing new versions alongside each Dagster core major/minor update.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to define a `build_pyspark_resource` for managing a `SparkSession` and then use that session within a Dagster asset. The resource is passed to the job, making the `SparkSession` available to assets that declare a dependency on it.

from dagster import asset, job, Definitions
from dagster_pyspark import build_pyspark_resource
from pyspark.sql import SparkSession

# 1. Define the PySpark resource
# This resource manages the lifecycle of a SparkSession.
# You can pass spark_conf for custom Spark properties or a setup_fn for advanced setup.
my_pyspark_resource = build_pyspark_resource(
    # Example: Configure Spark application name
    # spark_conf={
    #     "spark.app.name": "MyDagsterSparkApp",
    #     "spark.master": "local[*]" # Use local master for development
    # }
)

# 2. Define an asset that uses the SparkSession from the resource
@asset
def my_spark_asset(pyspark: SparkSession):
    # The 'pyspark' parameter receives the configured SparkSession object
    data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)]
    df = pyspark.createDataFrame(data, ["Name", "ID"])
    df.show()

    # In a real scenario, you'd perform data transformations, 
    # read/write from external systems, etc.
    return df.count() # Return some materialization metadata

# 3. Define a job that executes the asset using the PySpark resource
@job(resource_defs={"pyspark": my_pyspark_resource})
def my_spark_job():
    my_spark_asset()

# When deploying, you expose your definitions:
# defs = Definitions(
#     assets=[my_spark_asset],
#     jobs=[my_spark_job],
#     resources={
#         "pyspark": my_pyspark_resource,
#         # Add other resources like IO managers here
#     },
# )

# To run locally for testing (optional, usually done via `dagster dev`)
if __name__ == "__main__":
    from dagster import materialize_to_memory
    print("\n--- Executing my_spark_job ---")
    result = materialize_to_memory(my_spark_job)
    assert result.success
    print("Asset succeeded with result:", result.output_for_node("my_spark_asset"))

view raw JSON →