Dataengine
Dataengine is a general-purpose Python package designed for streamlined data engineering tasks. It provides a unified API for working with various data processing backends such as Pandas, Polars, Spark, and Ray, along with integrations for common data formats (CSV, Parquet, JSON) and relational databases via SQLAlchemy and DuckDB. Currently at version 0.0.92, it is under active development with a focus on providing flexible and scalable data manipulation tools.
Common errors
-
ModuleNotFoundError: No module named 'pyspark'
cause Attempting to use `de.SparkSession` or `de.SparkDataFrame` without installing the `pyspark` optional dependency.fixInstall `dataengine` with the Spark extra: `pip install 'dataengine[spark]'`. -
AttributeError: module 'dataengine' has no attribute 'OldClassName'
cause An API class, function, or module name has changed between `dataengine` versions, which is common during pre-1.0 development.fixCheck the `dataengine` GitHub repository for recent changes and update your code to use the new class/function names. Consider pinning your `dataengine` version if stability is critical for your project. -
dataengine.errors.DataEngineError: Failed to connect to database... (e.g., psycopg is not installed)
cause Missing a database-specific driver or incorrect connection parameters when interacting with databases via `dataengine`'s SQL capabilities.fixInstall the required database driver as an extra (e.g., `pip install 'dataengine[postgres]'` for PostgreSQL, `pip install 'dataengine[mysql]'` for MySQL). Additionally, verify your connection string, host, port, user, and password for correctness.
Warnings
- breaking API Breaking Changes in Pre-1.0 Versions
- gotcha Missing Optional Dependencies for Specific Backends/Connectors
- gotcha Performance Variance Across Different Backends
Install
-
pip install dataengine -
pip install 'dataengine[spark]' 'dataengine[s3]' 'dataengine[postgres]'
Imports
- SparkSession
from dataengine import SparkSession
- PandasDataFrame
from dataengine import PandasDataFrame
- CSV
from dataengine import CSV
Quickstart
import dataengine as de
import os
import pandas as pd # Used for creating dummy data file
# Create a dummy CSV file for the example
dummy_csv_content = "id,name,value\n1,Alice,100\n2,Bob,200\n3,Charlie,150"
temp_csv_file = "temp_quickstart.csv"
with open(temp_csv_file, "w") as f:
f.write(dummy_csv_content)
try:
# 1. Initialize a session (PandasSession is simplest for local execution)
# For Spark, you would use: session = de.SparkSession() (requires pyspark)
session = de.PandasSession()
print(f"Initialized session type: {session.__class__.__name__}")
# 2. Create a DataFrame directly from Python data
data = {'item': ['apple', 'banana', 'orange'], 'price': [1.0, 0.5, 0.75]}
df_from_dict = de.PandasDataFrame(data=data)
print("\nDataFrame created from dict:")
print(df_from_dict.to_pandas())
# 3. Read data from the temporary CSV file
csv_reader = de.CSV(session)
df_from_csv = csv_reader.read(temp_csv_file)
print("\nDataFrame read from temporary CSV file:")
print(df_from_csv.to_pandas())
# 4. Perform a simple transformation (e.g., filter)
filtered_df = df_from_csv.filter(df_from_csv['value'] > 100)
print("\nFiltered DataFrame (value > 100):")
print(filtered_df.to_pandas())
# 5. Example of writing (conceptual - actual write requires specific setup)
# For instance: filtered_df.write.parquet("output.parquet")
except Exception as e:
print(f"An error occurred during quickstart: {e}")
finally:
# Clean up the dummy file
if os.path.exists(temp_csv_file):
os.remove(temp_csv_file)
print("\nQuickstart complete. Explore de.SparkSession, de.PolarsDataFrame, de.Parquet etc. for more advanced usage.")