Apache Airflow Provider for Great Expectations

1.0.0 · active · verified Thu Apr 16

The `airflow-provider-great-expectations` package provides Apache Airflow operators for running Great Expectations (GX) data validations directly in your DAGs. It supports validating in-memory DataFrames, data from external sources using BatchDefinitions, or triggering actions with Checkpoints. The current version is 1.0.0, released in January 2026, and it typically receives new features and maintenance updates periodically.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to use the `GXValidateDataFrameOperator` to validate a Pandas DataFrame in an Airflow DAG. The `configure_dataframe` parameter takes a callable that returns the DataFrame, and `configure_expectations` takes a callable that returns an `ExpectationSuite` (or a single `Expectation`) to apply the validation.

from __future__ import annotations

import pendulum

from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator

from great_expectations_provider.operators.validate_dataframe import GXValidateDataFrameOperator

import pandas as pd # Import pandas here as per best practice, not top-level if heavy
from great_expectations.core import ExpectationSuite, ExpectationConfiguration # For defining expectations


def _get_dataframe():
    # Simulate loading data into a Pandas DataFrame
    data = {
        'col_a': [1, 2, 3, 4, 5],
        'col_b': ['a', 'b', 'c', 'd', 'e']
    }
    return pd.DataFrame(data)


def _get_expectations_suite(context):
    # Define expectations. 'context' is the AbstractDataContext passed by the operator.
    suite = context.suites.add_or_update(ExpectationSuite(name='my_expectation_suite'))
    suite.add_expectation(ExpectationConfiguration(
        expectation_type='expect_column_to_exist',
        kwargs={'column': 'col_a'}
    ))
    suite.add_expectation(ExpectationConfiguration(
        expectation_type='expect_column_values_to_be_of_type',
        kwargs={'column': 'col_a', 'type': 'int64'}
    ))
    return suite


with DAG(
    dag_id="great_expectations_dataframe_validation_dag",
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=False,
    schedule=None,
    tags=["great_expectations", "data_quality"],
) as dag:
    validate_dataframe_task = GXValidateDataFrameOperator(
        task_id="validate_my_dataframe",
        configure_dataframe=_get_dataframe,
        configure_expectations=_get_expectations_suite,
    )

    # Example of a downstream task that would run if validation passes
    success_task = PythonOperator(
        task_id="data_quality_passed",
        python_callable=lambda: print("Data quality checks passed!"),
    )

    validate_dataframe_task >> success_task

view raw JSON →