Column-wise type annotations for pyspark DataFrames

1.6.3 · active · verified Thu Apr 16

Typedspark provides column-wise type annotations for PySpark DataFrames, enhancing code readability, enabling static type-checking and linting, and improving auto-completion and refactoring capabilities. It helps define explicit schemas for Spark DataFrames, ensuring data integrity at a structural level. The library is currently at version 1.6.3 and maintains a regular release cadence, often driven by dependency updates.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to define a schema using `typedspark.Schema`, create a `DataSet` from a PySpark DataFrame, and apply transformations with type annotations. It also shows how to create an empty `DataSet` for testing purposes.

import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import LongType, StringType
from typedspark import Column, DataSet, Schema

# Initialize Spark Session (if not already present)
spark = SparkSession.builder.appName("TypedSparkQuickstart").getOrCreate()

class Person(Schema):
    id: Column[LongType]
    name: Column[StringType]
    age: Column[LongType]

def process_person_data(df: DataSet[Person]) -> DataSet[Person]:
    # Example transformation: add 1 to age
    return df.withColumn(Person.age, F.col(Person.age) + 1)

# Create a dummy DataFrame conforming to the Person schema
data = [
    (1, "Alice", 30),
    (2, "Bob", 24),
    (3, "Charlie", 35)
]
schema_spark = Person.get_structtype()
df_untyped = spark.createDataFrame(data, schema=schema_spark)

# Convert to a TypedSpark DataSet
df_typed = DataSet[Person](df_untyped)

# Process the data using the typed function
df_processed = process_person_data(df_typed)

# Show results
df_processed.show()

# You can also generate an empty DataSet for testing
empty_person_dataset = Person.create_empty_dataset(spark)
empty_person_dataset.show()

spark.stop()

view raw JSON →