Taskflow
Taskflow is a Python 3.10+ library for structured state management, task orchestration, and error handling in complex asynchronous workflows. It enables defining tasks and flows using decorators or classes, facilitating robust and scalable application development. The current version is 6.2.0, with an active development cadence that includes significant API changes in major releases.
Common errors
-
TypeError: task() got an unexpected keyword argument 'my_input'
cause Attempting to declare task inputs as direct keyword arguments in Taskflow v6.x or newer, which is the pre-v6 syntax.fixDeclare inputs using the `inputs` dictionary parameter: `@task(inputs={'my_input': str})`. -
AttributeError: 'FlowConfig' object has no attribute 'executor' or similar configuration errors.
cause Accessing or configuring executors (e.g., `LocalThreadPoolExecutor`) or other flow/task settings using pre-v6 methods or referencing removed components.fixThe `LocalThreadPoolExecutor` and `ProcessThreadPoolExecutor` were removed. Use the `AsyncIOExecutor` (default) or implement a custom one. For general config, pass a `FlowConfig` instance. -
ModuleNotFoundError: No module named 'taskflow'
cause The `taskflow` library is not installed in the current Python environment or the environment is not activated.fixInstall the library using pip: `pip install taskflow`. Ensure your virtual environment is activated if you are using one. -
TypeError: Flow.__init__() got an unexpected keyword argument 'max_concurrency'
cause In Taskflow v6.x+, direct configuration arguments for `Flow` or `Task` constructors (like `max_concurrency`) were removed.fixPass configuration parameters via a `FlowConfig` object: `my_flow = Flow(config=FlowConfig(max_concurrency=5))`.
Warnings
- breaking Task and Flow input/output declarations changed significantly in v6.0.0. Previously, inputs were declared as direct keyword arguments (e.g., `@task(name=str)`). Now, they must be dictionaries (e.g., `@task(inputs={"name": str})`).
- breaking The `LocalThreadPoolExecutor` and `ProcessThreadPoolExecutor` options for `FlowConfig` were removed in v6.0.0. The default executor is now `AsyncIOExecutor`.
- breaking Configuration parameters for `Flow` and `Task` are no longer passed directly to their constructors or decorators. All configuration is now done via `FlowConfig` or `TaskConfig` objects.
- deprecated The base classes `AbstractFlow` and `AbstractTask` were removed in v6.0.0. Users subclassing these should now subclass `Flow` and `Task` directly.
- gotcha Taskflow v6.x introduced stricter type enforcement. Mismatched type annotations between task signatures, input/output declarations, and actual data types can lead to runtime `TypeError` exceptions.
Install
-
pip install taskflow
Imports
- task
from taskflow import task
- flow
from taskflow import flow
- Flow
from taskflow import Flow
- Task
from taskflow import Task
- FlowConfig
from taskflow import FlowConfig
Quickstart
import asyncio
from taskflow import task, flow
@task
async def greet_task(name: str) -> str:
"""A simple task that greets a given name."""
print(f"Task received: {name}")
return f"Hello, {name}!"
@flow
async def greeting_flow(input_name: str) -> str:
"""A flow that orchestrates the greeting task."""
# Inputs/outputs are now dictionaries in v6+
greeting_message = await greet_task(input_name)
print(f"Flow processed: {greeting_message}")
return f"Flow finished with: {greeting_message}"
async def main():
name_to_greet = "World"
final_output = await greeting_flow(name_to_greet)
print(f"Final output from flow: {final_output}")
if __name__ == "__main__":
asyncio.run(main())