Prophecy Python Libraries

2.1.17 · active · verified Fri Apr 17

Prophecy Python Libraries (`prophecy-libs`) provides helper functions and utilities for Python code generated by the Prophecy data engineering platform. It facilitates the execution, configuration, and integration of Prophecy-generated data pipelines with Apache Spark. The library is actively maintained with frequent releases, typically accompanying platform updates.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates the essential setup of a SparkSession and how to register Prophecy's User-Defined Functions (UDFs). It highlights the core interaction pattern, though typical usage is within code generated and orchestrated by the Prophecy platform.

import os
from pyspark.sql import SparkSession
from prophecy.udf import UDFs

# This quickstart demonstrates how to initialize a SparkSession
# and register Prophecy's User-Defined Functions (UDFs).
# In a real Prophecy pipeline, this setup is usually handled automatically
# by the generated pipeline entry point.

# Ensure PySpark is installed and available in your environment.
# E.g., `pip install pyspark==3.3.0` (or appropriate version based on prophecy-libs requirements)

def run_quickstart():
    # Attempt to use a temporary directory for Spark warehouse for local testing
    warehouse_dir = os.path.join(os.getcwd(), "spark-warehouse")
    if not os.path.exists(warehouse_dir):
        os.makedirs(warehouse_dir)

    spark = SparkSession.builder \
        .appName("ProphecyLibQuickstart") \
        .config("spark.sql.warehouse.dir", warehouse_dir) \
        .master("local[*]") \
        .getOrCreate()

    try:
        print("SparkSession initialized.")

        # Register Prophecy UDFs
        UDFs.register_all_udfs(spark)
        print("Prophecy UDFs registered successfully.")

        # Example: Using a simple UDF (assuming 'concat' or similar exists after registration)
        # Note: Actual UDFs depend on the Prophecy project's definitions.
        # This is a placeholder to show usage.
        df = spark.createDataFrame([("hello", "world")], ["col1", "col2"])
        try:
            df.createOrReplaceTempView("my_table")
            result = spark.sql("SELECT concat(col1, ' ', col2) as greeting FROM my_table")
            print("\nExample UDF usage (if 'concat' is available via UDFs):")
            result.show()
        except Exception as e:
            print(f"Could not demonstrate UDF usage (e.g., concat): {e}")

    except Exception as e:
        print(f"An error occurred during quickstart: {e}")
    finally:
        spark.stop()
        print("SparkSession stopped.")

if __name__ == "__main__":
    run_quickstart()

view raw JSON →