Databricks Data Quality eXtended (DQX)
Data Quality eXtended (DQX) is a Python library for defining, executing, and monitoring data quality checks. It leverages Apache Spark and is designed to integrate seamlessly within the Databricks ecosystem, supporting features like Delta Lake, DLT, and Unity Catalog. The library is actively maintained with frequent releases, currently at version 0.13.0, introducing features like an enhanced data quality dashboard and AI-assisted rule generation.
Warnings
- gotcha DQX requires an active SparkSession. When running outside a Databricks environment (e.g., locally), you must explicitly install `pyspark` and create a `SparkSession` instance before initializing `DQEngine`.
- breaking Starting from v0.7.1, the `apply_checks` method enforces strict type validation for rules. Rules must be passed as a list of `DQRule` objects. Passing dictionaries or other types directly will raise a `TypeError`.
- breaking The Data Quality Dashboard has been significantly enhanced and restructured in v0.13.0. Existing custom dashboard integrations or deployment scripts might require updates to align with the new three-tab structure and underlying APIs.
- gotcha The `DQGenerator` class for AI-assisted rule generation (v0.12.0) and ODCS Data Contract rule generation (v0.11.0) introduces new APIs. Users expecting rule generation might need to adopt these new classes and methods instead of manual rule creation.
Install
-
pip install databricks-labs-dqx pyspark
Imports
- DQEngine
from dqx.core.dq_engine import DQEngine
- DQRule
from dqx.core.rule import DQRule
- DQGenerator
from dqx.core.dq_generator import DQGenerator
Quickstart
from pyspark.sql import SparkSession
from dqx.core.dq_engine import DQEngine
from dqx.core.rule import DQRule
# Initialize SparkSession (for local execution)
spark = SparkSession.builder.appName("DQXQuickstart") \
.master("local[*]") \
.getOrCreate()
# Create a sample DataFrame
data = [("A", 1, "2023-01-01"), ("B", 2, "2023-01-02"), ("C", None, "2023-01-03"), ("D", 4, "2023-01-01")]
columns = ["id", "value", "event_date"]
df = spark.createDataFrame(data, columns)
# Define data quality rules
rules = [
DQRule("value_not_null", "value IS NOT NULL", "value column should not be null"),
DQRule("id_is_unique", "COUNT(DISTINCT id) = COUNT(id)", "id column should be unique", dq_check_type="Aggregated"),
DQRule("event_date_freshness", "event_date >= '2023-01-01'", "event_date should be recent")
]
# Initialize DQEngine
dq_engine = DQEngine(spark_session=spark)
# Apply checks
results = dq_engine.run_checks(df, checks=rules)
# Print results
print("Data Quality Check Results:")
results.display()
# Stop SparkSession
spark.stop()