Spark Expectations
Spark Expectations is a Python library by Nike-Inc that facilitates in-flight data quality (DQ) checks within Apache Spark jobs. It enables validation of data against defined rules (row-level, aggregate, and query-based) as data is processed, ensuring only quality data reaches its destination. Erroneous records are quarantined into a separate error table, and aggregated metrics are provided. The library is actively maintained with regular updates; the current version is 2.9.1.
Warnings
- gotcha Databricks Serverless Compute environments may encounter issues with `pyspark` dependency installation, as `spark-expectations`'s `pyspark` requirement can conflict with the pre-installed optimized `pyspark` on Databricks Serverless. This can lead to job failures.
- gotcha When using `spark-expectations` with streaming DataFrames, it is crucial to configure a dedicated `checkpointLocation` in your streaming write options. Failure to do so can lead to production issues related to fault tolerance, exactly-once processing, and recovery after failures.
- gotcha The library requires the setup of specific tables: a `rules_df` (DataFrame containing DQ rule definitions), a `stats_table` (for aggregated metrics), and an `_error` table (to quarantine failed records). Incorrect or missing configuration of these tables will prevent the library from functioning correctly.
- gotcha The `action_if_failed` setting for data quality rules behaves differently based on the `rule_type` (row-level, aggregate, query-based) and the configured action ('fail', 'ignore', 'drop'). Misunderstanding these behaviors can lead to unexpected job failures or data loss. For instance, 'drop' on row-level rules removes bad rows from the target, while 'fail' on aggregate/query rules can fail the entire job.
- breaking While `spark-expectations` itself aims for backward compatibility, its reliance on Apache Spark means that major Spark upgrades (e.g., Spark 3.x to 4.x) can introduce breaking changes due to external factors like ANSI SQL mode becoming default in Spark 4.0.
- gotcha Email notifications from `spark-expectations` might not function as expected in Databricks Serverless environments due to network restrictions.
- gotcha Explicit schema evolution support for error tables in the event of data quality rule failures may require manual configuration or handling.
Install
-
pip install spark-expectations
Imports
- SparkExpectations
from spark_expectations.core.expectations import SparkExpectations
- WrappedDataFrameWriter
from spark_expectations.core.expectations import WrappedDataFrameWriter
- Constants
from spark_expectations.config.user_config import Constants as user_config
- WrappedDataFrameStreamWriter
from spark_expectations.sinks.utils.writer import WrappedDataFrameStreamWriter
Quickstart
from pyspark.sql import SparkSession, DataFrame
from spark_expectations.core.expectations import SparkExpectations, WrappedDataFrameWriter
from spark_expectations.config.user_config import Constants as user_config
import os
# Initialize Spark Session (example for local execution)
spark = SparkSession.builder \
.appName("SparkExpectationsQuickstart") \
.config("spark.sql.warehouse.dir", "file:///tmp/spark-warehouse") \
.enableHiveSupport() \
.getOrCreate()
# Example DataFrame
data = [("1", "Alice", 30), ("2", "Bob", 25), ("3", "Charlie", None), ("4", "David", 35), ("5", "Eve", "invalid_age")]
schema = ["id", "name", "age"]
df = spark.createDataFrame(data, schema)
df.createOrReplaceTempView("my_source_table")
# Define mock rules DataFrame (in a real scenario, this would be loaded from a table or config file)
rules_data = [
("my_product", "my_source_table", "row_dq", "age is not null", "error_records", "drop", "None", "None", "Active"),
("my_product", "my_source_table", "row_dq", "age between 1 and 100", "error_records", "drop", "None", "None", "Active"),
("my_product", "my_source_table", "agg_dq", "count(id) > 0", "error_records", "fail", "None", "None", "Active")
]
rules_schema = ["product_id", "table_name", "rule_type", "rule_column", "expectation_failure_criteria", "action_if_failed", "tag", "enable_for_source_dq_validation", "active"]
rules_df = spark.createDataFrame(rules_data, rules_schema)
rules_df.createOrReplaceTempView("dq_rules_table")
# Configure Spark Expectations
se_user_config = {
user_config.PRODUCT_ID: "my_product",
user_config.TABLE_NAME: "my_source_table",
user_config.RULES_TABLE_NAME: "dq_rules_table",
user_config.STATS_TABLE_NAME: "dq_stats_table",
user_config.ERROR_RECORDS_TABLE_NAME: "my_source_table_error",
user_config.TARGET_TABLE_NAME: "my_target_table",
user_config.QUERY_METRICS_TABLE_NAME: "dq_query_metrics_table"
}
writer = WrappedDataFrameWriter().mode("overwrite") # or "append", "delta", etc.
se = SparkExpectations(
product_id=se_user_config[user_config.PRODUCT_ID],
rules_df=rules_df,
stats_table=se_user_config[user_config.STATS_TABLE_NAME],
stats_table_writer=writer,
target_and_error_table_writer=writer,
dq_rules_api_type="sql",
query_metrics_table_name=se_user_config[user_config.QUERY_METRICS_TABLE_NAME]
)
@se.with_expectations(
product_id=se_user_config[user_config.PRODUCT_ID],
table_name=se_user_config[user_config.TABLE_NAME],
target_table=se_user_config[user_config.TARGET_TABLE_NAME],
write_to_table=True, # Set to True to write valid data to target table
user_conf=se_user_config
)
def process_data_with_dq() -> DataFrame:
# Your data processing logic here. This DataFrame will be validated.
processed_df = spark.table("my_source_table")
return processed_df
# Run the data quality job
validated_df = process_data_with_dq()
print("\nValidated DataFrame (valid records only):")
validated_df.show()
print("\nError records table (if any):")
spark.table(se_user_config[user_config.ERROR_RECORDS_TABLE_NAME]).show()
print("\nDQ Stats table:")
spark.table(se_user_config[user_config.STATS_TABLE_NAME]).show()
# Stop Spark Session
spark.stop()