{"id":9170,"library":"pbspark","title":"pbspark: Protobuf PySpark Conversion","description":"pbspark is a Python package providing functionality to convert between protobuf messages and PySpark DataFrames, leveraging PySpark UDFs for efficient data processing. It maps protobuf types to Spark SQL types and handles serialization/deserialization. The current version is 0.9.0, with minor releases occurring every few months, reflecting active maintenance.","status":"active","version":"0.9.0","language":"en","source_language":"en","source_url":"https://github.com/crflynn/pbspark","tags":["pyspark","protobuf","dataframes","conversion","etl"],"install":[{"cmd":"pip install pbspark","lang":"bash","label":"Install latest version"}],"dependencies":[{"reason":"Core functionality for DataFrame operations and UDFs.","package":"pyspark"},{"reason":"Required for defining and handling Protocol Buffer messages.","package":"protobuf"},{"reason":"Requires Python versions >=3.7,<4.0 for compatibility.","package":"python","optional":true}],"imports":[{"symbol":"from_protobuf","correct":"from pbspark import from_protobuf"},{"symbol":"to_protobuf","correct":"from pbspark import to_protobuf"},{"symbol":"MessageConverter","correct":"from pbspark import MessageConverter"},{"symbol":"SparkSession","correct":"from pyspark.sql.session import SparkSession"},{"note":"Used for converting DataFrame columns back to protobuf.","symbol":"struct","correct":"from pyspark.sql.functions import struct"},{"note":"Example import for a user-defined protobuf message. Path will vary based on user's proto compilation.","symbol":"SimpleMessage","correct":"from example.example_pb2 import SimpleMessage"}],"quickstart":{"code":"from pyspark.sql.session import SparkSession\nfrom pyspark.sql.functions import struct\nfrom pbspark import MessageConverter\n\n# Assume example_pb2 is generated from a .proto file like:\n# syntax = \"proto3\";\n# package example;\n# message SimpleMessage {\n#   string name = 1;\n#   int64 quantity = 2;\n#   float measure = 3;\n# }\n# from example.example_pb2 import SimpleMessage # This line assumes you have 'example' package with 'example_pb2.py'\n\n# Mock SimpleMessage for demonstration if not generated\nclass SimpleMessage:\n    def __init__(self, name, quantity, measure):\n        self.name = name\n        self.quantity = quantity\n        self.measure = measure\n    def SerializeToString(self):\n        # In a real scenario, this would serialize the protobuf message\n        # For mock, we'll return a simple string representation\n        return f\"name={self.name},quantity={self.quantity},measure={self.measure}\".encode('utf-8')\n\nspark = SparkSession.builder.appName(\"PbsparkQuickstart\").getOrCreate()\n\nexample_message = SimpleMessage(name=\"hello\", quantity=5, measure=12.3)\ndata = [{\n    \"value\": example_message.SerializeToString()\n}]\ndf_encoded = spark.createDataFrame(data)\n\nmc = MessageConverter()\n\n# Decode encoded protobuf messages to Spark StructType\ndf_decoded = df_encoded.select(mc.from_protobuf(df_encoded.value, SimpleMessage).alias(\"value\"))\ndf_decoded.printSchema()\ndf_decoded.show(truncate=False)\n\n# Expand the struct into individual columns\ndf_expanded = df_decoded.select(\"value.*\")\ndf_expanded.printSchema()\ndf_expanded.show(truncate=False)\n\n# Convert back to encoded protobuf message\ndf_re_encoded = df_expanded.select(\n    mc.to_protobuf(struct(df_expanded.name, df_expanded.quantity, df_expanded.measure), SimpleMessage).alias(\"value\")\n)\ndf_re_encoded.printSchema()\ndf_re_encoded.show(truncate=False)\n\nspark.stop()\n","lang":"python","description":"This quickstart demonstrates how to initialize a SparkSession, create a sample protobuf message, serialize it, and then use `pbspark.MessageConverter` to convert between an encoded protobuf column in a PySpark DataFrame and an expanded Spark StructType, and back again. It showcases the `from_protobuf` and `to_protobuf` methods for column-level conversions."},"warnings":[{"fix":"Consult the official Apache Spark migration guides for your specific PySpark version (e.g., Spark 4.0 dropped Python 3.8 support). Align all library versions (PySpark, Pandas, Numpy, PyArrow) accordingly.","message":"PySpark's core compatibility with Python, Pandas, Numpy, and PyArrow libraries changes frequently across Spark versions. `pbspark` relies heavily on PySpark's UDFs and DataFrame structures. Ensure your environment's PySpark version is compatible with the other Python libraries used, as incompatibility can lead to unexpected errors or silent data corruption.","severity":"breaking","affected_versions":"All versions of pbspark (due to underlying PySpark dependencies)"},{"fix":"For protobuf messages with self-referencing structures, you should create custom conversion functions that explicitly limit the structural depth during conversion to avoid recursion errors.","message":"When working with self-referencing or circular protobuf messages, `pbspark` can encounter `RecursionError` because Spark schemas do not inherently support arbitrary depth. This can lead to infinite recursion during schema inference.","severity":"gotcha","affected_versions":"All versions"},{"fix":"Ensure that the fully qualified module name in your `protoc`-generated Python file is identical to the module path from which the message type is imported. It is recommended to run `protoc` from the relative root path of your `.proto` files to ensure correct module paths.","message":"Incorrect module paths for protoc-generated Python files can lead to `PicklingError` when transmitting protobuf message types in a distributed Spark environment. The `pickle` module requires the class definition to be importable and live in the same module path as when the object was stored.","severity":"gotcha","affected_versions":"All versions"}],"env_vars":null,"last_verified":"2026-04-16T00:00:00.000Z","next_check":"2026-07-15T00:00:00.000Z","problems":[{"fix":"Verify that your `protoc` command is executed in a way that generates Python files with module paths consistent with how you import them. Ensure the `protoc`-generated `_pb2.py` files are in a package discoverable by Python, and the `from_protobuf`/`to_protobuf` calls use the correct import path for `YourMessage`.","cause":"The Python interpreter cannot find or correctly load the definition of your protobuf message class when Spark tries to serialize it for distribution across workers. This often happens if the `protoc`-generated Python file's module path doesn't match the import path.","error":"PicklingError: Can't pickle <class 'your_proto_module.YourMessage'>: attribute lookup on your_proto_module failed"},{"fix":"For protobuf messages with recursive definitions, implement custom conversion logic that explicitly limits the depth of schema inference or manually defines the Spark schema to break the recursion. Alternatively, consider flattening the protobuf structure if possible.","cause":"This error occurs when `pbspark` attempts to infer a Spark schema from a protobuf message that contains self-referencing or deeply nested circular structures, exceeding Python's default recursion limit.","error":"RecursionError: maximum recursion depth exceeded while calling a Python object"},{"fix":"Ensure that your protobuf messages consistently adhere to their schema, especially for nested structures. If a field is optional, handle its potential absence explicitly. You might need to pre-process data or use `MessageConverter` with custom serialization/deserialization logic if the default inference is too strict or encounters unexpected data patterns.","cause":"This can occur when `pbspark` tries to infer a schema or convert data where a nullable field in the protobuf message is expected to be a nested struct but is encountered as `None` or another incompatible type, and Spark's type inference or conversion logic struggles. This is particularly relevant when protobuf definitions change or data quality issues exist.","error":"TypeError: StructType can not accept a non-struct type None in a nullable field"}]}