Tempo - Timeseries Manipulation for Spark
Tempo is a Python library that builds upon PySpark to provide scalable abstractions and functions for timeseries data manipulation on Spark. It simplifies common operations like resampling, interpolation, and as-of joins for large-scale time series datasets. The project is actively maintained as part of Databricks Labs, with frequent patch releases, currently at version 0.1.30.
Warnings
- gotcha The `asofJoin()` optimization logic for small tables was updated in v0.1.24 to bypass certain checks when used with Delta Live Tables (DLT). This change might affect performance or specific behavior if your DLT pipelines relied on the previous optimization strategy, particularly for join conditions involving table sizes.
- breaking The behavior of `TSDF.extractStateInterval()` was modified in v0.1.20 to perform state comparison per metric column, rather than across all metric columns combined. This changes the output and how intervals are extracted based on state changes.
- gotcha As a 'Databricks Labs' project and currently in `0.1.x` versions, `dbl-tempo` APIs may not be fully stable. Minor version updates can introduce breaking changes or significant behavioral shifts without strict adherence to semantic versioning until a `1.0` release.
Install
-
pip install dbl-tempo
Imports
- TempoSparkSession
from tempo.spark import TempoSparkSession
- TSDF
from tempo.tsdf import TSDF
- SparkSession
from pyspark.sql import SparkSession
Quickstart
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, lit
from tempo.spark import TempoSparkSession
from tempo.tsdf import TSDF
# Configure Spark for local mode. In a Databricks environment,
# SparkSession is usually pre-configured and available as 'spark'.
spark = None
try:
spark = TempoSparkSession.builder \
.appName("TempoQuickstart") \
.master("local[*]") \
.config("spark.sql.shuffle.partitions", "2") \
.getOrCreate()
print(f"SparkSession created: {spark.sparkContext.appName}")
# 1. Create a sample PySpark DataFrame with timestamp and ID columns
data = [
("sensor_A", "2023-01-01 00:00:00", 10.0),
("sensor_A", "2023-01-01 00:01:00", 11.0),
("sensor_A", "2023-01-01 00:02:00", 12.0),
("sensor_B", "2023-01-01 00:00:00", 20.0),
("sensor_B", "2023-01-01 00:01:00", 21.0),
]
schema = ["device_id", "timestamp_str", "value"]
df = spark.createDataFrame(data, schema=schema).withColumn(
"timestamp", to_timestamp("timestamp_str")
).drop("timestamp_str")
# 2. Convert to Tempo TSDF
tsdf = TSDF(df, ts_col="timestamp", id_cols=["device_id"])
print("\nOriginal TSDF head:")
tsdf.df.show()
# 3. Perform a basic Tempo operation: Resample to 5-minute intervals
# and aggregate by taking the average value
resampled_tsdf = tsdf.resample("5 minutes", agg_f="mean")
print("\nResampled TSDF (5-min intervals, mean agg) head:")
resampled_tsdf.df.show()
# 4. Another operation: Fill missing values with forward fill
# Ensure the resampled TSDF has gaps before filling for demonstration
# (e.g., if there were no 'sensor_A' data for a 5-min interval)
filled_tsdf = resampled_tsdf.ffill(group_cols=["device_id"])
print("\nFilled TSDF (forward fill) head:")
filled_tsdf.df.show()
except Exception as e:
print(f"An error occurred during Tempo quickstart: {e}")
finally:
if spark:
spark.stop()
print("SparkSession stopped.")