repartipy: PySpark DataFrame Partition Size Helper
repartipy is a Python library designed to assist with managing PySpark DataFrame partition sizes. It provides a function to repartition a DataFrame based on a target partition size in megabytes, aiming to optimize storage and processing efficiency. As of version 0.1.8, it's a relatively stable and focused utility, with updates likely driven by PySpark compatibility or feature requests rather than a fixed cadence.
Common errors
-
ModuleNotFoundError: No module named 'pyspark'
cause The PySpark library, a core dependency for repartipy, is not installed in your Python environment or not accessible.fixInstall PySpark using pip: `pip install pyspark` or ensure your environment has PySpark configured (e.g., in a Databricks/EMR cluster). -
Exception: No active SparkSession found. Please provide a SparkSession instance or ensure one is active.
cause The `repartition_by_size` function could not find an active SparkSession. This can happen if you haven't created one, or if it has been stopped.fixEither pass an active `SparkSession` instance directly to the function using the `spark` argument (e.g., `repartition_by_size(df, ..., spark=my_spark_session)`) or ensure `SparkSession.builder.getOrCreate()` has been called before invoking the function. -
TypeError: repartition_by_size() missing 1 required positional argument: 'df'
cause The `repartition_by_size` function was called without the required DataFrame argument or with incorrect arguments.fixEnsure you are passing a valid PySpark DataFrame as the first argument, e.g., `repartition_by_size(my_dataframe, target_size_mb=10)`.
Warnings
- gotcha `repartition_by_size` returns a new DataFrame. Always assign the result to a new variable or overwrite the original DataFrame, as the operation does not modify the DataFrame in-place.
- gotcha Spark's repartitioning, especially when aiming for a specific size, involves shuffling data across the cluster, which can be a resource-intensive operation. Use `repartition_by_size` judiciously, especially for very large DataFrames or frequent operations, to avoid performance bottlenecks.
- gotcha The `target_partition_size_mb` is an *aim*, not a guarantee. Actual partition sizes can vary based on data skew, compression, and the underlying data storage mechanism. Do not rely on exact partition sizes.
Install
-
pip install repartipy
Imports
- repartition_by_size
from repartipy import repartition_by_size
Quickstart
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from repartipy import repartition_by_size
# Create a SparkSession
spark = SparkSession.builder \
.appName("repartipy_quickstart") \
.master("local[*]") \
.config("spark.ui.enabled", "false") \
.getOrCreate()
try:
# Create a dummy DataFrame with approx 100MB of data for demonstration
# (adjust range and string size to control actual data size)
data = [(i, f"value_{i}") for i in range(100000)]
df = spark.createDataFrame(data, ["id", "value"])
# Add a large column to increase row size for a more realistic scenario
df = df.withColumn("large_string", lit("x" * 500))
print(f"Initial DataFrame row count: {df.count()}")
print(f"Initial partition count: {df.rdd.getNumPartitions()}")
# Repartition the DataFrame to aim for 10MB partitions
target_size_mb = 10
repartitioned_df = repartition_by_size(df, target_size_mb, spark=spark)
print(f"Repartitioned DataFrame row count: {repartitioned_df.count()}")
print(f"New partition count (target ~{target_size_mb}MB per partition): {repartitioned_df.rdd.getNumPartitions()}")
# Perform an action to trigger the repartitioning and check the result
# e.g., repartitioned_df.write.mode("overwrite").parquet("/tmp/repartipy_output")
finally:
spark.stop()