Hamilton

1.89.0 · active · verified Fri Apr 17

Hamilton (current version 1.89.0) is a Python micro-framework for defining dataflows as functions, enabling modular, testable, and maintainable data pipelines. It represents data transformations as a directed acyclic graph (DAG) where nodes are Python functions and edges are dependencies, making it easy to build complex dataframes. It has an active release cadence with frequent updates.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart defines a simple dataflow: initial transactions are aggregated into daily spend, and then an average daily spend over a specific period is calculated. It demonstrates function-based node definition, the use of a function modifier (`@fm.config.when`), and executing the `Driver` to obtain a specific output.

from hamilton import driver
from hamilton import function_modifiers as fm
import pandas as pd

# Define functions representing nodes in the DAG
def initial_transactions() -> pd.DataFrame:
    """Simulate initial transaction data."""
    return pd.DataFrame({
        'user_id': [1, 1, 2, 2, 3],
        'amount': [10.0, 15.0, 5.0, 20.0, 30.0],
        'date': pd.to_datetime(['2024-01-01', '2024-01-02', '2024-01-01', '2024-01-03', '2024-01-02'])
    })

def daily_spend(initial_transactions: pd.DataFrame) -> pd.DataFrame:
    """Calculate daily spend per user."""
    return initial_transactions.groupby(['user_id', 'date'])['amount'].sum().reset_index()

@fm.config.when(period='30_day')
def avg_spend__30_day(daily_spend: pd.DataFrame) -> pd.DataFrame:
    """Calculate average daily spend over a configured 30-day period."""
    # In a real scenario, this would filter for the last 30 days
    return daily_spend.groupby('user_id')['amount'].mean().reset_index().rename(columns={'amount': 'avg_30_day_spend'})

# Create and run the driver
dr = driver.Driver({'period': '30_day'})
result = dr.execute(final_outputs=['avg_spend__30_day'])

print(result['avg_spend__30_day'])

view raw JSON →