LlamaIndex Workflows
LlamaIndex Workflows is an event-driven, async-first, step-based framework designed to control the execution flow of AI applications, especially agents. It allows developers to build complex, multi-step processes by orchestrating various components, including Large Language Models (LLMs) and external APIs, and to maintain state across different steps. The library is currently at version 2.17.3 and undergoes frequent updates to enhance features and improve performance. [1, 7, 25, 26]
Warnings
- breaking LlamaIndex Workflows became a standalone package (v1.0), deprecating older in-tree workflow-like implementations within `llama-index` itself (pre-0.11). Users migrating from `Query Pipelines` or older internal `AgentRunner`/`AgentWorker` classes in `llama-index.core.agent` must refactor their code to use the new `workflows` package structure. [1, 6, 14, 17, 22]
- gotcha LlamaIndex Workflows are designed as 'async-first'. This means all steps and the `run` method are asynchronous. Users running workflows in non-async environments (e.g., top-level scripts) need to wrap their execution with `asyncio.run()` or similar. In Jupyter/Colab, this is often handled, but direct script execution requires explicit async handling. [5, 9, 10]
- gotcha Error handling is critical for robust workflows. External API calls (like to LLMs), data validation, and network issues can cause `WorkflowRuntimeError`, `WorkflowTimeoutError`, or other exceptions. Workflows do not automatically snapshot state for recovery without explicit integration. [5, 16, 19]
- deprecated The `Query Pipelines` feature in `llama-index` was deprecated in favor of Workflows in LlamaIndex version 0.11. Existing code using `Query Pipelines` should be migrated. [6, 22]
- gotcha Complex workflows, especially those involving multiple LLM calls or external services, can experience high latency. Default configurations might not be optimized for performance. [18]
Install
-
pip install llama-index-workflows -
pip install 'llama-index-workflows[openai]' # Install with OpenAI dependencies
Imports
- Workflow
from workflows import Workflow
- step
from workflows import step
- Context
from workflows import Context
- WorkflowEvent
from workflows.events import WorkflowEvent
- StartEvent
from workflows.events import StartEvent
- StopEvent
from workflows.events import StopEvent
Quickstart
import asyncio
import os
from pydantic import BaseModel, Field
from workflows import Context, Workflow, step
from workflows.events import WorkflowEvent, StartEvent, StopEvent
from llama_index.core.llms import LLM
from llama_index.llms.openai import OpenAI # Ensure llama-index-llms-openai is installed
# Define custom event types for the workflow
class JokeTopicEvent(WorkflowEvent):
topic: str = Field(description="The topic for the joke.")
class JokeEvent(WorkflowEvent):
joke: str = Field(description="The generated joke.")
class CritiqueEvent(WorkflowEvent):
joke: str = Field(description="The original joke.")
critique: str = Field(description="The critique of the joke.")
class FinalJokeEvent(WorkflowEvent):
final_joke: str = Field(description="The final, refined joke.")
async def main():
# Initialize an LLM (requires OPENAI_API_KEY environment variable)
llm = OpenAI(model="gpt-4o-mini", api_key=os.environ.get("OPENAI_API_KEY", ""))
# Create a Workflow instance
workflow = Workflow()
# Define workflow steps using the @step decorator
@step
async def generate_joke(ctx: Context, event: JokeTopicEvent):
print(f"[Step] Generating joke about: {event.topic}")
response = await llm.complete(f"Tell me a short joke about {event.topic}.")
return JokeEvent(joke=response.text)
@step
async def critique_joke(ctx: Context, event: JokeEvent):
print(f"[Step] Critiquing joke: {event.joke}")
response = await llm.complete(f"Critique this joke and suggest an improvement: '{event.joke}'")
return CritiqueEvent(joke=event.joke, critique=response.text)
@step
async def refine_joke(ctx: Context, event: CritiqueEvent):
print(f"[Step] Refining joke based on critique: {event.critique}")
response = await llm.complete(f"Original joke: '{event.joke}'\nCritique: '{event.critique}'\nRefine the joke based on the critique to make it funnier or clearer.")
return FinalJokeEvent(final_joke=response.text)
# Register steps with the workflow
workflow.add_step(generate_joke)
workflow.add_step(critique_joke)
workflow.add_step(refine_joke)
print("Starting workflow to generate and refine a joke...")
# Trigger the initial event to start the workflow
events_stream = workflow.run(StartEvent(payload=JokeTopicEvent(topic="dogs")))
# Process events as they arrive
async for event in events_stream:
if isinstance(event, FinalJokeEvent):
print(f"\n[Final Result] {event.final_joke}")
break
elif isinstance(event, StopEvent):
print(f"\n[Workflow Stopped] {event.result}")
break
else:
print(f"[Event Emitted] {event.__class__.__name__}: {event.payload}")
if __name__ == "__main__":
# Ensure OPENAI_API_KEY is set for the example to run correctly
# For local testing without a real key, you can set a dummy key to bypass initialization errors
# but LLM calls will fail without a valid key.
if not os.environ.get("OPENAI_API_KEY"):
print("WARNING: OPENAI_API_KEY environment variable not set. LLM calls will fail.")
asyncio.run(main())