Kestra Python Client

1.3.0 · active · verified Mon Apr 13

Kestra is an infinitely scalable orchestration and scheduling platform that allows users to create, run, schedule, and monitor complex pipelines. The Python client library facilitates programmatic interaction with the Kestra API for managing flows, executions, and sending metrics, outputs, and logs from within Python script tasks. The library is currently at version 1.3.0 and maintains a regular release cadence with frequent updates.

Warnings

Install

Imports

Quickstart

This quickstart demonstrates how to programmatically create and execute a Kestra flow using the Kestra Python SDK. It configures the client using environment variables for host, username, and password (or API token), then defines a simple flow in YAML and submits it to the Kestra instance. It then triggers an execution and waits for its completion. For in-flow interactions, the `kestra.Kestra` class is used within Python script tasks.

import os
from kestrapy import Configuration, KestraClient

# Configure client using environment variables or hardcoded values (not recommended for production)
configuration = Configuration(
    host=os.environ.get('KESTRA_HOST', 'http://localhost:8080'),
    username=os.environ.get('KESTRA_USERNAME', 'root@root.com'), # or KESTRA_API_TOKEN
    password=os.environ.get('KESTRA_PASSWORD', 'Root!1234')
)
kestra_client = KestraClient(configuration)

tenant_id = os.environ.get('KESTRA_TENANT', 'main') # 'main' is default for OSS
namespace = "my_namespace"
flow_id = "my_flow"

flow_yaml = f'''
id: {flow_id}
namespace: {namespace}
tasks:
  - id: hello
    type: io.kestra.plugin.core.log.Log
    message: "Hello from a Kestra Python SDK created flow!"
'''

try:
    # Create a flow
    created_flow = kestra_client.flows.create_flow(tenant=tenant_id, body=flow_yaml)
    print(f"Flow created: {created_flow.id}")

    # Execute the flow
    execution = kestra_client.executions.create_execution(tenant=tenant_id, flow_id=flow_id, namespace=namespace, wait=True)
    print(f"Execution finished with status: {execution.state.current}")
    
    # Example of using Kestra within a flow (not directly runnable in quickstart, but demonstrates usage)
    # from kestra import Kestra
    # Kestra.outputs({"message": "This is an output from within a flow!"})

except Exception as e:
    print(f"An error occurred: {e}")

view raw JSON →