{"library":"spark-expectations","title":"Spark Expectations","type":"library","description":"Spark Expectations is a Python library by Nike-Inc that facilitates in-flight data quality (DQ) checks within Apache Spark jobs. It enables validation of data against defined rules (row-level, aggregate, and query-based) as data is processed, ensuring only quality data reaches its destination. Erroneous records are quarantined into a separate error table, and aggregated metrics are provided. The library is actively maintained with regular updates; the current version is 2.9.1.","language":"python","status":"active","last_verified":"Sat May 16","install":{"commands":["pip install spark-expectations"],"cli":null},"imports":["from spark_expectations.core.expectations import SparkExpectations","from spark_expectations.core.expectations import WrappedDataFrameWriter","from spark_expectations.config.user_config import Constants as user_config","from spark_expectations.sinks.utils.writer import WrappedDataFrameStreamWriter"],"auth":{"required":false,"env_vars":[]},"links":{"homepage":null,"github":null,"docs":null,"changelog":null,"pypi":"https://pypi.org/project/spark-expectations/","npm":null,"openapi_spec":null,"status_page":null,"smithery":null},"quickstart":{"code":"from pyspark.sql import SparkSession, DataFrame\nfrom spark_expectations.core.expectations import SparkExpectations, WrappedDataFrameWriter\nfrom spark_expectations.config.user_config import Constants as user_config\nimport os\n\n# Initialize Spark Session (example for local execution)\nspark = SparkSession.builder \\\n    .appName(\"SparkExpectationsQuickstart\") \\\n    .config(\"spark.sql.warehouse.dir\", \"file:///tmp/spark-warehouse\") \\\n    .enableHiveSupport() \\\n    .getOrCreate()\n\n# Example DataFrame\ndata = [(\"1\", \"Alice\", 30), (\"2\", \"Bob\", 25), (\"3\", \"Charlie\", None), (\"4\", \"David\", 35), (\"5\", \"Eve\", \"invalid_age\")]\nschema = [\"id\", \"name\", \"age\"]\ndf = spark.createDataFrame(data, schema)\ndf.createOrReplaceTempView(\"my_source_table\")\n\n# Define mock rules DataFrame (in a real scenario, this would be loaded from a table or config file)\nrules_data = [\n    (\"my_product\", \"my_source_table\", \"row_dq\", \"age is not null\", \"error_records\", \"drop\", \"None\", \"None\", \"Active\"),\n    (\"my_product\", \"my_source_table\", \"row_dq\", \"age between 1 and 100\", \"error_records\", \"drop\", \"None\", \"None\", \"Active\"),\n    (\"my_product\", \"my_source_table\", \"agg_dq\", \"count(id) > 0\", \"error_records\", \"fail\", \"None\", \"None\", \"Active\")\n]\nrules_schema = [\"product_id\", \"table_name\", \"rule_type\", \"rule_column\", \"expectation_failure_criteria\", \"action_if_failed\", \"tag\", \"enable_for_source_dq_validation\", \"active\"]\nrules_df = spark.createDataFrame(rules_data, rules_schema)\nrules_df.createOrReplaceTempView(\"dq_rules_table\")\n\n# Configure Spark Expectations\nse_user_config = {\n    user_config.PRODUCT_ID: \"my_product\",\n    user_config.TABLE_NAME: \"my_source_table\",\n    user_config.RULES_TABLE_NAME: \"dq_rules_table\",\n    user_config.STATS_TABLE_NAME: \"dq_stats_table\",\n    user_config.ERROR_RECORDS_TABLE_NAME: \"my_source_table_error\",\n    user_config.TARGET_TABLE_NAME: \"my_target_table\",\n    user_config.QUERY_METRICS_TABLE_NAME: \"dq_query_metrics_table\"\n}\n\nwriter = WrappedDataFrameWriter().mode(\"overwrite\") # or \"append\", \"delta\", etc.\n\nse = SparkExpectations(\n    product_id=se_user_config[user_config.PRODUCT_ID],\n    rules_df=rules_df,\n    stats_table=se_user_config[user_config.STATS_TABLE_NAME],\n    stats_table_writer=writer,\n    target_and_error_table_writer=writer,\n    dq_rules_api_type=\"sql\",\n    query_metrics_table_name=se_user_config[user_config.QUERY_METRICS_TABLE_NAME]\n)\n\n@se.with_expectations(\n    product_id=se_user_config[user_config.PRODUCT_ID],\n    table_name=se_user_config[user_config.TABLE_NAME],\n    target_table=se_user_config[user_config.TARGET_TABLE_NAME],\n    write_to_table=True, # Set to True to write valid data to target table\n    user_conf=se_user_config\n)\ndef process_data_with_dq() -> DataFrame:\n    # Your data processing logic here. This DataFrame will be validated.\n    processed_df = spark.table(\"my_source_table\")\n    return processed_df\n\n# Run the data quality job\nvalidated_df = process_data_with_dq()\n\nprint(\"\\nValidated DataFrame (valid records only):\")\nvalidated_df.show()\n\nprint(\"\\nError records table (if any):\")\nspark.table(se_user_config[user_config.ERROR_RECORDS_TABLE_NAME]).show()\n\nprint(\"\\nDQ Stats table:\")\nspark.table(se_user_config[user_config.STATS_TABLE_NAME]).show()\n\n# Stop Spark Session\nspark.stop()\n","lang":"python","description":"This quickstart demonstrates how to set up `SparkExpectations`, define data quality rules, and apply them to a Spark DataFrame using the `@se.with_expectations` decorator. It includes steps for initializing Spark, creating a sample DataFrame, defining mock rules, configuring the `SparkExpectations` instance, and running the decorated function to process and validate data.","tag":null,"tag_description":null,"last_tested":null,"results":[]},"compatibility":{"tag":null,"tag_description":null,"last_tested":"2026-05-16","installed_version":"2.10.0","pypi_latest":"2.10.0","is_stale":false,"summary":{"python_range":"3.10–3.9","success_rate":100,"avg_install_s":6.3,"avg_import_s":null,"wheel_type":"wheel"},"results":[{"runtime":"python:3.10-alpine","python_version":"3.10","os_libc":"alpine (musl)","variant":"spark-expectations","exit_code":0,"wheel_type":"wheel","failure_reason":null,"import_side_effects":"broken","install_time_s":null,"import_time_s":null,"mem_mb":null,"disk_size":"27.6M"},{"runtime":"python:3.10-slim","python_version":"3.10","os_libc":"slim (glibc)","variant":"spark-expectations","exit_code":0,"wheel_type":"wheel","failure_reason":null,"import_side_effects":"broken","install_time_s":2.9,"import_time_s":null,"mem_mb":null,"disk_size":"29M"},{"runtime":"python:3.11-alpine","python_version":"3.11","os_libc":"alpine (musl)","variant":"spark-expectations","exit_code":0,"wheel_type":"wheel","failure_reason":null,"import_side_effects":"broken","install_time_s":null,"import_time_s":null,"mem_mb":null,"disk_size":"31.1M"},{"runtime":"python:3.11-slim","python_version":"3.11","os_libc":"slim (glibc)","variant":"spark-expectations","exit_code":0,"wheel_type":"wheel","failure_reason":null,"import_side_effects":"broken","install_time_s":2.8,"import_time_s":null,"mem_mb":null,"disk_size":"32M"},{"runtime":"python:3.12-alpine","python_version":"3.12","os_libc":"alpine (musl)","variant":"spark-expectations","exit_code":0,"wheel_type":"wheel","failure_reason":null,"import_side_effects":"broken","install_time_s":null,"import_time_s":null,"mem_mb":null,"disk_size":"22.7M"},{"runtime":"python:3.12-slim","python_version":"3.12","os_libc":"slim (glibc)","variant":"spark-expectations","exit_code":0,"wheel_type":"wheel","failure_reason":null,"import_side_effects":"broken","install_time_s":2.6,"import_time_s":null,"mem_mb":null,"disk_size":"24M"},{"runtime":"python:3.13-alpine","python_version":"3.13","os_libc":"alpine (musl)","variant":"spark-expectations","exit_code":0,"wheel_type":"sdist","failure_reason":null,"import_side_effects":"broken","install_time_s":null,"import_time_s":null,"mem_mb":null,"disk_size":"356.2M"},{"runtime":"python:3.13-slim","python_version":"3.13","os_libc":"slim (glibc)","variant":"spark-expectations","exit_code":0,"wheel_type":"sdist","failure_reason":null,"import_side_effects":"broken","install_time_s":19.8,"import_time_s":null,"mem_mb":null,"disk_size":"357M"},{"runtime":"python:3.9-alpine","python_version":"3.9","os_libc":"alpine (musl)","variant":"spark-expectations","exit_code":0,"wheel_type":"wheel","failure_reason":null,"import_side_effects":"broken","install_time_s":null,"import_time_s":null,"mem_mb":null,"disk_size":"26.9M"},{"runtime":"python:3.9-slim","python_version":"3.9","os_libc":"slim (glibc)","variant":"spark-expectations","exit_code":0,"wheel_type":"wheel","failure_reason":null,"import_side_effects":"broken","install_time_s":3.2,"import_time_s":null,"mem_mb":null,"disk_size":"28M"}]}}