Adagio - DAG IO Framework

0.2.6 · active · verified Fri Apr 10

Adagio is a lightweight Python framework for building and managing Directed Acyclic Graphs (DAGs), particularly for data orchestration within Fugue projects. It provides abstractions for defining tasks and composing them into execution flows. Current version is 0.2.6. Releases are infrequent, often coinciding with Fugue updates, focusing on stability and compatibility.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to define two simple tasks, compose them into a DAG with dependencies, and execute the DAG using Adagio's `Flow`. It shows how to pass initial data and retrieve the final results.

from adagio.dag import DAG
from adagio.core import Flow, Task
from typing import Any, Dict

# Define simple tasks as functions
def add_one(value: int) -> int:
    return value + 1

def multiply_by_two(value: int) -> int:
    return value * 2

# Wrap functions as Adagio Tasks, defining input and output names
task1 = Task(add_one, "add_task", input_names=["initial_value"], output_names=["result_add"])
task2 = Task(multiply_by_two, "mul_task", input_names=["result_add"], output_names=["final_result"])

# Build the DAG by adding tasks and defining edges (dependencies)
dag = DAG()
dag.add_task(task1)
dag.add_task(task2)
dag.set_edge("add_task", "mul_task") # Connect tasks: output 'result_add' from task1 becomes input 'result_add' for task2

# Define initial data for the DAG execution
initial_data = {"initial_value": 5}

# Create a Flow and execute the DAG
flow = Flow(dag)
result = flow.run(initial_data)

# Access results
print(f"Initial value: {initial_data['initial_value']}")
print(f"After add_one: {result['result_add']}")
print(f"Final result: {result['final_result']}")
assert result['final_result'] == 12

view raw JSON →