pbspark: Protobuf PySpark Conversion
pbspark is a Python package providing functionality to convert between protobuf messages and PySpark DataFrames, leveraging PySpark UDFs for efficient data processing. It maps protobuf types to Spark SQL types and handles serialization/deserialization. The current version is 0.9.0, with minor releases occurring every few months, reflecting active maintenance.
Common errors
-
PicklingError: Can't pickle <class 'your_proto_module.YourMessage'>: attribute lookup on your_proto_module failed
cause The Python interpreter cannot find or correctly load the definition of your protobuf message class when Spark tries to serialize it for distribution across workers. This often happens if the `protoc`-generated Python file's module path doesn't match the import path.fixVerify that your `protoc` command is executed in a way that generates Python files with module paths consistent with how you import them. Ensure the `protoc`-generated `_pb2.py` files are in a package discoverable by Python, and the `from_protobuf`/`to_protobuf` calls use the correct import path for `YourMessage`. -
RecursionError: maximum recursion depth exceeded while calling a Python object
cause This error occurs when `pbspark` attempts to infer a Spark schema from a protobuf message that contains self-referencing or deeply nested circular structures, exceeding Python's default recursion limit.fixFor protobuf messages with recursive definitions, implement custom conversion logic that explicitly limits the depth of schema inference or manually defines the Spark schema to break the recursion. Alternatively, consider flattening the protobuf structure if possible. -
TypeError: StructType can not accept a non-struct type None in a nullable field
cause This can occur when `pbspark` tries to infer a schema or convert data where a nullable field in the protobuf message is expected to be a nested struct but is encountered as `None` or another incompatible type, and Spark's type inference or conversion logic struggles. This is particularly relevant when protobuf definitions change or data quality issues exist.fixEnsure that your protobuf messages consistently adhere to their schema, especially for nested structures. If a field is optional, handle its potential absence explicitly. You might need to pre-process data or use `MessageConverter` with custom serialization/deserialization logic if the default inference is too strict or encounters unexpected data patterns.
Warnings
- breaking PySpark's core compatibility with Python, Pandas, Numpy, and PyArrow libraries changes frequently across Spark versions. `pbspark` relies heavily on PySpark's UDFs and DataFrame structures. Ensure your environment's PySpark version is compatible with the other Python libraries used, as incompatibility can lead to unexpected errors or silent data corruption.
- gotcha When working with self-referencing or circular protobuf messages, `pbspark` can encounter `RecursionError` because Spark schemas do not inherently support arbitrary depth. This can lead to infinite recursion during schema inference.
- gotcha Incorrect module paths for protoc-generated Python files can lead to `PicklingError` when transmitting protobuf message types in a distributed Spark environment. The `pickle` module requires the class definition to be importable and live in the same module path as when the object was stored.
Install
-
pip install pbspark
Imports
- from_protobuf
from pbspark import from_protobuf
- to_protobuf
from pbspark import to_protobuf
- MessageConverter
from pbspark import MessageConverter
- SparkSession
from pyspark.sql.session import SparkSession
- struct
from pyspark.sql.functions import struct
- SimpleMessage
from example.example_pb2 import SimpleMessage
Quickstart
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import struct
from pbspark import MessageConverter
# Assume example_pb2 is generated from a .proto file like:
# syntax = "proto3";
# package example;
# message SimpleMessage {
# string name = 1;
# int64 quantity = 2;
# float measure = 3;
# }
# from example.example_pb2 import SimpleMessage # This line assumes you have 'example' package with 'example_pb2.py'
# Mock SimpleMessage for demonstration if not generated
class SimpleMessage:
def __init__(self, name, quantity, measure):
self.name = name
self.quantity = quantity
self.measure = measure
def SerializeToString(self):
# In a real scenario, this would serialize the protobuf message
# For mock, we'll return a simple string representation
return f"name={self.name},quantity={self.quantity},measure={self.measure}".encode('utf-8')
spark = SparkSession.builder.appName("PbsparkQuickstart").getOrCreate()
example_message = SimpleMessage(name="hello", quantity=5, measure=12.3)
data = [{
"value": example_message.SerializeToString()
}]
df_encoded = spark.createDataFrame(data)
mc = MessageConverter()
# Decode encoded protobuf messages to Spark StructType
df_decoded = df_encoded.select(mc.from_protobuf(df_encoded.value, SimpleMessage).alias("value"))
df_decoded.printSchema()
df_decoded.show(truncate=False)
# Expand the struct into individual columns
df_expanded = df_decoded.select("value.*")
df_expanded.printSchema()
df_expanded.show(truncate=False)
# Convert back to encoded protobuf message
df_re_encoded = df_expanded.select(
mc.to_protobuf(struct(df_expanded.name, df_expanded.quantity, df_expanded.measure), SimpleMessage).alias("value")
)
df_re_encoded.printSchema()
df_re_encoded.show(truncate=False)
spark.stop()