Pipecat AI Flows

1.0.0 · active · verified Fri Apr 17

Pipecat AI Flows provides a powerful conversation flow management system for Pipecat AI applications. It allows developers to define structured conversational experiences using dynamic nodes and functions, managing transitions and LLM interactions. The library is currently at version 1.0.0 and follows an active release cadence, with frequent updates preceding major version releases.

Common errors

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to define a conversation flow using `FlowManager` and `NodeConfig`, including how to register direct functions with `@flows_direct_function`. It uses mock services to define the structure of a flow without requiring a full Pipecat AI pipeline setup. To run a full conversational agent, the `FlowManager`'s `llm_service` and `transport` would be connected to actual Pipecat AI components and integrated into a `PipelineRunner`.

import asyncio
from pipecat_ai_flows import FlowManager, NodeConfig, flows_direct_function
from pipecat_ai_flows.llm import LLMService # Base class
from pipecat.frames.frames import TextFrame, EndFrame # Required for type hints
import os

# Minimal mock LLMService and Transport to make the example runnable
class MockLLM(LLMService):
    def __init__(self):
        super().__init__("mock_llm")
    async def process_input(self, input_frames):
        for frame in input_frames:
            if isinstance(frame, TextFrame):
                yield TextFrame(f"Mock LLM received: {frame.text}")
        yield EndFrame()

class MockTransport:
    async def send_frame(self, frame):
        if isinstance(frame, TextFrame):
            print(f"Transport received text: {frame.text}")
        elif isinstance(frame, EndFrame):
            print("Transport received EndFrame")
    async def receive_audio_frame(self): return None
    async def receive_text_frame(self): return None

@flows_direct_function(cancel_on_interruption=True)
async def greet_user(flow_manager: FlowManager, user_name: str = "there"):
    """Greets the user by their name."""
    await flow_manager.transport.send_frame(TextFrame(f"Hello, {user_name}!"))
    return "Greeting complete.", "start" # Transition back to start

async def main():
    print("Setting up Pipecat AI Flow Manager...")

    # Define nodes for the conversation flow
    start_node = NodeConfig(
        name="start",
        task_messages=[
            {"role": "developer", "content": "Ask the user for their name or just say hello."}
        ],
        functions=[greet_user], # Make `greet_user` available from this node
        next_node="ask_name_node", # Define a transition
    )

    ask_name_node = NodeConfig(
        name="ask_name_node",
        task_messages=[
            {"role": "developer", "content": "If the user hasn't provided a name, ask for it. Otherwise, acknowledge the name."}
        ],
        next_node=None # End of simple flow for this example
    )

    # Initialize the FlowManager with nodes and required services
    flow_manager = FlowManager(
        initial_node=start_node, # The starting point of the flow
        llm_service=MockLLM(), # In a real app, use pipecat_ai.services.openai.OpenAILLMService etc.
        transport=MockTransport(), # In a real app, use pipecat_ai.transports.daily.DailyService etc.
    )

    print(f"Flow Manager initialized. Current node: {flow_manager.current_node.name}")
    print("\nTo activate the flow and start a conversation, integrate this FlowManager with a Pipecat AI PipelineRunner.")
    print("For example: `pipeline = Pipeline(llm=flow_manager.llm_service, vad=..., stt=..., tts=..., transport=flow_manager.transport)`")
    print("Then: `await PipelineRunner().run(pipeline)`")

if __name__ == "__main__":
    asyncio.run(main())

view raw JSON →