SparkMeasure Python API
sparkMeasure is a Python API for the core Scala library, designed for performance troubleshooting of Apache Spark workloads. It simplifies the collection and analysis of Spark metrics, making it suitable for interactive analysis, testing, and production monitoring. The library focuses on easing metric collection and analysis for both developers and data engineers. Releases are frequent, typically on a quarterly to half-yearly cadence, with the current stable version being 0.27.0.
Warnings
- breaking The configuration properties for the Kafka sink have changed in version 0.27. Kafka Producer properties must now be passed via Spark config using the prefix `spark.sparkmeasure.kafka.<kafkaProperty>=<value>`.
- gotcha Spark Connect integration is partial. While Flight Recorder mode can capture application-wide metrics, per-client (Spark Connect client-side) metrics are not fully reported. Direct access to SparkContext and its listener interface is required for full integration.
- gotcha Metrics collected by sparkMeasure are buffered in the driver's memory. For very large workloads or extensive metric collection, this can become a bottleneck or lead to out-of-memory errors on the driver.
- gotcha Collecting metrics using `TaskMetrics` (at the granularity of each task completion) incurs additional overhead compared to `StageMetrics` (aggregated by stage). Use `TaskMetrics` only when fine-grained data (e.g., for skew analysis) is strictly necessary.
- gotcha sparkMeasure primarily collects metrics for *successfully* executed tasks. Resources consumed by failed tasks are generally not included in the reports.
Install
-
pip install sparkmeasure -
pyspark --packages ch.cern.sparkmeasure:spark-measure_2.13:0.27
Imports
- StageMetrics
from sparkmeasure import StageMetrics
- TaskMetrics
from sparkmeasure import TaskMetrics
Quickstart
from pyspark.sql import SparkSession
from sparkmeasure import StageMetrics
# Configure SparkSession to include the spark-measure JAR
spark = (SparkSession.builder
.appName("SparkMeasure Quickstart")
.master("local[*]")
.config("spark.jars.packages", "ch.cern.sparkmeasure:spark-measure_2.13:0.27")
.getOrCreate())
# Initialize StageMetrics
stagemetrics = StageMetrics(spark)
# Run and measure a Spark job
print("Running a simple Spark SQL query and measuring performance...")
stagemetrics.runandmeasure(globals(), 'spark.sql("SELECT count(*) FROM range(1000) CROSS JOIN range(1000)").show()')
print("\nPrinting performance report:")
stagemetrics.print_report()
spark.stop()