flytekitplugins-spark

raw JSON →
1.16.19 verified Fri May 01 auth: no python

Spark 3 plugin for flytekit, enabling PySpark jobs within Flyte workflows. Current version 1.16.19, part of the Flytekit ecosystem, released regularly alongside Flytekit.

pip install flytekitplugins-spark
error ModuleNotFoundError: No module named 'flytekitplugins.spark'
cause Plugin package not installed.
fix
Run 'pip install flytekitplugins-spark'.
error AttributeError: module 'flytekitplugins.spark' has no attribute 'PySparkTask'
cause Deprecated import path used; PySparkTask not exposed in older versions or wrong import.
fix
Use 'from flytekitplugins.spark import PySparkTask' (or upgrade flytekitplugins-spark).
error Py4JJavaError: An error occurred while calling o123.showString.
cause Spark configuration missing required settings or cluster resources insufficient.
fix
Ensure spark_conf includes 'spark.executor.cores' and 'spark.executor.instances'. Verify cluster is running.
breaking In flytekit>=1.10, Spark task configuration requires explicit spark_conf; default configs may be removed.
fix Always provide spark_conf dictionary in Spark() task config.
gotcha SparkSession must be obtained inside the task at runtime, not at import time. Using getOrCreate() outside the task scope can cause serialization errors.
fix Always call SparkSession.builder.getOrCreate() inside the task function body.
deprecated The old import path from 'flytekitplugins.spark.task' is deprecated in favor of top-level imports from 'flytekitplugins.spark'.
fix Use 'from flytekitplugins.spark import Spark, PySparkTask'.

Define a Spark task using task_config=Spark(...), then build a workflow.

from flytekit import task, workflow
from flytekitplugins.spark import Spark, PySparkTask

@task(task_config=Spark(
    spark_conf={
        "spark.executor.cores": "1",
        "spark.executor.instances": "1"
    }
))
def my_spark_task() -> int:
    import pyspark
    spark = pyspark.sql.SparkSession.builder.getOrCreate()
    df = spark.range(10)
    return df.count()

@workflow
def wf() -> int:
    return my_spark_task()

if __name__ == "__main__":
    print(wf())