{"library":"pbspark","title":"pbspark: Protobuf PySpark Conversion","description":"pbspark is a Python package providing functionality to convert between protobuf messages and PySpark DataFrames, leveraging PySpark UDFs for efficient data processing. It maps protobuf types to Spark SQL types and handles serialization/deserialization. The current version is 0.9.0, with minor releases occurring every few months, reflecting active maintenance.","language":"python","status":"active","last_verified":"Thu Apr 16","install":{"commands":["pip install pbspark"],"cli":null},"imports":["from pbspark import from_protobuf","from pbspark import to_protobuf","from pbspark import MessageConverter","from pyspark.sql.session import SparkSession","from pyspark.sql.functions import struct","from example.example_pb2 import SimpleMessage"],"auth":{"required":false,"env_vars":[]},"quickstart":{"code":"from pyspark.sql.session import SparkSession\nfrom pyspark.sql.functions import struct\nfrom pbspark import MessageConverter\n\n# Assume example_pb2 is generated from a .proto file like:\n# syntax = \"proto3\";\n# package example;\n# message SimpleMessage {\n#   string name = 1;\n#   int64 quantity = 2;\n#   float measure = 3;\n# }\n# from example.example_pb2 import SimpleMessage # This line assumes you have 'example' package with 'example_pb2.py'\n\n# Mock SimpleMessage for demonstration if not generated\nclass SimpleMessage:\n    def __init__(self, name, quantity, measure):\n        self.name = name\n        self.quantity = quantity\n        self.measure = measure\n    def SerializeToString(self):\n        # In a real scenario, this would serialize the protobuf message\n        # For mock, we'll return a simple string representation\n        return f\"name={self.name},quantity={self.quantity},measure={self.measure}\".encode('utf-8')\n\nspark = SparkSession.builder.appName(\"PbsparkQuickstart\").getOrCreate()\n\nexample_message = SimpleMessage(name=\"hello\", quantity=5, measure=12.3)\ndata = [{\n    \"value\": example_message.SerializeToString()\n}]\ndf_encoded = spark.createDataFrame(data)\n\nmc = MessageConverter()\n\n# Decode encoded protobuf messages to Spark StructType\ndf_decoded = df_encoded.select(mc.from_protobuf(df_encoded.value, SimpleMessage).alias(\"value\"))\ndf_decoded.printSchema()\ndf_decoded.show(truncate=False)\n\n# Expand the struct into individual columns\ndf_expanded = df_decoded.select(\"value.*\")\ndf_expanded.printSchema()\ndf_expanded.show(truncate=False)\n\n# Convert back to encoded protobuf message\ndf_re_encoded = df_expanded.select(\n    mc.to_protobuf(struct(df_expanded.name, df_expanded.quantity, df_expanded.measure), SimpleMessage).alias(\"value\")\n)\ndf_re_encoded.printSchema()\ndf_re_encoded.show(truncate=False)\n\nspark.stop()\n","lang":"python","description":"This quickstart demonstrates how to initialize a SparkSession, create a sample protobuf message, serialize it, and then use `pbspark.MessageConverter` to convert between an encoded protobuf column in a PySpark DataFrame and an expanded Spark StructType, and back again. It showcases the `from_protobuf` and `to_protobuf` methods for column-level conversions.","tag":null,"tag_description":null,"last_tested":null,"results":[]},"compatibility":null}