PySpark DataFrame Testing Utility
pyspark-test is a Python library designed to simplify unit testing for PySpark DataFrames. It provides a function, `assert_pyspark_df_equal`, inspired by the pandas testing module, which allows users to compare two Spark DataFrames and identify any differences. The library is currently at version 0.2.0 and has a stable, albeit infrequent, release cadence, focusing on its core DataFrame comparison functionality.
Common errors
-
AssertionError: DataFrames are not equal. ...
cause The actual data within the DataFrames differs. This could be due to differences in individual cell values or the presence/absence of rows.fixExamine the detailed error output provided by `assert_pyspark_df_equal`, which highlights differing rows and columns. Verify your transformation logic or expected input/output data. Ensure `order_by` is set if row order is non-deterministic. -
AssertionError: Schema are not equal. ...
cause The schemas (column names, data types, nullability) of the compared DataFrames do not match, and `check_dtype=True` was used.fixReview the schema definition of both DataFrames. Ensure that all column names, their exact data types, and nullability properties are identical. Set `check_dtype=False` if you only care about data values and not type strictness. -
AssertionError: Column names are not equal. ...
cause When `check_column_names=True`, the DataFrames have different column names or the columns are in a different order, and `check_columns_in_order=True` was used.fixEnsure that both DataFrames have the exact same column names in the exact same order. If column order is not important, set `check_columns_in_order=False`.
Warnings
- gotcha By default, `assert_pyspark_df_equal` does not check for column name equality (`check_column_names=False`) or column order (`check_columns_in_order=False`). This can lead to false positives if DataFrames have identical data but different column metadata or ordering. Always explicitly set comparison strictness.
- gotcha Managing SparkSession setup and teardown in a test suite can be complex, leading to resource leaks or slow tests if not handled properly. Excessive logging from `py4j` (Spark's Java gateway) can also obscure relevant test output.
Install
-
pip install pyspark-test
Imports
- assert_pyspark_df_equal
from pyspark_test import assert_pyspark_df_equal
Quickstart
import datetime
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, StringType, DoubleType, LongType
from pyspark_test import assert_pyspark_df_equal
# Initialize SparkSession for testing
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
# Create two identical DataFrames
df_1 = spark.createDataFrame(
data=[
[datetime.date(2020, 1, 1), 'apple', 1.123, 10],
[None, 'banana', 2.345, 20],
],
schema=StructType([
StructField('col_a', DateType(), True),
StructField('col_b', StringType(), True),
StructField('col_c', DoubleType(), True),
StructField('col_d', LongType(), True),
]),
)
df_2 = spark.createDataFrame(
data=[
[datetime.date(2020, 1, 1), 'apple', 1.123, 10],
[None, 'banana', 2.345, 20],
],
schema=StructType([
StructField('col_a', DateType(), True),
StructField('col_b', StringType(), True),
StructField('col_c', DoubleType(), True),
StructField('col_d', LongType(), True),
]),
)
# Assert that the two DataFrames are equal
print("Asserting identical DataFrames...")
assert_pyspark_df_equal(df_1, df_2, check_dtype=True, check_column_names=True, check_columns_in_order=True, order_by=['col_a', 'col_b'])
print("Assertion successful: DataFrames are equal.")
# Example of intentionally different DataFrames to demonstrate failure
df_3 = spark.createDataFrame(
data=[
[datetime.date(2020, 1, 1), 'apple', 1.123, 10],
[None, 'orange', 99.999, 20], # Changed data
],
schema=StructType([
StructField('col_a', DateType(), True),
StructField('col_b', StringType(), True),
StructField('col_c', DoubleType(), True),
StructField('col_d', LongType(), True),
]),
)
print("\nAsserting different DataFrames (expected to fail)...")
try:
assert_pyspark_df_equal(df_1, df_3, check_dtype=True, check_column_names=True, check_columns_in_order=True, order_by=['col_a', 'col_b'])
except AssertionError as e:
print(f"Caught expected error: {e}")
# Stop SparkSession
spark.stop()