{"id":5491,"library":"spark-expectations","title":"Spark Expectations","description":"Spark Expectations is a Python library by Nike-Inc that facilitates in-flight data quality (DQ) checks within Apache Spark jobs. It enables validation of data against defined rules (row-level, aggregate, and query-based) as data is processed, ensuring only quality data reaches its destination. Erroneous records are quarantined into a separate error table, and aggregated metrics are provided. The library is actively maintained with regular updates; the current version is 2.9.1.","status":"active","version":"2.9.1","language":"en","source_language":"en","source_url":"https://github.com/Nike-Inc/spark-expectations","tags":["spark","data quality","pyspark","data validation","etl","elt","data governance"],"install":[{"cmd":"pip install spark-expectations","lang":"bash","label":"Install latest version"}],"dependencies":[{"reason":"Core functionality relies on Apache Spark DataFrames.","package":"pyspark"}],"imports":[{"symbol":"SparkExpectations","correct":"from spark_expectations.core.expectations import SparkExpectations"},{"symbol":"WrappedDataFrameWriter","correct":"from spark_expectations.core.expectations import WrappedDataFrameWriter"},{"symbol":"Constants","correct":"from spark_expectations.config.user_config import Constants as user_config"},{"note":"Used for streaming DataFrame writes, often via `target_and_error_table_writer`.","symbol":"WrappedDataFrameStreamWriter","correct":"from spark_expectations.sinks.utils.writer import WrappedDataFrameStreamWriter"}],"quickstart":{"code":"from pyspark.sql import SparkSession, DataFrame\nfrom spark_expectations.core.expectations import SparkExpectations, WrappedDataFrameWriter\nfrom spark_expectations.config.user_config import Constants as user_config\nimport os\n\n# Initialize Spark Session (example for local execution)\nspark = SparkSession.builder \\\n    .appName(\"SparkExpectationsQuickstart\") \\\n    .config(\"spark.sql.warehouse.dir\", \"file:///tmp/spark-warehouse\") \\\n    .enableHiveSupport() \\\n    .getOrCreate()\n\n# Example DataFrame\ndata = [(\"1\", \"Alice\", 30), (\"2\", \"Bob\", 25), (\"3\", \"Charlie\", None), (\"4\", \"David\", 35), (\"5\", \"Eve\", \"invalid_age\")]\nschema = [\"id\", \"name\", \"age\"]\ndf = spark.createDataFrame(data, schema)\ndf.createOrReplaceTempView(\"my_source_table\")\n\n# Define mock rules DataFrame (in a real scenario, this would be loaded from a table or config file)\nrules_data = [\n    (\"my_product\", \"my_source_table\", \"row_dq\", \"age is not null\", \"error_records\", \"drop\", \"None\", \"None\", \"Active\"),\n    (\"my_product\", \"my_source_table\", \"row_dq\", \"age between 1 and 100\", \"error_records\", \"drop\", \"None\", \"None\", \"Active\"),\n    (\"my_product\", \"my_source_table\", \"agg_dq\", \"count(id) > 0\", \"error_records\", \"fail\", \"None\", \"None\", \"Active\")\n]\nrules_schema = [\"product_id\", \"table_name\", \"rule_type\", \"rule_column\", \"expectation_failure_criteria\", \"action_if_failed\", \"tag\", \"enable_for_source_dq_validation\", \"active\"]\nrules_df = spark.createDataFrame(rules_data, rules_schema)\nrules_df.createOrReplaceTempView(\"dq_rules_table\")\n\n# Configure Spark Expectations\nse_user_config = {\n    user_config.PRODUCT_ID: \"my_product\",\n    user_config.TABLE_NAME: \"my_source_table\",\n    user_config.RULES_TABLE_NAME: \"dq_rules_table\",\n    user_config.STATS_TABLE_NAME: \"dq_stats_table\",\n    user_config.ERROR_RECORDS_TABLE_NAME: \"my_source_table_error\",\n    user_config.TARGET_TABLE_NAME: \"my_target_table\",\n    user_config.QUERY_METRICS_TABLE_NAME: \"dq_query_metrics_table\"\n}\n\nwriter = WrappedDataFrameWriter().mode(\"overwrite\") # or \"append\", \"delta\", etc.\n\nse = SparkExpectations(\n    product_id=se_user_config[user_config.PRODUCT_ID],\n    rules_df=rules_df,\n    stats_table=se_user_config[user_config.STATS_TABLE_NAME],\n    stats_table_writer=writer,\n    target_and_error_table_writer=writer,\n    dq_rules_api_type=\"sql\",\n    query_metrics_table_name=se_user_config[user_config.QUERY_METRICS_TABLE_NAME]\n)\n\n@se.with_expectations(\n    product_id=se_user_config[user_config.PRODUCT_ID],\n    table_name=se_user_config[user_config.TABLE_NAME],\n    target_table=se_user_config[user_config.TARGET_TABLE_NAME],\n    write_to_table=True, # Set to True to write valid data to target table\n    user_conf=se_user_config\n)\ndef process_data_with_dq() -> DataFrame:\n    # Your data processing logic here. This DataFrame will be validated.\n    processed_df = spark.table(\"my_source_table\")\n    return processed_df\n\n# Run the data quality job\nvalidated_df = process_data_with_dq()\n\nprint(\"\\nValidated DataFrame (valid records only):\")\nvalidated_df.show()\n\nprint(\"\\nError records table (if any):\")\nspark.table(se_user_config[user_config.ERROR_RECORDS_TABLE_NAME]).show()\n\nprint(\"\\nDQ Stats table:\")\nspark.table(se_user_config[user_config.STATS_TABLE_NAME]).show()\n\n# Stop Spark Session\nspark.stop()\n","lang":"python","description":"This quickstart demonstrates how to set up `SparkExpectations`, define data quality rules, and apply them to a Spark DataFrame using the `@se.with_expectations` decorator. It includes steps for initializing Spark, creating a sample DataFrame, defining mock rules, configuring the `SparkExpectations` instance, and running the decorated function to process and validate data."},"warnings":[{"fix":"Review Databricks Serverless documentation and `spark-expectations` specific guidance for serverless deployments. Consider using alternative deployment models if conflicts persist. If possible, ensure the `pyspark` version required by `spark-expectations` is compatible with the Databricks runtime.","message":"Databricks Serverless Compute environments may encounter issues with `pyspark` dependency installation, as `spark-expectations`'s `pyspark` requirement can conflict with the pre-installed optimized `pyspark` on Databricks Serverless. This can lead to job failures.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Always include `option(\"checkpointLocation\", \"path/to/checkpoint\")` in your `WrappedDataFrameStreamWriter` configuration for streaming target and error tables.","message":"When using `spark-expectations` with streaming DataFrames, it is crucial to configure a dedicated `checkpointLocation` in your streaming write options. Failure to do so can lead to production issues related to fault tolerance, exactly-once processing, and recovery after failures.","severity":"gotcha","affected_versions":"All versions supporting streaming"},{"fix":"Ensure `rules_df` is provided during `SparkExpectations` instantiation and `stats_table`, `error_records_table_name`, and `target_table` are correctly configured in `user_conf` and passed to the `@se.with_expectations` decorator.","message":"The library requires the setup of specific tables: a `rules_df` (DataFrame containing DQ rule definitions), a `stats_table` (for aggregated metrics), and an `_error` table (to quarantine failed records). Incorrect or missing configuration of these tables will prevent the library from functioning correctly.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Carefully review the `spark-expectations` documentation on `action_if_failed` for each rule type to align with desired data handling and job failure policies.","message":"The `action_if_failed` setting for data quality rules behaves differently based on the `rule_type` (row-level, aggregate, query-based) and the configured action ('fail', 'ignore', 'drop'). Misunderstanding these behaviors can lead to unexpected job failures or data loss. For instance, 'drop' on row-level rules removes bad rows from the target, while 'fail' on aggregate/query rules can fail the entire job.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Thoroughly test `spark-expectations` jobs when migrating to new major Apache Spark versions. Consult Spark migration guides for details on compatibility issues and necessary code adjustments, especially concerning SQL syntax and NULL handling.","message":"While `spark-expectations` itself aims for backward compatibility, its reliance on Apache Spark means that major Spark upgrades (e.g., Spark 3.x to 4.x) can introduce breaking changes due to external factors like ANSI SQL mode becoming default in Spark 4.0.","severity":"breaking","affected_versions":"Spark 4.0 and newer (indirectly affects `spark-expectations` users)"},{"fix":"For notification in Databricks Serverless, prefer webhook-based notification methods like Slack or Microsoft Teams, which are generally more reliable in such environments.","message":"Email notifications from `spark-expectations` might not function as expected in Databricks Serverless environments due to network restrictions.","severity":"gotcha","affected_versions":"All versions in Databricks Serverless environments"},{"fix":"When dealing with evolving schemas in source data, plan for how the `_error` table schema will be managed to accommodate changes and prevent write failures. This may involve custom schema handling or specific table format configurations.","message":"Explicit schema evolution support for error tables in the event of data quality rule failures may require manual configuration or handling.","severity":"gotcha","affected_versions":"All versions"}],"env_vars":null,"last_verified":"2026-04-13T00:00:00.000Z","next_check":"2026-07-12T00:00:00.000Z"}