Adagio - DAG IO Framework
Adagio is a lightweight Python framework for building and managing Directed Acyclic Graphs (DAGs), particularly for data orchestration within Fugue projects. It provides abstractions for defining tasks and composing them into execution flows. Current version is 0.2.6. Releases are infrequent, often coinciding with Fugue updates, focusing on stability and compatibility.
Warnings
- gotcha Adagio is primarily designed as an internal component for the Fugue project. While it can be used standalone for DAG management, its API and feature set might be more opinionated and less extensive than other general-purpose DAG libraries.
- breaking Being in a 0.x.x version, Adagio's API is subject to change in minor releases (e.g., 0.1.x to 0.2.x). While efforts are made to minimize disruption, direct API methods or behavior might be altered without a major version increment.
- gotcha Connecting tasks requires careful mapping of output names to input names using the `Task` constructor (`output_names` parameter) and `DAG.set_edge()`. Mismatches will result in runtime errors due to missing inputs for downstream tasks, which can be hard to debug.
Install
-
pip install adagio
Imports
- Flow
from adagio.core import Flow
- DAG
from adagio.dag import DAG
- Task
from adagio.core import Task
Quickstart
from adagio.dag import DAG
from adagio.core import Flow, Task
from typing import Any, Dict
# Define simple tasks as functions
def add_one(value: int) -> int:
return value + 1
def multiply_by_two(value: int) -> int:
return value * 2
# Wrap functions as Adagio Tasks, defining input and output names
task1 = Task(add_one, "add_task", input_names=["initial_value"], output_names=["result_add"])
task2 = Task(multiply_by_two, "mul_task", input_names=["result_add"], output_names=["final_result"])
# Build the DAG by adding tasks and defining edges (dependencies)
dag = DAG()
dag.add_task(task1)
dag.add_task(task2)
dag.set_edge("add_task", "mul_task") # Connect tasks: output 'result_add' from task1 becomes input 'result_add' for task2
# Define initial data for the DAG execution
initial_data = {"initial_value": 5}
# Create a Flow and execute the DAG
flow = Flow(dag)
result = flow.run(initial_data)
# Access results
print(f"Initial value: {initial_data['initial_value']}")
print(f"After add_one: {result['result_add']}")
print(f"Final result: {result['final_result']}")
assert result['final_result'] == 12