repartipy: PySpark DataFrame Partition Size Helper

0.1.8 · active · verified Fri Apr 17

repartipy is a Python library designed to assist with managing PySpark DataFrame partition sizes. It provides a function to repartition a DataFrame based on a target partition size in megabytes, aiming to optimize storage and processing efficiency. As of version 0.1.8, it's a relatively stable and focused utility, with updates likely driven by PySpark compatibility or feature requests rather than a fixed cadence.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to initialize a SparkSession, create a sample DataFrame, and then use `repartition_by_size` to optimize its partitions. The example aims for 10MB partitions, showing the initial and resulting partition counts. Remember that repartitioning creates a new DataFrame and requires an action (like writing data or collecting) to trigger actual computation.

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from repartipy import repartition_by_size

# Create a SparkSession
spark = SparkSession.builder \
    .appName("repartipy_quickstart") \
    .master("local[*]") \
    .config("spark.ui.enabled", "false") \
    .getOrCreate()

try:
    # Create a dummy DataFrame with approx 100MB of data for demonstration
    # (adjust range and string size to control actual data size)
    data = [(i, f"value_{i}") for i in range(100000)]
    df = spark.createDataFrame(data, ["id", "value"])
    # Add a large column to increase row size for a more realistic scenario
    df = df.withColumn("large_string", lit("x" * 500))

    print(f"Initial DataFrame row count: {df.count()}")
    print(f"Initial partition count: {df.rdd.getNumPartitions()}")

    # Repartition the DataFrame to aim for 10MB partitions
    target_size_mb = 10
    repartitioned_df = repartition_by_size(df, target_size_mb, spark=spark)

    print(f"Repartitioned DataFrame row count: {repartitioned_df.count()}")
    print(f"New partition count (target ~{target_size_mb}MB per partition): {repartitioned_df.rdd.getNumPartitions()}")

    # Perform an action to trigger the repartitioning and check the result
    # e.g., repartitioned_df.write.mode("overwrite").parquet("/tmp/repartipy_output")

finally:
    spark.stop()

view raw JSON →