Koheesio
Koheesio is a unified, composable, and scalable steps-based framework for data processing and ETL tasks built on top of Apache Spark. It simplifies the creation and orchestration of data pipelines by providing a structured way to define and execute steps. The current version is 0.10.6, and it maintains an active release cadence with frequent updates and bug fixes.
Warnings
- breaking Koheesio has strict `pyspark` version requirements (`>=3.3.0,<3.6.0`). Using Spark versions outside this range, especially attempting 'Spark Connect' with unsupported versions, will lead to errors.
- gotcha Specific functionalities like Snowflake, Databricks, or Delta Lake integration require installing Koheesio with their respective optional dependencies (e.g., `pip install 'koheesio[snowflake]'`).
- gotcha Koheesio requires Python versions `>=3.9` and `<3.13`. Ensure your Python environment meets these requirements to avoid compatibility issues.
- breaking Koheesio now explicitly requires Pydantic V2 (`>=2.0.0`). If you are migrating from an older Koheesio version that used Pydantic V1, you may need to update your custom `Step` definitions due to Pydantic's breaking API changes between major versions.
Install
-
pip install koheesio -
pip install 'koheesio[snowflake]' # for Snowflake features -
pip install 'koheesio[all]' # for all optional dependencies
Imports
- KoheesioSparkSession
from koheesio.spark import KoheesioSparkSession
- Step
from koheesio.steps import Step
- Pipeline
from koheesio.pipelines import Pipeline
- CsvReader
from koheesio.steps.readers import CsvReader
Quickstart
import os
from koheesio.spark import KoheesioSparkSession
from koheesio.steps import Step
from pyspark.sql import DataFrame
from pyspark.sql.functions import lit
# 1. Define your Spark Koheesio Session
spark_session_name = "koheesio-example"
spark = KoheesioSparkSession.get_or_create(spark_session_name=spark_session_name)
# 2. Define a simple Step
class AddColumnStep(Step):
"""Adds a new column to the DataFrame."""
value: str
def execute(self, df: DataFrame) -> DataFrame:
self.log.info(f"Adding column 'new_column' with value '{self.value}'")
return df.withColumn("new_column", lit(self.value))
# 3. Create a dummy DataFrame
df_input = spark.createDataFrame([(1, "a"), (2, "b")], ["id", "col1"])
df_input.show()
# 4. Run your step
df_output = AddColumnStep(value="koheesio-rocks").execute(df_input)
df_output.show()
# 5. Stop the Spark session (optional in many environments)
# spark.stop()