Dagster Pandera Integration

0.29.0 · active · verified Thu Apr 16

dagster-pandera provides an integration layer to use Pandera for data validation within Dagster data pipelines. It allows defining Pandera schemas for Dagster assets, ensuring data quality and correctness before data is consumed by downstream assets. The current version is 0.29.0, released in sync with Dagster core, which has a frequent release cadence, often multiple patch releases per week.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart defines a `pandera.DataFrameSchema` and then uses the `pandera_schema_asset` decorator to apply this schema to a Dagster asset. If the DataFrame returned by `validated_data` does not conform to `my_dataframe_schema`, the asset materialization will fail, preventing invalid data from proceeding to `downstream_asset`. The example also shows how to define a simple Dagster job and schedule.

import pandas as pd
import pandera as pa
from dagster import Definitions, asset, ScheduleDefinition, JobDefinition
from dagster_pandera import pandera_schema_asset

# 1. Define a Pandera DataFrameSchema
my_dataframe_schema = pa.DataFrameSchema(
    columns={
        "id": pa.Column(int, pa.Check.ge(0)),
        "name": pa.Column(str, pa.Check.str_length(min_value=1)),
        "value": pa.Column(float)
    },
    strict=True, # Ensure no extra columns are present
    index=pa.Index(int, name="record_index")
)

# 2. Define a Dagster asset using the pandera_schema_asset decorator
@pandera_schema_asset(schema=my_dataframe_schema)
def validated_data() -> pd.DataFrame:
    """
    An asset that produces a DataFrame and validates it against my_dataframe_schema.
    If validation fails, the asset materialization will error.
    """
    # Simulate data production
    data = {
        "id": [1, 2, 3],
        "name": ["Alice", "Bob", "Charlie"],
        "value": [10.1, 20.2, 30.3]
    }
    df = pd.DataFrame(data, index=[100, 101, 102])
    return df

@asset
def downstream_asset(validated_data: pd.DataFrame):
    """
    This asset consumes the validated data, guaranteed to conform to the schema.
    """
    print(f"Downstream asset received validated data with {len(validated_data)} rows.")
    # Further processing with the validated DataFrame
    return validated_data['value'].sum()

# 3. Define the Dagster repository with assets and an example job
my_job = JobDefinition(name="my_validation_pipeline", assets=[validated_data, downstream_asset])

defs = Definitions(
    jobs=[my_job],
    schedules=[
        ScheduleDefinition(
            job=my_job,
            cron_schedule="0 0 * * *", # Run daily at midnight
            name="daily_validation_schedule"
        )
    ]
)

view raw JSON →