{"id":7813,"library":"typedspark","title":"Column-wise type annotations for pyspark DataFrames","description":"Typedspark provides column-wise type annotations for PySpark DataFrames, enhancing code readability, enabling static type-checking and linting, and improving auto-completion and refactoring capabilities. It helps define explicit schemas for Spark DataFrames, ensuring data integrity at a structural level. The library is currently at version 1.6.3 and maintains a regular release cadence, often driven by dependency updates.","status":"active","version":"1.6.3","language":"en","source_language":"en","source_url":"https://github.com/kaiko-ai/typedspark","tags":["pyspark","spark","typing","type-checking","data-quality","etl","schema-validation"],"install":[{"cmd":"pip install typedspark","lang":"bash","label":"Basic Installation"},{"cmd":"pip install \"typedspark[pyspark]\"","lang":"bash","label":"Installation with PySpark (if not pre-installed)"}],"dependencies":[{"reason":"Core dependency for PySpark DataFrame functionality; optional during installation for environments with pre-installed PySpark (e.g., Databricks, EMR).","package":"pyspark","optional":true}],"imports":[{"symbol":"Column","correct":"from typedspark import Column"},{"symbol":"DataSet","correct":"from typedspark import DataSet"},{"symbol":"Schema","correct":"from typedspark import Schema"},{"symbol":"LongType","correct":"from pyspark.sql.types import LongType"},{"symbol":"StringType","correct":"from pyspark.sql.types import StringType"}],"quickstart":{"code":"import pyspark.sql.functions as F\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql.types import LongType, StringType\nfrom typedspark import Column, DataSet, Schema\n\n# Initialize Spark Session (if not already present)\nspark = SparkSession.builder.appName(\"TypedSparkQuickstart\").getOrCreate()\n\nclass Person(Schema):\n    id: Column[LongType]\n    name: Column[StringType]\n    age: Column[LongType]\n\ndef process_person_data(df: DataSet[Person]) -> DataSet[Person]:\n    # Example transformation: add 1 to age\n    return df.withColumn(Person.age, F.col(Person.age) + 1)\n\n# Create a dummy DataFrame conforming to the Person schema\ndata = [\n    (1, \"Alice\", 30),\n    (2, \"Bob\", 24),\n    (3, \"Charlie\", 35)\n]\nschema_spark = Person.get_structtype()\ndf_untyped = spark.createDataFrame(data, schema=schema_spark)\n\n# Convert to a TypedSpark DataSet\ndf_typed = DataSet[Person](df_untyped)\n\n# Process the data using the typed function\ndf_processed = process_person_data(df_typed)\n\n# Show results\ndf_processed.show()\n\n# You can also generate an empty DataSet for testing\nempty_person_dataset = Person.create_empty_dataset(spark)\nempty_person_dataset.show()\n\nspark.stop()","lang":"python","description":"This quickstart demonstrates how to define a schema using `typedspark.Schema`, create a `DataSet` from a PySpark DataFrame, and apply transformations with type annotations. It also shows how to create an empty `DataSet` for testing purposes."},"warnings":[{"fix":"Refer to the `typedspark` documentation or GitHub README for the officially supported PySpark versions for your `typedspark` release. Pin your `pyspark` dependency accordingly.","message":"Typedspark's compatibility is tested with specific PySpark versions (e.g., 3.5.7 and 4.1.0 for v1.6.3). Using untested or significantly different PySpark versions may lead to unexpected behavior or incompatibilities.","severity":"gotcha","affected_versions":"<1.6.3 (for newer PySpark versions)"},{"fix":"Upgrade to `typedspark` version 1.6.2 or newer to ensure correct `Column` comparison behavior in threaded contexts. If upgrading is not possible, ensure Spark operations are not performed in parallel threads where `typedspark.Column` instances are compared.","message":"Prior to version 1.6.2, `Column` comparison in multi-threaded environments could lead to issues. This was fixed by explicitly using `SparkSession.active()` for thread-safe operations.","severity":"breaking","affected_versions":"<1.6.2"},{"fix":"Implement runtime validation mechanisms if strict schema enforcement is required at various pipeline stages. `typedspark` itself provides methods for validation, but these need to be explicitly invoked in your data pipeline after untyped operations or external data reads.","message":"While `typedspark` provides compile-time type-checking, runtime schema mismatches can still occur if the underlying PySpark DataFrame's schema changes unexpectedly after a `DataSet` is created (e.g., due to an external data source modification or an untyped transformation).","severity":"gotcha","affected_versions":"All"}],"env_vars":null,"last_verified":"2026-04-16T00:00:00.000Z","next_check":"2026-07-15T00:00:00.000Z","problems":[{"fix":"Review the DataFrame transformations preceding the error to ensure all required columns are present and correctly named according to your `typedspark.Schema`. Use `DataSet.validate()` immediately after loading data or after untyped operations to catch schema discrepancies early.","cause":"This Spark error typically indicates that a column expected by a `typedspark.Schema` or a subsequent operation is missing from the actual PySpark DataFrame at runtime. This can happen if an intermediate, untyped transformation drops a required column, or if the initial DataFrame doesn't conform to the `DataSet`'s schema.","error":"AnalysisException: Cannot resolve '`column_name`' given input columns: [`other_column`, ...]"},{"fix":"Optimize Spark configurations (e.g., `spark.executor.memory`, `spark.driver.memory`, `spark.memory.fraction`). Re-evaluate data processing logic for potential inefficiencies (e.g., excessive shuffling, `collect()` calls on large datasets, wide transformations). Consider repartitioning the DataFrame if data skew is an issue.","cause":"While not a direct `typedspark` error, this is a common PySpark issue often encountered when processing large DataFrames that exceed allocated executor memory. `typedspark` works with PySpark DataFrames, and if the data volume is high, OOM errors can occur due to inefficient Spark operations or insufficient resource allocation.","error":"java.lang.OutOfMemoryError: Java heap space"},{"fix":"Ensure all objects referenced within Spark transformations are serializable. This often means making custom classes implement `java.io.Serializable` (if working with Java/Scala) or ensuring Python closures only capture primitive types or serializable objects. Avoid complex objects in UDFs without careful consideration.","cause":"This error occurs in PySpark when objects (like custom classes, functions, or closures) passed to Spark transformations are not serializable. This is a general Spark problem, but can happen when `typedspark` users define complex logic within functions that are then used in Spark transformations without ensuring serializability.","error":"Py4JJavaError: An error occurred while calling o*.join. Caused by: org.apache.spark.SparkException: Task not serializable: java.io.NotSerializableException"}]}