Dagster Pandera Integration
dagster-pandera provides an integration layer to use Pandera for data validation within Dagster data pipelines. It allows defining Pandera schemas for Dagster assets, ensuring data quality and correctness before data is consumed by downstream assets. The current version is 0.29.0, released in sync with Dagster core, which has a frequent release cadence, often multiple patch releases per week.
Common errors
-
ModuleNotFoundError: No module named 'dagster_pandera'
cause The `dagster-pandera` package is not installed in the Python environment where Dagster is running.fixInstall the package: `pip install dagster-pandera` -
pandera.errors.SchemaError: Error while validating dataframe:
cause The DataFrame produced by the asset does not conform to the `pandera.DataFrameSchema` defined for `pandera_schema_asset`. This error message is followed by details about which column/check failed.fixInspect the detailed error message to identify which part of the DataFrame violates the schema. Adjust your data generation logic or refine your `pandera.DataFrameSchema` definition. -
TypeError: object of type 'NoneType' has no len()
cause This error often occurs within Pandera validation when the input to the schema is unexpectedly `None` instead of a DataFrame, or when a DataFrame operation inside a check receives `None`.fixEnsure that the function decorated with `pandera_schema_asset` always returns a `pandas.DataFrame` object, and not `None`, especially in error or edge cases. Add checks in your asset's code to handle potential `None` or empty data scenarios gracefully before returning.
Warnings
- breaking Version Mismatch with Dagster Core: `dagster-pandera`'s version is tightly coupled with `dagster` core's minor and patch versions. Upgrading `dagster` without also upgrading `dagster-pandera` (or vice-versa) can lead to unexpected behavior, `ImportError`s, or runtime crashes.
- gotcha Schema Strictness: By default, `pandera.DataFrameSchema` might not enforce that *only* the specified columns are present (i.e., it might allow extra columns). If you need to strictly enforce the column set, you must explicitly set `strict=True` in your schema definition.
- gotcha Performance with Large Datasets: Validating very large DataFrames (e.g., millions of rows) with complex Pandera schemas can be CPU and memory intensive, potentially increasing asset run times. Consider the impact on your pipeline performance.
Install
-
pip install dagster-pandera
Imports
- pandera_schema_asset
from dagster_pandera import pandera_schema_asset
- PanderaSchema
from dagster_pandera.schema import PanderaSchema
from dagster_pandera import PanderaSchema
Quickstart
import pandas as pd
import pandera as pa
from dagster import Definitions, asset, ScheduleDefinition, JobDefinition
from dagster_pandera import pandera_schema_asset
# 1. Define a Pandera DataFrameSchema
my_dataframe_schema = pa.DataFrameSchema(
columns={
"id": pa.Column(int, pa.Check.ge(0)),
"name": pa.Column(str, pa.Check.str_length(min_value=1)),
"value": pa.Column(float)
},
strict=True, # Ensure no extra columns are present
index=pa.Index(int, name="record_index")
)
# 2. Define a Dagster asset using the pandera_schema_asset decorator
@pandera_schema_asset(schema=my_dataframe_schema)
def validated_data() -> pd.DataFrame:
"""
An asset that produces a DataFrame and validates it against my_dataframe_schema.
If validation fails, the asset materialization will error.
"""
# Simulate data production
data = {
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"value": [10.1, 20.2, 30.3]
}
df = pd.DataFrame(data, index=[100, 101, 102])
return df
@asset
def downstream_asset(validated_data: pd.DataFrame):
"""
This asset consumes the validated data, guaranteed to conform to the schema.
"""
print(f"Downstream asset received validated data with {len(validated_data)} rows.")
# Further processing with the validated DataFrame
return validated_data['value'].sum()
# 3. Define the Dagster repository with assets and an example job
my_job = JobDefinition(name="my_validation_pipeline", assets=[validated_data, downstream_asset])
defs = Definitions(
jobs=[my_job],
schedules=[
ScheduleDefinition(
job=my_job,
cron_schedule="0 0 * * *", # Run daily at midnight
name="daily_validation_schedule"
)
]
)