Column-wise type annotations for pyspark DataFrames
Typedspark provides column-wise type annotations for PySpark DataFrames, enhancing code readability, enabling static type-checking and linting, and improving auto-completion and refactoring capabilities. It helps define explicit schemas for Spark DataFrames, ensuring data integrity at a structural level. The library is currently at version 1.6.3 and maintains a regular release cadence, often driven by dependency updates.
Common errors
-
AnalysisException: Cannot resolve '`column_name`' given input columns: [`other_column`, ...]
cause This Spark error typically indicates that a column expected by a `typedspark.Schema` or a subsequent operation is missing from the actual PySpark DataFrame at runtime. This can happen if an intermediate, untyped transformation drops a required column, or if the initial DataFrame doesn't conform to the `DataSet`'s schema.fixReview the DataFrame transformations preceding the error to ensure all required columns are present and correctly named according to your `typedspark.Schema`. Use `DataSet.validate()` immediately after loading data or after untyped operations to catch schema discrepancies early. -
java.lang.OutOfMemoryError: Java heap space
cause While not a direct `typedspark` error, this is a common PySpark issue often encountered when processing large DataFrames that exceed allocated executor memory. `typedspark` works with PySpark DataFrames, and if the data volume is high, OOM errors can occur due to inefficient Spark operations or insufficient resource allocation.fixOptimize Spark configurations (e.g., `spark.executor.memory`, `spark.driver.memory`, `spark.memory.fraction`). Re-evaluate data processing logic for potential inefficiencies (e.g., excessive shuffling, `collect()` calls on large datasets, wide transformations). Consider repartitioning the DataFrame if data skew is an issue. -
Py4JJavaError: An error occurred while calling o*.join. Caused by: org.apache.spark.SparkException: Task not serializable: java.io.NotSerializableException
cause This error occurs in PySpark when objects (like custom classes, functions, or closures) passed to Spark transformations are not serializable. This is a general Spark problem, but can happen when `typedspark` users define complex logic within functions that are then used in Spark transformations without ensuring serializability.fixEnsure all objects referenced within Spark transformations are serializable. This often means making custom classes implement `java.io.Serializable` (if working with Java/Scala) or ensuring Python closures only capture primitive types or serializable objects. Avoid complex objects in UDFs without careful consideration.
Warnings
- gotcha Typedspark's compatibility is tested with specific PySpark versions (e.g., 3.5.7 and 4.1.0 for v1.6.3). Using untested or significantly different PySpark versions may lead to unexpected behavior or incompatibilities.
- breaking Prior to version 1.6.2, `Column` comparison in multi-threaded environments could lead to issues. This was fixed by explicitly using `SparkSession.active()` for thread-safe operations.
- gotcha While `typedspark` provides compile-time type-checking, runtime schema mismatches can still occur if the underlying PySpark DataFrame's schema changes unexpectedly after a `DataSet` is created (e.g., due to an external data source modification or an untyped transformation).
Install
-
pip install typedspark -
pip install "typedspark[pyspark]"
Imports
- Column
from typedspark import Column
- DataSet
from typedspark import DataSet
- Schema
from typedspark import Schema
- LongType
from pyspark.sql.types import LongType
- StringType
from pyspark.sql.types import StringType
Quickstart
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import LongType, StringType
from typedspark import Column, DataSet, Schema
# Initialize Spark Session (if not already present)
spark = SparkSession.builder.appName("TypedSparkQuickstart").getOrCreate()
class Person(Schema):
id: Column[LongType]
name: Column[StringType]
age: Column[LongType]
def process_person_data(df: DataSet[Person]) -> DataSet[Person]:
# Example transformation: add 1 to age
return df.withColumn(Person.age, F.col(Person.age) + 1)
# Create a dummy DataFrame conforming to the Person schema
data = [
(1, "Alice", 30),
(2, "Bob", 24),
(3, "Charlie", 35)
]
schema_spark = Person.get_structtype()
df_untyped = spark.createDataFrame(data, schema=schema_spark)
# Convert to a TypedSpark DataSet
df_typed = DataSet[Person](df_untyped)
# Process the data using the typed function
df_processed = process_person_data(df_typed)
# Show results
df_processed.show()
# You can also generate an empty DataSet for testing
empty_person_dataset = Person.create_empty_dataset(spark)
empty_person_dataset.show()
spark.stop()