Kestra Python Client
Kestra is an infinitely scalable orchestration and scheduling platform that allows users to create, run, schedule, and monitor complex pipelines. The Python client library facilitates programmatic interaction with the Kestra API for managing flows, executions, and sending metrics, outputs, and logs from within Python script tasks. The library is currently at version 1.3.0 and maintains a regular release cadence with frequent updates.
Warnings
- breaking Starting from Kestra 0.23.0 (and confirmed in 0.24.0 and 1.x), a tenant context is required. The `defaultTenant` (null tenant) is no longer supported. For Kestra Open Source, a fixed tenant named 'main' is always used, and all API URIs now include the tenant, e.g., `/api/v1/...` became `/api/v1/main/...`.
- breaking With Kestra 0.24.0, basic authentication became required. Additionally, IAM and API endpoint changes were introduced.
- breaking For users of S3 or GCS as internal storage, Kestra now removes the leading root slash in all storage paths to fix double slashes. This impacts how internal storage objects are accessed.
- gotcha When running Python scripts within Kestra flows, managing dependencies can be tricky. Installing packages with `beforeCommands` downloads and installs them on every run, increasing execution time.
- gotcha The Python client package on PyPI is `kestra`, but the internal modules for programmatic client interaction (e.g., `Configuration`, `KestraClient`) are imported from `kestrapy`. For utilities used inside Kestra Python script tasks (e.g., `Kestra.outputs`), the import is `from kestra import Kestra`.
Install
-
pip install kestra -
pip install python-dotenv # Optional, for managing environment variables
Imports
- Flow
from kestra import Flow
- Kestra
from kestra import Kestra
- Configuration, KestraClient
from kestrapy import Configuration, KestraClient
Quickstart
import os
from kestrapy import Configuration, KestraClient
# Configure client using environment variables or hardcoded values (not recommended for production)
configuration = Configuration(
host=os.environ.get('KESTRA_HOST', 'http://localhost:8080'),
username=os.environ.get('KESTRA_USERNAME', 'root@root.com'), # or KESTRA_API_TOKEN
password=os.environ.get('KESTRA_PASSWORD', 'Root!1234')
)
kestra_client = KestraClient(configuration)
tenant_id = os.environ.get('KESTRA_TENANT', 'main') # 'main' is default for OSS
namespace = "my_namespace"
flow_id = "my_flow"
flow_yaml = f'''
id: {flow_id}
namespace: {namespace}
tasks:
- id: hello
type: io.kestra.plugin.core.log.Log
message: "Hello from a Kestra Python SDK created flow!"
'''
try:
# Create a flow
created_flow = kestra_client.flows.create_flow(tenant=tenant_id, body=flow_yaml)
print(f"Flow created: {created_flow.id}")
# Execute the flow
execution = kestra_client.executions.create_execution(tenant=tenant_id, flow_id=flow_id, namespace=namespace, wait=True)
print(f"Execution finished with status: {execution.state.current}")
# Example of using Kestra within a flow (not directly runnable in quickstart, but demonstrates usage)
# from kestra import Kestra
# Kestra.outputs({"message": "This is an output from within a flow!"})
except Exception as e:
print(f"An error occurred: {e}")