Sparkdantic
Sparkdantic is a Python library that bridges Pydantic models with PySpark schemas. It allows developers to define data structures using Pydantic, then automatically generate equivalent `pyspark.sql.types.StructType` schemas for use in Spark DataFrames. This simplifies data validation and schema management across Python applications and Spark environments. The current version is 2.8.0, and it maintains an active release cadence, frequently updating for Pydantic and PySpark compatibility.
Warnings
- breaking Older versions of sparkdantic (< 2.0.0) do not support Pydantic V2. If you are upgrading your Pydantic dependency to V2, ensure you also upgrade sparkdantic to version 2.0.0 or higher.
- gotcha Pydantic `Enum` types are automatically mapped to `StringType` in Spark schemas. If you require more specific type handling or validation for enums within Spark, you might need to implement custom logic post-schema generation.
- gotcha Pydantic `uuid.UUID` types are automatically converted to `StringType` in the generated Spark schema. Spark does not have a native UUID type, so string representation is the default and most common approach.
Install
-
pip install sparkdantic pyspark
Imports
- create_spark_schema
from sparkdantic import create_spark_schema
- BaseModel
from pydantic import BaseModel
Quickstart
from typing import Optional, List
from pydantic import BaseModel
from sparkdantic import create_spark_schema
from pyspark.sql import SparkSession
# Define your Pydantic model
class Product(BaseModel):
product_id: int
name: str
price: float
description: Optional[str] = None
tags: List[str] = []
# Generate the Spark schema from the Pydantic model
spark_schema = create_spark_schema(Product)
# Print the generated Spark schema (useful for verification)
print("Generated Spark Schema:")
print(spark_schema)
# Optionally, use it with a Spark DataFrame
spark = SparkSession.builder.appName("SparkdanticExample").getOrCreate()
# Create an empty DataFrame with the defined schema
df = spark.createDataFrame([], schema=spark_schema)
print("\nDataFrame created with generated schema:")
df.printSchema()
spark.stop()