pytest-spark
pytest-spark is a pytest plugin that simplifies testing PySpark applications by automatically providing session-scoped `spark_context` and `spark_session` fixtures. It enables users to configure the Spark environment, including setting SPARK_HOME and custom `spark_options`, directly within `pytest.ini`. The current version is 0.8.0, with an active development and release cycle.
Warnings
- gotcha Configuring `SPARK_HOME` has multiple methods (environment variable, `pytest.ini`, `--spark_home` CLI option) which are read in a specific order (CLI > `pytest.ini` > ENV). If `pyspark` is installed via pip, setting `SPARK_HOME` might not be necessary, leading to confusion or unexpected behavior if mismatched.
- breaking The `spark_context` fixture is not supported when using Spark Connect functionality. If you're working with Spark 3.4+ and Spark Connect, you must use the `spark_session` fixture instead.
- gotcha By default, the `spark_session` fixture creates a SparkSession with Hive support enabled. If Hive jars are not desired or cause conflicts, you can explicitly disable Hive support by adding `spark_options = spark.sql.catalogImplementation: in-memory` to your `pytest.ini`.
- gotcha Comparing Spark DataFrames for equality can be challenging directly due to potential differences in row order, column order, or schema. Direct `==` comparison often fails even for logically identical DataFrames.
- gotcha When running tests in parallel with `pytest-xdist`, session-scoped Spark fixtures (like `spark_session`) can interfere with each other if not properly isolated. Each parallel process might try to use the same temporary directories or resources, leading to data races or failures.
Install
-
pip install pytest-spark
Imports
- spark_session
def test_example(spark_session):
- spark_context
def test_example(spark_context):
Quickstart
import pytest
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# conftest.py (placed in your project's root or tests directory)
@pytest.fixture(scope="session")
def spark_session():
"""
Fixture for creating a SparkSession for testing.
This SparkSession is reused across all tests in the session.
"""
spark = SparkSession.builder \
.master("local[*]") \
.appName("pytest-spark-session") \
.config("spark.driver.memory", "2g") \
.getOrCreate()
yield spark
spark.stop()
# test_example.py (a sample test file)
def test_data_frame_creation(spark_session):
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
data = [("Alice", 1), ("Bob", 2)]
df = spark_session.createDataFrame(data, schema)
assert df.count() == 2
assert df.columns == ["name", "age"]
assert df.collect()[0].name == "Alice"