Prophecy Python Libraries
Prophecy Python Libraries (`prophecy-libs`) provides helper functions and utilities for Python code generated by the Prophecy data engineering platform. It facilitates the execution, configuration, and integration of Prophecy-generated data pipelines with Apache Spark. The library is actively maintained with frequent releases, typically accompanying platform updates.
Common errors
-
ModuleNotFoundError: No module named 'pyspark'
cause `pyspark` is a core dependency but is not automatically installed with `prophecy-libs` due to its typical external management in Spark environments.fixInstall `pyspark` explicitly, ensuring the version is compatible: `pip install pyspark==3.3.0` (adjust version as per `prophecy-libs` requirements). -
TypeError: register_all_udfs() missing 1 required positional argument: 'spark'
cause Methods that interact with Spark, such as UDF registration, require an active `SparkSession` object as an argument.fixEnsure a `SparkSession` is initialized and passed to the method: `spark = SparkSession.builder.appName(...).getOrCreate(); UDFs.register_all_udfs(spark)`. -
AttributeError: 'ProphecyConfiguration' object has no attribute 'get_config' (or similar config access issues)
cause Misunderstanding how configurations are accessed or initialized. `ConfigStore` is typically populated by the Prophecy runtime. Attempting to access non-existent attributes or before initialization will fail.fixIn generated code, configurations are usually accessed via `ConfigStore.get_config().my_setting`. For local testing, ensure `ConfigStore.init(...)` has been called or mock the configuration object. -
py4j.protocol.Py4JJavaError: An error occurred while calling o0.parquet (or other data source errors like S3, GCS, ADLS)
cause The Spark environment is not correctly configured for the specific data source, or there are missing connectors (JARs) or insufficient permissions.fixEnsure your Spark session includes necessary JARs (e.g., `spark-hadoop-cloud` for cloud storage) and has appropriate credentials/permissions to access the data source. For local PySpark, configure `spark-submit` with `--packages`.
Warnings
- gotcha Prophecy-libs is primarily a helper library for code generated by the Prophecy data engineering platform. While usable standalone, its full context and intended behavior are realized within a Prophecy-generated project, where configurations and Spark sessions are often managed automatically by the platform.
- breaking Strict dependency on PySpark versions. Prophecy pipelines are built on Spark, and the library has specific PySpark version compatibility requirements (e.g., `pyspark>=3.3.0,<4.0.0` for v2.x.x). Using an incompatible PySpark version will lead to runtime errors.
- gotcha Configuration values (via `ConfigStore`) are typically injected at runtime by the Prophecy platform, especially when deploying to environments like Databricks. Manually setting configurations using `ConfigStore.init()` in local tests might be overwritten or behave differently in deployed pipelines.
Install
-
pip install prophecy-libs
Imports
- ConfigStore
from prophecy.config import ConfigStore
- UDFs
from prophecy.udf import UserDefinedFunctions
from prophecy.udf import UDFs
- ProphecyApp
from prophecy.main import ProphecyApp
Quickstart
import os
from pyspark.sql import SparkSession
from prophecy.udf import UDFs
# This quickstart demonstrates how to initialize a SparkSession
# and register Prophecy's User-Defined Functions (UDFs).
# In a real Prophecy pipeline, this setup is usually handled automatically
# by the generated pipeline entry point.
# Ensure PySpark is installed and available in your environment.
# E.g., `pip install pyspark==3.3.0` (or appropriate version based on prophecy-libs requirements)
def run_quickstart():
# Attempt to use a temporary directory for Spark warehouse for local testing
warehouse_dir = os.path.join(os.getcwd(), "spark-warehouse")
if not os.path.exists(warehouse_dir):
os.makedirs(warehouse_dir)
spark = SparkSession.builder \
.appName("ProphecyLibQuickstart") \
.config("spark.sql.warehouse.dir", warehouse_dir) \
.master("local[*]") \
.getOrCreate()
try:
print("SparkSession initialized.")
# Register Prophecy UDFs
UDFs.register_all_udfs(spark)
print("Prophecy UDFs registered successfully.")
# Example: Using a simple UDF (assuming 'concat' or similar exists after registration)
# Note: Actual UDFs depend on the Prophecy project's definitions.
# This is a placeholder to show usage.
df = spark.createDataFrame([("hello", "world")], ["col1", "col2"])
try:
df.createOrReplaceTempView("my_table")
result = spark.sql("SELECT concat(col1, ' ', col2) as greeting FROM my_table")
print("\nExample UDF usage (if 'concat' is available via UDFs):")
result.show()
except Exception as e:
print(f"Could not demonstrate UDF usage (e.g., concat): {e}")
except Exception as e:
print(f"An error occurred during quickstart: {e}")
finally:
spark.stop()
print("SparkSession stopped.")
if __name__ == "__main__":
run_quickstart()