PySpark DataFrame Testing Utility

0.2.0 · active · verified Thu Apr 16

pyspark-test is a Python library designed to simplify unit testing for PySpark DataFrames. It provides a function, `assert_pyspark_df_equal`, inspired by the pandas testing module, which allows users to compare two Spark DataFrames and identify any differences. The library is currently at version 0.2.0 and has a stable, albeit infrequent, release cadence, focusing on its core DataFrame comparison functionality.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to use `assert_pyspark_df_equal` to compare two PySpark DataFrames. It includes the necessary setup for a local SparkSession and shows both successful and intentionally failing assertions to illustrate its usage and error reporting. The `check_dtype`, `check_column_names`, `check_columns_in_order`, and `order_by` parameters are used for a strict comparison.

import datetime
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, StringType, DoubleType, LongType
from pyspark_test import assert_pyspark_df_equal

# Initialize SparkSession for testing
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

# Create two identical DataFrames
df_1 = spark.createDataFrame(
    data=[
        [datetime.date(2020, 1, 1), 'apple', 1.123, 10],
        [None, 'banana', 2.345, 20],
    ],
    schema=StructType([
        StructField('col_a', DateType(), True),
        StructField('col_b', StringType(), True),
        StructField('col_c', DoubleType(), True),
        StructField('col_d', LongType(), True),
    ]),
)
df_2 = spark.createDataFrame(
    data=[
        [datetime.date(2020, 1, 1), 'apple', 1.123, 10],
        [None, 'banana', 2.345, 20],
    ],
    schema=StructType([
        StructField('col_a', DateType(), True),
        StructField('col_b', StringType(), True),
        StructField('col_c', DoubleType(), True),
        StructField('col_d', LongType(), True),
    ]),
)

# Assert that the two DataFrames are equal
print("Asserting identical DataFrames...")
assert_pyspark_df_equal(df_1, df_2, check_dtype=True, check_column_names=True, check_columns_in_order=True, order_by=['col_a', 'col_b'])
print("Assertion successful: DataFrames are equal.")

# Example of intentionally different DataFrames to demonstrate failure
df_3 = spark.createDataFrame(
    data=[
        [datetime.date(2020, 1, 1), 'apple', 1.123, 10],
        [None, 'orange', 99.999, 20], # Changed data
    ],
    schema=StructType([
        StructField('col_a', DateType(), True),
        StructField('col_b', StringType(), True),
        StructField('col_c', DoubleType(), True),
        StructField('col_d', LongType(), True),
    ]),
)

print("\nAsserting different DataFrames (expected to fail)...")
try:
    assert_pyspark_df_equal(df_1, df_3, check_dtype=True, check_column_names=True, check_columns_in_order=True, order_by=['col_a', 'col_b'])
except AssertionError as e:
    print(f"Caught expected error: {e}")

# Stop SparkSession
spark.stop()

view raw JSON →