Soda Core Spark DataFrame Connector
Soda Core Spark DF is a Soda Core package that enables Soda Core to connect to Spark DataFrames as a data source. It allows users to define and run data quality checks directly on Spark DataFrames, making it suitable for data pipelines that operate within a Spark environment. Current version is 3.5.6. Releases follow Soda Core's cadence, typically monthly or bi-monthly for minor versions.
Warnings
- gotcha PySpark Installation and Environment: Users often encounter issues due to `pyspark` not being installed or having version mismatches with their underlying Spark environment, leading to `ModuleNotFoundError` or runtime errors related to Spark context initialization.
- gotcha Configuration Mismatch: The `data_source` name in the `scan.add_configuration_yaml_str` must exactly match the `data_source_name` argument when initializing `SparkDfDataSource`. Similarly, the `table_name` argument in `SparkDfDataSource` must match `checks_for` in your SodaCL. Mismatches prevent checks from being executed.
- gotcha Forgetting to add Spark Session to Scan: Users might define `SparkDfDataSource` but forget to link the active `SparkSession` to the `Scan` object using `scan.add_spark_session(spark)`. This can lead to runtime errors or scans failing to execute correctly because the underlying Spark context is not available to the Soda Scan.
Install
-
pip install soda-core-spark-df pyspark
Imports
- Scan
from soda.scan import Scan
- SparkDfDataSource
from soda.spark_df_data_source import SparkDfDataSource
Quickstart
from pyspark.sql import SparkSession
from soda.scan import Scan
from soda.spark_df_data_source import SparkDfDataSource
# 1. Prepare your Spark DataFrame
spark = SparkSession.builder.appName("SodaSparkTest").getOrCreate()
data = [("Alice", 1), ("Bob", 2), ("Charlie", None)]
columns = ["name", "id"]
df = spark.createDataFrame(data, columns)
# 2. Configure Soda Core and register the Spark DataFrame
scan = Scan()
scan.add_configuration_yaml_str(
f"""
data_source spark_df_source:
type: spark_df
"""
)
scan.add_spark_session(spark) # Pass the active SparkSession
# 3. Add the DataFrame to the scan as a data source
spark_df_data_source = SparkDfDataSource(spark=spark, data_frame=df, data_source_name="spark_df_source", table_name="my_spark_df_table")
scan.add_data_source(spark_df_data_source)
# 4. Define your checks (e.g., in a checks.yaml or programmatically)
scan.add_sodacl_yaml_str(
"""
checks_for my_spark_df_table:
- row_count > 0
- missing_count(id) = 1
- column_count = 2
"""
)
# 5. Execute the scan
scan.execute()
# 6. Print scan results
print(scan.get_logs_text())
if scan.has_failures():
print("Scan finished with failures.")
exit(1)
elif scan.has_warnings():
print("Scan finished with warnings.")
exit(0)
else:
print("Scan finished successfully.")