Spark Expectations

2.9.1 · active · verified Mon Apr 13

Spark Expectations is a Python library by Nike-Inc that facilitates in-flight data quality (DQ) checks within Apache Spark jobs. It enables validation of data against defined rules (row-level, aggregate, and query-based) as data is processed, ensuring only quality data reaches its destination. Erroneous records are quarantined into a separate error table, and aggregated metrics are provided. The library is actively maintained with regular updates; the current version is 2.9.1.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to set up `SparkExpectations`, define data quality rules, and apply them to a Spark DataFrame using the `@se.with_expectations` decorator. It includes steps for initializing Spark, creating a sample DataFrame, defining mock rules, configuring the `SparkExpectations` instance, and running the decorated function to process and validate data.

from pyspark.sql import SparkSession, DataFrame
from spark_expectations.core.expectations import SparkExpectations, WrappedDataFrameWriter
from spark_expectations.config.user_config import Constants as user_config
import os

# Initialize Spark Session (example for local execution)
spark = SparkSession.builder \
    .appName("SparkExpectationsQuickstart") \
    .config("spark.sql.warehouse.dir", "file:///tmp/spark-warehouse") \
    .enableHiveSupport() \
    .getOrCreate()

# Example DataFrame
data = [("1", "Alice", 30), ("2", "Bob", 25), ("3", "Charlie", None), ("4", "David", 35), ("5", "Eve", "invalid_age")]
schema = ["id", "name", "age"]
df = spark.createDataFrame(data, schema)
df.createOrReplaceTempView("my_source_table")

# Define mock rules DataFrame (in a real scenario, this would be loaded from a table or config file)
rules_data = [
    ("my_product", "my_source_table", "row_dq", "age is not null", "error_records", "drop", "None", "None", "Active"),
    ("my_product", "my_source_table", "row_dq", "age between 1 and 100", "error_records", "drop", "None", "None", "Active"),
    ("my_product", "my_source_table", "agg_dq", "count(id) > 0", "error_records", "fail", "None", "None", "Active")
]
rules_schema = ["product_id", "table_name", "rule_type", "rule_column", "expectation_failure_criteria", "action_if_failed", "tag", "enable_for_source_dq_validation", "active"]
rules_df = spark.createDataFrame(rules_data, rules_schema)
rules_df.createOrReplaceTempView("dq_rules_table")

# Configure Spark Expectations
se_user_config = {
    user_config.PRODUCT_ID: "my_product",
    user_config.TABLE_NAME: "my_source_table",
    user_config.RULES_TABLE_NAME: "dq_rules_table",
    user_config.STATS_TABLE_NAME: "dq_stats_table",
    user_config.ERROR_RECORDS_TABLE_NAME: "my_source_table_error",
    user_config.TARGET_TABLE_NAME: "my_target_table",
    user_config.QUERY_METRICS_TABLE_NAME: "dq_query_metrics_table"
}

writer = WrappedDataFrameWriter().mode("overwrite") # or "append", "delta", etc.

se = SparkExpectations(
    product_id=se_user_config[user_config.PRODUCT_ID],
    rules_df=rules_df,
    stats_table=se_user_config[user_config.STATS_TABLE_NAME],
    stats_table_writer=writer,
    target_and_error_table_writer=writer,
    dq_rules_api_type="sql",
    query_metrics_table_name=se_user_config[user_config.QUERY_METRICS_TABLE_NAME]
)

@se.with_expectations(
    product_id=se_user_config[user_config.PRODUCT_ID],
    table_name=se_user_config[user_config.TABLE_NAME],
    target_table=se_user_config[user_config.TARGET_TABLE_NAME],
    write_to_table=True, # Set to True to write valid data to target table
    user_conf=se_user_config
)
def process_data_with_dq() -> DataFrame:
    # Your data processing logic here. This DataFrame will be validated.
    processed_df = spark.table("my_source_table")
    return processed_df

# Run the data quality job
validated_df = process_data_with_dq()

print("\nValidated DataFrame (valid records only):")
validated_df.show()

print("\nError records table (if any):")
spark.table(se_user_config[user_config.ERROR_RECORDS_TABLE_NAME]).show()

print("\nDQ Stats table:")
spark.table(se_user_config[user_config.STATS_TABLE_NAME]).show()

# Stop Spark Session
spark.stop()

view raw JSON →