{"id":5170,"library":"dagster-pyspark","title":"Dagster PySpark","description":"Dagster-pyspark provides components for integrating PySpark with Dagster, enabling data engineers to define, orchestrate, and observe Spark-based data pipelines. It is currently at version 0.29.0 and its releases are tightly coupled with the Dagster core library's release cadence, typically releasing new versions alongside each Dagster core major/minor update.","status":"active","version":"0.29.0","language":"en","source_language":"en","source_url":"https://github.com/dagster-io/dagster/tree/master/python_modules/dagster-framework/pyspark","tags":["data processing","spark","pyspark","etl","dagster","orchestration","resource"],"install":[{"cmd":"pip install dagster-pyspark","lang":"bash","label":"Install base package"},{"cmd":"pip install dagster-pyspark[databricks]","lang":"bash","label":"Install with Databricks support"},{"cmd":"pip install dagster-pyspark[emr]","lang":"bash","label":"Install with EMR support"}],"dependencies":[{"reason":"Core Dagster library, required for defining assets, jobs, and resources.","package":"dagster","optional":false},{"reason":"PySpark library itself, required to interact with Spark clusters. The base `dagster-pyspark` package pulls a compatible version.","package":"pyspark","optional":false}],"imports":[{"note":"While `pyspark_resource` is the underlying class, `build_pyspark_resource` is the recommended factory function for creating and configuring the resource.","wrong":"from dagster_pyspark import pyspark_resource","symbol":"build_pyspark_resource","correct":"from dagster_pyspark import build_pyspark_resource"},{"symbol":"pyspark_delta_lake_io_manager","correct":"from dagster_pyspark import pyspark_delta_lake_io_manager"},{"symbol":"pyspark_step_launcher","correct":"from dagster_pyspark import pyspark_step_launcher"}],"quickstart":{"code":"from dagster import asset, job, Definitions\nfrom dagster_pyspark import build_pyspark_resource\nfrom pyspark.sql import SparkSession\n\n# 1. Define the PySpark resource\n# This resource manages the lifecycle of a SparkSession.\n# You can pass spark_conf for custom Spark properties or a setup_fn for advanced setup.\nmy_pyspark_resource = build_pyspark_resource(\n    # Example: Configure Spark application name\n    # spark_conf={\n    #     \"spark.app.name\": \"MyDagsterSparkApp\",\n    #     \"spark.master\": \"local[*]\" # Use local master for development\n    # }\n)\n\n# 2. Define an asset that uses the SparkSession from the resource\n@asset\ndef my_spark_asset(pyspark: SparkSession):\n    # The 'pyspark' parameter receives the configured SparkSession object\n    data = [(\"Alice\", 1), (\"Bob\", 2), (\"Charlie\", 3)]\n    df = pyspark.createDataFrame(data, [\"Name\", \"ID\"])\n    df.show()\n\n    # In a real scenario, you'd perform data transformations, \n    # read/write from external systems, etc.\n    return df.count() # Return some materialization metadata\n\n# 3. Define a job that executes the asset using the PySpark resource\n@job(resource_defs={\"pyspark\": my_pyspark_resource})\ndef my_spark_job():\n    my_spark_asset()\n\n# When deploying, you expose your definitions:\n# defs = Definitions(\n#     assets=[my_spark_asset],\n#     jobs=[my_spark_job],\n#     resources={\n#         \"pyspark\": my_pyspark_resource,\n#         # Add other resources like IO managers here\n#     },\n# )\n\n# To run locally for testing (optional, usually done via `dagster dev`)\nif __name__ == \"__main__\":\n    from dagster import materialize_to_memory\n    print(\"\\n--- Executing my_spark_job ---\")\n    result = materialize_to_memory(my_spark_job)\n    assert result.success\n    print(\"Asset succeeded with result:\", result.output_for_node(\"my_spark_asset\"))\n","lang":"python","description":"This quickstart demonstrates how to define a `build_pyspark_resource` for managing a `SparkSession` and then use that session within a Dagster asset. The resource is passed to the job, making the `SparkSession` available to assets that declare a dependency on it."},"warnings":[{"fix":"Always install `dagster` and all `dagster-*` libraries with matching major.minor versions. For example, if you use `dagster==1.13.0`, ensure `dagster-pyspark==0.29.0` (which is the corresponding library version).","message":"Dagster library versions, including `dagster-pyspark`, are tightly coupled with the core `dagster` library. Installing a mismatched version (e.g., `dagster-pyspark` 0.29.0 with `dagster` 1.12.0) will lead to runtime errors or unexpected behavior.","severity":"breaking","affected_versions":"All versions"},{"fix":"Ensure your environment meets PySpark's requirements. For local development, install Java. For cluster execution, correctly configure `spark_conf` in `build_pyspark_resource` or use an appropriate `pyspark_step_launcher` for your cluster type (e.g., Databricks, EMR).","message":"While `dagster-pyspark` lists `pyspark` as a dependency, proper PySpark environment setup (e.g., Java Development Kit, `SPARK_HOME` environment variable, or cluster-specific configurations) is critical and often falls outside the scope of this library. Misconfiguration can lead to Spark errors.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Always use the `SparkSession` provided by the `pyspark` resource parameter in your assets. For example: `@asset def my_asset(pyspark: SparkSession): ...`","message":"The `build_pyspark_resource` factory function creates and manages a single `SparkSession` instance for your Dagster run within the asset's execution context. Avoid creating additional `SparkSession` instances directly within your assets unless you have specific, advanced requirements, as this can lead to resource contention or incorrect behavior.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Replace direct instantiation `pyspark_resource()` with `build_pyspark_resource()`. This allows for clearer configuration via keyword arguments.","message":"Direct instantiation of `pyspark_resource` (e.g., `pyspark_resource()`) is deprecated. The preferred and more flexible approach is to use the `build_pyspark_resource` factory function.","severity":"deprecated","affected_versions":"<=0.20.0 (and continued usage in later versions)"}],"env_vars":null,"last_verified":"2026-04-13T00:00:00.000Z","next_check":"2026-07-12T00:00:00.000Z"}