Tempo - Timeseries Manipulation for Spark

0.1.30 · active · verified Fri Apr 10

Tempo is a Python library that builds upon PySpark to provide scalable abstractions and functions for timeseries data manipulation on Spark. It simplifies common operations like resampling, interpolation, and as-of joins for large-scale time series datasets. The project is actively maintained as part of Databricks Labs, with frequent patch releases, currently at version 0.1.30.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to initialize a TempoSparkSession, create a sample Spark DataFrame, convert it into a Tempo TSDF, and perform common timeseries operations like resampling and forward-filling missing values. It's designed to run in a local PySpark environment or within a Databricks notebook.

from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, lit
from tempo.spark import TempoSparkSession
from tempo.tsdf import TSDF

# Configure Spark for local mode. In a Databricks environment,
# SparkSession is usually pre-configured and available as 'spark'.
spark = None
try:
    spark = TempoSparkSession.builder \
        .appName("TempoQuickstart") \
        .master("local[*]") \
        .config("spark.sql.shuffle.partitions", "2") \
        .getOrCreate()

    print(f"SparkSession created: {spark.sparkContext.appName}")

    # 1. Create a sample PySpark DataFrame with timestamp and ID columns
    data = [
        ("sensor_A", "2023-01-01 00:00:00", 10.0),
        ("sensor_A", "2023-01-01 00:01:00", 11.0),
        ("sensor_A", "2023-01-01 00:02:00", 12.0),
        ("sensor_B", "2023-01-01 00:00:00", 20.0),
        ("sensor_B", "2023-01-01 00:01:00", 21.0),
    ]
    schema = ["device_id", "timestamp_str", "value"]
    df = spark.createDataFrame(data, schema=schema).withColumn(
        "timestamp", to_timestamp("timestamp_str")
    ).drop("timestamp_str")

    # 2. Convert to Tempo TSDF
    tsdf = TSDF(df, ts_col="timestamp", id_cols=["device_id"])
    print("\nOriginal TSDF head:")
    tsdf.df.show()

    # 3. Perform a basic Tempo operation: Resample to 5-minute intervals
    # and aggregate by taking the average value
    resampled_tsdf = tsdf.resample("5 minutes", agg_f="mean")
    print("\nResampled TSDF (5-min intervals, mean agg) head:")
    resampled_tsdf.df.show()

    # 4. Another operation: Fill missing values with forward fill
    # Ensure the resampled TSDF has gaps before filling for demonstration
    # (e.g., if there were no 'sensor_A' data for a 5-min interval)
    filled_tsdf = resampled_tsdf.ffill(group_cols=["device_id"])
    print("\nFilled TSDF (forward fill) head:")
    filled_tsdf.df.show()

except Exception as e:
    print(f"An error occurred during Tempo quickstart: {e}")
finally:
    if spark:
        spark.stop()
        print("SparkSession stopped.")

view raw JSON →