pbspark: Protobuf PySpark Conversion

0.9.0 · active · verified Thu Apr 16

pbspark is a Python package providing functionality to convert between protobuf messages and PySpark DataFrames, leveraging PySpark UDFs for efficient data processing. It maps protobuf types to Spark SQL types and handles serialization/deserialization. The current version is 0.9.0, with minor releases occurring every few months, reflecting active maintenance.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to initialize a SparkSession, create a sample protobuf message, serialize it, and then use `pbspark.MessageConverter` to convert between an encoded protobuf column in a PySpark DataFrame and an expanded Spark StructType, and back again. It showcases the `from_protobuf` and `to_protobuf` methods for column-level conversions.

from pyspark.sql.session import SparkSession
from pyspark.sql.functions import struct
from pbspark import MessageConverter

# Assume example_pb2 is generated from a .proto file like:
# syntax = "proto3";
# package example;
# message SimpleMessage {
#   string name = 1;
#   int64 quantity = 2;
#   float measure = 3;
# }
# from example.example_pb2 import SimpleMessage # This line assumes you have 'example' package with 'example_pb2.py'

# Mock SimpleMessage for demonstration if not generated
class SimpleMessage:
    def __init__(self, name, quantity, measure):
        self.name = name
        self.quantity = quantity
        self.measure = measure
    def SerializeToString(self):
        # In a real scenario, this would serialize the protobuf message
        # For mock, we'll return a simple string representation
        return f"name={self.name},quantity={self.quantity},measure={self.measure}".encode('utf-8')

spark = SparkSession.builder.appName("PbsparkQuickstart").getOrCreate()

example_message = SimpleMessage(name="hello", quantity=5, measure=12.3)
data = [{
    "value": example_message.SerializeToString()
}]
df_encoded = spark.createDataFrame(data)

mc = MessageConverter()

# Decode encoded protobuf messages to Spark StructType
df_decoded = df_encoded.select(mc.from_protobuf(df_encoded.value, SimpleMessage).alias("value"))
df_decoded.printSchema()
df_decoded.show(truncate=False)

# Expand the struct into individual columns
df_expanded = df_decoded.select("value.*")
df_expanded.printSchema()
df_expanded.show(truncate=False)

# Convert back to encoded protobuf message
df_re_encoded = df_expanded.select(
    mc.to_protobuf(struct(df_expanded.name, df_expanded.quantity, df_expanded.measure), SimpleMessage).alias("value")
)
df_re_encoded.printSchema()
df_re_encoded.show(truncate=False)

spark.stop()

view raw JSON →