Fugue: Abstraction Layer for Distributed Computing
Fugue is a Python library that provides a unified interface for defining data workflows, allowing them to run seamlessly on Pandas, Spark, Dask, Ray, and other distributed computing engines without code changes. It's designed to make data pipelines more portable and testable. The current version is 0.9.7, and it maintains an active release cadence with frequent patches and updates.
Warnings
- breaking Fugue SQL and related functions/dependencies were moved to an optional `[sql]` extra package in version 0.9.0. Users relying on these features without explicit installation will encounter `ImportError`.
- gotcha Fugue has strict Python version requirements. From 0.9.0 onwards, it generally requires Python 3.10 or higher. Using older Python versions will lead to installation and runtime errors.
- gotcha There have been several patches related to Pandas compatibility (e.g., specific version pinning for Pandas<3, then unpinning). Users might face issues if their Pandas version is incompatible with the installed Fugue version, especially when using Pandas 2.x.
- deprecated The `DataFrame` class was moved from `fugue.dataframe` to `fugue.collections.dataframe`. While the old path might still work for some time due to redirects, it's considered deprecated.
- gotcha Compatibility with distributed engines (Spark, Ray, Dask) is continuously updated. Older Fugue versions might not work with the latest versions of these engines, and vice-versa. Users should monitor Fugue release notes for compatibility fixes.
Install
-
pip install fugue -
pip install 'fugue[spark]' -
pip install 'fugue[dask]' -
pip install 'fugue[ray]' -
pip install 'fugue[sql]'
Imports
- FugueWorkflow
from fugue import FugueWorkflow
- fugue_transform
from fugue.api import fugue_transform
- DataFrame
from fugue.collections.dataframe import DataFrame
Quickstart
from fugue import FugueWorkflow
import pandas as pd
def map_to_string(df: pd.DataFrame) -> pd.DataFrame:
return df.assign(value_str=df['value'].astype(str))
with FugueWorkflow() as dag:
df = dag.df([{"id": 1, "value": 10}, {"id": 2, "value": 20}])
result = df.transform(map_to_string, schema="*,value_str:str")
result.show()