Apache Airflow Provider for Great Expectations

raw JSON →
1.0.0 verified Thu Apr 16 auth: no python

The `airflow-provider-great-expectations` package provides Apache Airflow operators for running Great Expectations (GX) data validations directly in your DAGs. It supports validating in-memory DataFrames, data from external sources using BatchDefinitions, or triggering actions with Checkpoints. The current version is 1.0.0, released in January 2026, and it typically receives new features and maintenance updates periodically.

pip install "airflow-provider-great-expectations<3.14,>3.9"
error ModuleNotFoundError: No module named 'great_expectations.checkpoint.types.checkpoint_result'
cause An older version of `airflow-provider-great-expectations` (e.g., pre-0.2.9) is being used with `great_expectations` version 1.0.0 or higher. The `CheckpointResult` class was removed in `great_expectations==1.0.0`.
fix
Upgrade airflow-provider-great-expectations to version 0.2.9 or later, or preferably to 1.0.0 which is designed for compatibility with newer Great Expectations versions (e.g., great-expectations>=1.7.0).
error Great Expectations validation failed but Airflow DAG continued to run.
cause Using `airflow-provider-great-expectations` version older than `1.0.0a5`. In these versions, validation failures might have been logged but did not always explicitly raise an Airflow exception to halt the DAG.
fix
Upgrade to airflow-provider-great-expectations version 1.0.0a5 or newer. These versions are designed to fail the Airflow task upon a Great Expectations validation failure.
error TypeError: 'NoneType' object is not callable (or similar errors related to `configure_dataframe`/`configure_expectations`)
cause The functions provided to `configure_dataframe` or `configure_expectations` parameters in operators like `GXValidateDataFrameOperator` are either not callable, or they return `None` or an unexpected type.
fix
Ensure that configure_dataframe returns a pandas.DataFrame or pyspark.sql.DataFrame, and configure_expectations returns a great_expectations.core.ExpectationSuite or great_expectations.expectations.Expectation. Verify the callable functions are correctly defined and return the expected objects.
breaking Version 1.0.0 (and its alpha releases) introduced new specialized operators (`GXValidateDataFrameOperator`, `GXValidateBatchOperator`, `GXValidateCheckpointOperator`) which replace the legacy `GreatExpectationsOperator`. Existing DAGs using `GreatExpectationsOperator` must be migrated.
fix Rewrite your DAGs to use the new `GXValidate*` operators, choosing the one most appropriate for your data context and validation needs. Consult the official migration guide.
breaking As of version 1.0.0a5, Great Expectations validation failures within the provider's operators will now explicitly raise an AirflowException, causing the DAG task to fail. Previous versions might have allowed the DAG to continue without halting.
fix Ensure your downstream tasks are prepared for potential failures. Review your DAG's error handling and retry mechanisms. This change makes validation failures more visible and prevents downstream tasks from processing bad data.
breaking Support for Python versions prior to 3.8 was dropped in version 0.3.0. Additionally, version 1.0.0+ requires Python 3.10+ (specifically `<3.14, >3.9` as per PyPI metadata).
fix Upgrade your Python environment to 3.10, 3.11, 3.12, or 3.13 to ensure compatibility with the latest provider versions.
gotcha Older versions of the `airflow-provider-great-expectations` (e.g., pre-0.2.9) were not compatible with `great_expectations` version 1.0.0 and above due to API changes (e.g., removal of `CheckpointResult`). The current provider `1.0.0` requires `great-expectations>=1.7.0`.
fix Always align your provider version with the recommended `great-expectations` version. For `airflow-provider-great-expectations==1.0.0`, ensure `great-expectations>=1.7.0` is installed. Upgrade both libraries if encountering `ModuleNotFoundError` related to GX components.
pip install "airflow-provider-great-expectations[snowflake]" # Example for Snowflake

This quickstart demonstrates how to use the `GXValidateDataFrameOperator` to validate a Pandas DataFrame in an Airflow DAG. The `configure_dataframe` parameter takes a callable that returns the DataFrame, and `configure_expectations` takes a callable that returns an `ExpectationSuite` (or a single `Expectation`) to apply the validation.

from __future__ import annotations

import pendulum

from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator

from great_expectations_provider.operators.validate_dataframe import GXValidateDataFrameOperator

import pandas as pd # Import pandas here as per best practice, not top-level if heavy
from great_expectations.core import ExpectationSuite, ExpectationConfiguration # For defining expectations


def _get_dataframe():
    # Simulate loading data into a Pandas DataFrame
    data = {
        'col_a': [1, 2, 3, 4, 5],
        'col_b': ['a', 'b', 'c', 'd', 'e']
    }
    return pd.DataFrame(data)


def _get_expectations_suite(context):
    # Define expectations. 'context' is the AbstractDataContext passed by the operator.
    suite = context.suites.add_or_update(ExpectationSuite(name='my_expectation_suite'))
    suite.add_expectation(ExpectationConfiguration(
        expectation_type='expect_column_to_exist',
        kwargs={'column': 'col_a'}
    ))
    suite.add_expectation(ExpectationConfiguration(
        expectation_type='expect_column_values_to_be_of_type',
        kwargs={'column': 'col_a', 'type': 'int64'}
    ))
    return suite


with DAG(
    dag_id="great_expectations_dataframe_validation_dag",
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=False,
    schedule=None,
    tags=["great_expectations", "data_quality"],
) as dag:
    validate_dataframe_task = GXValidateDataFrameOperator(
        task_id="validate_my_dataframe",
        configure_dataframe=_get_dataframe,
        configure_expectations=_get_expectations_suite,
    )

    # Example of a downstream task that would run if validation passes
    success_task = PythonOperator(
        task_id="data_quality_passed",
        python_callable=lambda: print("Data quality checks passed!"),
    )

    validate_dataframe_task >> success_task