Hamilton
Hamilton (current version 1.89.0) is a Python micro-framework for defining dataflows as functions, enabling modular, testable, and maintainable data pipelines. It represents data transformations as a directed acyclic graph (DAG) where nodes are Python functions and edges are dependencies, making it easy to build complex dataframes. It has an active release cadence with frequent updates.
Common errors
-
hamilton.graph.GraphException: Nodes ['some_output_name'] were not found in the graph.
cause The requested output function `some_output_name` does not exist in the DAG. This could be due to a typo, the function not being defined, or not being accessible to the `Driver`.fixCheck the spelling of `some_output_name`. Ensure the function `def some_output_name(...)` is correctly defined in an imported module or directly in the script, and that the `Driver` is aware of it. -
TypeError: Missing required parameter for function 'my_transform': 'some_dependency_name'
cause The function `my_transform` requires an input named `some_dependency_name`, but no function named `some_dependency_name` exists in the graph, nor was it provided as an initial input to the `Driver`.fixDefine a function `def some_dependency_name(...)` to provide the required input, or pass `some_dependency_name` as an initial input argument to the `Driver`'s constructor or `execute` method (e.g., `driver.execute(inputs={'some_dependency_name': ...}, ...) `). -
ModuleNotFoundError: No module named 'pygraphviz' or OSError: failed to execute ['dot', '-V'], exit code 1, stderr: b'sh: dot: command not found\n'
cause You are trying to visualize the DAG (e.g., `driver.visualize_execution()`) but the necessary visualization tools (`pygraphviz` Python package and/or the `graphviz` system tool) are not installed or not in the system PATH.fixInstall `pygraphviz` via `pip install "sf-hamilton[visualization]"`. For the underlying `graphviz` command-line tool, install it via your system's package manager (e.g., `sudo apt-get install graphviz` on Debian/Ubuntu, `brew install graphviz` on macOS) and ensure it's in your system's PATH.
Warnings
- breaking Major Refactor in Version 1.0.0. If upgrading from versions prior to 1.0.0, expect significant breaking changes, including how configuration is handled and the removal of `base_functions`. The `function_modifiers.extract_fields` replaced older patterns.
- gotcha Function Parameter Naming is Crucial. Hamilton resolves dependencies by matching function parameter names to other function names (or configured variable names). A typo in a parameter name will lead to a 'missing required parameter' error, as the DAG cannot be correctly constructed.
- gotcha Outputs Must Be Explicit. When using `driver.execute()` or `driver.materialize()`, you must explicitly list all desired outputs in the `final_outputs` parameter. If a function is defined but not specified as an output or a dependency for a requested output, it won't be executed.
Install
-
pip install sf-hamilton -
pip install "sf-hamilton[pandas, visualization]"
Imports
- Driver
from hamilton import driver
- function_modifiers
from hamilton import function_modifiers as fm
Quickstart
from hamilton import driver
from hamilton import function_modifiers as fm
import pandas as pd
# Define functions representing nodes in the DAG
def initial_transactions() -> pd.DataFrame:
"""Simulate initial transaction data."""
return pd.DataFrame({
'user_id': [1, 1, 2, 2, 3],
'amount': [10.0, 15.0, 5.0, 20.0, 30.0],
'date': pd.to_datetime(['2024-01-01', '2024-01-02', '2024-01-01', '2024-01-03', '2024-01-02'])
})
def daily_spend(initial_transactions: pd.DataFrame) -> pd.DataFrame:
"""Calculate daily spend per user."""
return initial_transactions.groupby(['user_id', 'date'])['amount'].sum().reset_index()
@fm.config.when(period='30_day')
def avg_spend__30_day(daily_spend: pd.DataFrame) -> pd.DataFrame:
"""Calculate average daily spend over a configured 30-day period."""
# In a real scenario, this would filter for the last 30 days
return daily_spend.groupby('user_id')['amount'].mean().reset_index().rename(columns={'amount': 'avg_30_day_spend'})
# Create and run the driver
dr = driver.Driver({'period': '30_day'})
result = dr.execute(final_outputs=['avg_spend__30_day'])
print(result['avg_spend__30_day'])