Simpleflow (AWS SWF)
Simpleflow is a Python library for dataflow programming with Amazon Simple Workflow Service (SWF). It provides a Pythonic way to define and execute complex, distributed workflows by orchestrating activities and managing their states. The current version is 0.34.2, and it typically sees infrequent, but targeted, updates.
Warnings
- breaking Python 2 support has been dropped. Versions of simpleflow 0.30.0 and above require Python 3.7 or newer.
- gotcha Simpleflow relies on `boto3` for AWS interaction, which requires proper AWS credentials and region configuration. Issues with these can lead to `ClientError` or `NoCredentialsError`.
- gotcha AWS SWF domains and task lists must be pre-created and configured in your AWS account before simpleflow can use them. If they don't exist, workflow/activity workers will fail to poll tasks.
- gotcha Simpleflow activities return `Future` objects. Directly accessing `.result` on a `Future` before the associated activity completes will either block indefinitely (in some executors) or raise an exception (in others).
- gotcha Data passed between activities and workflows (inputs/outputs) must be JSON-serializable. Complex or custom Python objects will cause serialization errors unless custom encoders are used or data is converted to a compatible format.
Install
-
pip install simpleflow
Imports
- Workflow
from simpleflow.workflow import Workflow
- activity
from simpleflow import activity
- futures
from simpleflow import futures
- with_attributes
from simpleflow.activity import with_attributes
Quickstart
import os
import simpleflow
from simpleflow import workflow, activity, futures
from simpleflow.local.executor import Executor
# Define an activity
@activity.with_attributes(task_list='my_activity_task_list', version='1.0')
def say_hello(name):
"""An activity that returns a greeting."""
print(f"Activity 'say_hello' received: {name}")
return f"Hello, {name}!"
# Define a workflow
@workflow.with_attributes(task_list='my_workflow_task_list', version='1.0')
class GreetingWorkflow(workflow.Workflow):
"""A simple workflow that uses the say_hello activity."""
def __init__(self):
super(GreetingWorkflow, self).__init__()
# Bind the activity to the workflow instance
self.say_hello_activity = say_hello
def run(self, name):
print(f"Workflow 'GreetingWorkflow' started with input: {name}")
# Schedule the activity and get a Future object
hello_future = self.say_hello_activity(name)
# In a real SWF execution, futures.wait() would block until the activity completes.
# For the local executor, the result is often resolved synchronously.
# Accessing .result will retrieve the value when ready.
final_greeting = hello_future.result
print(f"Workflow received result from activity: {final_greeting}")
return final_greeting
# --- Quickstart Execution (using local executor for demonstration) ---
if __name__ == "__main__":
# The local executor allows running workflows without connecting to AWS SWF.
# It executes activities and workflows synchronously in the same process.
executor = Executor()
print("\n--- Executing GreetingWorkflow locally ---")
# Run the workflow. Arguments to the workflow's `run` method are passed as a tuple.
# The executor's `run` method returns the final result of the workflow.
workflow_input_name = "Simpleflow User"
final_output = executor.run(GreetingWorkflow, (workflow_input_name,))
print(f"\n--- Workflow Execution Complete ---")
print(f"Input name: '{workflow_input_name}'")
print(f"Final output: '{final_output}'")
assert final_output == f"Hello, {workflow_input_name}!"
print("Local execution successful!")